6
0

PipelineCache.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. from queue import Queue
  2. from threading import Lock
  3. from time import time, sleep
  4. from eventlet import tpool, spawn_n
  5. from pycs.database.Project import Project
  6. from pycs.interfaces.Pipeline import Pipeline
  7. from pycs.jobs.JobRunner import JobRunner
  8. from pycs.util.PipelineUtil import load_from_root_folder
  9. class PipelineCache:
  10. """
  11. Store initialized pipelines and call `close` after `CLOSE_TIMER` if they are not requested
  12. another time.
  13. """
  14. CLOSE_TIMER = 120
  15. def __init__(self, jobs: JobRunner):
  16. self.__jobs = jobs
  17. self.__pipelines = {}
  18. self.__queue = Queue()
  19. self.__lock = Lock()
  20. spawn_n(self.__run)
  21. def load_from_root_folder(self, project: Project, root_folder: str) -> Pipeline:
  22. """
  23. load configuration.json and create an instance from the included code object
  24. :param project: associated project
  25. :param root_folder: path to model root folder
  26. :return: Pipeline instance
  27. """
  28. # check if instance is cached
  29. with self.__lock:
  30. if root_folder in self.__pipelines:
  31. instance = self.__pipelines[root_folder]
  32. # increase reference counter
  33. instance[0] += 1
  34. # return instance
  35. return instance[1]
  36. # load pipeline
  37. pipeline = load_from_root_folder(root_folder)
  38. # save instance to cache
  39. with self.__lock:
  40. self.__pipelines[root_folder] = [1, pipeline, project]
  41. # return
  42. return pipeline
  43. def free_instance(self, root_folder: str):
  44. """
  45. Change an instance's status to unused and start the timer to call it's `close` function
  46. after `CLOSE_TIMER` seconds. The next call to `load_from_root_folder` in this interval
  47. will disable this timer.
  48. :param root_folder: path to model root folder
  49. """
  50. # abort if no pipeline with this root folder is loaded
  51. with self.__lock:
  52. if root_folder not in self.__pipelines:
  53. return
  54. # start timeout
  55. timestamp = time()
  56. self.__queue.put((root_folder, timestamp))
  57. def __get(self):
  58. while True:
  59. # get element from queue
  60. root_folder, timestamp = self.__queue.get()
  61. # sleep if needed
  62. delay = int(timestamp + self.CLOSE_TIMER - time())
  63. if delay > 0:
  64. sleep(delay)
  65. # lock and access __pipelines
  66. with self.__lock:
  67. instance = self.__pipelines[root_folder]
  68. # reference counter greater than 1
  69. if instance[0] > 1:
  70. # decrease reference counter
  71. instance[0] -= 1
  72. # reference counter equals 1
  73. else:
  74. # delete instance from __pipelines and return to call `close` function
  75. del self.__pipelines[root_folder]
  76. return instance[1], instance[2]
  77. def __run(self):
  78. while True:
  79. # get pipeline
  80. pipeline, project = tpool.execute(self.__get)
  81. # create job to close pipeline
  82. self.__jobs.run(project,
  83. 'Model Interaction',
  84. f'{project.name} (close pipeline)',
  85. f'{project.name}/model-interaction',
  86. pipeline.close
  87. )