File.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. from __future__ import annotations
  2. import os
  3. import typing as T
  4. import warnings
  5. from datetime import datetime
  6. from pathlib import Path
  7. from pycs import db
  8. from pycs.database.Collection import Collection
  9. from pycs.database.Result import Result
  10. from pycs.database.Label import Label
  11. from pycs.database.base import NamedBaseModel
  12. from pycs.database.util import commit_on_return
  13. class File(NamedBaseModel):
  14. """ DB Model for files """
  15. # table columns
  16. uuid = db.Column(db.String, nullable=False)
  17. extension = db.Column(db.String, nullable=False)
  18. type = db.Column(db.String, nullable=False)
  19. size = db.Column(db.Integer, nullable=False)
  20. created = db.Column(db.DateTime, default=datetime.utcnow,
  21. index=True, nullable=False)
  22. path = db.Column(db.String, nullable=False)
  23. frames = db.Column(db.Integer)
  24. fps = db.Column(db.Float)
  25. project_id = db.Column(
  26. db.Integer,
  27. db.ForeignKey("project.id", ondelete="CASCADE"),
  28. nullable=False)
  29. collection_id = db.Column(
  30. db.Integer,
  31. db.ForeignKey("collection.id", ondelete="SET NULL"))
  32. # contraints
  33. __table_args__ = (
  34. db.UniqueConstraint('project_id', 'path'),
  35. )
  36. results = db.relationship("Result",
  37. backref="file",
  38. lazy="dynamic",
  39. passive_deletes=True,
  40. )
  41. serialize_only = NamedBaseModel.serialize_only + (
  42. "uuid",
  43. "extension",
  44. "type",
  45. "size",
  46. "created",
  47. "path",
  48. "frames",
  49. "has_annotations",
  50. "fps",
  51. "project_id",
  52. "collection_id",
  53. )
  54. @property
  55. def filename(self):
  56. """ filename consisting of a name and an extension """
  57. return f"{self.name}{self.extension}"
  58. @property
  59. def has_annotations(self):
  60. """ check if there are any referenced results """
  61. return self.results.count() != 0
  62. @property
  63. def absolute_path(self) -> str:
  64. """ returns an absolute of the file """
  65. path = Path(self.path)
  66. if path.is_absolute():
  67. return str(path)
  68. return str(Path.cwd() / path)
  69. # pylint: disable=arguments-differ
  70. def delete(self, commit: bool = True):
  71. """
  72. after the object is deleted, the according physical file
  73. is also delete if commit was True
  74. """
  75. # pylint: disable=unexpected-keyword-arg
  76. dump = super().delete(commit=commit)
  77. if commit:
  78. os.remove(self.path)
  79. # TODO: remove temp files
  80. warnings.warn("Temporary files may still exist!")
  81. return dump
  82. @commit_on_return
  83. def set_collection(self, collection_id: T.Optional[int]):
  84. """
  85. set this file's collection
  86. :param collection_id: new collection id
  87. :return:
  88. """
  89. self.collection_id = collection_id
  90. @commit_on_return
  91. def set_collection_by_reference(self, collection_reference: T.Optional[str]):
  92. """
  93. set this file's collection
  94. :param collection_reference: collection reference
  95. :return:
  96. """
  97. if self.collection_reference is None:
  98. self.set_collection(None)
  99. collection = Collection.query.filter_by(reference=collection_reference).one()
  100. self.collection_id = collection.id
  101. def _get_another_file(self, *query, with_annotations=None) -> T.Optional[File]:
  102. """
  103. get the first file matching the query ordered by descending id
  104. :return: another file or None
  105. """
  106. result = File.query.filter(File.project_id == self.project_id, *query)
  107. if with_annotations is None:
  108. return result
  109. annot_query = File.results.any()
  110. if with_annotations == False:
  111. annot_query = ~annot_query
  112. return result.filter(annot_query)
  113. def next(self, **kwargs) -> T.Optional[File]:
  114. """
  115. get the successor of this file
  116. :return: another file or None
  117. """
  118. res = self._get_another_file(File.path > self.path, **kwargs)\
  119. .order_by(File.path)
  120. return res.first()
  121. def previous(self, **kwargs) -> T.Optional[File]:
  122. """
  123. get the predecessor of this file
  124. :return: another file or None
  125. """
  126. # pylint: disable=no-member
  127. res = self._get_another_file(File.path < self.path, **kwargs)\
  128. .order_by(File.path.desc())
  129. return res.first()
  130. def next_in_collection(self, **kwargs) -> T.Optional[File]:
  131. """
  132. get the predecessor of this file
  133. :return: another file or None
  134. """
  135. return self._get_another_file(
  136. File.path > self.path, File.collection_id == self.collection_id, **kwargs)\
  137. .order_by(File.path).first()
  138. def previous_in_collection(self, **kwargs) -> T.Optional[File]:
  139. """
  140. get the predecessor of this file
  141. :return: another file or None
  142. """
  143. # pylint: disable=no-member
  144. return self._get_another_file(
  145. File.path < self.path, File.collection_id == self.collection_id, **kwargs)\
  146. .order_by(File.path.desc()).first()
  147. def result(self, identifier: int) -> T.Optional[Result]:
  148. """
  149. get one of the file's results
  150. :return: result object or None
  151. """
  152. return self.results.get(identifier)
  153. @commit_on_return
  154. def create_result(self,
  155. origin: str,
  156. result_type: str,
  157. origin_user: str = None,
  158. label: T.Optional[T.Union[Label, int, str]] = None,
  159. data: T.Optional[dict] = None) -> Result:
  160. """
  161. Creates a result and returns the created object
  162. :return: result object
  163. """
  164. if origin == "pipeline" and not origin_user is None:
  165. raise ValueError("If an annotation was made by the pipeline no username"\
  166. "can be specified!")
  167. result = Result.new(commit=False,
  168. file_id=self.id,
  169. origin=origin,
  170. type=result_type,
  171. origin_user=origin_user)
  172. result.data = data
  173. if label is not None:
  174. assert isinstance(label, (int, Label, str)), \
  175. f"Label \"{label}\" has invalid type: {type(label)}"
  176. if isinstance(label, str):
  177. label = Label.query.filter(
  178. Label.project_id == self.project_id,
  179. Label.reference == label).one_or_none()
  180. if isinstance(label, Label):
  181. label = label.id
  182. result.set_label(label, commit=False)
  183. return result
  184. def remove_result(self, result_id: int) -> T.List[Result]:
  185. """
  186. Remove the result with the given id.
  187. :param result_id: id of the result to delete
  188. :return: list of result objects
  189. """
  190. results = Result.query.filter(
  191. Result.file_id == self.id,
  192. Result.id == result_id)
  193. _results = [r.serialize() for r in results.all()]
  194. results.delete()
  195. return _results
  196. def remove_results(self, origin='pipeline') -> T.List[Result]:
  197. """
  198. Remove assigned results, but return them.
  199. :return: list of result objects
  200. """
  201. results = Result.query.filter(
  202. Result.file_id == self.id,
  203. Result.origin == origin)
  204. _results = [r.serialize() for r in results.all()]
  205. results.delete()
  206. return _results