import typing as T from contextlib import closing from datetime import datetime from os.path import join from typing import List, Optional, Tuple, Iterator from pycs import db from pycs.database.base import NamedBaseModel from pycs.database.Collection import Collection from pycs.database.File import File from pycs.database.Label import Label class Project(NamedBaseModel): description = db.Column(db.String) created = db.Column(db.DateTime, default=datetime.utcnow, index=True, nullable=False) model_id = db.Column( db.Integer, db.ForeignKey("model.id", ondelete="SET NULL")) label_provider_id = db.Column( db.Integer, db.ForeignKey("label_provider.id", ondelete="SET NULL")) root_folder = db.Column(db.String, nullable=False, unique=True) external_data = db.Column(db.Boolean, nullable=False) data_folder = db.Column(db.String, nullable=False) # contraints __table_args__ = () # relationships to other models files = db.relationship("File", backref="project", lazy="dynamic", passive_deletes=True) labels = db.relationship("Label", backref="project", lazy="dynamic", passive_deletes=True) collections = db.relationship("Collection", backref="project", lazy="dynamic", passive_deletes=True) serialize_only = ( "id", "name", "created", "description", "model_id", "label_provider_id", "root_folder", "external_data", "data_folder", ) def label(self, id: int) -> T.Optional[Label]: """ get a label using its unique identifier :param identifier: unique identifier :return: label """ return self.labels.filter_by(id=id).one_or_none() def file(self, id: int) -> T.Optional[Label]: """ get a file using its unique identifier :param identifier: unique identifier :return: file """ return self.files.filter_by(id=id).one_or_none() def collection(self, id: int) -> T.Optional[Collection]: """ get a collection using its unique identifier :param identifier: unique identifier :return: collection """ return self.collections.filter_by(id=id).one_or_none() def collection_by_reference(self, reference: str) -> T.Optional[Collection]: """ get a collection using its unique identifier :param identifier: unique identifier :return: collection """ return self.collections.filter_by(reference=reference).one_or_none() def create_label(self, name: str, reference: str = None, parent_id: int = None) -> Tuple[Optional[Label], bool]: """ create a label for this project. If there is already a label with the same reference in the database its name is updated. :param name: label name :param reference: label reference :param parent_id: parent's identifier :return: created or edited label, insert """ label, is_new = Label.get_or_create(project=self, reference=reference) label.set_name(name) label.set_parent(parent_id) self.commit() return label, is_new def create_collection(self, reference: str, name: str, description: str, position: int, autoselect: bool): collection, is_new = Collection.get_or_create(project=self, reference=reference) collection.name = name collection.description = description collection.position = position collection.autoselect = autoselect self.commit() return collection, is_new def add_file(self, uuid: str, file_type: str, name: str, extension: str, size: int, filename: str, frames: int = None, fps: float = None) -> T.Tuple[File, bool]: """ add a file to this project :param uuid: unique identifier which is used for temporary files :param file_type: file type (either image or video) :param name: file name :param extension: file extension :param size: file size :param filename: actual name in filesystem :param frames: frame count :param fps: frames per second :return: file """ path = join(self.data_folder, filename + extension) file, is_new = File.get_or_create(project=self, path=path) file.uuid = uuid file.type = file_type file.name = name file.extension = extension file.size = size file.frames = frames file.fps = fps self.commit() return file, is_new def set_description(self, description: str): """ set this projects description :param description: new description :return: """ self.description = description self.commit() def count_files(self) -> int: """ count files associated with this project :return: count """ return self.files.count() def get_files(self, offset: int = 0, limit: int = -1) -> T.Iterator[File]: """ get an iterator of files associated with this project :param offset: file offset :param limit: file limit :return: iterator of files """ return self.files.order_by(File.id).offset(offset).limit(limit) def count_files_without_results(self) -> int: """ count files without associated results :return: count """ raise NotImplementedError with closing(self.database.con.cursor()) as cursor: cursor.execute(''' SELECT COUNT(*) FROM files LEFT JOIN results ON files.id = results.file WHERE files.project = ? AND results.id IS NULL ''', [self.id]) return cursor.fetchone()[0] def files_without_results(self) -> Iterator[File]: """ get an iterator of files without associated results :return: list of files """ raise NotImplementedError with closing(self.database.con.cursor()) as cursor: cursor.execute(''' SELECT files.* FROM files LEFT JOIN results ON files.id = results.file WHERE files.project = ? AND results.id IS NULL ORDER BY id ASC ''', [self.id]) for row in cursor: yield File(self.database, row) def files_without_collection(self, offset: int = 0, limit: int = -1) -> Iterator[File]: """ get an iterator of files without not associated with any collection :return: list of files """ return self.get_files(offset, limit).filter(File.collection_id == None) def count_files_without_collection(self) -> int: """ count files associated with this project but with no collection :return: count """ return self.files_without_collection().count()