PipelineCache.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. import warnings
  2. from queue import Queue
  3. from threading import Lock
  4. from time import time, sleep
  5. from eventlet import tpool, spawn_n
  6. from pycs.database.Project import Project
  7. from pycs.interfaces.Pipeline import Pipeline
  8. from pycs.jobs.JobRunner import JobRunner
  9. from pycs.util.PipelineUtil import load_from_root_folder
  10. class PipelineCache:
  11. """
  12. Store initialized pipelines and call `close` after `CLOSE_TIMER` if they are not requested
  13. another time.
  14. """
  15. CLOSE_TIMER = 120
  16. def __init__(self, jobs: JobRunner, cache_time: float = None):
  17. self.__jobs = jobs
  18. self.__pipelines = {}
  19. self.__is_running = False
  20. self.__queue = Queue()
  21. self.__lock = Lock()
  22. self._cache_time = cache_time or CLOSE_TIMER
  23. print(f"Initialized Pipeline cache (pipelines are closed after {self._cache_time:.3f} sec)")
  24. def start(self):
  25. """ starts the main worker method """
  26. if self.__is_running:
  27. warnings.warn("Pipeline cache is already started")
  28. return
  29. spawn_n(self.__run)
  30. @property
  31. def is_empty(self):
  32. """ checks whether the pipeline cache is empty """
  33. return len(self.__pipelines) == 0
  34. def shutdown(self):
  35. """ puts None in the queue to signal the worker to stop """
  36. self.__queue.put(None)
  37. def load_from_root_folder(self, project_id: int, root_folder: str) -> Pipeline:
  38. """
  39. load configuration.json and create an instance from the included code object
  40. :param project_id: associated project ID
  41. :param root_folder: path to model root folder
  42. :return: Pipeline instance
  43. """
  44. # check if instance is cached
  45. with self.__lock:
  46. if root_folder in self.__pipelines:
  47. instance = self.__pipelines[root_folder]
  48. # increase reference counter
  49. instance[0] += 1
  50. # return instance
  51. return instance[1]
  52. # load pipeline
  53. pipeline = load_from_root_folder(root_folder)
  54. # save instance to cache
  55. with self.__lock:
  56. if not self.__is_running:
  57. warnings.warn("[save instance] pipeline cache was not started yet!")
  58. self.__pipelines[root_folder] = [1, pipeline, project_id]
  59. # return
  60. return pipeline
  61. def free_instance(self, root_folder: str):
  62. """
  63. Change an instance's status to unused and start the timer to call it's `close` function
  64. after `_cache_time` seconds. The next call to `load_from_root_folder` in this interval
  65. will disable this timer.
  66. :param root_folder: path to model root folder
  67. """
  68. # abort if no pipeline with this root folder is loaded
  69. with self.__lock:
  70. if root_folder not in self.__pipelines:
  71. return
  72. # start timeout
  73. timestamp = time()
  74. self.__queue.put((root_folder, timestamp))
  75. if not self.__is_running:
  76. warnings.warn("[free instance] pipeline cache was not started yet!")
  77. def __get(self):
  78. while True:
  79. # get element from queue
  80. entry = self.__queue.get()
  81. if entry is None:
  82. # closing pipeline cache
  83. return None
  84. root_folder, timestamp = entry
  85. # sleep if needed
  86. delay = int(timestamp + self._cache_time - time())
  87. if delay > 0:
  88. print(f"Cache sleeps for {delay:.3f} sec")
  89. sleep(delay)
  90. # lock and access __pipelines
  91. with self.__lock:
  92. print("Removing pipeline from cache")
  93. instance = self.__pipelines[root_folder]
  94. # reference counter greater than 1
  95. if instance[0] > 1:
  96. # decrease reference counter
  97. instance[0] -= 1
  98. # reference counter equals 1
  99. else:
  100. # delete instance from __pipelines and return to call `close` function
  101. del self.__pipelines[root_folder]
  102. return instance[1], instance[2]
  103. def __run(self):
  104. while True:
  105. self.__is_running = True
  106. # get pipeline
  107. result = tpool.execute(self.__get)
  108. if result is None:
  109. self.__is_running = False
  110. return
  111. pipeline, project_id = result
  112. project = Project.query.get(project_id)
  113. # create job to close pipeline
  114. self.__jobs.run(project,
  115. 'Model Interaction',
  116. f'{project.name} (close pipeline)',
  117. f'{project.name}/model-interaction',
  118. pipeline.close
  119. )