1
1

PipelineCache.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. import queue
  2. import warnings
  3. from threading import Lock
  4. from time import sleep
  5. from time import time
  6. import eventlet
  7. from eventlet import spawn_n
  8. from eventlet import tpool
  9. from pycs import app
  10. from pycs.database.Project import Project
  11. from pycs.interfaces.Pipeline import Pipeline
  12. from pycs.jobs.JobRunner import JobRunner
  13. from pycs.util.PipelineUtil import load_from_root_folder
  14. class PipelineCache:
  15. """
  16. Store initialized pipelines and call `close` after `CLOSE_TIMER` if they are not requested
  17. another time.
  18. """
  19. CLOSE_TIMER = 120
  20. def __init__(self, jobs: JobRunner, cache_time: float = None):
  21. self.__jobs = jobs
  22. self.__pipelines = {}
  23. self.__is_running = False
  24. self.__queue = queue.Queue()
  25. self.__lock = Lock()
  26. self._cache_time = cache_time or self.CLOSE_TIMER
  27. msg = ("Initialized Pipeline cache "
  28. f"(pipelines are closed after {self._cache_time:.3f} sec)")
  29. app.logger.info(msg)
  30. def start(self):
  31. """ starts the main worker method """
  32. if self.__is_running:
  33. warnings.warn("Pipeline cache is already started")
  34. return
  35. spawn_n(self.__run)
  36. @property
  37. def is_empty(self):
  38. """ checks whether the pipeline cache is empty """
  39. return len(self.__pipelines) == 0 and self.__queue.empty()
  40. def shutdown(self):
  41. """ puts None in the queue to signal the worker to stop """
  42. self.__queue.put(None)
  43. def load_from_root_folder(self, project_id: int, root_folder: str) -> Pipeline:
  44. """
  45. load configuration.json and create an instance from the included code object
  46. :param project_id: associated project ID
  47. :param root_folder: path to model root folder
  48. :return: Pipeline instance
  49. """
  50. # check if instance is cached
  51. with self.__lock:
  52. if root_folder in self.__pipelines:
  53. instance = self.__pipelines[root_folder]
  54. # increase reference counter
  55. instance[0] += 1
  56. # return instance
  57. return instance[1]
  58. # load pipeline
  59. pipeline = load_from_root_folder(root_folder)
  60. # save instance to cache
  61. with self.__lock:
  62. if not self.__is_running:
  63. warnings.warn("[save instance] pipeline cache was not started yet!")
  64. self.__pipelines[root_folder] = [1, pipeline, project_id]
  65. # return
  66. return pipeline
  67. def free_instance(self, root_folder: str):
  68. """
  69. Change an instance's status to unused and start the timer to call it's `close` function
  70. after `_cache_time` seconds. The next call to `load_from_root_folder` in this interval
  71. will disable this timer.
  72. :param root_folder: path to model root folder
  73. """
  74. # abort if no pipeline with this root folder is loaded
  75. with self.__lock:
  76. if root_folder not in self.__pipelines:
  77. return
  78. # start timeout
  79. timestamp = time()
  80. self.__queue.put((root_folder, timestamp))
  81. if not self.__is_running:
  82. warnings.warn("[free instance] pipeline cache was not started yet!")
  83. def __get(self):
  84. while True:
  85. # get element from queue
  86. while True:
  87. try:
  88. entry = self.__queue.get(block=False)
  89. break
  90. except queue.Empty:
  91. eventlet.sleep(0.2)
  92. if entry is None:
  93. # closing pipeline cache
  94. return None
  95. root_folder, timestamp = entry
  96. # sleep if needed
  97. delay = int(timestamp + self._cache_time - time())
  98. if delay > 0:
  99. app.logger.info(f"Cache sleeps for {delay:.3f} sec")
  100. sleep(delay)
  101. # lock and access __pipelines
  102. with self.__lock:
  103. app.logger.info("Removing pipeline from cache")
  104. instance = self.__pipelines[root_folder]
  105. # reference counter greater than 1
  106. if instance[0] > 1:
  107. # decrease reference counter
  108. instance[0] -= 1
  109. # reference counter equals 1
  110. else:
  111. # delete instance from __pipelines and return to call `close` function
  112. del self.__pipelines[root_folder]
  113. return instance[1], instance[2]
  114. def __run(self):
  115. while True:
  116. self.__is_running = True
  117. # get pipeline
  118. result = tpool.execute(self.__get)
  119. if result is None:
  120. self.__is_running = False
  121. return
  122. pipeline, project_id = result
  123. project = Project.query.get(project_id)
  124. # create job to close pipeline
  125. self.__jobs.run(project,
  126. 'Model Interaction',
  127. f'{project.name} (close pipeline)',
  128. f'{project.name}/model-interaction',
  129. pipeline.close
  130. )