from queue import Queue from threading import Lock from time import time, sleep from eventlet import tpool, spawn_n from pycs.database.Project import Project from pycs.interfaces.Pipeline import Pipeline from pycs.jobs.JobRunner import JobRunner from pycs.util.PipelineUtil import load_from_root_folder class PipelineCache: """ Store initialized pipelines and call `close` after `CLOSE_TIMER` if they are not requested another time. """ CLOSE_TIMER = 120 def __init__(self, jobs: JobRunner): self.__jobs = jobs self.__pipelines = {} self.__queue = Queue() self.__lock = Lock() def start(self): """ starts the main worker method """ spawn_n(self.__run) @property def is_empty(self): """ checks whether the pipeline cache is empty """ return len(self.__pipelines) == 0 def shutdown(self): """ puts None in the queue to signal the worker to stop """ self.__queue.put(None) def load_from_root_folder(self, project_id: int, root_folder: str) -> Pipeline: """ load configuration.json and create an instance from the included code object :param project_id: associated project ID :param root_folder: path to model root folder :return: Pipeline instance """ # check if instance is cached with self.__lock: if root_folder in self.__pipelines: instance = self.__pipelines[root_folder] # increase reference counter instance[0] += 1 # return instance return instance[1] # load pipeline pipeline = load_from_root_folder(root_folder) # save instance to cache with self.__lock: self.__pipelines[root_folder] = [1, pipeline, project_id] # return return pipeline def free_instance(self, root_folder: str): """ Change an instance's status to unused and start the timer to call it's `close` function after `CLOSE_TIMER` seconds. The next call to `load_from_root_folder` in this interval will disable this timer. :param root_folder: path to model root folder """ # abort if no pipeline with this root folder is loaded with self.__lock: if root_folder not in self.__pipelines: return # start timeout timestamp = time() self.__queue.put((root_folder, timestamp)) def __get(self): while True: # get element from queue entry = self.__queue.get() if entry is None: # closing pipeline cache return None root_folder, timestamp = entry # sleep if needed delay = int(timestamp + self.CLOSE_TIMER - time()) if delay > 0: sleep(delay) # lock and access __pipelines with self.__lock: instance = self.__pipelines[root_folder] # reference counter greater than 1 if instance[0] > 1: # decrease reference counter instance[0] -= 1 # reference counter equals 1 else: # delete instance from __pipelines and return to call `close` function del self.__pipelines[root_folder] return instance[1], instance[2] def __run(self): while True: # get pipeline result = tpool.execute(self.__get) if result is None: return pipeline, project_id = result project = Project.query.get(project_id) # create job to close pipeline self.__jobs.run(project, 'Model Interaction', f'{project.name} (close pipeline)', f'{project.name}/model-interaction', pipeline.close )