6
0

PipelineManager.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. from os import getcwd
  2. from os import path
  3. from eventlet import tpool
  4. from pycs.ApplicationStatus import ApplicationStatus
  5. from pycs.pipeline.Job import Job
  6. from pycs.pipeline.tf1.pipeline import Pipeline as TF1Pipeline
  7. class PipelineManager:
  8. def __init__(self, app_status: ApplicationStatus):
  9. self.app_status = app_status
  10. app_status['projects'].subscribe(self.__update)
  11. def __update(self, data):
  12. # get current project path
  13. opened_projects = list(filter(lambda x: x['status'] == 'open', data))
  14. if len(opened_projects) == 0:
  15. return
  16. current_project = opened_projects[0]
  17. # find images to predict
  18. if 'data' not in current_project.keys() or len(current_project['data']) == 0:
  19. return
  20. # load pipeline
  21. pipeline = tpool.execute(self.__load_pipeline, current_project['pipeline']['model-distribution'])
  22. # create job list
  23. for d in current_project['data']:
  24. print('keys:', d.keys())
  25. if 'result' not in d.keys():
  26. # TODO update job progress
  27. job = Job('detect-faces', current_project['id'], d)
  28. result = tpool.execute(lambda p, j: p.execute(j), pipeline, job)
  29. d['result'] = result.predictions
  30. # close pipeline
  31. pipeline.close()
  32. def __load_pipeline(self, identifier):
  33. model_distribution = self.app_status['models'][identifier]
  34. if model_distribution['mode'] == 'tf1':
  35. model_root = path.join(getcwd(), 'models', model_distribution['name'])
  36. pipeline = TF1Pipeline()
  37. pipeline.load(model_root, model_distribution['pipeline'])
  38. return pipeline