PipelineManager.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. from os import path
  2. from eventlet import tpool
  3. from pycs.pipeline.Job import Job
  4. from pycs.projects.Project import Project
  5. class PipelineManager:
  6. def __init__(self, project: Project, pipeline):
  7. code_path = path.join(pipeline['path'], pipeline['code']['module'])
  8. module_name = code_path.replace('/', '.').replace('\\', '.')
  9. class_name = pipeline['code']['class']
  10. mod = __import__(module_name, fromlist=[class_name])
  11. cl = getattr(mod, class_name)
  12. self.project = project
  13. self.pipeline = cl(pipeline['path'], pipeline)
  14. def close(self):
  15. self.pipeline.close()
  16. print('PipelineManager', 'close')
  17. def run(self, media_file):
  18. # create job list
  19. # TODO update job progress
  20. job = Job('detect-faces', self.project['id'], media_file)
  21. result = tpool.execute(lambda p, j: p.execute(j), self.pipeline, job)
  22. # remove existing pipeline predictions from media_fle
  23. media_file.remove_pipeline_results()
  24. # add new predictions
  25. for prediction in result.predictions:
  26. media_file.add_result(prediction, origin='pipeline')
  27. '''
  28. def __load_pipeline(self, pipeline_identifier):
  29. model_distribution = self.project.parent.parent['models'][pipeline_identifier]
  30. if model_distribution['mode'] == 'tf1':
  31. model_root = path.join(getcwd(), 'models', model_distribution['name'])
  32. #pipeline = TF1Pipeline()
  33. #pipeline.load(model_root, model_distribution['pipeline'])
  34. #return pipeline
  35. '''
  36. '''
  37. def __update(self, data):
  38. # get current project path
  39. opened_projects = list(filter(lambda x: x['status'] == 'open', data))
  40. if len(opened_projects) == 0:
  41. return
  42. current_project = opened_projects[0]
  43. # find images to predict
  44. if 'data' not in current_project.keys() or len(current_project['data']) == 0:
  45. return
  46. # load pipeline
  47. pipeline = tpool.execute(self.__load_pipeline, current_project['pipeline']['model-distribution'])
  48. # create job list
  49. for d in current_project['data']:
  50. print('keys:', d.keys())
  51. if 'result' not in d.keys():
  52. # TODO update job progress
  53. job = Job('detect-faces', current_project['id'], d)
  54. result = tpool.execute(lambda p, j: p.execute(j), pipeline, job)
  55. d['result'] = result.predictions
  56. # close pipeline
  57. pipeline.close()
  58. '''
  59. '''
  60. def __update(self, data):
  61. for current_project in data:
  62. print('>>>>>>>>>>')
  63. # find images to predict
  64. if 'data' not in current_project.keys() or len(current_project['data']) == 0:
  65. return
  66. # load pipeline
  67. pipeline = tpool.execute(self.__load_pipeline, current_project['pipeline']['model-distribution'])
  68. # create job list
  69. for d in current_project['data']:
  70. print('keys:', d.keys())
  71. if 'result' not in d.keys():
  72. # TODO update job progress
  73. job = Job('detect-faces', current_project['id'], d)
  74. result = tpool.execute(lambda p, j: p.execute(j), pipeline, job)
  75. d['result'] = result.predictions
  76. # close pipeline
  77. pipeline.close()
  78. '''