6
0
Prechádzať zdrojové kódy

reworked Pipeline cache a bit

Dimitri Korsch 4 rokov pred
rodič
commit
0896a6aef5
2 zmenil súbory, kde vykonal 71 pridanie a 46 odobranie
  1. 62 43
      pycs/util/PipelineCache.py
  2. 9 3
      pycs/util/green_worker.py

+ 62 - 43
pycs/util/PipelineCache.py

@@ -1,12 +1,15 @@
 import eventlet
+import datetime as dt
 
 from queue import Queue
 from threading import Lock
-from time import time, sleep
+from time import sleep
+# from time import time
 
-from eventlet import tpool, spawn_n
-from dataclasses import dataclass
 from collections import namedtuple
+from dataclasses import dataclass
+from eventlet import spawn_n
+from eventlet import tpool
 
 from pycs import app
 from pycs.database.Project import Project
@@ -18,7 +21,7 @@ from pycs.util.green_worker import GreenWorker
 
 @dataclass
 class PipelineEntry(object):
-    counter: int = 1
+    last_used: int = -1
     pipeline: Pipeline = None
     pipeline_name: str = None
     project_id: int = -1
@@ -26,25 +29,26 @@ class PipelineEntry(object):
     def __post_init__(self):
         if self.pipeline is not None:
             self.pipeline_name = self.pipeline.__class__.__name__
+        self.poke()
+
+    def poke(self):
+        self.last_used = dt.datetime.now()
 
     def __str__(self):
-        return f"<Pipeline '{self.pipeline_name}' for project #{self.project_id} (counter={self.counter})>"
+        return f"<Pipeline '{self.pipeline_name}' for project #{self.project_id} (last_used: {self.last_used})>"
 
 class PipelineCache(GreenWorker):
-    CLOSE_TIMER = 120
+    CLOSE_TIMER = dt.timedelta(seconds=120)
 
     def __init__(self, jobs: JobRunner):
         super().__init__()
         self.__jobs = jobs
 
         self.__pipelines: dict[PipelineEntry] = {}
-        # self.__queue = Queue()
         self.__lock = Lock()
 
-        # self.__greenpool = eventlet.GreenPool()
-        # spawn_n(self.__run)
 
-    def load_from_root_folder(self, project: Project, root_folder: str) -> Pipeline:
+    def load_from_root_folder(self, project: Project, root_folder: str, no_cache: bool = False) -> Pipeline:
         """
         load configuration.json and create an instance from the included code object
 
@@ -56,21 +60,22 @@ class PipelineCache(GreenWorker):
         with self.__lock:
             if root_folder in self.__pipelines:
                 entry: PipelineEntry = self.__pipelines[root_folder]
+                entry.poke()
 
-                # increase reference counter
-                entry.counter += 1
-                app.logger.info(f"Using {entry}")
+                app.logger.info(f"[{self.ident}] Using {entry}")
 
-                # return entry
                 return entry.pipeline
 
         # load pipeline
         pipeline = load_from_root_folder(root_folder)
 
+        if no_cache:
+            return pipeline
+
         # save instance to cache
         with self.__lock:
-            entry = PipelineEntry(counter=1, pipeline=pipeline, project_id=project.id)
-            app.logger.info(f"Cached {entry}")
+            entry = PipelineEntry(pipeline=pipeline, project_id=project.id)
+            app.logger.info(f"[{self.ident}] Cached {entry}")
             self.__pipelines[root_folder] = entry
 
         # return
@@ -89,52 +94,66 @@ class PipelineCache(GreenWorker):
             if root_folder not in self.__pipelines:
                 return
 
-        # start timeout
-        timestamp = time()
-        self.queue.put((root_folder, timestamp))
+        self.queue.put((root_folder,))
 
     # executed as coroutine in the main thread
-    def start_work(self, root_folder, timestamp):
+    def start_work(self, *args):
 
         # delegate to work method in a separate thread
-        pipeline, project_id = super().start_work(root_folder, timestamp)
+        res = super().start_work(*args)
 
-        project = Project.query.get(project_id)
+        if res is None:
+            # an error occured in the execution
+            return
+
+        pipeline, project_id = res
 
+        if pipeline is None:
+            # pipeline vanished
+            return
+
+        project = Project.query.get(project_id)
         # create job to close pipeline
         self.__jobs.run(project,
                         'Model Interaction',
                         f'{project.name} (close pipeline)',
                         f'{project.name}/model-interaction',
                         pipeline.close
-                        )
+                    )
 
     # executed in a separate thread
-    def work(self, root_folder, timestamp):
+    def work(self, root_folder):
+        with self.__lock:
+            entry = self.__pipelines.get(root_folder)
+
+        if entry is None:
+            app.logger.info(f"[{self.ident}] Entry for {root_folder} already gone")
+            return
+
+        app.logger.info(f"[{self.ident}] Starting checks for {entry}...")
         while True:
-            # sleep if needed
-            delay = int(timestamp + self.CLOSE_TIMER - time())
+            now = dt.datetime.now()
+
+            with self.__lock:
+                entry = self.__pipelines.get(root_folder)
+
+                if entry is None:
+                    app.logger.info(f"[{self.ident}] Entry for {root_folder} already gone")
+                    return None, None
 
-            if delay > 0:
-                sleep(delay)
+                delay = entry.last_used + self.CLOSE_TIMER - now
+
+            if delay.seconds > 0:
+                sleep(delay.seconds)
+                continue
 
-            # lock and access __pipelines
             with self.__lock:
-                entry: PipelineEntry = self.__pipelines[root_folder]
+                entry = self.__pipelines.pop(root_folder, None)
+                app.logger.info(f"[{self.ident}] Entry for {root_folder} already gone")
+                return None, None
 
-                # reference counter greater than 1
-                if entry.counter > 1:
-                    # decrease reference counter
-                    entry.counter -= 1
-                    app.logger.info(f"Decreased counter of {entry}.")
-                    continue
-
-                # reference counter equals 1
-                else:
-                    # delete instance from __pipelines and return to call `close` function
-                    del self.__pipelines[root_folder]
-                    app.logger.info(f"Removed {entry} from cache.")
-                    return entry.pipeline, entry.project_id
+            app.logger.info(f"[{self.ident}] Removed {entry} from cache")
+            return entry.pipeline, entry.project_id
 
     # def __get(self):
     #     while True:

+ 9 - 3
pycs/util/green_worker.py

@@ -75,9 +75,15 @@ class GreenWorker(abc.ABC):
 
     def start_work(self, *args):
         app.logger.info(f"[{self.__class__.__name__} - {self.ident}] starting work")
-        res = tpool.execute(self.work, *args)
-        app.logger.info(f"[{self.__class__.__name__} - {self.ident}] work finished")
-        return res
+
+        try:
+            return tpool.execute(self.work, *args)
+
+        except Exception as e:
+            app.logger.error(f"[{self.__class__.__name__} - {self.ident}] error occurred: {e}")
+
+        finally:
+            app.logger.info(f"[{self.__class__.__name__} - {self.ident}] work finished")
 
     @property
     def ident(self):