6
0

PipelineCache.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. import warnings
  2. from queue import Queue
  3. from threading import Lock
  4. from time import time, sleep
  5. from eventlet import tpool, spawn_n
  6. from pycs import app
  7. from pycs.database.Project import Project
  8. from pycs.interfaces.Pipeline import Pipeline
  9. from pycs.jobs.JobRunner import JobRunner
  10. from pycs.util.PipelineUtil import load_from_root_folder
  11. class PipelineCache:
  12. """
  13. Store initialized pipelines and call `close` after `CLOSE_TIMER` if they are not requested
  14. another time.
  15. """
  16. CLOSE_TIMER = 120
  17. def __init__(self, jobs: JobRunner, cache_time: float = None):
  18. self.__jobs = jobs
  19. self.__pipelines = {}
  20. self.__is_running = False
  21. self.__queue = Queue()
  22. self.__lock = Lock()
  23. self._cache_time = cache_time or self.CLOSE_TIMER
  24. msg = ("Initialized Pipeline cache "
  25. f"(pipelines are closed after {self._cache_time:.3f} sec)")
  26. app.logger.info(msg)
  27. def start(self):
  28. """ starts the main worker method """
  29. if self.__is_running:
  30. warnings.warn("Pipeline cache is already started")
  31. return
  32. spawn_n(self.__run)
  33. @property
  34. def is_empty(self):
  35. """ checks whether the pipeline cache is empty """
  36. return len(self.__pipelines) == 0
  37. def shutdown(self):
  38. """ puts None in the queue to signal the worker to stop """
  39. self.__queue.put(None)
  40. def load_from_root_folder(self, project_id: int, root_folder: str) -> Pipeline:
  41. """
  42. load configuration.json and create an instance from the included code object
  43. :param project_id: associated project ID
  44. :param root_folder: path to model root folder
  45. :return: Pipeline instance
  46. """
  47. # check if instance is cached
  48. with self.__lock:
  49. if root_folder in self.__pipelines:
  50. instance = self.__pipelines[root_folder]
  51. # increase reference counter
  52. instance[0] += 1
  53. # return instance
  54. return instance[1]
  55. # load pipeline
  56. pipeline = load_from_root_folder(root_folder)
  57. # save instance to cache
  58. with self.__lock:
  59. if not self.__is_running:
  60. warnings.warn("[save instance] pipeline cache was not started yet!")
  61. self.__pipelines[root_folder] = [1, pipeline, project_id]
  62. # return
  63. return pipeline
  64. def free_instance(self, root_folder: str):
  65. """
  66. Change an instance's status to unused and start the timer to call it's `close` function
  67. after `_cache_time` seconds. The next call to `load_from_root_folder` in this interval
  68. will disable this timer.
  69. :param root_folder: path to model root folder
  70. """
  71. # abort if no pipeline with this root folder is loaded
  72. with self.__lock:
  73. if root_folder not in self.__pipelines:
  74. return
  75. # start timeout
  76. timestamp = time()
  77. self.__queue.put((root_folder, timestamp))
  78. if not self.__is_running:
  79. warnings.warn("[free instance] pipeline cache was not started yet!")
  80. def __get(self):
  81. while True:
  82. # get element from queue
  83. entry = self.__queue.get()
  84. if entry is None:
  85. # closing pipeline cache
  86. return None
  87. root_folder, timestamp = entry
  88. # sleep if needed
  89. delay = int(timestamp + self._cache_time - time())
  90. if delay > 0:
  91. app.logger.info(f"Cache sleeps for {delay:.3f} sec")
  92. sleep(delay)
  93. # lock and access __pipelines
  94. with self.__lock:
  95. app.logger.info("Removing pipeline from cache")
  96. instance = self.__pipelines[root_folder]
  97. # reference counter greater than 1
  98. if instance[0] > 1:
  99. # decrease reference counter
  100. instance[0] -= 1
  101. # reference counter equals 1
  102. else:
  103. # delete instance from __pipelines and return to call `close` function
  104. del self.__pipelines[root_folder]
  105. return instance[1], instance[2]
  106. def __run(self):
  107. while True:
  108. self.__is_running = True
  109. # get pipeline
  110. result = tpool.execute(self.__get)
  111. if result is None:
  112. self.__is_running = False
  113. return
  114. pipeline, project_id = result
  115. project = Project.query.get(project_id)
  116. # create job to close pipeline
  117. self.__jobs.run(project,
  118. 'Model Interaction',
  119. f'{project.name} (close pipeline)',
  120. f'{project.name}/model-interaction',
  121. pipeline.close
  122. )