from os import path from uuid import uuid1 from eventlet import tpool from flask import make_response, request, abort from flask.views import View from werkzeug import formparser from pycs.database.Database import Database from pycs.frontend.notifications.NotificationManager import NotificationManager from pycs.util.FileParser import file_info class UploadFile(View): """ save file uploads """ # pylint: disable=arguments-differ methods = ['POST'] def __init__(self, db: Database, nm: NotificationManager): # pylint: disable=invalid-name self.db = db self.nm = nm self.data_folder = None self.file_id = None self.file_name = None self.file_extension = None self.file_size = None def dispatch_request(self, identifier): # find project project = self.db.project(identifier) if project is None: return abort(404, "Project not found") # abort if external storage is used if project.external_data: return abort(400, "Project uses external data, but a file was uploaded") # get upload path and id self.data_folder = project.data_folder self.file_id = str(uuid1()) # parse upload data _, _, files = tpool.execute(formparser.parse_form_data, request.environ, stream_factory=self.custom_stream_factory) # abort if there is no file entry in uploaded data if 'file' not in files.keys(): return abort(400, "No file entry was found in uploaded data") # detect file type try: ftype, frames, fps = tpool.execute(file_info, self.data_folder, self.file_id, self.file_extension) except ValueError as e: return abort(400, str(e)) # add to project files with self.db: file, _ = project.add_file(self.file_id, ftype, self.file_name, self.file_extension, self.file_size, self.file_id, frames, fps) # send update self.nm.create_file(file) # return default success response return make_response() def custom_stream_factory(self, total_content_length, filename, content_type, content_length=None): """ save some useful information and open a file handler to save the uploaded file to :param total_content_length: :param filename: :param content_type: :param content_length: :return: """ # pylint: disable=unused-argument # set relevant properties self.file_name, self.file_extension = path.splitext(filename) if content_length is not None and content_length > 0: self.file_size = content_length else: self.file_size = total_content_length # open file handler file_path = path.join(self.data_folder, f'{self.file_id}{self.file_extension}') return open(file_path, 'wb')