123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- from __future__ import annotations
- import json
- import typing as T
- from contextlib import closing
- from datetime import datetime
- from pycs import db
- from pycs.database.Result import Result
- from pycs.database.Collection import Collection
- from pycs.database.base import NamedBaseModel
- class File(NamedBaseModel):
- # table columns
- uuid = db.Column(db.String, nullable=False)
- extension = db.Column(db.String, nullable=False)
- type = db.Column(db.String, nullable=False)
- size = db.Column(db.String, nullable=False)
- created = db.Column(db.DateTime, default=datetime.utcnow,
- index=True, nullable=False)
- path = db.Column(db.String, nullable=False)
- frames = db.Column(db.Integer)
- fps = db.Column(db.Float)
- project_id = db.Column(
- db.Integer,
- db.ForeignKey("project.id", ondelete="CASCADE"),
- nullable=False)
- collection_id = db.Column(
- db.Integer,
- db.ForeignKey("collection.id", ondelete="SET NULL"))
- # contraints
- __table_args__ = (
- db.UniqueConstraint('project_id', 'path'),
- )
- # relationships to other models
- results = db.relationship("Result", backref="file", lazy="dynamic")
- serialize_rules = ('-results',)
- def serialize(self):
- result = super().serialize()
- if result["data"] is not None:
- result["data"] = json.loads(result["data"])
- return result
- def set_collection(self, id: T.Optional[int]):
- """
- set this file's collection
- :param id: new collection id
- :return:
- """
- self.collection_id = id
- self.commit()
- def set_collection_by_reference(self, collection_reference: T.Optional[str]):
- """
- set this file's collection
- :param collection_reference: collection reference
- :return:
- """
- if self.collection_reference is None:
- self.set_collection(None)
- collection = Collection.query.filter_by(reference=collection_reference).one()
- self.collection = collection
- self.commit()
- def _get_another_file(self, *query) -> T.Optional[File]:
- """
- get the first file matching the query ordered by descending id
- :return: another file or None
- """
- return self.project.files.filter(*query)\
- .order_by(File.id.desc())\
- .first()
- def next(self) -> T.Optional[File]:
- """
- get the successor of this file
- :return: another file or None
- """
- query = File.id > self.id
- return self._get_another_file(*query)
- def previous(self) -> T.Optional[File]:
- """
- get the predecessor of this file
- :return: another file or None
- """
- query = File.id < self.id
- return self._get_another_file(*query)
- def next_in_collection(self) -> T.Optional[File]:
- """
- get the predecessor of this file
- :return: another file or None
- """
- query = File.id > self.id, Collection.id == self.collection_id
- return self._get_another_file(*query)
- def previous_in_collection(self) -> T.Optional[File]:
- """
- get the predecessor of this file
- :return: another file or None
- """
- query = File.id < self.id, Collection.id == self.collection_id
- return self._get_another_file(*query)
- def result(self, id: int) -> T.Optional[Result]:
- return self.results.get(id)
- def create_result(self, origin, result_type, label, data: T.Optional[dict] = None):
- data = data if data is None else json.dumps(data)
- result = Result.new(commit=True,
- file=self,
- origin=origin,
- type=result_type,
- label=label,
- data=data)
- return result
- def remove_results(self, origin='pipeline'):
- results = Result.query.filter(Result.file == self, Result.origin == origin)
- results.remove()
- return results
|