import eventlet import datetime as dt from queue import Queue from threading import Lock from time import sleep # from time import time from collections import namedtuple from dataclasses import dataclass from eventlet import spawn_n from eventlet import tpool from pycs import app from pycs.database.Project import Project from pycs.interfaces.Pipeline import Pipeline from pycs.jobs.JobRunner import JobRunner from pycs.util.PipelineUtil import load_from_root_folder from pycs.util import GreenWorker @dataclass class PipelineEntry(object): last_used: int = -1 pipeline: Pipeline = None pipeline_name: str = None project_id: int = -1 def __post_init__(self): if self.pipeline is not None: self.pipeline_name = self.pipeline.__class__.__name__ self.poke() def poke(self): self.last_used = dt.datetime.now() def __str__(self): return f"" class PipelineCache(GreenWorker): """ Store initialized pipelines and call `close` after `CLOSE_TIMER` if they are not requested another time. """ CLOSE_TIMER = dt.timedelta(seconds=120) def __init__(self): super().__init__() self.__pipelines: dict[PipelineEntry] = {} self.__lock = Lock() def load_from_root_folder(self, project: Project, no_cache: bool = False) -> Pipeline: """ load configuration.json and create an instance from the included code object :param project: associated project :return: Pipeline instance """ root_folder = project.model.root_folder # check if instance is cached with self.__lock: if root_folder in self.__pipelines: entry: PipelineEntry = self.__pipelines[root_folder] entry.poke() self.info(f"Using {entry}") return entry.pipeline # load pipeline pipeline = load_from_root_folder(root_folder) if no_cache: return pipeline # save instance to cache with self.__lock: entry = PipelineEntry(pipeline=pipeline, project_id=project.id) self.info(f"Cached {entry}") self.__pipelines[root_folder] = entry self.queue.put((root_folder,)) # return return pipeline def free_instance(self, project: Project): """ Change an instance's status to unused and start the timer to call it's `close` function after `CLOSE_TIMER` seconds. The next call to `load_from_root_folder` in this interval will disable this timer. :param project: associated project """ root_folder = project.model.root_folder with self.__lock: if root_folder in self.__pipelines: # reset "last used" to now self.__pipelines[root_folder].poke() # abort if no pipeline with this root folder is loaded else: return # executed as coroutine in the main thread def __run__(self): while True: # get pipeline res = tpool.execute(self.work) if res is self.STOP_QUEUE: break pipeline, project_id = res if pipeline is None: # pipeline vanished from cache continue project = Project.query.get(project_id) if project is None: # project does not exist anymore continue # create job to close pipeline JobRunner.run(project, 'Model Interaction', f'{project.name} (close pipeline)', f'{project.name}/model-interaction', pipeline.close ) self._finish() # executed in a separate thread def work(self): while True: res = self.check_queue() if res is self.STOP_QUEUE: return res elif res is self.CONTINUE_QUEUE: continue # an entry was found in the queue return self._check_cache_entry(*res) def _check_cache_entry(self, key): with self.__lock: entry = self.__pipelines.get(key) if entry is None: self.info(f"Entry for {key} already gone") return None, None self.info(f"Starting checks for {entry}...") while True: now = dt.datetime.now() with self.__lock: entry = self.__pipelines.get(key) if entry is None: self.info(f"Entry for {key} already gone") return None, None delay = entry.last_used + self.CLOSE_TIMER - now if delay.seconds > 0: sleep(delay.seconds) continue with self.__lock: entry = self.__pipelines.pop(key, None) if entry is None: self.info(f"Entry for {key} already gone") return None, None self.info(f"Removed {entry} from cache") return entry.pipeline, entry.project_id # def __get(self): # while True: # # get element from queue # root_folder, timestamp = self.__queue.get() # # sleep if needed # delay = int(timestamp + self.CLOSE_TIMER - time()) # if delay > 0: # eventlet.sleep(delay) # # lock and access __pipelines # with self.__lock: # instance = self.__pipelines[root_folder] # # reference counter greater than 1 # if instance.counter > 1: # # decrease reference counter # instance.counter -= 1 # # reference counter equals 1 # else: # # delete instance from __pipelines and return to call `close` function # del self.__pipelines[root_folder] # return instance.pipeline, instance.project_id # def __run(self): # while True: # # get pipeline # pipeline, project_id = tpool.execute(self.__get) # project = Project.query.get(project_id) # # create job to close pipeline # self.__jobs.run(project, # 'Model Interaction', # f'{project.name} (close pipeline)', # f'{project.name}/model-interaction', # pipeline.close # )