from glob import glob from json import load, dump from os import path, mkdir from shutil import rmtree from time import time from uuid import uuid1 from eventlet import spawn_after from pycs import ApplicationStatus from pycs.observable import ObservableDict from pycs.pipeline.PipelineManager import PipelineManager from pycs.projects.Project import Project class ProjectManager(ObservableDict): DEFAULT_PIPELINE_TIMEOUT = 120 def __init__(self, app_status: ApplicationStatus): self.pipeline_manager = None self.quit_pipeline_thread = None # TODO create projects folder if it does not exist self.app_status = app_status # initialize observable dict with no keys and # app_status object as parent super().__init__({}, app_status) app_status['projects'] = self # find projects for folder in glob('projects/*'): # load project.json with open(path.join(folder, 'project.json'), 'r') as file: project = Project(load(file), self) self[project['id']] = project def write_project(self, uuid): with open(path.join('projects', uuid, 'project.json'), 'w') as file: copy = self[uuid].copy() del copy['jobs'] dump(copy, file, indent=4) def create_project(self, name, description, model): # create dict representation uuid = str(uuid1()) self[uuid] = Project({ 'id': uuid, 'name': name, 'description': description, 'created': int(time()), 'pipeline': { 'model-distribution': model }, 'data': {}, 'labels': {}, 'jobs': {} }, self) # create project directory folder = path.join('projects', uuid) mkdir(folder) # create project.json self.write_project(uuid) def update_project(self, uuid, update): # abort if uuid is no valid key if uuid not in self.keys(): return # set values and write to disk self[uuid].update_properties(update) self.write_project(uuid) def delete_project(self, uuid): # abort if uuid is no valid key if uuid not in self.keys(): return # delete project folder folder = path.join('projects', uuid) rmtree(folder) # delete project data del self[uuid] def predict(self, uuid, identifiers): # abort if uuid is no valid key if uuid not in self.keys(): return project = self[uuid] # abort pipeline termination if self.quit_pipeline_thread is not None: self.quit_pipeline_thread.cancel() self.quit_pipeline_thread = None # create pipeline if it does not exist already if self.pipeline_manager is None: # abort if pipeline id is no valid key pipeline_identifier = project['pipeline']['model-distribution'] pipeline = self.parent['models'][pipeline_identifier] self.pipeline_manager = PipelineManager(project, pipeline) # run jobs for file_id in identifiers: if file_id in project['data'].keys(): self.pipeline_manager.run(project['data'][file_id]) # quit timeout thread if self.quit_pipeline_thread is not None: self.quit_pipeline_thread.cancel() self.quit_pipeline_thread = None # schedule timeout thread self.quit_pipeline_thread = spawn_after(self.DEFAULT_PIPELINE_TIMEOUT, self.__quit_pipeline) def __quit_pipeline(self): if self.pipeline_manager is not None: self.pipeline_manager.close() self.pipeline_manager = None self.quit_pipeline_thread = None