123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- import queue
- import warnings
- from threading import Lock
- from time import sleep
- from time import time
- import eventlet
- from eventlet import spawn_n
- from eventlet import tpool
- from pycs import app
- from pycs.database.Project import Project
- from pycs.interfaces.Pipeline import Pipeline
- from pycs.jobs.JobRunner import JobRunner
- from pycs.util.PipelineUtil import load_from_root_folder
- class PipelineCache:
- """
- Store initialized pipelines and call `close` after `CLOSE_TIMER` if they are not requested
- another time.
- """
- CLOSE_TIMER = 120
- def __init__(self, jobs: JobRunner, cache_time: float = None):
- self.__jobs = jobs
- self.__pipelines = {}
- self.__is_running = False
- self.__queue = queue.Queue()
- self.__lock = Lock()
- self._cache_time = cache_time or self.CLOSE_TIMER
- msg = ("Initialized Pipeline cache "
- f"(pipelines are closed after {self._cache_time:.3f} sec)")
- app.logger.info(msg)
- def start(self):
- """ starts the main worker method """
- if self.__is_running:
- warnings.warn("Pipeline cache is already started")
- return
- spawn_n(self.__run)
- @property
- def is_empty(self):
- """ checks whether the pipeline cache is empty """
- return len(self.__pipelines) == 0 and self.__queue.empty()
- def shutdown(self):
- """ puts None in the queue to signal the worker to stop """
- self.__queue.put(None)
- def load_from_root_folder(self, project_id: int, root_folder: str) -> Pipeline:
- """
- load configuration.json and create an instance from the included code object
- :param project_id: associated project ID
- :param root_folder: path to model root folder
- :return: Pipeline instance
- """
- # check if instance is cached
- with self.__lock:
- if root_folder in self.__pipelines:
- instance = self.__pipelines[root_folder]
- # increase reference counter
- instance[0] += 1
- # return instance
- return instance[1]
- # load pipeline
- pipeline = load_from_root_folder(root_folder)
- # save instance to cache
- with self.__lock:
- if not self.__is_running:
- warnings.warn("[save instance] pipeline cache was not started yet!")
- self.__pipelines[root_folder] = [1, pipeline, project_id]
- # return
- return pipeline
- def free_instance(self, root_folder: str):
- """
- Change an instance's status to unused and start the timer to call it's `close` function
- after `_cache_time` seconds. The next call to `load_from_root_folder` in this interval
- will disable this timer.
- :param root_folder: path to model root folder
- """
- # abort if no pipeline with this root folder is loaded
- with self.__lock:
- if root_folder not in self.__pipelines:
- return
- # start timeout
- timestamp = time()
- self.__queue.put((root_folder, timestamp))
- if not self.__is_running:
- warnings.warn("[free instance] pipeline cache was not started yet!")
- def __get(self):
- while True:
- # get element from queue
- while True:
- try:
- entry = self.__queue.get(block=False)
- break
- except queue.Empty:
- eventlet.sleep(0.2)
- if entry is None:
- # closing pipeline cache
- return None
- root_folder, timestamp = entry
- # sleep if needed
- delay = int(timestamp + self._cache_time - time())
- if delay > 0:
- app.logger.info(f"Cache sleeps for {delay:.3f} sec")
- sleep(delay)
- # lock and access __pipelines
- with self.__lock:
- app.logger.info("Removing pipeline from cache")
- instance = self.__pipelines[root_folder]
- # reference counter greater than 1
- if instance[0] > 1:
- # decrease reference counter
- instance[0] -= 1
- # reference counter equals 1
- else:
- # delete instance from __pipelines and return to call `close` function
- del self.__pipelines[root_folder]
- return instance[1], instance[2]
- def __run(self):
- while True:
- self.__is_running = True
- # get pipeline
- result = tpool.execute(self.__get)
- if result is None:
- self.__is_running = False
- return
- pipeline, project_id = result
- project = Project.query.get(project_id)
- # create job to close pipeline
- self.__jobs.run(project,
- 'Model Interaction',
- f'{project.name} (close pipeline)',
- f'{project.name}/model-interaction',
- pipeline.close
- )
|