from contextlib import closing from os.path import join from time import time from typing import List, Optional, Tuple, Iterator from pycs.database.Collection import Collection from pycs.database.File import File from pycs.database.Label import Label from pycs.database.LabelProvider import LabelProvider from pycs.database.Model import Model class Project: """ database class for projects """ def __init__(self, database, row): self.database = database self.identifier = row[0] self.name = row[1] self.description = row[2] self.created = row[3] self.model_id = row[4] self.label_provider_id = row[5] self.root_folder = row[6] self.external_data = bool(row[7]) self.data_folder = row[8] def model(self) -> Model: """ get the model this project is associated with :return: model """ return self.database.model(self.model_id) def label_provider(self) -> Optional[LabelProvider]: """ get the label provider this project is associated with :return: label provider """ if self.label_provider_id is not None: return self.database.label_provider(self.label_provider_id) return None def labels(self) -> List[Label]: """ get a list of labels associated with this project :return: list of labels """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT * FROM labels WHERE project = ?', [self.identifier]) return list(map( lambda row: Label(self.database, row), cursor.fetchall() )) def label(self, identifier: int) -> Optional[Label]: """ get a label using its unique identifier :param identifier: unique identifier :return: label """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT * FROM labels WHERE id = ? AND project = ?', (identifier, self.identifier)) row = cursor.fetchone() if row is not None: return Label(self.database, row) return None def create_label(self, name: str, reference: str = None, parent_id: int = None) -> Tuple[Optional[Label], bool]: """ create a label for this project. If there is already a label with the same reference in the database its name is updated. :param name: label name :param reference: label reference :param parent_id: parent's identifier :return: created or edited label, insert """ created = int(time()) with closing(self.database.con.cursor()) as cursor: cursor.execute(''' INSERT INTO labels (project, parent, created, reference, name) VALUES (?, ?, ?, ?, ?) ON CONFLICT (project, reference) DO UPDATE SET parent = ?, name = ? ''', (self.identifier, parent_id, created, reference, name, parent_id, name)) # lastrowid is 0 if on conflict clause applies. # If this is the case we do an extra query to receive the row id. if cursor.lastrowid > 0: row_id = cursor.lastrowid insert = True else: cursor.execute('SELECT id FROM labels WHERE project = ? AND reference = ?', (self.identifier, reference)) row_id = cursor.fetchone()[0] insert = False return self.label(row_id), insert def collections(self) -> List[Collection]: """ get a list of collections associated with this project :return: list of collections """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT * FROM collections WHERE project = ? ORDER BY position ASC', [self.identifier]) return list(map( lambda row: Collection(self.database, row), cursor.fetchall() )) def collection(self, identifier: int) -> Optional[Collection]: """ get a collection using its unique identifier :param identifier: unique identifier :return: collection """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT * FROM collections WHERE id = ? AND project = ?', (identifier, self.identifier)) row = cursor.fetchone() if row is not None: return Collection(self.database, row) return None def collection_by_reference(self, reference: str): """ get a collection using its reference string :param reference: reference string :return: collection """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT * FROM collections WHERE reference = ? AND project = ?', (reference, self.identifier)) row = cursor.fetchone() if row is not None: return Collection(self.database, row) return None def create_collection(self, reference: str, name: str, description: str, position: int, autoselect: bool): autoselect = 1 if autoselect else 0 with closing(self.database.con.cursor()) as cursor: cursor.execute(''' INSERT INTO collections (project, reference, name, description, position, autoselect) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT (project, reference) DO UPDATE SET name = ?, description = ?, position = ?, autoselect = ? ''', (self.identifier, reference, name, description, position, autoselect, name, description, position, autoselect)) # lastrowid is 0 if on conflict clause applies. # If this is the case we do an extra query to receive the row id. if cursor.lastrowid > 0: row_id = cursor.lastrowid insert = True else: cursor.execute('SELECT id FROM collections WHERE project = ? AND reference = ?', (self.identifier, reference)) row_id = cursor.fetchone()[0] insert = False return self.collection(row_id), insert def remove(self) -> None: """ remove this project from the database :return: """ with closing(self.database.con.cursor()) as cursor: cursor.execute('DELETE FROM projects WHERE id = ?', [self.identifier]) def set_name(self, name: str) -> None: """ set this projects name :param name: new name :return: """ with closing(self.database.con.cursor()) as cursor: cursor.execute('UPDATE projects SET name = ? WHERE id = ?', (name, self.identifier)) self.name = name def set_description(self, description: str) -> None: """ set this projects description :param description: new description :return: """ with closing(self.database.con.cursor()) as cursor: cursor.execute('UPDATE projects SET description = ? WHERE id = ?', (description, self.identifier)) self.description = description def count_files(self) -> int: """ count files associated with this project :return: count """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT COUNT(*) FROM files WHERE project = ?', [self.identifier]) return cursor.fetchone()[0] def files(self, offset: int = 0, limit: int = -1) -> List[File]: """ get a list of files associated with this project :param offset: file offset :param limit: file limit :return: list of files """ return list(self.files_iter(offset, limit)) def files_iter(self, offset: int = 0, limit: int = -1) -> Iterator[File]: """ get an iterator of files associated with this project :param offset: file offset :param limit: file limit :return: iterator of files """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT * FROM files WHERE project = ? ORDER BY id ASC LIMIT ? OFFSET ?', (self.identifier, limit, offset)) return map( lambda row: File(self.database, row), cursor.fetchall() ) def count_files_without_results(self) -> int: """ count files without associated results :return: count """ with closing(self.database.con.cursor()) as cursor: cursor.execute(''' SELECT COUNT(*) FROM files LEFT JOIN results ON files.id = results.file WHERE files.project = ? AND results.id IS NULL ''', [self.identifier]) return cursor.fetchone()[0] def files_without_results(self) -> List[File]: """ get a list of files without associated results :return: list of files """ with closing(self.database.con.cursor()) as cursor: cursor.execute(''' SELECT files.* FROM files LEFT JOIN results ON files.id = results.file WHERE files.project = ? AND results.id IS NULL ORDER BY id ASC ''', [self.identifier]) return list(map( lambda row: File(self.database, row), cursor.fetchall() )) def count_files_without_collection(self) -> int: """ count files associated with this project but with no collection :return: count """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT COUNT(*) FROM files WHERE project = ? AND collection IS NULL', [self.identifier]) return cursor.fetchone()[0] def files_without_collection(self, offset=0, limit=-1) -> List[File]: """ get a list of files without not associated with any collection :return: list of files """ with closing(self.database.con.cursor()) as cursor: cursor.execute(''' SELECT * FROM files WHERE files.project = ? AND files.collection IS NULL ORDER BY id ASC LIMIT ? OFFSET ? ''', (self.identifier, limit, offset)) return list(map( lambda row: File(self.database, row), cursor.fetchall() )) def file(self, identifier) -> Optional[File]: """ get a file using its unique identifier :param identifier: unique identifier :return: file """ with closing(self.database.con.cursor()) as cursor: cursor.execute('SELECT * FROM files WHERE id = ? AND project = ?', (identifier, self.identifier)) row = cursor.fetchone() if row is not None: return File(self.database, row) return None def add_file(self, uuid: str, file_type: str, name: str, extension: str, size: int, filename: str, frames: int = None, fps: float = None) -> Tuple[File, bool]: """ add a file to this project :param uuid: unique identifier which is used for temporary files :param file_type: file type (either image or video) :param name: file name :param extension: file extension :param size: file size :param filename: actual name in filesystem :param frames: frame count :param fps: frames per second :return: file """ created = int(time()) path = join(self.data_folder, filename + extension) with closing(self.database.con.cursor()) as cursor: cursor.execute(''' INSERT INTO files ( uuid, project, type, name, extension, size, created, path, frames, fps ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ) ON CONFLICT (project, path) DO UPDATE SET type = ?, name = ?, extension = ?, size = ?, frames = ?, fps = ? ''', (uuid, self.identifier, file_type, name, extension, size, created, path, frames, fps, file_type, name, extension, size, frames, fps)) # lastrowid is 0 if on conflict clause applies. # If this is the case we do an extra query to receive the row id. if cursor.lastrowid > 0: row_id = cursor.lastrowid insert = True else: cursor.execute('SELECT id FROM files WHERE project = ? AND path = ?', (self.identifier, path)) row_id = cursor.fetchone()[0] insert = False return self.file(row_id), insert