PipelineCache.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. import warnings
  2. from queue import Queue
  3. from threading import Lock
  4. from time import time, sleep
  5. from eventlet import tpool, spawn_n
  6. from pycs import app
  7. from pycs.database.Project import Project
  8. from pycs.interfaces.Pipeline import Pipeline
  9. from pycs.jobs.JobRunner import JobRunner
  10. from pycs.util.PipelineUtil import load_from_root_folder
  11. class PipelineCache:
  12. """
  13. Store initialized pipelines and call `close` after `CLOSE_TIMER` if they are not requested
  14. another time.
  15. """
  16. CLOSE_TIMER = 120
  17. def __init__(self, jobs: JobRunner, cache_time: float = None):
  18. self.__jobs = jobs
  19. self.__pipelines = {}
  20. self.__is_running = False
  21. self.__queue = Queue()
  22. self.__lock = Lock()
  23. self._cache_time = cache_time or self.CLOSE_TIMER
  24. app.logger.info(f"Initialized Pipeline cache (pipelines are closed after {self._cache_time:.3f} sec)")
  25. def start(self):
  26. """ starts the main worker method """
  27. if self.__is_running:
  28. warnings.warn("Pipeline cache is already started")
  29. return
  30. spawn_n(self.__run)
  31. @property
  32. def is_empty(self):
  33. """ checks whether the pipeline cache is empty """
  34. return len(self.__pipelines) == 0
  35. def shutdown(self):
  36. """ puts None in the queue to signal the worker to stop """
  37. self.__queue.put(None)
  38. def load_from_root_folder(self, project_id: int, root_folder: str) -> Pipeline:
  39. """
  40. load configuration.json and create an instance from the included code object
  41. :param project_id: associated project ID
  42. :param root_folder: path to model root folder
  43. :return: Pipeline instance
  44. """
  45. # check if instance is cached
  46. with self.__lock:
  47. if root_folder in self.__pipelines:
  48. instance = self.__pipelines[root_folder]
  49. # increase reference counter
  50. instance[0] += 1
  51. # return instance
  52. return instance[1]
  53. # load pipeline
  54. pipeline = load_from_root_folder(root_folder)
  55. # save instance to cache
  56. with self.__lock:
  57. if not self.__is_running:
  58. warnings.warn("[save instance] pipeline cache was not started yet!")
  59. self.__pipelines[root_folder] = [1, pipeline, project_id]
  60. # return
  61. return pipeline
  62. def free_instance(self, root_folder: str):
  63. """
  64. Change an instance's status to unused and start the timer to call it's `close` function
  65. after `_cache_time` seconds. The next call to `load_from_root_folder` in this interval
  66. will disable this timer.
  67. :param root_folder: path to model root folder
  68. """
  69. # abort if no pipeline with this root folder is loaded
  70. with self.__lock:
  71. if root_folder not in self.__pipelines:
  72. return
  73. # start timeout
  74. timestamp = time()
  75. self.__queue.put((root_folder, timestamp))
  76. if not self.__is_running:
  77. warnings.warn("[free instance] pipeline cache was not started yet!")
  78. def __get(self):
  79. while True:
  80. # get element from queue
  81. entry = self.__queue.get()
  82. if entry is None:
  83. # closing pipeline cache
  84. return None
  85. root_folder, timestamp = entry
  86. # sleep if needed
  87. delay = int(timestamp + self._cache_time - time())
  88. if delay > 0:
  89. app.logger.info(f"Cache sleeps for {delay:.3f} sec")
  90. sleep(delay)
  91. # lock and access __pipelines
  92. with self.__lock:
  93. app.logger.info("Removing pipeline from cache")
  94. instance = self.__pipelines[root_folder]
  95. # reference counter greater than 1
  96. if instance[0] > 1:
  97. # decrease reference counter
  98. instance[0] -= 1
  99. # reference counter equals 1
  100. else:
  101. # delete instance from __pipelines and return to call `close` function
  102. del self.__pipelines[root_folder]
  103. return instance[1], instance[2]
  104. def __run(self):
  105. while True:
  106. self.__is_running = True
  107. # get pipeline
  108. result = tpool.execute(self.__get)
  109. if result is None:
  110. self.__is_running = False
  111. return
  112. pipeline, project_id = result
  113. project = Project.query.get(project_id)
  114. # create job to close pipeline
  115. self.__jobs.run(project,
  116. 'Model Interaction',
  117. f'{project.name} (close pipeline)',
  118. f'{project.name}/model-interaction',
  119. pipeline.close
  120. )