6
0

PipelineCache.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. import queue
  2. import warnings
  3. import eventlet
  4. from threading import Lock
  5. from time import sleep
  6. from time import time
  7. from eventlet import spawn_n
  8. from eventlet import tpool
  9. from pycs import app
  10. from pycs.database.Project import Project
  11. from pycs.interfaces.Pipeline import Pipeline
  12. from pycs.jobs.JobRunner import JobRunner
  13. from pycs.util.PipelineUtil import load_from_root_folder
  14. class PipelineCache:
  15. """
  16. Store initialized pipelines and call `close` after `CLOSE_TIMER` if they are not requested
  17. another time.
  18. """
  19. CLOSE_TIMER = 120
  20. def __init__(self, jobs: JobRunner, cache_time: float = None):
  21. self.__jobs = jobs
  22. self.__pipelines = {}
  23. self.__is_running = False
  24. self.__queue = queue.Queue()
  25. self.__lock = Lock()
  26. self._cache_time = cache_time or self.CLOSE_TIMER
  27. msg = ("Initialized Pipeline cache "
  28. f"(pipelines are closed after {self._cache_time:.3f} sec)")
  29. app.logger.info(msg)
  30. def start(self):
  31. """ starts the main worker method """
  32. if self.__is_running:
  33. warnings.warn("Pipeline cache is already started")
  34. return
  35. spawn_n(self.__run)
  36. @property
  37. def is_empty(self):
  38. """ checks whether the pipeline cache is empty """
  39. return len(self.__pipelines) == 0 and self.__queue.empty()
  40. def shutdown(self):
  41. """ puts None in the queue to signal the worker to stop """
  42. self.__queue.put(None)
  43. def load_from_root_folder(self, project_id: int, root_folder: str) -> Pipeline:
  44. """
  45. load configuration.json and create an instance from the included code object
  46. :param project_id: associated project ID
  47. :param root_folder: path to model root folder
  48. :return: Pipeline instance
  49. """
  50. # check if instance is cached
  51. with self.__lock:
  52. if root_folder in self.__pipelines:
  53. instance = self.__pipelines[root_folder]
  54. # increase reference counter
  55. instance[0] += 1
  56. # return instance
  57. return instance[1]
  58. # load pipeline
  59. pipeline = load_from_root_folder(root_folder)
  60. # save instance to cache
  61. with self.__lock:
  62. if not self.__is_running:
  63. warnings.warn("[save instance] pipeline cache was not started yet!")
  64. self.__pipelines[root_folder] = [1, pipeline, project_id]
  65. # return
  66. return pipeline
  67. def free_instance(self, root_folder: str):
  68. """
  69. Change an instance's status to unused and start the timer to call it's `close` function
  70. after `_cache_time` seconds. The next call to `load_from_root_folder` in this interval
  71. will disable this timer.
  72. :param root_folder: path to model root folder
  73. """
  74. # abort if no pipeline with this root folder is loaded
  75. with self.__lock:
  76. if root_folder not in self.__pipelines:
  77. return
  78. # start timeout
  79. timestamp = time()
  80. self.__queue.put((root_folder, timestamp))
  81. if not self.__is_running:
  82. warnings.warn("[free instance] pipeline cache was not started yet!")
  83. def __get(self):
  84. while True:
  85. # get element from queue
  86. while True:
  87. try:
  88. entry = self.__queue.get(block=False)
  89. break
  90. except queue.Empty:
  91. eventlet.sleep(0.2)
  92. pass
  93. if entry is None:
  94. # closing pipeline cache
  95. return None
  96. root_folder, timestamp = entry
  97. # sleep if needed
  98. delay = int(timestamp + self._cache_time - time())
  99. if delay > 0:
  100. app.logger.info(f"Cache sleeps for {delay:.3f} sec")
  101. sleep(delay)
  102. # lock and access __pipelines
  103. with self.__lock:
  104. app.logger.info("Removing pipeline from cache")
  105. instance = self.__pipelines[root_folder]
  106. # reference counter greater than 1
  107. if instance[0] > 1:
  108. # decrease reference counter
  109. instance[0] -= 1
  110. # reference counter equals 1
  111. else:
  112. # delete instance from __pipelines and return to call `close` function
  113. del self.__pipelines[root_folder]
  114. return instance[1], instance[2]
  115. def __run(self):
  116. while True:
  117. self.__is_running = True
  118. # get pipeline
  119. result = tpool.execute(self.__get)
  120. if result is None:
  121. self.__is_running = False
  122. return
  123. pipeline, project_id = result
  124. project = Project.query.get(project_id)
  125. # create job to close pipeline
  126. self.__jobs.run(project,
  127. 'Model Interaction',
  128. f'{project.name} (close pipeline)',
  129. f'{project.name}/model-interaction',
  130. pipeline.close
  131. )