123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- import os
- import uuid
- from eventlet import tpool
- from flask import make_response, request, abort
- from flask.views import View
- from werkzeug import formparser
- from pycs.database.Project import Project
- from pycs.frontend.notifications.NotificationManager import NotificationManager
- from pycs.util.FileOperations import file_info
- class UploadFile(View):
- """
- save file uploads
- """
- # pylint: disable=arguments-differ
- methods = ['POST']
- def __init__(self, nm: NotificationManager):
- # pylint: disable=invalid-name
- self.nm = nm
- self.data_folder = None
- self.uuid = None
- self.name = None
- self.extension = None
- self.size = None
- def dispatch_request(self, project_id: int):
- # find project
- project = Project.get_or_404(project_id)
- # abort if external storage is used
- if project.external_data:
- return abort(400, "Project uses external data, but a file was uploaded")
- # get upload path and id
- self.data_folder = project.data_folder
- self.uuid = str(uuid.uuid1())
- # parse upload data
- _, _, files = tpool.execute(formparser.parse_form_data,
- request.environ,
- stream_factory=self.custom_stream_factory)
- # abort if there is no file entry in uploaded data
- if 'file' not in files.keys():
- return abort(400, "No file entry was found in uploaded data")
- # detect file type
- try:
- ftype, frames, fps = tpool.execute(file_info,
- self.data_folder,
- self.uuid,
- self.extension)
- except ValueError as exception:
- return abort(400, str(exception))
- file, _ = project.add_file(
- uuid=self.uuid,
- file_type=ftype,
- name=self.name,
- extension=self.extension,
- size=self.size,
- filename=self.uuid,
- frames=frames,
- fps=fps)
- # send update
- self.nm.create_file(file)
- # return default success response
- return make_response()
- def custom_stream_factory(self, total_content_length, filename, content_type,
- content_length=None):
- """
- save some useful information and open a file handler to save the uploaded file to
- :param total_content_length:
- :param filename:
- :param content_type:
- :param content_length:
- :return:
- """
- # pylint: disable=unused-argument
- # set relevant properties
- self.name, self.extension = os.path.splitext(filename)
- if content_length is not None and content_length > 0:
- self.size = content_length
- else:
- self.size = total_content_length
- # open file handler
- file_path = os.path.join(self.data_folder, f'{self.uuid}{self.extension}')
- #pylint: disable=consider-using-with
- return open(file_path, 'wb')
|