123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 |
- import os
- import shutil
- import typing as T
- from datetime import datetime
- from pycs import app
- from pycs import db
- from pycs.database.base import NamedBaseModel
- from pycs.database.Collection import Collection
- from pycs.database.File import File
- from pycs.database.Label import Label
- from pycs.database.util import commit_on_return
- class Project(NamedBaseModel):
- """ DB Model for projects """
- description = db.Column(db.String)
- created = db.Column(db.DateTime, default=datetime.utcnow,
- index=True, nullable=False)
- model_id = db.Column(
- db.Integer,
- db.ForeignKey("model.id", ondelete="SET NULL"))
- label_provider_id = db.Column(
- db.Integer,
- db.ForeignKey("label_provider.id", ondelete="SET NULL"))
- root_folder = db.Column(db.String, nullable=False, unique=True)
- external_data = db.Column(db.Boolean, nullable=False)
- data_folder = db.Column(db.String, nullable=False)
- # contraints
- __table_args__ = ()
- # relationships to other models
- files = db.relationship(
- "File",
- backref="project",
- lazy="dynamic",
- passive_deletes=True,
- )
- labels = db.relationship(
- "Label",
- backref="project",
- lazy="dynamic",
- passive_deletes=True,
- )
- collections = db.relationship(
- "Collection",
- backref="project",
- lazy="dynamic",
- passive_deletes=True,
- )
- serialize_only = NamedBaseModel.serialize_only + (
- "created",
- "description",
- "model_id",
- "label_provider_id",
- "root_folder",
- "external_data",
- "data_folder",
- )
- @commit_on_return
- def delete(self) -> T.Tuple[dict, dict]:
- # pylint: disable=unexpected-keyword-arg
- dump = super().delete(commit=False)
- model_dump = {}
- if self.model_id is not None:
- # pylint: disable=unexpected-keyword-arg
- model_dump = self.model.delete(commit=False)
- if os.path.exists(self.root_folder):
- # remove from file system
- shutil.rmtree(self.root_folder)
- return dump, model_dump
- def label(self, identifier: int) -> T.Optional[Label]:
- """
- get a label using its unique identifier
- :param identifier: unique identifier
- :return: label
- """
- return self.labels.filter(Label.id == identifier).one_or_none()
- def label_by_reference(self, reference: str) -> T.Optional[Label]:
- """
- get a label using its reference string
- :param reference: reference string
- :return: label
- """
- return self.labels.filter(Label.reference == reference).one_or_none()
- def file(self, identifier: int) -> T.Optional[Label]:
- """
- get a file using its unique identifier
- :param identifier: unique identifier
- :return: file
- """
- return self.files.filter(File.id == identifier).one_or_none()
- def collection(self, identifier: int) -> T.Optional[Collection]:
- """
- get a collection using its unique identifier
- :param identifier: unique identifier
- :return: collection
- """
- return self.collections.filter(Collection.id == identifier).one_or_none()
- def collection_by_reference(self, reference: str) -> T.Optional[Collection]:
- """
- get a collection using its unique identifier
- :param identifier: unique identifier
- :return: collection
- """
- return self.collections.filter(Collection.reference == reference).one_or_none()
- @commit_on_return
- def create_label(self, name: str,
- reference: str = None,
- parent: T.Optional[T.Union[int, str, Label]] = None,
- hierarchy_level: str = None) -> T.Tuple[T.Optional[Label], bool]:
- """
- create a label for this project. If there is already a label with the same reference
- in the database its name is updated.
- :param name: label name
- :param reference: label reference
- :param parent: parent label. Either a reference string, a Label id or a Label instance
- :param hierarchy_level: hierarchy level name
- :return: created or edited label, insert
- """
- label = None
- is_new = False
- if reference is not None:
- label = Label.query.filter_by(project_id=self.id, reference=reference).one_or_none()
- if label is None:
- label = Label.new(commit=False, project_id=self.id, reference=reference)
- is_new = True
- label.set_name(name, commit=False)
- label.set_parent(parent, commit=False)
- label.hierarchy_level = hierarchy_level
- return label, is_new
- @commit_on_return
- def bulk_create_labels(self, labels: T.List[T.Dict], clean_old_labels: bool = True):
- """
- Inserts a all labels at once.
- :raises:
- - AssertionError if project_id and reference are not unique
- - ValueError if a cycle in the hierarchy is found
- """
- if clean_old_labels:
- self.labels.delete()
- for label in labels:
- label["project_id"] = self.id
- self.__check_labels(labels)
- app.logger.info(f"Inserting {len(labels):,d} labels")
- db.engine.execute(Label.__table__.insert(), labels)
- self.__set_parents(labels)
- return labels
- def __set_parents(self, labels):
- """ after the bul insert, we need to set correct parent_ids """
- app.logger.info("Setting parents of the labels")
- self.flush()
- for label in labels:
- if label["parent"] is None:
- continue
- label_obj = self.label_by_reference(label["reference"])
- parent_label_obj = self.label_by_reference(label["parent"])
- label_obj.parent_id = parent_label_obj.id
- # pylint: disable=no-self-use
- def __check_labels(self, labels):
- """ check labels for unique keys and cycles """
- unique_keys = dict()
- for label in labels:
- key = (label["project_id"], label["reference"])
- assert key not in unique_keys, \
- f"{key} was not unique: ({label=} vs {unique_keys[key]=})!"
- unique_keys[key] = label
- # pylint: disable=too-many-arguments
- @commit_on_return
- def create_collection(self,
- reference: str,
- name: str,
- description: str,
- position: int,
- autoselect: bool) -> T.Tuple[Collection, bool]:
- """
- create a new collection associated with this project
- :param reference: collection reference string
- :param name: collection name
- :param description: collection description
- :param position: position in menus
- :param autoselect: automatically select this collection on session load
- :return: collection object, insert
- """
- collection, is_new = Collection.get_or_create(
- project_id=self.id, reference=reference)
- collection.name = name
- collection.description = description
- collection.position = position
- collection.autoselect = autoselect
- return collection, is_new
- # pylint: disable=too-many-arguments
- @commit_on_return
- def add_file(self,
- uuid: str,
- file_type: str,
- name: str,
- extension: str,
- size: int,
- filename: str,
- frames: int = None,
- fps: float = None) -> T.Tuple[File, bool]:
- """
- add a file to this project
- :param uuid: unique identifier which is used for temporary files
- :param file_type: file type (either image or video)
- :param name: file name
- :param extension: file extension
- :param size: file size
- :param filename: actual name in filesystem
- :param frames: frame count
- :param fps: frames per second
- :return: file
- """
- path = os.path.join(self.data_folder, f"{filename}{extension}")
- file, is_new = File.get_or_create(
- project_id=self.id, path=path)
- file.uuid = uuid
- file.type = file_type
- file.name = name
- file.extension = extension
- file.size = size
- file.frames = frames
- file.fps = fps
- return file, is_new
- def get_files(self, *filters, offset: int = 0, limit: int = -1) -> T.List[File]:
- """
- get an iterator of files associated with this project
- :param offset: file offset
- :param limit: file limit
- :return: iterator of files
- """
- return self.files.filter(*filters).order_by(File.id).offset(offset).limit(limit)
- def _files_without_results(self):
- """
- get files without any results
- :return: a query object
- """
- # pylint: disable=no-member
- return self.files.filter(~File.results.any())
- def count_files_without_results(self) -> int:
- """
- count files without associated results
- :return: count
- """
- return self._files_without_results().count()
- def files_without_results(self) -> T.List[File]:
- """
- get a list of files without associated results
- :return: list of files
- """
- return self._files_without_results().all()
- def _files_without_collection(self, offset: int = 0, limit: int = -1):
- """
- get files without a collection
- :return: a query object
- """
- # pylint: disable=no-member
- return self.get_files(File.collection_id.is_(None), offset=offset, limit=limit)
- def files_without_collection(self, offset: int = 0, limit: int = -1) -> T.List[File]:
- """
- get a list of files without a collection
- :return: list of files
- """
- return self._files_without_collection(offset=offset, limit=limit).all()
- def count_files_without_collection(self) -> int:
- """
- count files associated with this project but without a collection
- :return: count
- """
- return self._files_without_collection().count()
|