import os import shutil import typing as T from datetime import datetime from pycs import app from pycs import db from pycs.database.base import NamedBaseModel from pycs.database.Collection import Collection from pycs.database.File import File from pycs.database.Label import Label from pycs.database.util import commit_on_return class Project(NamedBaseModel): """ DB Model for projects """ description = db.Column(db.String) created = db.Column(db.DateTime, default=datetime.utcnow, index=True, nullable=False) model_id = db.Column( db.Integer, db.ForeignKey("model.id", ondelete="SET NULL")) label_provider_id = db.Column( db.Integer, db.ForeignKey("label_provider.id", ondelete="SET NULL")) root_folder = db.Column(db.String, nullable=False, unique=True) external_data = db.Column(db.Boolean, nullable=False) data_folder = db.Column(db.String, nullable=False) # contraints __table_args__ = () # relationships to other models files = db.relationship( "File", backref="project", lazy="dynamic", passive_deletes=True, ) labels = db.relationship( "Label", backref="project", lazy="dynamic", passive_deletes=True, ) collections = db.relationship( "Collection", backref="project", lazy="dynamic", passive_deletes=True, ) serialize_only = NamedBaseModel.serialize_only + ( "created", "description", "model_id", "label_provider_id", "root_folder", "external_data", "data_folder", ) @commit_on_return def delete(self) -> T.Tuple[dict, dict]: # pylint: disable=unexpected-keyword-arg dump = super().delete(commit=False) model_dump = {} if self.model_id is not None: # pylint: disable=unexpected-keyword-arg model_dump = self.model.delete(commit=False) if os.path.exists(self.root_folder): # remove from file system shutil.rmtree(self.root_folder) return dump, model_dump def label(self, identifier: int) -> T.Optional[Label]: """ get a label using its unique identifier :param identifier: unique identifier :return: label """ return self.labels.filter(Label.id == identifier).one_or_none() def label_by_reference(self, reference: str) -> T.Optional[Label]: """ get a label using its reference string :param reference: reference string :return: label """ return self.labels.filter(Label.reference == reference).one_or_none() def file(self, identifier: int) -> T.Optional[Label]: """ get a file using its unique identifier :param identifier: unique identifier :return: file """ return self.files.filter(File.id == identifier).one_or_none() def collection(self, identifier: int) -> T.Optional[Collection]: """ get a collection using its unique identifier :param identifier: unique identifier :return: collection """ return self.collections.filter(Collection.id == identifier).one_or_none() def collection_by_reference(self, reference: str) -> T.Optional[Collection]: """ get a collection using its unique identifier :param identifier: unique identifier :return: collection """ return self.collections.filter(Collection.reference == reference).one_or_none() @commit_on_return def create_label(self, name: str, reference: str = None, parent: T.Optional[T.Union[int, str, Label]] = None, hierarchy_level: str = None) -> T.Tuple[T.Optional[Label], bool]: """ create a label for this project. If there is already a label with the same reference in the database its name is updated. :param name: label name :param reference: label reference :param parent: parent label. Either a reference string, a Label id or a Label instance :param hierarchy_level: hierarchy level name :return: created or edited label, insert """ label = None is_new = False if reference is not None: label = Label.query.filter_by(project_id=self.id, reference=reference).one_or_none() if label is None: label = Label.new(commit=False, project_id=self.id, reference=reference) is_new = True label.set_name(name, commit=False) label.set_parent(parent, commit=False) label.hierarchy_level = hierarchy_level return label, is_new @commit_on_return def bulk_create_labels(self, labels: T.List[T.Dict], clean_old_labels: bool = True): """ Inserts a all labels at once. :raises: - AssertionError if project_id and reference are not unique - ValueError if a cycle in the hierarchy is found """ if clean_old_labels: self.labels.delete() for label in labels: label["project_id"] = self.id self.__check_labels(labels) app.logger.info(f"Inserting {len(labels):,d} labels") db.engine.execute(Label.__table__.insert(), labels) self.__set_parents(labels) return labels def __set_parents(self, labels): """ after the bul insert, we need to set correct parent_ids """ app.logger.info("Setting parents of the labels") self.flush() for label in labels: if label["parent"] is None: continue label_obj = self.label_by_reference(label["reference"]) parent_label_obj = self.label_by_reference(label["parent"]) label_obj.parent_id = parent_label_obj.id # pylint: disable=no-self-use def __check_labels(self, labels): """ check labels for unique keys and cycles """ unique_keys = dict() for label in labels: key = (label["project_id"], label["reference"]) assert key not in unique_keys, \ f"{key} was not unique: ({label=} vs {unique_keys[key]=})!" unique_keys[key] = label # pylint: disable=too-many-arguments @commit_on_return def create_collection(self, reference: str, name: str, description: str, position: int, autoselect: bool) -> T.Tuple[Collection, bool]: """ create a new collection associated with this project :param reference: collection reference string :param name: collection name :param description: collection description :param position: position in menus :param autoselect: automatically select this collection on session load :return: collection object, insert """ collection, is_new = Collection.get_or_create( project_id=self.id, reference=reference) collection.name = name collection.description = description collection.position = position collection.autoselect = autoselect return collection, is_new # pylint: disable=too-many-arguments @commit_on_return def add_file(self, uuid: str, file_type: str, name: str, extension: str, size: int, filename: str, frames: int = None, fps: float = None) -> T.Tuple[File, bool]: """ add a file to this project :param uuid: unique identifier which is used for temporary files :param file_type: file type (either image or video) :param name: file name :param extension: file extension :param size: file size :param filename: actual name in filesystem :param frames: frame count :param fps: frames per second :return: file """ path = os.path.join(self.data_folder, f"{filename}{extension}") file, is_new = File.get_or_create( project_id=self.id, path=path) file.uuid = uuid file.type = file_type file.name = name file.extension = extension file.size = size file.frames = frames file.fps = fps return file, is_new def get_files(self, *filters, offset: int = 0, limit: int = -1) -> T.List[File]: """ get an iterator of files associated with this project :param offset: file offset :param limit: file limit :return: iterator of files """ return self.files.filter(*filters).order_by(File.id).offset(offset).limit(limit) def _files_without_results(self): """ get files without any results :return: a query object """ # pylint: disable=no-member return self.files.filter(~File.results.any()) def count_files_without_results(self) -> int: """ count files without associated results :return: count """ return self._files_without_results().count() def files_without_results(self) -> T.List[File]: """ get a list of files without associated results :return: list of files """ return self._files_without_results().all() def _files_without_collection(self, offset: int = 0, limit: int = -1): """ get files without a collection :return: a query object """ # pylint: disable=no-member return self.get_files(File.collection_id.is_(None), offset=offset, limit=limit) def files_without_collection(self, offset: int = 0, limit: int = -1) -> T.List[File]: """ get a list of files without a collection :return: list of files """ return self._files_without_collection(offset=offset, limit=limit).all() def count_files_without_collection(self) -> int: """ count files associated with this project but without a collection :return: count """ return self._files_without_collection().count()