import eventlet import datetime as dt from queue import Queue from threading import Lock from time import sleep # from time import time from collections import namedtuple from dataclasses import dataclass from eventlet import spawn_n from eventlet import tpool from pycs import app from pycs.database.Project import Project from pycs.interfaces.Pipeline import Pipeline from pycs.jobs.JobRunner import JobRunner from pycs.util.PipelineUtil import load_from_root_folder from pycs.util.green_worker import GreenWorker @dataclass class PipelineEntry(object): last_used: int = -1 pipeline: Pipeline = None pipeline_name: str = None project_id: int = -1 def __post_init__(self): if self.pipeline is not None: self.pipeline_name = self.pipeline.__class__.__name__ self.poke() def poke(self): self.last_used = dt.datetime.now() def __str__(self): return f"" class PipelineCache(GreenWorker): CLOSE_TIMER = dt.timedelta(seconds=120) def __init__(self, jobs: JobRunner): super().__init__() self.__jobs = jobs self.__pipelines: dict[PipelineEntry] = {} self.__lock = Lock() def load_from_root_folder(self, project: Project, root_folder: str, no_cache: bool = False) -> Pipeline: """ load configuration.json and create an instance from the included code object :param projeventletect: associated project :param root_folder: path to model root folder :return: Pipeline instance """ # check if instance is cached with self.__lock: if root_folder in self.__pipelines: entry: PipelineEntry = self.__pipelines[root_folder] entry.poke() app.logger.info(f"[{self.ident}] Using {entry}") return entry.pipeline # load pipeline pipeline = load_from_root_folder(root_folder) if no_cache: return pipeline # save instance to cache with self.__lock: entry = PipelineEntry(pipeline=pipeline, project_id=project.id) app.logger.info(f"[{self.ident}] Cached {entry}") self.__pipelines[root_folder] = entry # return return pipeline def free_instance(self, root_folder: str): """ Change an instance's status to unused and start the timer to call it's `close` function after `CLOSE_TIMER` seconds. The next call to `load_from_root_folder` in this interval will disable this timer. :param root_folder: path to model root folder """ # abort if no pipeline with this root folder is loaded with self.__lock: if root_folder not in self.__pipelines: return self.queue.put((root_folder,)) # executed as coroutine in the main thread def start_work(self, *args): # delegate to work method in a separate thread res = super().start_work(*args) if res is None: # an error occured in the execution return pipeline, project_id = res if pipeline is None: # pipeline vanished return project = Project.query.get(project_id) # create job to close pipeline self.__jobs.run(project, 'Model Interaction', f'{project.name} (close pipeline)', f'{project.name}/model-interaction', pipeline.close ) # executed in a separate thread def work(self, root_folder): with self.__lock: entry = self.__pipelines.get(root_folder) if entry is None: app.logger.info(f"[{self.ident}] Entry for {root_folder} already gone") return app.logger.info(f"[{self.ident}] Starting checks for {entry}...") while True: now = dt.datetime.now() with self.__lock: entry = self.__pipelines.get(root_folder) if entry is None: app.logger.info(f"[{self.ident}] Entry for {root_folder} already gone") return None, None delay = entry.last_used + self.CLOSE_TIMER - now if delay.seconds > 0: sleep(delay.seconds) continue with self.__lock: entry = self.__pipelines.pop(root_folder, None) app.logger.info(f"[{self.ident}] Entry for {root_folder} already gone") return None, None app.logger.info(f"[{self.ident}] Removed {entry} from cache") return entry.pipeline, entry.project_id # def __get(self): # while True: # # get element from queue # root_folder, timestamp = self.__queue.get() # # sleep if needed # delay = int(timestamp + self.CLOSE_TIMER - time()) # if delay > 0: # eventlet.sleep(delay) # # lock and access __pipelines # with self.__lock: # instance = self.__pipelines[root_folder] # # reference counter greater than 1 # if instance.counter > 1: # # decrease reference counter # instance.counter -= 1 # # reference counter equals 1 # else: # # delete instance from __pipelines and return to call `close` function # del self.__pipelines[root_folder] # return instance.pipeline, instance.project_id # def __run(self): # while True: # # get pipeline # pipeline, project_id = tpool.execute(self.__get) # project = Project.query.get(project_id) # # create job to close pipeline # self.__jobs.run(project, # 'Model Interaction', # f'{project.name} (close pipeline)', # f'{project.name}/model-interaction', # pipeline.close # )