import os import shutil import typing as T import warnings from datetime import datetime from pycs import db from pycs.database.base import NamedBaseModel from pycs.database.Collection import Collection from pycs.database.File import File from pycs.database.Label import Label from pycs.database.util import commit_on_return class Project(NamedBaseModel): """ DB Model for projects """ description = db.Column(db.String) created = db.Column(db.DateTime, default=datetime.utcnow, index=True, nullable=False) model_id = db.Column( db.Integer, db.ForeignKey("model.id", ondelete="SET NULL")) label_provider_id = db.Column( db.Integer, db.ForeignKey("label_provider.id", ondelete="SET NULL")) root_folder = db.Column(db.String, nullable=False, unique=True) external_data = db.Column(db.Boolean, nullable=False) data_folder = db.Column(db.String, nullable=False) # contraints __table_args__ = () # relationships to other models files = db.relationship( "File", backref="project", lazy="dynamic") labels = db.relationship( "Label", backref="project", lazy="dynamic") collections = db.relationship( "Collection", backref="project", lazy="dynamic") serialize_only = NamedBaseModel.serialize_only + ( "created", "description", "model_id", "label_provider_id", "root_folder", "external_data", "data_folder", ) @commit_on_return def delete(self) -> T.Tuple[dict, dict]: # pylint: disable=unexpected-keyword-arg dump = super().delete(commit=False) model_dump = {} if self.model_id is not None: # pylint: disable=unexpected-keyword-arg model_dump = self.model.delete(commit=False) if os.path.exists(self.root_folder): # remove from file system shutil.rmtree(self.root_folder) return dump, model_dump def label(self, identifier: int) -> T.Optional[Label]: """ get a label using its unique identifier :param identifier: unique identifier :return: label """ return self.labels.filter(Label.id == identifier).one_or_none() def label_by_reference(self, reference: str) -> T.Optional[Label]: """ get a label using its reference string :param reference: reference string :return: label """ return self.labels.filter(Label.reference == reference).one_or_none() def file(self, identifier: int) -> T.Optional[Label]: """ get a file using its unique identifier :param identifier: unique identifier :return: file """ return self.files.filter(File.id == identifier).one_or_none() def label_tree(self) -> T.List[Label]: """ get a list of root labels associated with this project :return: list of labels """ warnings.warn("Check performance of this method!") # pylint: disable=no-member return self.labels.filter(Label.parent_id.is_(None)).all() def label_tree_original(self): """ get a list of root labels associated with this project :return: list of labels """ raise NotImplementedError # pylint: disable=unreachable # pylint: disable=pointless-string-statement """ with closing(self.database.con.cursor()) as cursor: cursor.execute(''' WITH RECURSIVE tree AS ( SELECT labels.* FROM labels WHERE project = ? AND parent IS NULL UNION ALL SELECT labels.* FROM labels JOIN tree ON labels.parent = tree.id ) SELECT * FROM tree ''', [self.id]) result = [] lookup = {} for row in cursor.fetchall(): label = TreeNodeLabel(self.database, row) lookup[label.id] = label if label.parent_id is None: result.append(label) else: lookup[label.parent_id].children.append(label) return result """ def collection(self, identifier: int) -> T.Optional[Collection]: """ get a collection using its unique identifier :param identifier: unique identifier :return: collection """ return self.collections.filter(Collection.id == identifier).one_or_none() def collection_by_reference(self, reference: str) -> T.Optional[Collection]: """ get a collection using its unique identifier :param identifier: unique identifier :return: collection """ return self.collections.filter(Collection.reference == reference).one_or_none() @commit_on_return def create_label(self, name: str, reference: str = None, parent: T.Optional[T.Union[int, str, Label]] = None, hierarchy_level: str = None) -> T.Tuple[T.Optional[Label], bool]: """ create a label for this project. If there is already a label with the same reference in the database its name is updated. :param name: label name :param reference: label reference :param parent: parent label. Either a reference string, a Label id or a Label instance :param hierarchy_level: hierarchy level name :return: created or edited label, insert """ label = Label.query.get(project=self, reference=reference) is_new = False if label is None: label = Label.new(project=self, reference=reference) is_new = True label.set_name(name, commit=False) label.set_parent(parent, commit=False) label.hierarchy_level = hierarchy_level return label, is_new # pylint: disable=too-many-arguments @commit_on_return def create_collection(self, reference: str, name: str, description: str, position: int, autoselect: bool) -> T.Tuple[Collection, bool]: """ create a new collection associated with this project :param reference: collection reference string :param name: collection name :param description: collection description :param position: position in menus :param autoselect: automatically select this collection on session load :return: collection object, insert """ collection, is_new = Collection.get_or_create( project_id=self.id, reference=reference) collection.name = name collection.description = description collection.position = position collection.autoselect = autoselect return collection, is_new # pylint: disable=too-many-arguments @commit_on_return def add_file(self, uuid: str, file_type: str, name: str, extension: str, size: int, filename: str, frames: int = None, fps: float = None) -> T.Tuple[File, bool]: """ add a file to this project :param uuid: unique identifier which is used for temporary files :param file_type: file type (either image or video) :param name: file name :param extension: file extension :param size: file size :param filename: actual name in filesystem :param frames: frame count :param fps: frames per second :return: file """ path = os.path.join(self.data_folder, f"{filename}{extension}") file, is_new = File.get_or_create( project_id=self.id, path=path) file.uuid = uuid file.type = file_type file.name = name file.extension = extension file.size = size file.frames = frames file.fps = fps return file, is_new def get_files(self, offset: int = 0, limit: int = -1) -> T.List[File]: """ get an iterator of files associated with this project :param offset: file offset :param limit: file limit :return: iterator of files """ return self.files.order_by(File.id).offset(offset).limit(limit) def _files_without_results(self): """ get files without any results :return: a query object """ # pylint: disable=no-member return self.files.filter(~File.results.any()) def count_files_without_results(self) -> int: """ count files without associated results :return: count """ return self._files_without_results().count() def files_without_results(self) -> T.List[File]: """ get a list of files without associated results :return: list of files """ return self._files_without_results().all() def _files_without_collection(self, offset: int = 0, limit: int = -1): """ get files without a collection :return: a query object """ # pylint: disable=no-member return self.get_files(offset, limit).filter(File.collection_id.is_(None)) def files_without_collection(self, offset: int = 0, limit: int = -1) -> T.List[File]: """ get a list of files without a collection :return: list of files """ return self._files_without_collection(offset=offset, limit=limit).all() def count_files_without_collection(self) -> int: """ count files associated with this project but without a collection :return: count """ return self._files_without_collection().count()