123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- import eventlet
- import datetime as dt
- from queue import Queue
- from threading import Lock
- from time import sleep
- # from time import time
- from collections import namedtuple
- from dataclasses import dataclass
- from eventlet import spawn_n
- from eventlet import tpool
- from pycs import app
- from pycs.database.Project import Project
- from pycs.interfaces.Pipeline import Pipeline
- from pycs.jobs.JobRunner import JobRunner
- from pycs.util.PipelineUtil import load_from_root_folder
- from pycs.util import GreenWorker
- @dataclass
- class PipelineEntry(object):
- last_used: int = -1
- pipeline: Pipeline = None
- pipeline_name: str = None
- project_id: int = -1
- def __post_init__(self):
- if self.pipeline is not None:
- self.pipeline_name = self.pipeline.__class__.__name__
- self.poke()
- def poke(self):
- self.last_used = dt.datetime.now()
- def __str__(self):
- return f"<Pipeline '{self.pipeline_name}' for project #{self.project_id} (last_used: {self.last_used})>"
- class PipelineCache(GreenWorker):
- """
- Store initialized pipelines and call `close` after `CLOSE_TIMER` if they are not requested
- another time.
- """
- CLOSE_TIMER = dt.timedelta(seconds=120)
- def __init__(self):
- super().__init__()
- self.__pipelines: dict[PipelineEntry] = {}
- self.__lock = Lock()
- def load_from_root_folder(self, project: Project, no_cache: bool = False) -> Pipeline:
- """
- load configuration.json and create an instance from the included code object
- :param project: associated project
- :return: Pipeline instance
- """
- root_folder = project.model.root_folder
- # check if instance is cached
- with self.__lock:
- if root_folder in self.__pipelines:
- entry: PipelineEntry = self.__pipelines[root_folder]
- entry.poke()
- self.info(f"Using {entry}")
- return entry.pipeline
- # load pipeline
- pipeline = load_from_root_folder(root_folder)
- if no_cache:
- return pipeline
- # save instance to cache
- with self.__lock:
- entry = PipelineEntry(pipeline=pipeline, project_id=project.id)
- self.info(f"Cached {entry}")
- self.__pipelines[root_folder] = entry
- self.queue.put((root_folder,))
- # return
- return pipeline
- def free_instance(self, project: Project):
- """
- Change an instance's status to unused and start the timer to call it's `close` function
- after `CLOSE_TIMER` seconds. The next call to `load_from_root_folder` in this interval
- will disable this timer.
- :param project: associated project
- """
- root_folder = project.model.root_folder
- with self.__lock:
- if root_folder in self.__pipelines:
- # reset "last used" to now
- self.__pipelines[root_folder].poke()
- # abort if no pipeline with this root folder is loaded
- else:
- return
- # executed as coroutine in the main thread
- def __run__(self):
- while True:
- # get pipeline
- res = tpool.execute(self.work)
- if res is self.STOP_QUEUE:
- break
- pipeline, project_id = res
- if pipeline is None:
- # pipeline vanished from cache
- continue
- project = Project.query.get(project_id)
- if project is None:
- # project does not exist anymore
- continue
- # create job to close pipeline
- JobRunner.run(project,
- 'Model Interaction',
- f'{project.name} (close pipeline)',
- f'{project.name}/model-interaction',
- pipeline.close
- )
- self._finish()
- # executed in a separate thread
- def work(self):
- while True:
- res = self.check_queue()
- if res is self.STOP_QUEUE:
- return res
- elif res is self.CONTINUE_QUEUE:
- continue
- # an entry was found in the queue
- return self._check_cache_entry(*res)
- def _check_cache_entry(self, key):
- with self.__lock:
- entry = self.__pipelines.get(key)
- if entry is None:
- self.info(f"Entry for {key} already gone")
- return None, None
- self.info(f"Starting checks for {entry}...")
- while True:
- now = dt.datetime.now()
- with self.__lock:
- entry = self.__pipelines.get(key)
- if entry is None:
- self.info(f"Entry for {key} already gone")
- return None, None
- delay = entry.last_used + self.CLOSE_TIMER - now
- if delay.seconds > 0:
- sleep(delay.seconds)
- continue
- with self.__lock:
- entry = self.__pipelines.pop(key, None)
- if entry is None:
- self.info(f"Entry for {key} already gone")
- return None, None
- self.info(f"Removed {entry} from cache")
- return entry.pipeline, entry.project_id
- # def __get(self):
- # while True:
- # # get element from queue
- # root_folder, timestamp = self.__queue.get()
- # # sleep if needed
- # delay = int(timestamp + self.CLOSE_TIMER - time())
- # if delay > 0:
- # eventlet.sleep(delay)
- # # lock and access __pipelines
- # with self.__lock:
- # instance = self.__pipelines[root_folder]
- # # reference counter greater than 1
- # if instance.counter > 1:
- # # decrease reference counter
- # instance.counter -= 1
- # # reference counter equals 1
- # else:
- # # delete instance from __pipelines and return to call `close` function
- # del self.__pipelines[root_folder]
- # return instance.pipeline, instance.project_id
- # def __run(self):
- # while True:
- # # get pipeline
- # pipeline, project_id = tpool.execute(self.__get)
- # project = Project.query.get(project_id)
- # # create job to close pipeline
- # self.__jobs.run(project,
- # 'Model Interaction',
- # f'{project.name} (close pipeline)',
- # f'{project.name}/model-interaction',
- # pipeline.close
- # )
|