from __future__ import annotations import json import os import typing as T from contextlib import closing from datetime import datetime from pycs import db from pycs.database.Collection import Collection from pycs.database.Label import Label from pycs.database.Result import Result from pycs.database.base import NamedBaseModel class File(NamedBaseModel): # table columns uuid = db.Column(db.String, nullable=False) extension = db.Column(db.String, nullable=False) type = db.Column(db.String, nullable=False) size = db.Column(db.String, nullable=False) created = db.Column(db.DateTime, default=datetime.utcnow, index=True, nullable=False) path = db.Column(db.String, nullable=False) frames = db.Column(db.Integer) fps = db.Column(db.Float) project_id = db.Column( db.Integer, db.ForeignKey("project.id", ondelete="CASCADE"), nullable=False) collection_id = db.Column( db.Integer, db.ForeignKey("collection.id", ondelete="SET NULL")) # contraints __table_args__ = ( db.UniqueConstraint('project_id', 'path'), ) # relationships to other models results = db.relationship("Result", backref="file", lazy="dynamic", passive_deletes=True) serialize_only = ( "id", "name", "uuid", "extension", "type", "size", "created", "path", "frames", "fps", "project_id", "collection_id", ) @property def filename(self): return f"{self.name}{self.extension}" @property def absolute_path(self): if os.path.isabs(self.path): return self.path return os.path.join(os.getcwd(), self.path) def set_collection(self, id: T.Optional[int]): """ set this file's collection :param id: new collection id :return: """ self.collection_id = id self.commit() def set_collection_by_reference(self, collection_reference: T.Optional[str]): """ set this file's collection :param collection_reference: collection reference :return: """ if self.collection_reference is None: self.set_collection(None) collection = Collection.query.filter_by(reference=collection_reference).one() self.collection = collection self.commit() def _get_another_file(self, *query) -> T.Optional[File]: """ get the first file matching the query ordered by descending id :return: another file or None """ return File.query.filter_by(project_id=self.project_id).filter(*query) def next(self) -> T.Optional[File]: """ get the successor of this file :return: another file or None """ return self._get_another_file(File.id > self.id)\ .order_by(File.id).first() def previous(self) -> T.Optional[File]: """ get the predecessor of this file :return: another file or None """ return self._get_another_file(File.id < self.id)\ .order_by(File.id.desc()).first() def next_in_collection(self) -> T.Optional[File]: """ get the predecessor of this file :return: another file or None """ return self._get_another_file(File.id > self.id, File.collection_id == self.collection_id)\ .order_by(File.id).first() def previous_in_collection(self) -> T.Optional[File]: """ get the predecessor of this file :return: another file or None """ return self._get_another_file(File.id < self.id, File.collection_id == self.collection_id)\ .order_by(File.id.desc()).first() def result(self, id: int) -> T.Optional[Result]: return self.results.get(id) def create_result(self, origin, result_type, label, data: T.Optional[dict] = None, commit: bool = True): result = Result.new(commit=False, file_id=self.id, origin=origin, type=result_type) result.data = data if label is not None: assert isinstance(label, (int, Label)), f"Wrong label type: {type(label)}" if isinstance(label, Label): label = label.id result.label_id = label if commit: self.commit() return result def remove_results(self, origin='pipeline') -> List[Result]: """ remove all results with the specified origin :param origin: either 'pipeline' or 'user' :return: list of removed results """ results = Result.query.filter(Result.file == self, Result.origin == origin) results.delete() return results