123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- import typing as T
- from contextlib import closing
- from datetime import datetime
- from os.path import join
- from typing import List, Optional, Tuple, Iterator
- from pycs import db
- from pycs.database.base import NamedBaseModel
- from pycs.database.Collection import Collection
- from pycs.database.File import File
- from pycs.database.Label import Label
- class Project(NamedBaseModel):
- description = db.Column(db.String)
- created = db.Column(db.DateTime, default=datetime.utcnow,
- index=True, nullable=False)
- model_id = db.Column(
- db.Integer,
- db.ForeignKey("model.id", ondelete="SET NULL"))
- label_provider_id = db.Column(
- db.Integer,
- db.ForeignKey("label_provider.id", ondelete="SET NULL"))
- root_folder = db.Column(db.String, nullable=False, unique=True)
- external_data = db.Column(db.Boolean, nullable=False)
- data_folder = db.Column(db.String, nullable=False)
- # contraints
- __table_args__ = ()
- # relationships to other models
- files = db.relationship("File", backref="project", lazy="dynamic")
- labels = db.relationship("Label", backref="project", lazy="dynamic")
- collections = db.relationship("Collection", backref="project", lazy="dynamic")
- serialize_rules = (
- '-files',
- '-labels',
- '-collections',
- )
- def label(self, id: int) -> T.Optional[Label]:
- """
- get a label using its unique identifier
- :param identifier: unique identifier
- :return: label
- """
- return self.labels.get(id)
- def file(self, id: int) -> T.Optional[Label]:
- """
- get a file using its unique identifier
- :param identifier: unique identifier
- :return: file
- """
- return self.files.get(id)
- def collection(self, id: int) -> T.Optional[Collection]:
- """
- get a collection using its unique identifier
- :param identifier: unique identifier
- :return: collection
- """
- return self.collections.get(id)
- def collection_by_reference(self, reference: str) -> T.Optional[Collection]:
- """
- get a collection using its unique identifier
- :param identifier: unique identifier
- :return: collection
- """
- return self.collections.filter_by(reference=reference).one()
- def create_label(self, name: str, reference: str = None,
- parent_id: int = None) -> Tuple[Optional[Label], bool]:
- """
- create a label for this project. If there is already a label with the same reference
- in the database its name is updated.
- :param name: label name
- :param reference: label reference
- :param parent_id: parent's identifier
- :return: created or edited label, insert
- """
- label, is_new = Label.get_or_create(project=self, reference=reference)
- label.set_name(name)
- label.set_parent(parent_id)
- self.commit()
- return label, is_new
- def create_collection(self,
- reference: str,
- name: str,
- description: str,
- position: int,
- autoselect: bool):
- collection, is_new = Collection.get_or_create(project=self, reference=reference)
- collection.name = name
- collection.description = description
- collection.position = position
- collection.autoselect = autoselect
- self.commit()
- return collection, is_new
- def add_file(self, uuid: str, file_type: str, name: str, extension: str, size: int,
- filename: str, frames: int = None, fps: float = None) -> T.Tuple[File, bool]:
- """
- add a file to this project
- :param uuid: unique identifier which is used for temporary files
- :param file_type: file type (either image or video)
- :param name: file name
- :param extension: file extension
- :param size: file size
- :param filename: actual name in filesystem
- :param frames: frame count
- :param fps: frames per second
- :return: file
- """
- path = join(self.data_folder, filename + extension)
- file, is_new = File.get_or_create(project=self, path=path)
- file.uuid = uuid
- file.type = file_type
- file.name = name
- file.extension = extension
- file.size = size
- file.frames = frames
- file.fps = fps
- self.commit()
- return file, is_new
- def set_description(self, description: str):
- """
- set this projects description
- :param description: new description
- :return:
- """
- self.description = description
- def count_files(self) -> int:
- """
- count files associated with this project
- :return: count
- """
- return self.files.count()
- def get_files(self, offset: int = 0, limit: int = -1) -> T.Iterator[File]:
- """
- get an iterator of files associated with this project
- :param offset: file offset
- :param limit: file limit
- :return: iterator of files
- """
- return self.files.order_by(File.id).offset(offset).limit(limit)
- def count_files_without_results(self) -> int:
- """
- count files without associated results
- :return: count
- """
- raise NotImplementedError
- with closing(self.database.con.cursor()) as cursor:
- cursor.execute('''
- SELECT COUNT(*)
- FROM files
- LEFT JOIN results ON files.id = results.file
- WHERE files.project = ? AND results.id IS NULL
- ''', [self.identifier])
- return cursor.fetchone()[0]
- def files_without_results(self) -> Iterator[File]:
- """
- get an iterator of files without associated results
- :return: list of files
- """
- raise NotImplementedError
- with closing(self.database.con.cursor()) as cursor:
- cursor.execute('''
- SELECT files.*
- FROM files
- LEFT JOIN results ON files.id = results.file
- WHERE files.project = ? AND results.id IS NULL
- ORDER BY id ASC
- ''', [self.identifier])
- for row in cursor:
- yield File(self.database, row)
- def files_without_collection(self, offset: int = 0, limit: int = -1) -> Iterator[File]:
- """
- get an iterator of files without not associated with any collection
- :return: list of files
- """
- return self.get_files(offset, limit).filter(File.collection_id == None)
- def count_files_without_collection(self) -> int:
- """
- count files associated with this project but with no collection
- :return: count
- """
- return self.files_without_collection().count()
|