1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- from os import getcwd
- from os import path
- from eventlet import tpool
- from pycs.ApplicationStatus import ApplicationStatus
- from pycs.pipeline.Job import Job
- from pycs.pipeline.tf1.pipeline import Pipeline as TF1Pipeline
- class PipelineManager:
- def __init__(self, app_status: ApplicationStatus):
- self.app_status = app_status
- # app_status['projects'].subscribe(self.__update)
- def run(self, project_identifier, file_identifier):
- # find project
- opened_projects = list(filter(lambda x: x['id'] == project_identifier, self.app_status['projects']))
- if len(opened_projects) == 0:
- return
- current_project = opened_projects[0]
- # find data object
- data_objects = list(filter(lambda x: x['id'] == file_identifier, current_project['data']))
- if len(data_objects) == 0:
- return
- target_objects = list(filter(lambda o: 'predictionResults' not in o.keys(), current_project['data']))
- if len(target_objects) == 0:
- return
- # load pipeline
- pipeline = tpool.execute(self.__load_pipeline, current_project['pipeline']['model-distribution'])
- for target_object in target_objects:
- print('>>>', target_object)
- # create job list
- # TODO update job progress
- job = Job('detect-faces', current_project['id'], target_object)
- result = tpool.execute(lambda p, j: p.execute(j), pipeline, job)
- target_object['predictionResults'] = result.predictions
- print('<<<', target_object)
- # close pipeline
- pipeline.close()
- def __update(self, data):
- # get current project path
- opened_projects = list(filter(lambda x: x['status'] == 'open', data))
- if len(opened_projects) == 0:
- return
- current_project = opened_projects[0]
- # find images to predict
- if 'data' not in current_project.keys() or len(current_project['data']) == 0:
- return
- # load pipeline
- pipeline = tpool.execute(self.__load_pipeline, current_project['pipeline']['model-distribution'])
- # create job list
- for d in current_project['data']:
- print('keys:', d.keys())
- if 'result' not in d.keys():
- # TODO update job progress
- job = Job('detect-faces', current_project['id'], d)
- result = tpool.execute(lambda p, j: p.execute(j), pipeline, job)
- d['result'] = result.predictions
- # close pipeline
- pipeline.close()
- def __load_pipeline(self, pipeline_identifier):
- model_distribution = self.app_status['models'][pipeline_identifier]
- if model_distribution['mode'] == 'tf1':
- model_root = path.join(getcwd(), 'models', model_distribution['name'])
- pipeline = TF1Pipeline()
- pipeline.load(model_root, model_distribution['pipeline'])
- return pipeline
|