6
0

PipelineCache.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. import eventlet
  2. import datetime as dt
  3. from queue import Queue
  4. from threading import Lock
  5. from time import sleep
  6. # from time import time
  7. from collections import namedtuple
  8. from dataclasses import dataclass
  9. from eventlet import spawn_n
  10. from eventlet import tpool
  11. from pycs import app
  12. from pycs.database.Project import Project
  13. from pycs.interfaces.Pipeline import Pipeline
  14. from pycs.jobs.JobRunner import JobRunner
  15. from pycs.util.PipelineUtil import load_from_root_folder
  16. from pycs.util import GreenWorker
  17. @dataclass
  18. class PipelineEntry(object):
  19. last_used: int = -1
  20. pipeline: Pipeline = None
  21. pipeline_name: str = None
  22. project_id: int = -1
  23. def __post_init__(self):
  24. if self.pipeline is not None:
  25. self.pipeline_name = self.pipeline.__class__.__name__
  26. self.poke()
  27. def poke(self):
  28. self.last_used = dt.datetime.now()
  29. def __str__(self):
  30. return f"<Pipeline '{self.pipeline_name}' for project #{self.project_id} (last_used: {self.last_used})>"
  31. class PipelineCache(GreenWorker):
  32. """
  33. Store initialized pipelines and call `close` after `CLOSE_TIMER` if they are not requested
  34. another time.
  35. """
  36. CLOSE_TIMER = dt.timedelta(seconds=120)
  37. def __init__(self):
  38. super().__init__()
  39. self.__pipelines: dict[PipelineEntry] = {}
  40. self.__lock = Lock()
  41. def load_from_root_folder(self, project: Project, no_cache: bool = False) -> Pipeline:
  42. """
  43. load configuration.json and create an instance from the included code object
  44. :param project: associated project
  45. :return: Pipeline instance
  46. """
  47. root_folder = project.model.root_folder
  48. # check if instance is cached
  49. with self.__lock:
  50. if root_folder in self.__pipelines:
  51. entry: PipelineEntry = self.__pipelines[root_folder]
  52. entry.poke()
  53. self.info(f"Using {entry}")
  54. return entry.pipeline
  55. # load pipeline
  56. pipeline = load_from_root_folder(root_folder)
  57. if no_cache:
  58. return pipeline
  59. # save instance to cache
  60. with self.__lock:
  61. entry = PipelineEntry(pipeline=pipeline, project_id=project.id)
  62. self.info(f"Cached {entry}")
  63. self.__pipelines[root_folder] = entry
  64. self.queue.put((root_folder,))
  65. # return
  66. return pipeline
  67. def free_instance(self, project: Project):
  68. """
  69. Change an instance's status to unused and start the timer to call it's `close` function
  70. after `CLOSE_TIMER` seconds. The next call to `load_from_root_folder` in this interval
  71. will disable this timer.
  72. :param project: associated project
  73. """
  74. root_folder = project.model.root_folder
  75. with self.__lock:
  76. if root_folder in self.__pipelines:
  77. # reset "last used" to now
  78. self.__pipelines[root_folder].poke()
  79. # abort if no pipeline with this root folder is loaded
  80. else:
  81. return
  82. # executed as coroutine in the main thread
  83. def __run__(self):
  84. while True:
  85. # get pipeline
  86. res = tpool.execute(self.work)
  87. if res is self.STOP_QUEUE:
  88. break
  89. pipeline, project_id = res
  90. if pipeline is None:
  91. # pipeline vanished from cache
  92. continue
  93. project = Project.query.get(project_id)
  94. if project is None:
  95. # project does not exist anymore
  96. continue
  97. # create job to close pipeline
  98. JobRunner.run(project,
  99. 'Model Interaction',
  100. f'{project.name} (close pipeline)',
  101. f'{project.name}/model-interaction',
  102. pipeline.close
  103. )
  104. self._finish()
  105. # executed in a separate thread
  106. def work(self):
  107. while True:
  108. res = self.check_queue()
  109. if res is self.STOP_QUEUE:
  110. return res
  111. elif res is self.CONTINUE_QUEUE:
  112. continue
  113. # an entry was found in the queue
  114. return self._check_cache_entry(*res)
  115. def _check_cache_entry(self, key):
  116. with self.__lock:
  117. entry = self.__pipelines.get(key)
  118. if entry is None:
  119. self.info(f"Entry for {key} already gone")
  120. return None, None
  121. self.info(f"Starting checks for {entry}...")
  122. while True:
  123. now = dt.datetime.now()
  124. with self.__lock:
  125. entry = self.__pipelines.get(key)
  126. if entry is None:
  127. self.info(f"Entry for {key} already gone")
  128. return None, None
  129. delay = entry.last_used + self.CLOSE_TIMER - now
  130. if delay.seconds > 0:
  131. sleep(delay.seconds)
  132. continue
  133. with self.__lock:
  134. entry = self.__pipelines.pop(key, None)
  135. if entry is None:
  136. self.info(f"Entry for {key} already gone")
  137. return None, None
  138. self.info(f"Removed {entry} from cache")
  139. return entry.pipeline, entry.project_id
  140. # def __get(self):
  141. # while True:
  142. # # get element from queue
  143. # root_folder, timestamp = self.__queue.get()
  144. # # sleep if needed
  145. # delay = int(timestamp + self.CLOSE_TIMER - time())
  146. # if delay > 0:
  147. # eventlet.sleep(delay)
  148. # # lock and access __pipelines
  149. # with self.__lock:
  150. # instance = self.__pipelines[root_folder]
  151. # # reference counter greater than 1
  152. # if instance.counter > 1:
  153. # # decrease reference counter
  154. # instance.counter -= 1
  155. # # reference counter equals 1
  156. # else:
  157. # # delete instance from __pipelines and return to call `close` function
  158. # del self.__pipelines[root_folder]
  159. # return instance.pipeline, instance.project_id
  160. # def __run(self):
  161. # while True:
  162. # # get pipeline
  163. # pipeline, project_id = tpool.execute(self.__get)
  164. # project = Project.query.get(project_id)
  165. # # create job to close pipeline
  166. # self.__jobs.run(project,
  167. # 'Model Interaction',
  168. # f'{project.name} (close pipeline)',
  169. # f'{project.name}/model-interaction',
  170. # pipeline.close
  171. # )