123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- from json import load
- from os import path, mkdir, listdir
- from os.path import splitext
- from uuid import uuid1
- from eventlet import spawn_after
- from pycs.observable import ObservableDict
- from pycs.pipeline.PipelineManager import PipelineManager
- from pycs.projects.ImageFile import ImageFile
- from pycs.projects.UnmanagedImageFile import UnmanagedImageFile
- from pycs.projects.UnmanagedVideoFile import UnmanagedVideoFile
- from pycs.projects.VideoFile import VideoFile
- from pycs.util.RecursiveDictionary import set_recursive
- class Project(ObservableDict):
- DEFAULT_PIPELINE_TIMEOUT = 120
- def __init__(self, obj: dict, parent):
- self.pipeline_manager = None
- self.quit_pipeline_thread = None
- self.unmanaged_files_keys = []
- self.unmanaged_files = {}
- # ensure all required object keys are available
- for key in ['data', 'labels', 'jobs']:
- if key not in obj.keys():
- obj[key] = {}
- # load model data
- folder = path.join('projects', obj['id'], 'model')
- with open(path.join(folder, 'distribution.json'), 'r') as file:
- model = load(file)
- model['path'] = folder
- obj['model'] = model
- # save data as MediaFile objects
- if obj['unmanaged'] is None:
- for key in obj['data'].keys():
- obj['data'][key] = self.create_media_file(obj['data'][key], project=obj)
- # handle unmanaged files
- else:
- prev = None
- for file in listdir(obj['unmanaged']):
- uuid, ext = splitext(file)
- next = {
- 'id': uuid,
- 'extension': ext
- }
- next = self.create_media_file(next, project=obj)
- if prev is not None:
- next.prev(prev)
- prev.next(next)
- prev = next
- self.unmanaged_files_keys.append(uuid)
- self.unmanaged_files[uuid] = next
- length = len(self.unmanaged_files_keys)
- for key in self.unmanaged_files:
- self.unmanaged_files[key].length(length)
- # initialize super
- super().__init__(obj, parent)
- # create data and temp
- data_path = path.join('projects', self['id'], 'data')
- if not path.exists(data_path):
- mkdir(data_path)
- temp_path = path.join('projects', self['id'], 'temp')
- if not path.exists(temp_path):
- mkdir(temp_path)
- # subscribe to changes to write to disk afterwards
- self.subscribe(lambda d, k: self.parent.write_project(self['id']))
- def update_properties(self, update):
- set_recursive(update, self)
- def get_media_file(self, identifier):
- if self['unmanaged']:
- if identifier not in self.unmanaged_files_keys:
- return None
- return self.unmanaged_files[identifier]
- else:
- if identifier not in self['data'].keys():
- return None
- return self['data'][identifier]
- def new_media_file_path(self):
- return path.join('projects', self['id'], 'data'), str(uuid1())
- def create_media_file(self, file, project=None):
- if project is None:
- project = self
- if file['extension'] in ['.jpg', '.png']:
- if project['unmanaged']:
- return UnmanagedImageFile(file, project)
- else:
- return ImageFile(file, project)
- if file['extension'] in ['.mp4']:
- if project['unmanaged']:
- return UnmanagedVideoFile(file, project)
- else:
- return VideoFile(file, project)
- raise NotImplementedError
- def add_media_file(self, uuid, name, extension, size, created):
- file = {
- 'id': uuid,
- 'name': name,
- 'extension': extension,
- 'size': size,
- 'created': created
- }
- self['data'][file['id']] = self.create_media_file(file)
- def remove_media_file(self, file_id):
- del self['data'][file_id]
- def add_label(self, name, identifier=None):
- if identifier is None:
- identifier = str(uuid1())
- self['labels'][identifier] = {
- 'id': identifier,
- 'name': name
- }
- def update_label(self, identifier, name):
- if identifier in self['labels']:
- self['labels'][identifier]['name'] = name
- def remove_label(self, identifier):
- # abort if identifier is unknown
- if identifier not in self['labels']:
- return
- # remove label from data elements
- remove = list()
- for data in self['data']:
- for pred in self['data'][data]['predictionResults']:
- if 'label' in self['data'][data]['predictionResults'][pred]:
- if self['data'][data]['predictionResults'][pred]['label'] == identifier:
- remove.append((data, pred))
- for t in remove:
- del self['data'][t[0]]['predictionResults'][t[1]]
- # remove label from list
- del self['labels'][identifier]
- def predict(self, identifiers, unlabeled=False):
- # create pipeline
- pipeline = self.__create_pipeline()
- # run jobs
- if self['unmanaged'] is None:
- for file_id in identifiers:
- if file_id in self['data'].keys():
- if not unlabeled or len(self['data'][file_id]['predictionResults'].keys()) == 0:
- pipeline.run(self['data'][file_id])
- else:
- for file_id in identifiers:
- if file_id in self.unmanaged_files:
- if not unlabeled or len(self.unmanaged_files[file_id].get_data()['predictionResults']) == 0:
- pipeline.run(self.unmanaged_files[file_id])
- # schedule timeout thread
- self.quit_pipeline_thread = spawn_after(self.DEFAULT_PIPELINE_TIMEOUT, self.__quit_pipeline)
- def fit(self):
- # create pipeline
- pipeline = self.__create_pipeline()
- # run fit
- pipeline.fit()
- # schedule timeout thread
- self.quit_pipeline_thread = spawn_after(self.DEFAULT_PIPELINE_TIMEOUT, self.__quit_pipeline)
- def __create_pipeline(self):
- # abort pipeline termination
- self.__quit_pipeline_thread()
- # create pipeline if it does not exist already
- if self.pipeline_manager is None:
- self.pipeline_manager = PipelineManager(self)
- return self.pipeline_manager
- def __quit_pipeline(self):
- if self.pipeline_manager is not None:
- self.pipeline_manager.close()
- self.pipeline_manager = None
- self.quit_pipeline_thread = None
- def __create_quit_pipeline_thread(self):
- # abort pipeline termination
- self.__quit_pipeline_thread()
- # create new thread
- self.quit_pipeline_thread = spawn_after(self.DEFAULT_PIPELINE_TIMEOUT, self.__quit_pipeline)
- def __quit_pipeline_thread(self):
- if self.quit_pipeline_thread is not None:
- self.quit_pipeline_thread.cancel()
- self.quit_pipeline_thread = None
|