import typing as T from contextlib import closing from datetime import datetime from os.path import join from typing import Iterator from typing import List from typing import Optional from typing import Tuple from pycs import db from pycs.database.base import NamedBaseModel from pycs.database.Collection import Collection from pycs.database.File import File from pycs.database.Label import Label from pycs.database.util import commit_on_return from pycs.database.util.TreeNodeLabel import TreeNodeLabel class Project(NamedBaseModel): description = db.Column(db.String) created = db.Column(db.DateTime, default=datetime.utcnow, index=True, nullable=False) model_id = db.Column( db.Integer, db.ForeignKey("model.id", ondelete="SET NULL")) label_provider_id = db.Column( db.Integer, db.ForeignKey("label_provider.id", ondelete="SET NULL")) root_folder = db.Column(db.String, nullable=False, unique=True) external_data = db.Column(db.Boolean, nullable=False) data_folder = db.Column(db.String, nullable=False) # contraints __table_args__ = () # relationships to other models files = db.relationship("File", backref="project", lazy=True) labels = db.relationship("Label", backref="project", lazy=True) collections = db.relationship("Collection", backref="project", lazy=True) def label(self, id: int) -> T.Optional[Label]: """ get a label using its unique identifier :param identifier: unique identifier :return: label """ return self.labels.get(id) def file(self, id: int) -> T.Optional[Label]: """ get a file using its unique identifier :param identifier: unique identifier :return: file """ return self.files.get(id) def collection(self, id: int) -> T.Optional[Collection]: """ get a collection using its unique identifier :param identifier: unique identifier :return: collection """ return self.collections.get(id) def collection_by_reference(self, reference: str) -> T.Optional[Collection]: """ get a collection using its unique identifier :param identifier: unique identifier :return: collection """ return self.collections.filter_by(reference=reference).one() @commit_on_return def create_label(self, name: str, reference: str = None, parent_id: int = None, hierarchy_level: str = None) -> Tuple[Optional[Label], bool]: """ create a label for this project. If there is already a label with the same reference in the database its name is updated. :param name: label name :param reference: label reference :param parent_id: parent's identifier :param hierarchy_level: hierarchy level name :return: created or edited label, insert """ label = Label.query.get(project=self, reference=reference) is_new = False if label is None: label = Label.new(project=self, reference=reference) is_new = True label.set_name(name, commit=False) label.set_parent(parent_id, commit=False) label.hierarchy_level = hierarchy_level return label, is_new def label_tree(self) -> List[TreeNodeLabel]: """ get a list of root labels associated with this project :return: list of labels """ raise NotImplementedError with closing(self.database.con.cursor()) as cursor: cursor.execute(''' WITH RECURSIVE tree AS ( SELECT labels.* FROM labels WHERE project = ? AND parent IS NULL UNION ALL SELECT labels.* FROM labels JOIN tree ON labels.parent = tree.id ) SELECT * FROM tree ''', [self.identifier]) result = [] lookup = {} for row in cursor.fetchall(): label = TreeNodeLabel(self.database, row) lookup[label.identifier] = label if label.parent_id is None: result.append(label) else: lookup[label.parent_id].children.append(label) return result @commit_on_return def create_collection(self, reference: str, name: str, description: str, position: int, autoselect: bool) -> Tuple[Collection, bool]: """ create a new collection associated with this project :param reference: collection reference string :param name: collection name :param description: collection description :param position: position in menus :param autoselect: automatically select this collection on session load :return: collection object, insert """ collection = Collection.query.get(project=self, reference=reference) is_new = False if collection is None: collection = Collection.new(project=self, reference=reference) is_new = True collection.name = name collection.description = description collection.position = position collection.autoselect = autoselect return collection, is_new @commit_on_return def add_file(self, uuid: str, file_type: str, name: str, extension: str, size: int, filename: str, frames: int = None, fps: float = None) -> T.Tuple[File, bool]: """ add a file to this project :param uuid: unique identifier which is used for temporary files :param file_type: file type (either image or video) :param name: file name :param extension: file extension :param size: file size :param filename: actual name in filesystem :param frames: frame count :param fps: frames per second :return: file """ path = join(self.data_folder, filename + extension) file = File.objects.get(project=self, path=path) is_new = False if file is None: file = File.new(uuid=uuid, project=self, path=path) is_new = True file.type = file_type file.name = name file.extension = extension file.size = size file.frames = frames file.fps = fps return file, is_new def set_description(self, description: str): """ set this projects description :param description: new description :return: """ self.description = description self def count_files(self) -> int: """ count files associated with this project :return: count """ return self.files.count() def get_files(self, offset: int = 0, limit: int = -1) -> T.Iterator[File]: """ get an iterator of files associated with this project :param offset: file offset :param limit: file limit :return: iterator of files """ return self.files.order_by(File.id.acs()).offset(offset).limit(limit) def count_files_without_results(self) -> int: """ count files without associated results :return: count """ raise NotImplementedError with closing(self.database.con.cursor()) as cursor: cursor.execute(''' SELECT COUNT(*) FROM files LEFT JOIN results ON files.id = results.file WHERE files.project = ? AND results.id IS NULL ''', [self.identifier]) return cursor.fetchone()[0] def files_without_results(self) -> Iterator[File]: """ get an iterator of files without associated results :return: list of files """ raise NotImplementedError with closing(self.database.con.cursor()) as cursor: cursor.execute(''' SELECT files.* FROM files LEFT JOIN results ON files.id = results.file WHERE files.project = ? AND results.id IS NULL ORDER BY id ASC ''', [self.identifier]) for row in cursor: yield File(self.database, row) def files_without_collection(self, offset: int = 0, limit: int = -1) -> Iterator[File]: """ get an iterator of files without not associated with any collection :return: list of files """ return self.get_files(offset, limit).filter(File.collection_id == None) def count_files_without_collection(self) -> int: """ count files associated with this project but with no collection :return: count """ return self.files_without_collection().count()