Pipeline.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. from json import dump, load
  2. from os import path
  3. from typing import List
  4. from pycs.pipeline.Fit import Fit
  5. from pycs.pipeline.Job import Job
  6. from pycs.pipeline.Pipeline import Pipeline as Interface
  7. class Pipeline(Interface):
  8. def __init__(self, root_folder, distribution):
  9. print('fmv1 init')
  10. self.root_folder = root_folder
  11. def close(self):
  12. print('fmv1 close')
  13. def execute(self, job: Job) -> List[dict]:
  14. print('fmv1 execute')
  15. data_file = path.join(self.root_folder, 'data.json')
  16. if path.exists(data_file):
  17. with open(data_file, 'r') as file:
  18. result = load(file)
  19. else:
  20. result = {}
  21. if job.path in result:
  22. return result[job.path]
  23. else:
  24. return []
  25. def fit(self, fit: List[Fit]):
  26. print('fmv1 fit')
  27. result = {}
  28. for f in fit:
  29. result[f.path] = f.result
  30. data_file = path.join(self.root_folder, 'data.json')
  31. with open(data_file, 'w') as file:
  32. dump(result, file, indent=4)