from __future__ import annotations import os import typing as T import warnings from datetime import datetime from pathlib import Path from pycs import db from pycs.database.Collection import Collection from pycs.database.Result import Result from pycs.database.Label import Label from pycs.database.base import NamedBaseModel from pycs.database.util import commit_on_return class File(NamedBaseModel): """ DB Model for files """ # table columns uuid = db.Column(db.String, nullable=False) extension = db.Column(db.String, nullable=False) type = db.Column(db.String, nullable=False) size = db.Column(db.Integer, nullable=False) created = db.Column(db.DateTime, default=datetime.utcnow, index=True, nullable=False) path = db.Column(db.String, nullable=False) frames = db.Column(db.Integer) fps = db.Column(db.Float) project_id = db.Column( db.Integer, db.ForeignKey("project.id", ondelete="CASCADE"), nullable=False) collection_id = db.Column( db.Integer, db.ForeignKey("collection.id", ondelete="SET NULL")) # contraints __table_args__ = ( db.UniqueConstraint('project_id', 'path'), ) results = db.relationship("Result", backref="file", lazy="dynamic", passive_deletes=True, ) serialize_only = NamedBaseModel.serialize_only + ( "uuid", "extension", "type", "size", "created", "path", "frames", "has_annotations", "fps", "project_id", "collection_id", ) @property def filename(self): """ filename consisting of a name and an extension """ return f"{self.name}{self.extension}" @property def has_annotations(self): """ check if there are any referenced results """ return self.results.count() != 0 @property def absolute_path(self) -> str: """ returns an absolute of the file """ path = Path(self.path) if path.is_absolute(): return str(path) return str(Path.cwd() / path) # pylint: disable=arguments-differ def delete(self, commit: bool = True): """ after the object is deleted, the according physical file is also delete if commit was True """ # pylint: disable=unexpected-keyword-arg dump = super().delete(commit=commit) if commit: os.remove(self.path) # TODO: remove temp files warnings.warn("Temporary files may still exist!") return dump @commit_on_return def set_collection(self, collection_id: T.Optional[int]): """ set this file's collection :param collection_id: new collection id :return: """ self.collection_id = collection_id @commit_on_return def set_collection_by_reference(self, collection_reference: T.Optional[str]): """ set this file's collection :param collection_reference: collection reference :return: """ if self.collection_reference is None: self.set_collection(None) collection = Collection.query.filter_by(reference=collection_reference).one() self.collection_id = collection.id def _get_another_file(self, *query, with_annotations=None) -> T.Optional[File]: """ get the first file matching the query ordered by descending id :return: another file or None """ result = File.query.filter(File.project_id == self.project_id, *query) if with_annotations is None: return result annot_query = File.results.any() if with_annotations == False: annot_query = ~annot_query return result.filter(annot_query) def next(self, **kwargs) -> T.Optional[File]: """ get the successor of this file :return: another file or None """ res = self._get_another_file(File.path > self.path, **kwargs)\ .order_by(File.path) return res.first() def previous(self, **kwargs) -> T.Optional[File]: """ get the predecessor of this file :return: another file or None """ # pylint: disable=no-member res = self._get_another_file(File.path < self.path, **kwargs)\ .order_by(File.path.desc()) return res.first() def next_in_collection(self, **kwargs) -> T.Optional[File]: """ get the predecessor of this file :return: another file or None """ return self._get_another_file( File.path > self.path, File.collection_id == self.collection_id, **kwargs)\ .order_by(File.path).first() def previous_in_collection(self, **kwargs) -> T.Optional[File]: """ get the predecessor of this file :return: another file or None """ # pylint: disable=no-member return self._get_another_file( File.path < self.path, File.collection_id == self.collection_id, **kwargs)\ .order_by(File.path.desc()).first() def result(self, identifier: int) -> T.Optional[Result]: """ get one of the file's results :return: result object or None """ return self.results.get(identifier) @commit_on_return def create_result(self, origin: str, result_type: str, origin_user: str = None, label: T.Optional[T.Union[Label, int, str]] = None, data: T.Optional[dict] = None) -> Result: """ Creates a result and returns the created object :return: result object """ if origin == "pipeline" and not origin_user is None: raise ValueError("If an annotation was made by the pipeline no username"\ "can be specified!") result = Result.new(commit=False, file_id=self.id, origin=origin, type=result_type, origin_user=origin_user) result.data = data if label is not None: assert isinstance(label, (int, Label, str)), \ f"Label \"{label}\" has invalid type: {type(label)}" if isinstance(label, str): label = Label.query.filter( Label.project_id == self.project_id, Label.reference == label).one_or_none() if isinstance(label, Label): label = label.id result.set_label(label) return result def remove_result(self, result_id: int) -> T.List[Result]: """ Remove the result with the given id. :param result_id: id of the result to delete :return: list of result objects """ results = Result.query.filter( Result.file_id == self.id, Result.id == result_id) _results = [r.serialize() for r in results.all()] results.delete() return _results def remove_results(self, origin='pipeline') -> T.List[Result]: """ Remove assigned results, but return them. :return: list of result objects """ results = Result.query.filter( Result.file_id == self.id, Result.origin == origin) _results = [r.serialize() for r in results.all()] results.delete() return _results