6
0

PipelineCache.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. from queue import Queue
  2. from threading import Lock
  3. from time import time, sleep
  4. from eventlet import tpool, spawn_n
  5. from pycs.database.Project import Project
  6. from pycs.interfaces.Pipeline import Pipeline
  7. from pycs.jobs.JobRunner import JobRunner
  8. from pycs.util.PipelineUtil import load_from_root_folder
  9. class PipelineCache:
  10. """
  11. Store initialized pipelines and call `close` after `CLOSE_TIMER` if they are not requested
  12. another time.
  13. """
  14. CLOSE_TIMER = 120
  15. def __init__(self, jobs: JobRunner):
  16. self.__jobs = jobs
  17. self.__pipelines = {}
  18. self.__queue = Queue()
  19. self.__lock = Lock()
  20. def start(self):
  21. """ starts the main worker method """
  22. spawn_n(self.__run)
  23. @property
  24. def is_empty(self):
  25. """ checks whether the pipeline cache is empty """
  26. return len(self.__pipelines) == 0
  27. def shutdown(self):
  28. """ puts None in the queue to signal the worker to stop """
  29. self.__queue.put(None)
  30. def load_from_root_folder(self, project_id: int, root_folder: str) -> Pipeline:
  31. """
  32. load configuration.json and create an instance from the included code object
  33. :param project_id: associated project ID
  34. :param root_folder: path to model root folder
  35. :return: Pipeline instance
  36. """
  37. # check if instance is cached
  38. with self.__lock:
  39. if root_folder in self.__pipelines:
  40. instance = self.__pipelines[root_folder]
  41. # increase reference counter
  42. instance[0] += 1
  43. # return instance
  44. return instance[1]
  45. # load pipeline
  46. pipeline = load_from_root_folder(root_folder)
  47. # save instance to cache
  48. with self.__lock:
  49. self.__pipelines[root_folder] = [1, pipeline, project_id]
  50. # return
  51. return pipeline
  52. def free_instance(self, root_folder: str):
  53. """
  54. Change an instance's status to unused and start the timer to call it's `close` function
  55. after `CLOSE_TIMER` seconds. The next call to `load_from_root_folder` in this interval
  56. will disable this timer.
  57. :param root_folder: path to model root folder
  58. """
  59. # abort if no pipeline with this root folder is loaded
  60. with self.__lock:
  61. if root_folder not in self.__pipelines:
  62. return
  63. # start timeout
  64. timestamp = time()
  65. self.__queue.put((root_folder, timestamp))
  66. def __get(self):
  67. while True:
  68. # get element from queue
  69. entry = self.__queue.get()
  70. if entry is None:
  71. # closing pipeline cache
  72. return None
  73. root_folder, timestamp = entry
  74. # sleep if needed
  75. delay = int(timestamp + self.CLOSE_TIMER - time())
  76. if delay > 0:
  77. sleep(delay)
  78. # lock and access __pipelines
  79. with self.__lock:
  80. instance = self.__pipelines[root_folder]
  81. # reference counter greater than 1
  82. if instance[0] > 1:
  83. # decrease reference counter
  84. instance[0] -= 1
  85. # reference counter equals 1
  86. else:
  87. # delete instance from __pipelines and return to call `close` function
  88. del self.__pipelines[root_folder]
  89. return instance[1], instance[2]
  90. def __run(self):
  91. while True:
  92. # get pipeline
  93. result = tpool.execute(self.__get)
  94. if result is None:
  95. return
  96. pipeline, project_id = result
  97. project = Project.query.get(project_id)
  98. # create job to close pipeline
  99. self.__jobs.run(project,
  100. 'Model Interaction',
  101. f'{project.name} (close pipeline)',
  102. f'{project.name}/model-interaction',
  103. pipeline.close
  104. )