123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- from os import path
- from eventlet import tpool
- from pycs.pipeline.Job import Job
- from pycs.projects.Project import Project
- class PipelineManager:
- def __init__(self, project: Project, pipeline):
- code_path = path.join(pipeline['path'], pipeline['code']['module'])
- module_name = code_path.replace('/', '.').replace('\\', '.')
- class_name = pipeline['code']['class']
- mod = __import__(module_name, fromlist=[class_name])
- cl = getattr(mod, class_name)
- self.project = project
- self.pipeline = cl(pipeline['path'], pipeline)
- def close(self):
- self.pipeline.close()
- print('PipelineManager', 'close')
- def run(self, media_file):
- # create job list
- # TODO update job progress
- job = Job('detect-faces', self.project['id'], media_file)
- result = tpool.execute(lambda p, j: p.execute(j), self.pipeline, job)
- # remove existing pipeline predictions from media_fle
- media_file.remove_pipeline_results()
- # add new predictions
- for prediction in result.predictions:
- media_file.add_result(prediction, origin='pipeline')
- '''
- def __load_pipeline(self, pipeline_identifier):
- model_distribution = self.project.parent.parent['models'][pipeline_identifier]
- if model_distribution['mode'] == 'tf1':
- model_root = path.join(getcwd(), 'models', model_distribution['name'])
- #pipeline = TF1Pipeline()
- #pipeline.load(model_root, model_distribution['pipeline'])
- #return pipeline
- '''
- '''
- def __update(self, data):
- # get current project path
- opened_projects = list(filter(lambda x: x['status'] == 'open', data))
- if len(opened_projects) == 0:
- return
- current_project = opened_projects[0]
- # find images to predict
- if 'data' not in current_project.keys() or len(current_project['data']) == 0:
- return
- # load pipeline
- pipeline = tpool.execute(self.__load_pipeline, current_project['pipeline']['model-distribution'])
- # create job list
- for d in current_project['data']:
- print('keys:', d.keys())
- if 'result' not in d.keys():
- # TODO update job progress
- job = Job('detect-faces', current_project['id'], d)
- result = tpool.execute(lambda p, j: p.execute(j), pipeline, job)
- d['result'] = result.predictions
- # close pipeline
- pipeline.close()
- '''
- '''
- def __update(self, data):
- for current_project in data:
- print('>>>>>>>>>>')
- # find images to predict
- if 'data' not in current_project.keys() or len(current_project['data']) == 0:
- return
- # load pipeline
- pipeline = tpool.execute(self.__load_pipeline, current_project['pipeline']['model-distribution'])
- # create job list
- for d in current_project['data']:
- print('keys:', d.keys())
- if 'result' not in d.keys():
- # TODO update job progress
- job = Job('detect-faces', current_project['id'], d)
- result = tpool.execute(lambda p, j: p.execute(j), pipeline, job)
- d['result'] = result.predictions
- # close pipeline
- pipeline.close()
- '''
|