import os import typing as T from contextlib import closing from datetime import datetime from pycs import db from pycs.database.base import NamedBaseModel from pycs.database.Collection import Collection from pycs.database.File import File from pycs.database.Label import Label class Project(NamedBaseModel): description = db.Column(db.String) created = db.Column(db.DateTime, default=datetime.utcnow, index=True, nullable=False) model_id = db.Column( db.Integer, db.ForeignKey("model.id", ondelete="SET NULL")) label_provider_id = db.Column( db.Integer, db.ForeignKey("label_provider.id", ondelete="SET NULL")) root_folder = db.Column(db.String, nullable=False, unique=True) external_data = db.Column(db.Boolean, nullable=False) data_folder = db.Column(db.String, nullable=False) # contraints __table_args__ = () # relationships to other models files = db.relationship("File", backref="project", lazy="dynamic", passive_deletes=True) labels = db.relationship("Label", backref="project", lazy="dynamic", passive_deletes=True) collections = db.relationship("Collection", backref="project", lazy="dynamic", passive_deletes=True) serialize_only = NamedBaseModel.serialize_only + ( "created", "description", "model_id", "label_provider_id", "root_folder", "external_data", "data_folder", ) def file(self, id: int) -> T.Optional[File]: """ get a file using its unique identifier :param id: unique identifier :return: file """ return self.files.filter_by(id=id).one_or_none() def label_tree(self) -> T.List[Label]: """ get a list of root labels associated with this project :return: list of labels """ return self.labels.filter(Label.parent_id == None).all() def label(self, id: int) -> T.Optional[Label]: """ get a label using its unique identifier :param id: unique identifier :return: label """ return self.labels.filter_by(id=id).one_or_none() def label_by_reference(self, reference: str) -> T.Optional[Label]: """ get a label using its reference string :param reference: reference string :return: label """ return self.labels.filter_by(reference=reference).one_or_none() def create_label(self, name: str, reference: T.Optional[str] = None, parent_id: T.Union[Label, int, str] = None, hierarchy_level: T.Optional[str] = None, commit: bool = True) -> T.Tuple[T.Optional[Label], bool]: """ create a label for this project. If there is already a label with the same reference in the database its name is updated. :param name: label name :param reference: label reference :param parent: either parent identifier, parent reference string or `Label` object :param hierarchy_level: hierarchy level name :return: created or edited label, insert """ if isinstance(parent_id, str): parent_id = self.label_by_reference(parent_id) if isinstance(parent_id, Label): parent_id = parent_id.id label, is_new = Label.get_or_create(project=self, reference=reference) label.name = name label.hierarchy_level = hierarchy_level label.set_parent(parent_id, commit=False) if commit: self.commit() return label, is_new def collection(self, id: int) -> T.Optional[Collection]: """ get a collection using its unique identifier :param identifier: unique identifier :return: collection """ return self.collections.filter_by(id=id).one_or_none() def collection_by_reference(self, reference: str) -> T.Optional[Collection]: """ get a collection using its unique identifier :param identifier: unique identifier :return: collection """ return self.collections.filter_by(reference=reference).one_or_none() def create_collection(self, reference: str, name: str, description: str, position: int, autoselect: bool, commit: bool = True) -> T.Tuple[Collection, bool]: """ create a new collection associated with this project :param reference: collection reference string :param name: collection name :param description: collection description :param position: position in menus :param autoselect: automatically select this collection on session load :return: collection object, insert """ collection, is_new = Collection.get_or_create(project_id=self.id, reference=reference) collection.name = name collection.description = description collection.position = position collection.autoselect = autoselect if commit: self.commit() return collection, is_new def add_file(self, uuid: str, file_type: str, name: str, extension: str, size: int, filename: str, frames: int = None, fps: float = None, commit: bool = True) -> T.Tuple[File, bool]: """ add a file to this project :param uuid: unique identifier which is used for temporary files :param file_type: file type (either image or video) :param name: file name :param extension: file extension :param size: file size :param filename: actual name in filesystem :param frames: frame count :param fps: frames per second :return: file """ path = os.path.join(self.data_folder, filename + extension) file, is_new = File.get_or_create(project=self, path=path) file.uuid = uuid file.type = file_type file.name = name file.extension = extension file.size = size file.frames = frames file.fps = fps if commit: self.commit() return file, is_new def set_description(self, description: str): """ set this projects description :param description: new description :return: """ self.description = description self.commit() def count_files(self) -> int: """ count files associated with this project :return: count """ return self.files.count() def get_files(self, offset: int = 0, limit: int = -1) -> T.Iterator[File]: """ get an iterator of files associated with this project :param offset: file offset :param limit: file limit :return: iterator of files """ return self.files.order_by(File.id).offset(offset).limit(limit) def _files_without_results(self): """ get files without any results :return: a query object """ return self.files.filter(~File.results.any()) def count_files_without_results(self) -> int: """ count files without associated results :return: count """ return self._files_without_results().count() def files_without_results(self) -> T.Iterator[File]: """ get an iterator of files without associated results :return: list of files """ return self._files_without_results().all()