from contextlib import closing from os.path import join from time import time from typing import List, Optional, Tuple, Iterator, Union from pycs.database.Collection import Collection from pycs.database.File import File from pycs.database.Label import Label from pycs.database.LabelProvider import LabelProvider from pycs.database.Model import Model from pycs.database.util.TreeNodeLabel import TreeNodeLabel class Project: """ database class for projects """ def __init__(self, database, row): self.database = database self.identifier = row[0] self.name = row[1] self.description = row[2] self.created = row[3] self.model_id = row[4] self.label_provider_id = row[5] self.root_folder = row[6] self.external_data = bool(row[7]) self.data_folder = row[8] def model(self) -> Model: """ get the model this project is associated with :return: model """ return self.database.model(self.model_id) def label_provider(self) -> Optional[LabelProvider]: """ get the label provider this project is associated with :return: label provider """ if self.label_provider_id is not None: return self.database.label_provider(self.label_provider_id) return None def labels(self) -> List[Label]: """ get a list of labels associated with this project :return: list of labels """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT * FROM labels WHERE project = ?', [self.identifier]) return list(map( lambda row: Label(self.database, row), cursor.fetchall() )) def label_tree(self) -> List[TreeNodeLabel]: """ get a list of root labels associated with this project :return: list of labels """ with closing(self.database.con.cursor()) as cursor: cursor.execute(''' WITH RECURSIVE tree AS ( SELECT labels.* FROM labels WHERE project = ? AND parent IS NULL UNION ALL SELECT labels.* FROM labels JOIN tree ON labels.parent = tree.id ) SELECT * FROM tree ''', [self.identifier]) result = [] lookup = {} for row in cursor.fetchall(): label = TreeNodeLabel(self.database, row) lookup[label.identifier] = label if label.parent_id is None: result.append(label) else: lookup[label.parent_id].children.append(label) return result def label(self, identifier: int) -> Optional[Label]: """ get a label using its unique identifier :param identifier: unique identifier :return: label """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT * FROM labels WHERE id = ? AND project = ?', (identifier, self.identifier)) row = cursor.fetchone() if row is not None: return Label(self.database, row) return None def label_by_reference(self, reference: str) -> Optional[Label]: """ get a label using its reference string :param reference: reference string :return: label """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT * FROM labels WHERE reference = ? AND project = ?', (reference, self.identifier)) row = cursor.fetchone() if row is not None: return Label(self.database, row) return None def create_label(self, name: str, reference: str = None, parent: Union[Label, int, str] = None, hierarchy_level: str = None) -> Tuple[Optional[Label], bool]: """ create a label for this project. If there is already a label with the same reference in the database its name is updated. :param name: label name :param reference: label reference :param parent: either parent identifier, parent reference string or `Label` object :param hierarchy_level: hierarchy level name :return: created or edited label, insert """ created = int(time()) if isinstance(parent, str): parent = self.label_by_reference(parent) if isinstance(parent, Label): parent = parent.identifier with closing(self.database.con.cursor()) as cursor: cursor.execute(''' INSERT INTO labels (project, parent, created, reference, name, hierarchy_level) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT (project, reference) DO UPDATE SET parent = ?, name = ?, hierarchy_level = ? ''', (self.identifier, parent, created, reference, name, hierarchy_level, parent, name, hierarchy_level)) # lastrowid is 0 if on conflict clause applies. # If this is the case we do an extra query to receive the row id. if cursor.lastrowid > 0: row_id = cursor.lastrowid insert = True else: cursor.execute('SELECT id FROM labels WHERE project = ? AND reference = ?', (self.identifier, reference)) row_id = cursor.fetchone()[0] insert = False return self.label(row_id), insert def collections(self) -> List[Collection]: """ get a list of collections associated with this project :return: list of collections """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT * FROM collections WHERE project = ? ORDER BY position ASC', [self.identifier]) return list(map( lambda row: Collection(self.database, row), cursor.fetchall() )) def collection(self, identifier: int) -> Optional[Collection]: """ get a collection using its unique identifier :param identifier: unique identifier :return: collection """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT * FROM collections WHERE id = ? AND project = ?', (identifier, self.identifier)) row = cursor.fetchone() if row is not None: return Collection(self.database, row) return None def collection_by_reference(self, reference: str): """ get a collection using its reference string :param reference: reference string :return: collection """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT * FROM collections WHERE reference = ? AND project = ?', (reference, self.identifier)) row = cursor.fetchone() if row is not None: return Collection(self.database, row) return None def create_collection(self, reference: str, name: str, description: str, position: int, autoselect: bool) -> Tuple[Collection, bool]: """ create a new collection associated with this project :param reference: collection reference string :param name: collection name :param description: collection description :param position: position in menus :param autoselect: automatically select this collection on session load :return: collection object, insert """ autoselect = 1 if autoselect else 0 with closing(self.database.con.cursor()) as cursor: cursor.execute(''' INSERT INTO collections (project, reference, name, description, position, autoselect) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT (project, reference) DO UPDATE SET name = ?, description = ?, position = ?, autoselect = ? ''', (self.identifier, reference, name, description, position, autoselect, name, description, position, autoselect)) # lastrowid is 0 if on conflict clause applies. # If this is the case we do an extra query to receive the row id. if cursor.lastrowid > 0: row_id = cursor.lastrowid insert = True else: cursor.execute('SELECT id FROM collections WHERE project = ? AND reference = ?', (self.identifier, reference)) row_id = cursor.fetchone()[0] insert = False return self.collection(row_id), insert def remove(self) -> None: """ remove this project from the database :return: """ with closing(self.database.con.cursor()) as cursor: cursor.execute('DELETE FROM projects WHERE id = ?', [self.identifier]) def set_name(self, name: str) -> None: """ set this projects name :param name: new name :return: """ with closing(self.database.con.cursor()) as cursor: cursor.execute('UPDATE projects SET name = ? WHERE id = ?', (name, self.identifier)) self.name = name def set_description(self, description: str) -> None: """ set this projects description :param description: new description :return: """ with closing(self.database.con.cursor()) as cursor: cursor.execute('UPDATE projects SET description = ? WHERE id = ?', (description, self.identifier)) self.description = description def count_files(self) -> int: """ count files associated with this project :return: count """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT COUNT(*) FROM files WHERE project = ?', [self.identifier]) return cursor.fetchone()[0] def files(self, offset: int = 0, limit: int = -1) -> Iterator[File]: """ get an iterator of files associated with this project :param offset: file offset :param limit: file limit :return: iterator of files """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT * FROM files WHERE project = ? ORDER BY id ASC LIMIT ? OFFSET ?', (self.identifier, limit, offset)) return map( lambda row: File(self.database, row), cursor.fetchall() ) def count_files_without_results(self) -> int: """ count files without associated results :return: count """ with closing(self.database.con.cursor()) as cursor: cursor.execute(''' SELECT COUNT(*) FROM files LEFT JOIN results ON files.id = results.file WHERE files.project = ? AND results.id IS NULL ''', [self.identifier]) return cursor.fetchone()[0] def files_without_results(self) -> Iterator[File]: """ get an iterator of files without associated results :return: list of files """ with closing(self.database.con.cursor()) as cursor: cursor.execute(''' SELECT files.* FROM files LEFT JOIN results ON files.id = results.file WHERE files.project = ? AND results.id IS NULL ORDER BY id ASC ''', [self.identifier]) for row in cursor: yield File(self.database, row) def count_files_without_collection(self) -> int: """ count files associated with this project but with no collection :return: count """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT COUNT(*) FROM files WHERE project = ? AND collection IS NULL', [self.identifier]) return cursor.fetchone()[0] def files_without_collection(self, offset=0, limit=-1) -> Iterator[File]: """ get an iterator of files without not associated with any collection :return: list of files """ with closing(self.database.con.cursor()) as cursor: cursor.execute(''' SELECT * FROM files WHERE files.project = ? AND files.collection IS NULL ORDER BY id ASC LIMIT ? OFFSET ? ''', (self.identifier, limit, offset)) for row in cursor: yield File(self.database, row) def file(self, identifier) -> Optional[File]: """ get a file using its unique identifier :param identifier: unique identifier :return: file """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT * FROM files WHERE id = ? AND project = ?', (identifier, self.identifier)) row = cursor.fetchone() if row is not None: return File(self.database, row) return None def add_file(self, uuid: str, file_type: str, name: str, extension: str, size: int, filename: str, frames: int = None, fps: float = None) -> Tuple[File, bool]: """ add a file to this project :param uuid: unique identifier which is used for temporary files :param file_type: file type (either image or video) :param name: file name :param extension: file extension :param size: file size :param filename: actual name in filesystem :param frames: frame count :param fps: frames per second :return: file """ created = int(time()) path = join(self.data_folder, filename + extension) with closing(self.database.con.cursor()) as cursor: cursor.execute(''' INSERT INTO files ( uuid, project, type, name, extension, size, created, path, frames, fps ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ) ON CONFLICT (project, path) DO UPDATE SET type = ?, name = ?, extension = ?, size = ?, frames = ?, fps = ? ''', (uuid, self.identifier, file_type, name, extension, size, created, path, frames, fps, file_type, name, extension, size, frames, fps)) # lastrowid is 0 if on conflict clause applies. # If this is the case we do an extra query to receive the row id. if cursor.lastrowid > 0: row_id = cursor.lastrowid insert = True else: cursor.execute('SELECT id FROM files WHERE project = ? AND path = ?', (self.identifier, path)) row_id = cursor.fetchone()[0] insert = False return self.file(row_id), insert