1
1

PipelineManager.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. from os import getcwd
  2. from os import path
  3. from eventlet import tpool
  4. from pycs.ApplicationStatus import ApplicationStatus
  5. from pycs.pipeline.Job import Job
  6. from pycs.pipeline.tf1.pipeline import Pipeline as TF1Pipeline
  7. class PipelineManager:
  8. def __init__(self, app_status: ApplicationStatus):
  9. self.app_status = app_status
  10. # app_status['projects'].subscribe(self.__update)
  11. def run(self, project_identifier, file_identifier):
  12. # find project
  13. opened_projects = list(filter(lambda x: x['id'] == project_identifier, self.app_status['projects']))
  14. if len(opened_projects) == 0:
  15. return
  16. current_project = opened_projects[0]
  17. # find data object
  18. data_objects = list(filter(lambda x: x['id'] == file_identifier, current_project['data']))
  19. if len(data_objects) == 0:
  20. return
  21. target_objects = list(filter(lambda o: 'predictionResults' not in o.keys(), current_project['data']))
  22. if len(target_objects) == 0:
  23. return
  24. # load pipeline
  25. pipeline = tpool.execute(self.__load_pipeline, current_project['pipeline']['model-distribution'])
  26. for target_object in target_objects:
  27. print('>>>', target_object)
  28. # create job list
  29. # TODO update job progress
  30. job = Job('detect-faces', current_project['id'], target_object)
  31. result = tpool.execute(lambda p, j: p.execute(j), pipeline, job)
  32. target_object['predictionResults'] = result.predictions
  33. print('<<<', target_object)
  34. # close pipeline
  35. pipeline.close()
  36. def __update(self, data):
  37. # get current project path
  38. opened_projects = list(filter(lambda x: x['status'] == 'open', data))
  39. if len(opened_projects) == 0:
  40. return
  41. current_project = opened_projects[0]
  42. # find images to predict
  43. if 'data' not in current_project.keys() or len(current_project['data']) == 0:
  44. return
  45. # load pipeline
  46. pipeline = tpool.execute(self.__load_pipeline, current_project['pipeline']['model-distribution'])
  47. # create job list
  48. for d in current_project['data']:
  49. print('keys:', d.keys())
  50. if 'result' not in d.keys():
  51. # TODO update job progress
  52. job = Job('detect-faces', current_project['id'], d)
  53. result = tpool.execute(lambda p, j: p.execute(j), pipeline, job)
  54. d['result'] = result.predictions
  55. # close pipeline
  56. pipeline.close()
  57. def __load_pipeline(self, pipeline_identifier):
  58. model_distribution = self.app_status['models'][pipeline_identifier]
  59. if model_distribution['mode'] == 'tf1':
  60. model_root = path.join(getcwd(), 'models', model_distribution['name'])
  61. pipeline = TF1Pipeline()
  62. pipeline.load(model_root, model_distribution['pipeline'])
  63. return pipeline