123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- import eventlet
- import datetime as dt
- from queue import Queue
- from threading import Lock
- from time import sleep
- # from time import time
- from collections import namedtuple
- from dataclasses import dataclass
- from eventlet import spawn_n
- from eventlet import tpool
- from pycs import app
- from pycs.database.Project import Project
- from pycs.interfaces.Pipeline import Pipeline
- from pycs.jobs.JobRunner import JobRunner
- from pycs.util.PipelineUtil import load_from_root_folder
- from pycs.util.green_worker import GreenWorker
- @dataclass
- class PipelineEntry(object):
- last_used: int = -1
- pipeline: Pipeline = None
- pipeline_name: str = None
- project_id: int = -1
- def __post_init__(self):
- if self.pipeline is not None:
- self.pipeline_name = self.pipeline.__class__.__name__
- self.poke()
- def poke(self):
- self.last_used = dt.datetime.now()
- def __str__(self):
- return f"<Pipeline '{self.pipeline_name}' for project #{self.project_id} (last_used: {self.last_used})>"
- class PipelineCache(GreenWorker):
- CLOSE_TIMER = dt.timedelta(seconds=120)
- def __init__(self, jobs: JobRunner):
- super().__init__()
- self.__jobs = jobs
- self.__pipelines: dict[PipelineEntry] = {}
- self.__lock = Lock()
- def load_from_root_folder(self, project: Project, root_folder: str, no_cache: bool = False) -> Pipeline:
- """
- load configuration.json and create an instance from the included code object
- :param projeventletect: associated project
- :param root_folder: path to model root folder
- :return: Pipeline instance
- """
- # check if instance is cached
- with self.__lock:
- if root_folder in self.__pipelines:
- entry: PipelineEntry = self.__pipelines[root_folder]
- entry.poke()
- app.logger.info(f"[{self.ident}] Using {entry}")
- return entry.pipeline
- # load pipeline
- pipeline = load_from_root_folder(root_folder)
- if no_cache:
- return pipeline
- # save instance to cache
- with self.__lock:
- entry = PipelineEntry(pipeline=pipeline, project_id=project.id)
- app.logger.info(f"[{self.ident}] Cached {entry}")
- self.__pipelines[root_folder] = entry
- # return
- return pipeline
- def free_instance(self, root_folder: str):
- """
- Change an instance's status to unused and start the timer to call it's `close` function
- after `CLOSE_TIMER` seconds. The next call to `load_from_root_folder` in this interval
- will disable this timer.
- :param root_folder: path to model root folder
- """
- # abort if no pipeline with this root folder is loaded
- with self.__lock:
- if root_folder not in self.__pipelines:
- return
- self.queue.put((root_folder,))
- # executed as coroutine in the main thread
- def start_work(self, *args):
- # delegate to work method in a separate thread
- res = super().start_work(*args)
- if res is None:
- # an error occured in the execution
- return
- pipeline, project_id = res
- if pipeline is None:
- # pipeline vanished
- return
- project = Project.query.get(project_id)
- # create job to close pipeline
- self.__jobs.run(project,
- 'Model Interaction',
- f'{project.name} (close pipeline)',
- f'{project.name}/model-interaction',
- pipeline.close
- )
- # executed in a separate thread
- def work(self, root_folder):
- with self.__lock:
- entry = self.__pipelines.get(root_folder)
- if entry is None:
- app.logger.info(f"[{self.ident}] Entry for {root_folder} already gone")
- return
- app.logger.info(f"[{self.ident}] Starting checks for {entry}...")
- while True:
- now = dt.datetime.now()
- with self.__lock:
- entry = self.__pipelines.get(root_folder)
- if entry is None:
- app.logger.info(f"[{self.ident}] Entry for {root_folder} already gone")
- return None, None
- delay = entry.last_used + self.CLOSE_TIMER - now
- if delay.seconds > 0:
- sleep(delay.seconds)
- continue
- with self.__lock:
- entry = self.__pipelines.pop(root_folder, None)
- app.logger.info(f"[{self.ident}] Entry for {root_folder} already gone")
- return None, None
- app.logger.info(f"[{self.ident}] Removed {entry} from cache")
- return entry.pipeline, entry.project_id
- # def __get(self):
- # while True:
- # # get element from queue
- # root_folder, timestamp = self.__queue.get()
- # # sleep if needed
- # delay = int(timestamp + self.CLOSE_TIMER - time())
- # if delay > 0:
- # eventlet.sleep(delay)
- # # lock and access __pipelines
- # with self.__lock:
- # instance = self.__pipelines[root_folder]
- # # reference counter greater than 1
- # if instance.counter > 1:
- # # decrease reference counter
- # instance.counter -= 1
- # # reference counter equals 1
- # else:
- # # delete instance from __pipelines and return to call `close` function
- # del self.__pipelines[root_folder]
- # return instance.pipeline, instance.project_id
- # def __run(self):
- # while True:
- # # get pipeline
- # pipeline, project_id = tpool.execute(self.__get)
- # project = Project.query.get(project_id)
- # # create job to close pipeline
- # self.__jobs.run(project,
- # 'Model Interaction',
- # f'{project.name} (close pipeline)',
- # f'{project.name}/model-interaction',
- # pipeline.close
- # )
|