import queue import warnings from threading import Lock from time import sleep from time import time import eventlet from eventlet import spawn_n from eventlet import tpool from pycs import app from pycs.database.Project import Project from pycs.interfaces.Pipeline import Pipeline from pycs.jobs.JobRunner import JobRunner from pycs.util.PipelineUtil import load_from_root_folder class PipelineCache: """ Store initialized pipelines and call `close` after `CLOSE_TIMER` if they are not requested another time. """ CLOSE_TIMER = 120 def __init__(self, jobs: JobRunner, cache_time: float = None): self.__jobs = jobs self.__pipelines = {} self.__is_running = False self.__queue = queue.Queue() self.__lock = Lock() self._cache_time = cache_time or self.CLOSE_TIMER msg = ("Initialized Pipeline cache " f"(pipelines are closed after {self._cache_time:.3f} sec)") app.logger.info(msg) def start(self): """ starts the main worker method """ if self.__is_running: warnings.warn("Pipeline cache is already started") return spawn_n(self.__run) @property def is_empty(self): """ checks whether the pipeline cache is empty """ return len(self.__pipelines) == 0 and self.__queue.empty() def shutdown(self): """ puts None in the queue to signal the worker to stop """ self.__queue.put(None) def load_from_root_folder(self, project_id: int, root_folder: str) -> Pipeline: """ load configuration.json and create an instance from the included code object :param project_id: associated project ID :param root_folder: path to model root folder :return: Pipeline instance """ # check if instance is cached with self.__lock: if root_folder in self.__pipelines: instance = self.__pipelines[root_folder] # increase reference counter instance[0] += 1 # return instance return instance[1] # load pipeline pipeline = load_from_root_folder(root_folder) # save instance to cache with self.__lock: if not self.__is_running: warnings.warn("[save instance] pipeline cache was not started yet!") self.__pipelines[root_folder] = [1, pipeline, project_id] # return return pipeline def free_instance(self, root_folder: str): """ Change an instance's status to unused and start the timer to call it's `close` function after `_cache_time` seconds. The next call to `load_from_root_folder` in this interval will disable this timer. :param root_folder: path to model root folder """ # abort if no pipeline with this root folder is loaded with self.__lock: if root_folder not in self.__pipelines: return # start timeout timestamp = time() self.__queue.put((root_folder, timestamp)) if not self.__is_running: warnings.warn("[free instance] pipeline cache was not started yet!") def __get(self): while True: # get element from queue while True: try: entry = self.__queue.get(block=False) break except queue.Empty: eventlet.sleep(0.2) if entry is None: # closing pipeline cache return None root_folder, timestamp = entry # sleep if needed delay = int(timestamp + self._cache_time - time()) if delay > 0: app.logger.info(f"Cache sleeps for {delay:.3f} sec") sleep(delay) # lock and access __pipelines with self.__lock: app.logger.info("Removing pipeline from cache") instance = self.__pipelines[root_folder] # reference counter greater than 1 if instance[0] > 1: # decrease reference counter instance[0] -= 1 # reference counter equals 1 else: # delete instance from __pipelines and return to call `close` function del self.__pipelines[root_folder] return instance[1], instance[2] def __run(self): while True: self.__is_running = True # get pipeline result = tpool.execute(self.__get) if result is None: self.__is_running = False return pipeline, project_id = result project = Project.query.get(project_id) # create job to close pipeline self.__jobs.run(project, 'Model Interaction', f'{project.name} (close pipeline)', f'{project.name}/model-interaction', pipeline.close )