123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- from __future__ import annotations
- import os
- import typing as T
- import warnings
- from datetime import datetime
- from pathlib import Path
- from pycs import db
- from pycs.database.Collection import Collection
- from pycs.database.Result import Result
- from pycs.database.Label import Label
- from pycs.database.base import NamedBaseModel
- from pycs.database.util import commit_on_return
- class File(NamedBaseModel):
- """ DB Model for files """
- # table columns
- uuid = db.Column(db.String, nullable=False)
- extension = db.Column(db.String, nullable=False)
- type = db.Column(db.String, nullable=False)
- size = db.Column(db.Integer, nullable=False)
- created = db.Column(db.DateTime, default=datetime.utcnow,
- index=True, nullable=False)
- path = db.Column(db.String, nullable=False)
- frames = db.Column(db.Integer)
- fps = db.Column(db.Float)
- project_id = db.Column(
- db.Integer,
- db.ForeignKey("project.id", ondelete="CASCADE"),
- nullable=False)
- collection_id = db.Column(
- db.Integer,
- db.ForeignKey("collection.id", ondelete="SET NULL"))
- # contraints
- __table_args__ = (
- db.UniqueConstraint('project_id', 'path'),
- )
- results = db.relationship("Result",
- backref="file",
- lazy="dynamic",
- passive_deletes=True,
- )
- serialize_only = NamedBaseModel.serialize_only + (
- "uuid",
- "extension",
- "type",
- "size",
- "created",
- "path",
- "frames",
- "fps",
- "project_id",
- "collection_id",
- )
- @property
- def filename(self):
- """ filename consisting of a name and an extension """
- return f"{self.name}{self.extension}"
- @property
- def absolute_path(self) -> str:
- """ returns an absolute of the file """
- path = Path(self.path)
- if path.is_absolute():
- return str(path)
- return str(Path.cwd() / path)
- # pylint: disable=arguments-differ
- def delete(self, commit: bool = True):
- """
- after the object is deleted, the according physical file
- is also delete if commit was True
- """
- # pylint: disable=unexpected-keyword-arg
- dump = super().delete(commit=commit)
- if commit:
- os.remove(self.path)
- # TODO: remove temp files
- warnings.warn("Temporary files may still exist!")
- return dump
- @commit_on_return
- def set_collection(self, collection_id: T.Optional[int]):
- """
- set this file's collection
- :param collection_id: new collection id
- :return:
- """
- self.collection_id = collection_id
- @commit_on_return
- def set_collection_by_reference(self, collection_reference: T.Optional[str]):
- """
- set this file's collection
- :param collection_reference: collection reference
- :return:
- """
- if self.collection_reference is None:
- self.set_collection(None)
- collection = Collection.query.filter_by(reference=collection_reference).one()
- self.collection_id = collection.id
- def _get_another_file(self, *query) -> T.Optional[File]:
- """
- get the first file matching the query ordered by descending id
- :return: another file or None
- """
- return File.query.filter(File.project_id == self.project_id, *query)
- def next(self) -> T.Optional[File]:
- """
- get the successor of this file
- :return: another file or None
- """
- return self._get_another_file(File.id > self.id)\
- .order_by(File.id).first()
- def previous(self) -> T.Optional[File]:
- """
- get the predecessor of this file
- :return: another file or None
- """
- # pylint: disable=no-member
- return self._get_another_file(File.id < self.id)\
- .order_by(File.id.desc()).first()
- def next_in_collection(self) -> T.Optional[File]:
- """
- get the predecessor of this file
- :return: another file or None
- """
- return self._get_another_file(
- File.id > self.id, File.collection_id == self.collection_id)\
- .order_by(File.id).first()
- def previous_in_collection(self) -> T.Optional[File]:
- """
- get the predecessor of this file
- :return: another file or None
- """
- # pylint: disable=no-member
- return self._get_another_file(
- File.id < self.id, File.collection_id == self.collection_id)\
- .order_by(File.id.desc()).first()
- def result(self, identifier: int) -> T.Optional[Result]:
- """
- get one of the file's results
- :return: result object or None
- """
- return self.results.get(identifier)
- @commit_on_return
- def create_result(self,
- origin: str,
- result_type: str,
- label: T.Optional[T.Union[Label, int]] = None,
- data: T.Optional[dict] = None) -> Result:
- """
- Creates a result and returns the created object
- :return: result object
- """
- result = Result.new(commit=False,
- file_id=self.id,
- origin=origin,
- type=result_type)
- result.data = data
- if label is not None:
- assert isinstance(label, (int, Label)), f"Wrong label type: {type(label)}"
- if isinstance(label, Label):
- label = label.id
- result.label_id = label
- return result
- def remove_results(self, origin='pipeline') -> T.List[Result]:
- """
- Remove assigned results, but return them.
- :return: list of result objects
- """
- results = Result.query.filter(
- Result.file_id == self.id,
- Result.origin == origin)
- _results = [r.serialize() for r in results.all()]
- results.delete()
- return _results
|