PipelineCache.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. import eventlet
  2. import datetime as dt
  3. from queue import Queue
  4. from threading import Lock
  5. from time import sleep
  6. # from time import time
  7. from collections import namedtuple
  8. from dataclasses import dataclass
  9. from eventlet import spawn_n
  10. from eventlet import tpool
  11. from pycs import app
  12. from pycs.database.Project import Project
  13. from pycs.interfaces.Pipeline import Pipeline
  14. from pycs.jobs.JobRunner import JobRunner
  15. from pycs.util.PipelineUtil import load_from_root_folder
  16. from pycs.util.green_worker import GreenWorker
  17. @dataclass
  18. class PipelineEntry(object):
  19. last_used: int = -1
  20. pipeline: Pipeline = None
  21. pipeline_name: str = None
  22. project_id: int = -1
  23. def __post_init__(self):
  24. if self.pipeline is not None:
  25. self.pipeline_name = self.pipeline.__class__.__name__
  26. self.poke()
  27. def poke(self):
  28. self.last_used = dt.datetime.now()
  29. def __str__(self):
  30. return f"<Pipeline '{self.pipeline_name}' for project #{self.project_id} (last_used: {self.last_used})>"
  31. class PipelineCache(GreenWorker):
  32. CLOSE_TIMER = dt.timedelta(seconds=120)
  33. def __init__(self, jobs: JobRunner):
  34. super().__init__()
  35. self.__jobs = jobs
  36. self.__pipelines: dict[PipelineEntry] = {}
  37. self.__lock = Lock()
  38. def load_from_root_folder(self, project: Project, root_folder: str, no_cache: bool = False) -> Pipeline:
  39. """
  40. load configuration.json and create an instance from the included code object
  41. :param projeventletect: associated project
  42. :param root_folder: path to model root folder
  43. :return: Pipeline instance
  44. """
  45. # check if instance is cached
  46. with self.__lock:
  47. if root_folder in self.__pipelines:
  48. entry: PipelineEntry = self.__pipelines[root_folder]
  49. entry.poke()
  50. app.logger.info(f"[{self.ident}] Using {entry}")
  51. return entry.pipeline
  52. # load pipeline
  53. pipeline = load_from_root_folder(root_folder)
  54. if no_cache:
  55. return pipeline
  56. # save instance to cache
  57. with self.__lock:
  58. entry = PipelineEntry(pipeline=pipeline, project_id=project.id)
  59. app.logger.info(f"[{self.ident}] Cached {entry}")
  60. self.__pipelines[root_folder] = entry
  61. # return
  62. return pipeline
  63. def free_instance(self, root_folder: str):
  64. """
  65. Change an instance's status to unused and start the timer to call it's `close` function
  66. after `CLOSE_TIMER` seconds. The next call to `load_from_root_folder` in this interval
  67. will disable this timer.
  68. :param root_folder: path to model root folder
  69. """
  70. # abort if no pipeline with this root folder is loaded
  71. with self.__lock:
  72. if root_folder not in self.__pipelines:
  73. return
  74. self.queue.put((root_folder,))
  75. # executed as coroutine in the main thread
  76. def start_work(self, *args):
  77. # delegate to work method in a separate thread
  78. res = super().start_work(*args)
  79. if res is None:
  80. # an error occured in the execution
  81. return
  82. pipeline, project_id = res
  83. if pipeline is None:
  84. # pipeline vanished
  85. return
  86. project = Project.query.get(project_id)
  87. # create job to close pipeline
  88. self.__jobs.run(project,
  89. 'Model Interaction',
  90. f'{project.name} (close pipeline)',
  91. f'{project.name}/model-interaction',
  92. pipeline.close
  93. )
  94. # executed in a separate thread
  95. def work(self, root_folder):
  96. with self.__lock:
  97. entry = self.__pipelines.get(root_folder)
  98. if entry is None:
  99. app.logger.info(f"[{self.ident}] Entry for {root_folder} already gone")
  100. return
  101. app.logger.info(f"[{self.ident}] Starting checks for {entry}...")
  102. while True:
  103. now = dt.datetime.now()
  104. with self.__lock:
  105. entry = self.__pipelines.get(root_folder)
  106. if entry is None:
  107. app.logger.info(f"[{self.ident}] Entry for {root_folder} already gone")
  108. return None, None
  109. delay = entry.last_used + self.CLOSE_TIMER - now
  110. if delay.seconds > 0:
  111. sleep(delay.seconds)
  112. continue
  113. with self.__lock:
  114. entry = self.__pipelines.pop(root_folder, None)
  115. app.logger.info(f"[{self.ident}] Entry for {root_folder} already gone")
  116. return None, None
  117. app.logger.info(f"[{self.ident}] Removed {entry} from cache")
  118. return entry.pipeline, entry.project_id
  119. # def __get(self):
  120. # while True:
  121. # # get element from queue
  122. # root_folder, timestamp = self.__queue.get()
  123. # # sleep if needed
  124. # delay = int(timestamp + self.CLOSE_TIMER - time())
  125. # if delay > 0:
  126. # eventlet.sleep(delay)
  127. # # lock and access __pipelines
  128. # with self.__lock:
  129. # instance = self.__pipelines[root_folder]
  130. # # reference counter greater than 1
  131. # if instance.counter > 1:
  132. # # decrease reference counter
  133. # instance.counter -= 1
  134. # # reference counter equals 1
  135. # else:
  136. # # delete instance from __pipelines and return to call `close` function
  137. # del self.__pipelines[root_folder]
  138. # return instance.pipeline, instance.project_id
  139. # def __run(self):
  140. # while True:
  141. # # get pipeline
  142. # pipeline, project_id = tpool.execute(self.__get)
  143. # project = Project.query.get(project_id)
  144. # # create job to close pipeline
  145. # self.__jobs.run(project,
  146. # 'Model Interaction',
  147. # f'{project.name} (close pipeline)',
  148. # f'{project.name}/model-interaction',
  149. # pipeline.close
  150. # )