123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- from queue import Queue
- from threading import Lock
- from time import time, sleep
- from eventlet import tpool, spawn_n
- from pycs.database.Project import Project
- from pycs.interfaces.Pipeline import Pipeline
- from pycs.jobs.JobRunner import JobRunner
- from pycs.util.PipelineUtil import load_from_root_folder
- class PipelineCache:
- CLOSE_TIMER = 120
- def __init__(self, jobs: JobRunner):
- self.__jobs = jobs
- self.__pipelines = {}
- self.__queue = Queue()
- self.__lock = Lock()
- spawn_n(self.__run)
- def load_from_root_folder(self, project: Project, root_folder: str) -> Pipeline:
- """
- load configuration.json and create an instance from the included code object
- :param project: associated project
- :param root_folder: path to model root folder
- :return: Pipeline instance
- """
- # check if instance is cached
- with self.__lock:
- if root_folder in self.__pipelines:
- instance = self.__pipelines[root_folder]
- # increase reference counter
- instance[0] += 1
- # return instance
- return instance[1]
- # load pipeline
- pipeline = load_from_root_folder(root_folder)
- # save instance to cache
- with self.__lock:
- self.__pipelines[root_folder] = [1, pipeline, project]
- # return
- return pipeline
- def free_instance(self, root_folder: str):
- """
- Change an instance's status to unused and start the timer to call it's `close` function
- after `CLOSE_TIMER` seconds. The next call to `load_from_root_folder` in this interval
- will disable this timer.
- :param root_folder: path to model root folder
- """
- # abort if no pipeline with this root folder is loaded
- with self.__lock:
- if root_folder not in self.__pipelines:
- return
- # start timeout
- timestamp = time()
- self.__queue.put((root_folder, timestamp))
- def __get(self):
- while True:
- # get element from queue
- root_folder, timestamp = self.__queue.get()
- # sleep if needed
- delay = int(timestamp + self.CLOSE_TIMER - time())
- if delay > 0:
- sleep(delay)
- # lock and access __pipelines
- with self.__lock:
- instance = self.__pipelines[root_folder]
- # reference counter greater than 1
- if instance[0] > 1:
- # decrease reference counter
- instance[0] -= 1
- # reference counter equals 1
- else:
- # delete instance from __pipelines and return to call `close` function
- del self.__pipelines[root_folder]
- return instance[1], instance[2]
- def __run(self):
- while True:
- # get pipeline
- pipeline, project = tpool.execute(self.__get)
- # create job to close pipeline
- self.__jobs.run(project,
- 'Model Interaction',
- f'{project.name} (close pipeline)',
- f'{project.name}/model-interaction',
- pipeline.close
- )
|