from queue import Queue from threading import Lock from time import time, sleep from eventlet import tpool, spawn_n from pycs.database.Project import Project from pycs.interfaces.Pipeline import Pipeline from pycs.jobs.JobRunner import JobRunner from pycs.util.PipelineUtil import load_from_root_folder class PipelineCache: """ Store initialized pipelines and call `close` after `CLOSE_TIMER` if they are not requested another time. """ CLOSE_TIMER = 120 def __init__(self, jobs: JobRunner): self.__jobs = jobs self.__pipelines = {} self.__queue = Queue() self.__lock = Lock() spawn_n(self.__run) def load_from_root_folder(self, project: Project, root_folder: str) -> Pipeline: """ load configuration.json and create an instance from the included code object :param project: associated project :param root_folder: path to model root folder :return: Pipeline instance """ # check if instance is cached with self.__lock: if root_folder in self.__pipelines: instance = self.__pipelines[root_folder] # increase reference counter instance[0] += 1 # return instance return instance[1] # load pipeline pipeline = load_from_root_folder(root_folder) # save instance to cache with self.__lock: self.__pipelines[root_folder] = [1, pipeline, project] # return return pipeline def free_instance(self, root_folder: str): """ Change an instance's status to unused and start the timer to call it's `close` function after `CLOSE_TIMER` seconds. The next call to `load_from_root_folder` in this interval will disable this timer. :param root_folder: path to model root folder """ # abort if no pipeline with this root folder is loaded with self.__lock: if root_folder not in self.__pipelines: return # start timeout timestamp = time() self.__queue.put((root_folder, timestamp)) def __get(self): while True: # get element from queue root_folder, timestamp = self.__queue.get() # sleep if needed delay = int(timestamp + self.CLOSE_TIMER - time()) if delay > 0: sleep(delay) # lock and access __pipelines with self.__lock: instance = self.__pipelines[root_folder] # reference counter greater than 1 if instance[0] > 1: # decrease reference counter instance[0] -= 1 # reference counter equals 1 else: # delete instance from __pipelines and return to call `close` function del self.__pipelines[root_folder] return instance[1], instance[2] def __run(self): while True: # get pipeline pipeline, project = tpool.execute(self.__get) # create job to close pipeline self.__jobs.run(project, 'Model Interaction', f'{project.name} (close pipeline)', f'{project.name}/model-interaction', pipeline.close )