瀏覽代碼

Resolve "pipeline cache"

Eric Tröbs 3 年之前
父節點
當前提交
76d7090deb

+ 6 - 1
app.py

@@ -6,6 +6,7 @@ from os import mkdir, path
 from pycs.database.Database import Database
 from pycs.frontend.WebServer import WebServer
 from pycs.jobs.JobRunner import JobRunner
+from pycs.util.PipelineCache import PipelineCache
 
 if __name__ == '__main__':
     # load settings
@@ -25,6 +26,10 @@ if __name__ == '__main__':
     print('- start job runner')
     jobs = JobRunner()
 
+    # create pipeline cache
+    print('- create pipeline cache')
+    pipelines = PipelineCache(jobs)
+
     # start web server
     print('- start web server')
-    web_server = WebServer(settings, database, jobs)
+    web_server = WebServer(settings, database, jobs, pipelines)

+ 12 - 7
pycs/frontend/WebServer.py

@@ -46,6 +46,7 @@ from pycs.frontend.endpoints.results.ResetResults import ResetResults
 from pycs.frontend.notifications.NotificationManager import NotificationManager
 from pycs.frontend.util.JSONEncoder import JSONEncoder
 from pycs.jobs.JobRunner import JobRunner
+from pycs.util.PipelineCache import PipelineCache
 
 
 class WebServer:
@@ -54,7 +55,7 @@ class WebServer:
     """
 
     # pylint: disable=line-too-long
-    def __init__(self, settings: dict, database: Database, jobs: JobRunner):
+    def __init__(self, settings: dict, database: Database, jobs: JobRunner, pipelines: PipelineCache):
         # initialize web server
         if exists('webui/index.html'):
             print('production build')
@@ -247,11 +248,13 @@ class WebServer:
         )
         self.__flask.add_url_rule(
             '/projects/<int:identifier>/label_provider',
-            view_func=ExecuteLabelProvider.as_view('execute_label_provider', database, notifications, jobs)
+            view_func=ExecuteLabelProvider.as_view('execute_label_provider', database,
+                                                   notifications, jobs)
         )
         self.__flask.add_url_rule(
             '/projects/<int:identifier>/external_storage',
-            view_func=ExecuteExternalStorage.as_view('execute_external_storage', database, notifications, jobs)
+            view_func=ExecuteExternalStorage.as_view('execute_external_storage', database,
+                                                     notifications, jobs)
         )
         self.__flask.add_url_rule(
             '/projects/<int:identifier>/remove',
@@ -263,21 +266,23 @@ class WebServer:
         )
         self.__flask.add_url_rule(
             '/projects/<int:identifier>/description',
-            view_func=EditProjectDescription.as_view('edit_project_description', database, notifications)
+            view_func=EditProjectDescription.as_view('edit_project_description', database,
+                                                     notifications)
         )
 
         # pipelines
         self.__flask.add_url_rule(
             '/projects/<int:project_id>/pipelines/fit',
-            view_func=FitModel.as_view('fit_model', database, jobs)
+            view_func=FitModel.as_view('fit_model', database, jobs, pipelines)
         )
         self.__flask.add_url_rule(
             '/projects/<int:project_id>/pipelines/predict',
-            view_func=PredictModel.as_view('predict_model', database, notifications, jobs)
+            view_func=PredictModel.as_view('predict_model', database, notifications, jobs,
+                                           pipelines)
         )
         self.__flask.add_url_rule(
             '/data/<int:file_id>/predict',
-            view_func=PredictFile.as_view('predict_file', database, notifications, jobs)
+            view_func=PredictFile.as_view('predict_file', database, notifications, jobs, pipelines)
         )
 
         # finally start web server

+ 6 - 5
pycs/frontend/endpoints/pipelines/FitModel.py

@@ -5,7 +5,7 @@ from pycs.database.Database import Database
 from pycs.interfaces.MediaStorage import MediaStorage
 from pycs.jobs.JobGroupBusyException import JobGroupBusyException
 from pycs.jobs.JobRunner import JobRunner
-from pycs.util.PipelineUtil import load_from_root_folder as load_pipeline
+from pycs.util.PipelineCache import PipelineCache
 
 
 class FitModel(View):
@@ -15,10 +15,11 @@ class FitModel(View):
     # pylint: disable=arguments-differ
     methods = ['POST']
 
-    def __init__(self, db: Database, jobs: JobRunner):
+    def __init__(self, db: Database, jobs: JobRunner, pipelines: PipelineCache):
         # pylint: disable=invalid-name
         self.db = db
         self.jobs = jobs
+        self.pipelines = pipelines
 
     def dispatch_request(self, project_id):
         # extract request data
@@ -45,7 +46,7 @@ class FitModel(View):
         return make_response()
 
     @staticmethod
-    def load_and_fit(database: Database, project_id: int):
+    def load_and_fit(database: Database, pipelines: PipelineCache, project_id: int):
         db = None
         pipeline = None
 
@@ -58,13 +59,13 @@ class FitModel(View):
 
             # load pipeline
             try:
-                pipeline = load_pipeline(model.root_folder)
+                pipeline = pipelines.load_from_root_folder(project, model.root_folder)
                 yield from pipeline.fit(storage)
             except TypeError:
                 pass
             finally:
                 if pipeline is not None:
-                    pipeline.close()
+                    pipelines.free_instance(model.root_folder)
         finally:
             if db is not None:
                 db.close()

+ 5 - 2
pycs/frontend/endpoints/pipelines/PredictFile.py

@@ -7,6 +7,7 @@ from pycs.frontend.notifications.NotificationList import NotificationList
 from pycs.frontend.notifications.NotificationManager import NotificationManager
 from pycs.jobs.JobGroupBusyException import JobGroupBusyException
 from pycs.jobs.JobRunner import JobRunner
+from pycs.util.PipelineCache import PipelineCache
 
 
 class PredictFile(View):
@@ -16,11 +17,13 @@ class PredictFile(View):
     # pylint: disable=arguments-differ
     methods = ['POST']
 
-    def __init__(self, db: Database, nm: NotificationManager, jobs: JobRunner):
+    def __init__(self,
+                 db: Database, nm: NotificationManager, jobs: JobRunner, pipelines: PipelineCache):
         # pylint: disable=invalid-name
         self.db = db
         self.nm = nm
         self.jobs = jobs
+        self.pipelines = pipelines
 
     def dispatch_request(self, file_id):
         # extract request data
@@ -46,7 +49,7 @@ class PredictFile(View):
                           f'{project.name} (create predictions)',
                           f'{project.name}/model-interaction',
                           Predict.load_and_predict,
-                          self.db, project.identifier, [file], notifications,
+                          self.db, self.pipelines, notifications, project.identifier, [file],
                           progress=Predict.progress)
         except JobGroupBusyException:
             return abort(400)

+ 10 - 7
pycs/frontend/endpoints/pipelines/PredictModel.py

@@ -10,7 +10,7 @@ from pycs.interfaces.MediaFile import MediaFile
 from pycs.interfaces.MediaStorage import MediaStorage
 from pycs.jobs.JobGroupBusyException import JobGroupBusyException
 from pycs.jobs.JobRunner import JobRunner
-from pycs.util.PipelineUtil import load_from_root_folder as load_pipeline
+from pycs.util.PipelineCache import PipelineCache
 
 
 class PredictModel(View):
@@ -20,11 +20,13 @@ class PredictModel(View):
     # pylint: disable=arguments-differ
     methods = ['POST']
 
-    def __init__(self, db: Database, nm: NotificationManager, jobs: JobRunner):
+    def __init__(self,
+                 db: Database, nm: NotificationManager, jobs: JobRunner, pipelines: PipelineCache):
         # pylint: disable=invalid-name
         self.db = db
         self.nm = nm
         self.jobs = jobs
+        self.pipelines = pipelines
 
     def dispatch_request(self, project_id):
         # extract request data
@@ -47,7 +49,8 @@ class PredictModel(View):
                           f'{project.name} (create predictions)',
                           f'{project.name}/model-interaction',
                           self.load_and_predict,
-                          self.db, project.identifier, data['predict'], notifications,
+                          self.db, self.pipelines, notifications,
+                          project.identifier, data['predict'],
                           progress=self.progress)
         except JobGroupBusyException:
             return abort(400)
@@ -55,8 +58,8 @@ class PredictModel(View):
         return make_response()
 
     @staticmethod
-    def load_and_predict(database: Database, project_id: int, file_filter: Any,
-                         notifications: NotificationList):
+    def load_and_predict(database: Database, pipelines: PipelineCache,
+                         notifications: NotificationList, project_id: int, file_filter: Any):
         db = None
         pipeline = None
 
@@ -84,7 +87,7 @@ class PredictModel(View):
 
             # load pipeline
             try:
-                pipeline = load_pipeline(model.root_folder)
+                pipeline = pipelines.load_from_root_folder(project, model.root_folder)
 
                 # iterate over files
                 index = 0
@@ -102,7 +105,7 @@ class PredictModel(View):
                     index += 1
             finally:
                 if pipeline is not None:
-                    pipeline.close()
+                    pipelines.free_instance(model.root_folder)
         finally:
             if db is not None:
                 db.close()

+ 108 - 0
pycs/util/PipelineCache.py

@@ -0,0 +1,108 @@
+from queue import Queue
+from threading import Lock
+from time import time, sleep
+
+from eventlet import tpool, spawn_n
+
+from pycs.database.Project import Project
+from pycs.interfaces.Pipeline import Pipeline
+from pycs.jobs.JobRunner import JobRunner
+from pycs.util.PipelineUtil import load_from_root_folder
+
+
+class PipelineCache:
+    CLOSE_TIMER = 120
+
+    def __init__(self, jobs: JobRunner):
+        self.__jobs = jobs
+
+        self.__pipelines = {}
+        self.__queue = Queue()
+        self.__lock = Lock()
+
+        spawn_n(self.__run)
+
+    def load_from_root_folder(self, project: Project, root_folder: str) -> Pipeline:
+        """
+        load configuration.json and create an instance from the included code object
+
+        :param project: associated project
+        :param root_folder: path to model root folder
+        :return: Pipeline instance
+        """
+        # check if instance is cached
+        with self.__lock:
+            if root_folder in self.__pipelines:
+                instance = self.__pipelines[root_folder]
+
+                # increase reference counter
+                instance[0] += 1
+
+                # return instance
+                return instance[1]
+
+        # load pipeline
+        pipeline = load_from_root_folder(root_folder)
+
+        # save instance to cache
+        with self.__lock:
+            self.__pipelines[root_folder] = [1, pipeline, project]
+
+        # return
+        return pipeline
+
+    def free_instance(self, root_folder: str):
+        """
+        Change an instance's status to unused and start the timer to call it's `close` function
+        after `CLOSE_TIMER` seconds. The next call to `load_from_root_folder` in this interval
+        will disable this timer.
+
+        :param root_folder: path to model root folder
+        """
+        # abort if no pipeline with this root folder is loaded
+        with self.__lock:
+            if root_folder not in self.__pipelines:
+                return
+
+        # start timeout
+        timestamp = time()
+        self.__queue.put((root_folder, timestamp))
+
+    def __get(self):
+        while True:
+            # get element from queue
+            root_folder, timestamp = self.__queue.get()
+
+            # sleep if needed
+            delay = int(timestamp + self.CLOSE_TIMER - time())
+
+            if delay > 0:
+                sleep(delay)
+
+            # lock and access __pipelines
+            with self.__lock:
+                instance = self.__pipelines[root_folder]
+
+                # reference counter greater than 1
+                if instance[0] > 1:
+                    # decrease reference counter
+                    instance[0] -= 1
+
+                # reference counter equals 1
+                else:
+                    # delete instance from __pipelines and return to call `close` function
+                    del self.__pipelines[root_folder]
+                    return instance[1], instance[2]
+
+    def __run(self):
+        while True:
+            # get pipeline
+            pipeline, project = tpool.execute(self.__get)
+
+            # create job to close pipeline
+            self.__jobs.run(project,
+                            'Model Interaction',
+                            f'{project.name} (close pipeline)',
+                            f'{project.name}/model-interaction',
+                            pipeline.close
+                            )