Explorar el Código

fixing some threading issues

Dimitri Korsch hace 3 años
padre
commit
7f84d96fb0

+ 1 - 1
pycs/database/File.py

@@ -233,7 +233,7 @@ class File(NamedBaseModel):
             Result.file_id == self.id,
             Result.origin == origin)
 
-        _results = results.all()
+        _results = [r.serialize() for r in results.all()]
         results.delete()
 
         return _results

+ 2 - 1
pycs/frontend/WebServer.py

@@ -116,7 +116,8 @@ class WebServer:
 
         # create notification manager
         self.jobs = JobRunner()
-        self.pipelines = PipelineCache(self.jobs)
+        self.pipelines = PipelineCache(self.jobs, settings.get("pipeline_cache_time"))
+        self.pipelines.start()
         self.notifications = NotificationManager(self.__sio)
 
         self.jobs.on_create(self.notifications.create_job)

+ 21 - 3
pycs/util/PipelineCache.py

@@ -1,3 +1,5 @@
+import warnings
+
 from queue import Queue
 from threading import Lock
 from time import time, sleep
@@ -17,15 +19,22 @@ class PipelineCache:
     """
     CLOSE_TIMER = 120
 
-    def __init__(self, jobs: JobRunner):
+    def __init__(self, jobs: JobRunner, cache_time: float = None):
         self.__jobs = jobs
 
         self.__pipelines = {}
+        self.__is_running = False
         self.__queue = Queue()
         self.__lock = Lock()
 
+        self._cache_time = cache_time or CLOSE_TIMER
+        print(f"Initialized Pipeline cache (pipelines are closed after {self._cache_time:.3f} sec)")
+
     def start(self):
         """ starts the main worker method """
+        if self.__is_running:
+            warnings.warn("Pipeline cache is already started")
+            return
         spawn_n(self.__run)
 
     @property
@@ -61,6 +70,8 @@ class PipelineCache:
 
         # save instance to cache
         with self.__lock:
+            if not self.__is_running:
+                warnings.warn("[save instance] pipeline cache was not started yet!")
             self.__pipelines[root_folder] = [1, pipeline, project_id]
 
         # return
@@ -69,7 +80,7 @@ class PipelineCache:
     def free_instance(self, root_folder: str):
         """
         Change an instance's status to unused and start the timer to call it's `close` function
-        after `CLOSE_TIMER` seconds. The next call to `load_from_root_folder` in this interval
+        after `_cache_time` seconds. The next call to `load_from_root_folder` in this interval
         will disable this timer.
 
         :param root_folder: path to model root folder
@@ -83,6 +94,9 @@ class PipelineCache:
         timestamp = time()
         self.__queue.put((root_folder, timestamp))
 
+        if not self.__is_running:
+            warnings.warn("[free instance] pipeline cache was not started yet!")
+
     def __get(self):
         while True:
             # get element from queue
@@ -93,13 +107,15 @@ class PipelineCache:
             root_folder, timestamp = entry
 
             # sleep if needed
-            delay = int(timestamp + self.CLOSE_TIMER - time())
+            delay = int(timestamp + self._cache_time - time())
 
             if delay > 0:
+                print(f"Cache sleeps for {delay:.3f} sec")
                 sleep(delay)
 
             # lock and access __pipelines
             with self.__lock:
+                print("Removing pipeline from cache")
                 instance = self.__pipelines[root_folder]
 
                 # reference counter greater than 1
@@ -115,9 +131,11 @@ class PipelineCache:
 
     def __run(self):
         while True:
+            self.__is_running = True
             # get pipeline
             result = tpool.execute(self.__get)
             if result is None:
+                self.__is_running = False
                 return
 
             pipeline, project_id = result

+ 2 - 2
settings.json

@@ -3,6 +3,6 @@
   "port": 5000,
   "allowedOrigins": [],
   "projects_folder": "projects",
-  "database": "data2.sqlite3"
-
+  "database": "data2.sqlite3",
+  "pipeline_cache_time": 5
 }

+ 16 - 0
tests/client/pipeline_tests.py

@@ -76,7 +76,23 @@ class PipelineTests(BaseTestCase):
 
     def test_predict_file(self):
         url = url_for("predict_file", file_id=self.file.id)
+        self.assertEqual(0, self.file.results.count())
         self.post(url, json=dict(predict=True))
+        self.wait_for_bg_jobs()
+        self.assertEqual(1, self.file.results.count())
+
+    def test_predict_file_multiple_times(self):
+        url = url_for("predict_file", file_id=self.file.id)
+
+        self.assertEqual(0, self.file.results.count())
+
+        self.post(url, json=dict(predict=True))
+        self.wait_for_bg_jobs()
+        self.assertEqual(1, self.file.results.count())
+
+        self.post(url, json=dict(predict=True))
+        self.wait_for_bg_jobs()
+        self.assertEqual(1, self.file.results.count())
 
     def test_predict_model_errors(self):
         self.post(url_for("predict_model", project_id=4242),

+ 1 - 1
tests/client/test_models/simple_model/model.py

@@ -13,7 +13,7 @@ class Model(Pipeline):
         print("Closing")
 
     def execute(self, storage: MediaStorage, file: MediaFile):
-        print("executing model")
+        file.add_bounding_box(0, 0, 1, 1/3)
 
     def fit(self, storage: MediaStorage):
         print("fitting model")