123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- import os
- import typing as T
- from contextlib import closing
- from datetime import datetime
- from pycs import db
- from pycs.database.base import NamedBaseModel
- from pycs.database.Collection import Collection
- from pycs.database.File import File
- from pycs.database.Label import Label
- class Project(NamedBaseModel):
- description = db.Column(db.String)
- created = db.Column(db.DateTime, default=datetime.utcnow,
- index=True, nullable=False)
- model_id = db.Column(
- db.Integer,
- db.ForeignKey("model.id", ondelete="SET NULL"))
- label_provider_id = db.Column(
- db.Integer,
- db.ForeignKey("label_provider.id", ondelete="SET NULL"))
- root_folder = db.Column(db.String, nullable=False, unique=True)
- external_data = db.Column(db.Boolean, nullable=False)
- data_folder = db.Column(db.String, nullable=False)
- # contraints
- __table_args__ = ()
- # relationships to other models
- files = db.relationship("File",
- backref="project",
- lazy="dynamic",
- passive_deletes=True)
- labels = db.relationship("Label",
- backref="project",
- lazy="dynamic",
- passive_deletes=True)
- collections = db.relationship("Collection",
- backref="project",
- lazy="dynamic",
- passive_deletes=True)
- serialize_only = NamedBaseModel.serialize_only + (
- "created",
- "description",
- "model_id",
- "label_provider_id",
- "root_folder",
- "external_data",
- "data_folder",
- )
- def file(self, id: int) -> T.Optional[File]:
- """
- get a file using its unique identifier
- :param id: unique identifier
- :return: file
- """
- return self.files.filter_by(id=id).one_or_none()
- def label_tree(self) -> T.List[Label]:
- """
- get a list of root labels associated with this project
- :return: list of labels
- """
- return self.labels.filter(Label.parent_id == None).all()
- def label(self, id: int) -> T.Optional[Label]:
- """
- get a label using its unique identifier
- :param id: unique identifier
- :return: label
- """
- return self.labels.filter_by(id=id).one_or_none()
- def label_by_reference(self, reference: str) -> T.Optional[Label]:
- """
- get a label using its reference string
- :param reference: reference string
- :return: label
- """
- return self.labels.filter_by(reference=reference).one_or_none()
- def create_label(self, name: str,
- reference: T.Optional[str] = None,
- parent_id: T.Union[Label, int, str] = None,
- hierarchy_level: T.Optional[str] = None,
- commit: bool = True) -> T.Tuple[T.Optional[Label], bool]:
- """
- create a label for this project. If there is already a label with the same reference
- in the database its name is updated.
- :param name: label name
- :param reference: label reference
- :param parent: either parent identifier, parent reference string or `Label` object
- :param hierarchy_level: hierarchy level name
- :return: created or edited label, insert
- """
- if isinstance(parent_id, str):
- parent_id = self.label_by_reference(parent_id)
- if isinstance(parent_id, Label):
- parent_id = parent_id.id
- label, is_new = Label.get_or_create(project=self, reference=reference)
- label.name = name
- label.hierarchy_level = hierarchy_level
- label.set_parent(parent_id, commit=False)
- if commit:
- self.commit()
- return label, is_new
- def collection(self, id: int) -> T.Optional[Collection]:
- """
- get a collection using its unique identifier
- :param identifier: unique identifier
- :return: collection
- """
- return self.collections.filter_by(id=id).one_or_none()
- def collection_by_reference(self, reference: str) -> T.Optional[Collection]:
- """
- get a collection using its unique identifier
- :param identifier: unique identifier
- :return: collection
- """
- return self.collections.filter_by(reference=reference).one_or_none()
- def create_collection(self,
- reference: str,
- name: str,
- description: str,
- position: int,
- autoselect: bool,
- commit: bool = True) -> T.Tuple[Collection, bool]:
- """
- create a new collection associated with this project
- :param reference: collection reference string
- :param name: collection name
- :param description: collection description
- :param position: position in menus
- :param autoselect: automatically select this collection on session load
- :return: collection object, insert
- """
- collection, is_new = Collection.get_or_create(project_id=self.id, reference=reference)
- collection.name = name
- collection.description = description
- collection.position = position
- collection.autoselect = autoselect
- if commit:
- self.commit()
- return collection, is_new
- def add_file(self, uuid: str, file_type: str, name: str, extension: str, size: int,
- filename: str, frames: int = None, fps: float = None, commit: bool = True) -> T.Tuple[File, bool]:
- """
- add a file to this project
- :param uuid: unique identifier which is used for temporary files
- :param file_type: file type (either image or video)
- :param name: file name
- :param extension: file extension
- :param size: file size
- :param filename: actual name in filesystem
- :param frames: frame count
- :param fps: frames per second
- :return: file
- """
- path = os.path.join(self.data_folder, filename + extension)
- file, is_new = File.get_or_create(project=self, path=path)
- file.uuid = uuid
- file.type = file_type
- file.name = name
- file.extension = extension
- file.size = size
- file.frames = frames
- file.fps = fps
- if commit:
- self.commit()
- return file, is_new
- def count_files(self) -> int:
- """
- count files associated with this project
- :return: count
- """
- return self.files.count()
- def get_files(self, offset: int = 0, limit: int = -1) -> T.Iterator[File]:
- """
- get an iterator of files associated with this project
- :param offset: file offset
- :param limit: file limit
- :return: iterator of files
- """
- return self.files.order_by(File.id).offset(offset).limit(limit)
- def _files_without_results(self):
- """
- get files without any results
- :return: a query object
- """
- return self.files.filter(~File.results.any())
- def count_files_without_results(self) -> int:
- """
- count files without associated results
- :return: count
- """
- return self._files_without_results().count()
- def files_without_results(self) -> T.Iterator[File]:
- """
- get an iterator of files without associated results
- :return: list of files
- """
- return self._files_without_results().all()
|