from __future__ import annotations import os import typing as T import warnings from datetime import datetime from pathlib import Path from pycs import db from pycs.database.Collection import Collection from pycs.database.Result import Result from pycs.database.base import NamedBaseModel from pycs.database.util import commit_on_return class File(NamedBaseModel): """ database class for files """ # table columns uuid = db.Column(db.String, nullable=False) extension = db.Column(db.String, nullable=False) type = db.Column(db.String, nullable=False) size = db.Column(db.String, nullable=False) created = db.Column(db.DateTime, default=datetime.utcnow, index=True, nullable=False) path = db.Column(db.String, nullable=False) frames = db.Column(db.Integer) fps = db.Column(db.Float) project_id = db.Column( db.Integer, db.ForeignKey("project.id", ondelete="CASCADE"), nullable=False) collection_id = db.Column( db.Integer, db.ForeignKey("collection.id", ondelete="SET NULL")) # contraints __table_args__ = ( db.UniqueConstraint('project_id', 'path'), ) results = db.relationship("Result", backref="file", lazy="dynamic", passive_deletes=True) serialize_only = NamedBaseModel.serialize_only + ( "uuid", "extension", "type", "size", "created", "path", "frames", "fps", "project_id", "collection_id", ) @property def filename(self): return f"{self.name}{self.extension}" @property def absolute_path(self) -> str: path = Path(self.path) if path.is_absolute(): return str(path) return str(Path.cwd() / path) def delete(self, commit: bool = True): super().delete(commit=commit) # remove file from folder os.remove(self.path) # TODO: remove temp files warnings.warn("Temporary files may still exist!") @commit_on_return def set_collection(self, collection_id: T.Optional[int]): """ set this file's collection :param collection_id: new collection id :return: """ self.collection_id = collection_id @commit_on_return def set_collection_by_reference(self, collection_reference: T.Optional[str]): """ set this file's collection :param collection_reference: collection reference :return: """ if self.collection_reference is None: self.set_collection(None) collection = Collection.query.filter_by(reference=collection_reference).one() self.collection_id = collection.id def _get_another_file(self, *query) -> T.Optional[File]: """ get the first file matching the query ordered by descending id :return: another file or None """ return File.query.filter(File.project_id == self.project_id, *query) def next(self) -> T.Optional[File]: """ get the successor of this file :return: another file or None """ return self._get_another_file(File.id > self.id)\ .order_by(File.id).first() def previous(self) -> T.Optional[File]: """ get the predecessor of this file :return: another file or None """ return self._get_another_file(File.id < self.id)\ .order_by(File.id.desc()).first() def next_in_collection(self) -> T.Optional[File]: """ get the predecessor of this file :return: another file or None """ return self._get_another_file( File.id > self.id, File.collection_id == self.collection_id)\ .order_by(File.id).first() def previous_in_collection(self) -> T.Optional[File]: """ get the predecessor of this file :return: another file or None """ return self._get_another_file( File.id < self.id, File.collection_id == self.collection_id)\ .order_by(File.id.desc()).first() def result(self, id: int) -> T.Optional[Result]: return self.results.get(id) @commit_on_return def create_result(self, origin: str, result_type: str, label: T.Optional[T.Union[Label, int]] = None, data: T.Optional[dict] = None) -> Result: result = Result.new(commit=False, file_id=self.id, origin=origin, type=result_type) result.data = data if label is not None: assert isinstance(label, (int, Label)), f"Wrong label type: {type(label)}" if isinstance(label, Label): label = label.id result.label_id = label return result def remove_results(self, origin='pipeline') -> T.List[Result]: results = Result.query.filter( Result.file_id == self.id, Result.origin == origin) _results = results.all() results.delete() return _results