6
0

PipelineCache.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. from queue import Queue
  2. from threading import Lock
  3. from time import time, sleep
  4. from eventlet import tpool, spawn_n
  5. from pycs.database.Project import Project
  6. from pycs.interfaces.Pipeline import Pipeline
  7. from pycs.jobs.JobRunner import JobRunner
  8. from pycs.util.PipelineUtil import load_from_root_folder
  9. class PipelineCache:
  10. CLOSE_TIMER = 120
  11. def __init__(self, jobs: JobRunner):
  12. self.__jobs = jobs
  13. self.__pipelines = {}
  14. self.__queue = Queue()
  15. self.__lock = Lock()
  16. spawn_n(self.__run)
  17. def load_from_root_folder(self, project: Project, root_folder: str) -> Pipeline:
  18. """
  19. load configuration.json and create an instance from the included code object
  20. :param project: associated project
  21. :param root_folder: path to model root folder
  22. :return: Pipeline instance
  23. """
  24. # check if instance is cached
  25. with self.__lock:
  26. if root_folder in self.__pipelines:
  27. instance = self.__pipelines[root_folder]
  28. # increase reference counter
  29. instance[0] += 1
  30. # return instance
  31. return instance[1]
  32. # load pipeline
  33. pipeline = load_from_root_folder(root_folder)
  34. # save instance to cache
  35. with self.__lock:
  36. self.__pipelines[root_folder] = [1, pipeline, project]
  37. # return
  38. return pipeline
  39. def free_instance(self, root_folder: str):
  40. """
  41. Change an instance's status to unused and start the timer to call it's `close` function
  42. after `CLOSE_TIMER` seconds. The next call to `load_from_root_folder` in this interval
  43. will disable this timer.
  44. :param root_folder: path to model root folder
  45. """
  46. # abort if no pipeline with this root folder is loaded
  47. with self.__lock:
  48. if root_folder not in self.__pipelines:
  49. return
  50. # start timeout
  51. timestamp = time()
  52. self.__queue.put((root_folder, timestamp))
  53. def __get(self):
  54. while True:
  55. # get element from queue
  56. root_folder, timestamp = self.__queue.get()
  57. # sleep if needed
  58. delay = int(timestamp + self.CLOSE_TIMER - time())
  59. if delay > 0:
  60. sleep(delay)
  61. # lock and access __pipelines
  62. with self.__lock:
  63. instance = self.__pipelines[root_folder]
  64. # reference counter greater than 1
  65. if instance[0] > 1:
  66. # decrease reference counter
  67. instance[0] -= 1
  68. # reference counter equals 1
  69. else:
  70. # delete instance from __pipelines and return to call `close` function
  71. del self.__pipelines[root_folder]
  72. return instance[1], instance[2]
  73. def __run(self):
  74. while True:
  75. # get pipeline
  76. pipeline, project = tpool.execute(self.__get)
  77. # create job to close pipeline
  78. self.__jobs.run(project,
  79. 'Model Interaction',
  80. f'{project.name} (close pipeline)',
  81. f'{project.name}/model-interaction',
  82. pipeline.close
  83. )