from __future__ import annotations import os import typing as T import warnings from datetime import datetime from pathlib import Path from pycs import db from pycs.database.Collection import Collection from pycs.database.Result import Result from pycs.database.Label import Label from pycs.database.base import NamedBaseModel from pycs.database.util import commit_on_return class File(NamedBaseModel): """ DB Model for files """ # table columns uuid = db.Column(db.String, nullable=False) extension = db.Column(db.String, nullable=False) type = db.Column(db.String, nullable=False) size = db.Column(db.Integer, nullable=False) created = db.Column(db.DateTime, default=datetime.utcnow, index=True, nullable=False) path = db.Column(db.String, nullable=False) frames = db.Column(db.Integer) fps = db.Column(db.Float) project_id = db.Column( db.Integer, db.ForeignKey("project.id", ondelete="CASCADE"), nullable=False) collection_id = db.Column( db.Integer, db.ForeignKey("collection.id", ondelete="SET NULL")) # contraints __table_args__ = ( db.UniqueConstraint('project_id', 'path'), ) results = db.relationship("Result", backref="file", lazy="dynamic", passive_deletes=True, ) serialize_only = NamedBaseModel.serialize_only + ( "uuid", "extension", "type", "size", "created", "path", "frames", "fps", "project_id", "collection_id", ) @property def filename(self): """ filename consisting of a name and an extension """ return f"{self.name}{self.extension}" @property def absolute_path(self) -> str: """ returns an absolute of the file """ path = Path(self.path) if path.is_absolute(): return str(path) return str(Path.cwd() / path) # pylint: disable=arguments-differ def delete(self, commit: bool = True): """ after the object is deleted, the according physical file is also delete if commit was True """ # pylint: disable=unexpected-keyword-arg dump = super().delete(commit=commit) if commit: os.remove(self.path) # TODO: remove temp files warnings.warn("Temporary files may still exist!") return dump @commit_on_return def set_collection(self, collection_id: T.Optional[int]): """ set this file's collection :param collection_id: new collection id :return: """ self.collection_id = collection_id @commit_on_return def set_collection_by_reference(self, collection_reference: T.Optional[str]): """ set this file's collection :param collection_reference: collection reference :return: """ if self.collection_reference is None: self.set_collection(None) collection = Collection.query.filter_by(reference=collection_reference).one() self.collection_id = collection.id def _get_another_file(self, *query) -> T.Optional[File]: """ get the first file matching the query ordered by descending id :return: another file or None """ return File.query.filter(File.project_id == self.project_id, *query) def next(self) -> T.Optional[File]: """ get the successor of this file :return: another file or None """ return self._get_another_file(File.id > self.id)\ .order_by(File.id).first() def previous(self) -> T.Optional[File]: """ get the predecessor of this file :return: another file or None """ # pylint: disable=no-member return self._get_another_file(File.id < self.id)\ .order_by(File.id.desc()).first() def next_in_collection(self) -> T.Optional[File]: """ get the predecessor of this file :return: another file or None """ return self._get_another_file( File.id > self.id, File.collection_id == self.collection_id)\ .order_by(File.id).first() def previous_in_collection(self) -> T.Optional[File]: """ get the predecessor of this file :return: another file or None """ # pylint: disable=no-member return self._get_another_file( File.id < self.id, File.collection_id == self.collection_id)\ .order_by(File.id.desc()).first() def result(self, identifier: int) -> T.Optional[Result]: """ get one of the file's results :return: result object or None """ return self.results.get(identifier) @commit_on_return def create_result(self, origin: str, result_type: str, label: T.Optional[T.Union[Label, int]] = None, data: T.Optional[dict] = None) -> Result: """ Creates a result and returns the created object :return: result object """ result = Result.new(commit=False, file_id=self.id, origin=origin, type=result_type) result.data = data if label is not None: assert isinstance(label, (int, Label)), f"Wrong label type: {type(label)}" if isinstance(label, Label): label = label.id result.label_id = label return result def remove_results(self, origin='pipeline') -> T.List[Result]: """ Remove assigned results, but return them. :return: list of result objects """ results = Result.query.filter( Result.file_id == self.id, Result.origin == origin) _results = [r.serialize() for r in results.all()] results.delete() return _results