Przeglądaj źródła

fixed JobRunner and PipelineCache. added Makefile and .editorconfig

Dimitri Korsch 3 lat temu
rodzic
commit
abcbdeb9ae
4 zmienionych plików z 96 dodań i 55 usunięć
  1. 12 0
      .editorconfig
  2. 8 0
      Makefile
  3. 23 20
      pycs/jobs/JobRunner.py
  4. 53 35
      pycs/util/PipelineCache.py

+ 12 - 0
.editorconfig

@@ -0,0 +1,12 @@
+# editorconfig.org
+root = true
+
+[*]
+charset = utf-8
+end_of_line = lf
+trim_trailing_whitespace = true
+insert_final_newline = true
+
+[*.{py,sh,js,vue}]
+indent_style = space
+indent_size = 4

+ 8 - 0
Makefile

@@ -0,0 +1,8 @@
+run:
+	python app.py
+
+install:
+	@echo "INSTALL MISSING!"
+
+run_tests:
+	python -m unittest discover test/

+ 23 - 20
pycs/jobs/JobRunner.py

@@ -156,6 +156,23 @@ class JobRunner(GreenWorker):
         # return job object
         return job
 
+
+    def _job_started(self, job):
+        job.start()
+        for callback in self.__start_listeners:
+            callback(job)
+
+    def _job_progress(self, job, progress):
+        job.update(progress=progress)
+        for callback in self.__progress_listeners:
+            callback(job)
+
+    def _job_finished(self, job):
+        job.finish()
+        for callback in self.__finish_listeners:
+            callback(job)
+
+
     def process_iterator(self, generator, job, progress_fun):
         try:
             iterator = iter(generator)
@@ -165,9 +182,6 @@ class JobRunner(GreenWorker):
                 future = self.__executor.submit(next, iterator)
                 progress = eventlet.tpool.execute(future.result)
 
-                # progress = future.result()
-                # progress = next(iterator)
-
                 # execute progress function
                 if progress_fun is not None:
                     if isinstance(progress, tuple):
@@ -175,23 +189,17 @@ class JobRunner(GreenWorker):
                     else:
                         progress = progress_fun(progress)
 
-                # execute progress listeners
-                job.update(progress=progress)
-                for callback in self.__progress_listeners:
-                    callback(job)
+                self._job_progress(job, progress=progress)
 
         except StopIteration as stop_iteration_exception:
             return stop_iteration_exception.value
 
 
-    # done in a separate green thread
-    def work(self, group, executable, job, progress_fun, result_fun, result_event, args, kwargs):
+    def start_work(self, group, executable, job, progress_fun, result_fun, result_event, args, kwargs):
+        """ started as coroutine. Starts other tasks in separate green threads. """
 
         # execute start listeners
-        job.start()
-        for callback in self.__start_listeners:
-            callback(job)
-
+        self._job_started(job)
         try:
             # result = generator = executable(*args, **kwargs)
             future = self.__executor.submit(executable, *args, **kwargs)
@@ -200,10 +208,7 @@ class JobRunner(GreenWorker):
             if isinstance(generator, GeneratorType):
                 result = self.process_iterator(generator, job, progress_fun)
 
-
-            job.update(progress=1)
-            for callback in self.__progress_listeners:
-                callback(job)
+            self._job_progress(job, progress=1)
 
             # execute result function
             if result_fun is not None:
@@ -226,9 +231,7 @@ class JobRunner(GreenWorker):
         if group is not None:
             del self.__groups[group]
 
-        job.finish()
-        for callback in self.__finish_listeners:
-            callback(job)
+        self._job_finished(job)
 
     def __run(self):
 

+ 53 - 35
pycs/util/PipelineCache.py

@@ -62,7 +62,7 @@ class PipelineCache(GreenWorker):
                 entry: PipelineEntry = self.__pipelines[root_folder]
                 entry.poke()
 
-                app.logger.info(f"[{self.ident}] Using {entry}")
+                self.info(f"Using {entry}")
 
                 return entry.pipeline
 
@@ -75,8 +75,9 @@ class PipelineCache(GreenWorker):
         # save instance to cache
         with self.__lock:
             entry = PipelineEntry(pipeline=pipeline, project_id=project.id)
-            app.logger.info(f"[{self.ident}] Cached {entry}")
+            self.info(f"Cached {entry}")
             self.__pipelines[root_folder] = entry
+            self.queue.put((root_folder,))
 
         # return
         return pipeline
@@ -89,56 +90,72 @@ class PipelineCache(GreenWorker):
 
         :param root_folder: path to model root folder
         """
-        # abort if no pipeline with this root folder is loaded
         with self.__lock:
-            if root_folder not in self.__pipelines:
+            if root_folder in self.__pipelines:
+                # reset "last used" to now
+                self.__pipelines[root_folder].poke()
+
+            # abort if no pipeline with this root folder is loaded
+            else:
                 return
 
-        self.queue.put((root_folder,))
 
     # executed as coroutine in the main thread
-    def start_work(self, *args):
-
-        # delegate to work method in a separate thread
-        res = super().start_work(*args)
+    def __run__(self):
+        while True:
+            # get pipeline
+            res = tpool.execute(self.work)
 
-        if res is None:
-            # an error occured in the execution
-            return
+            if res is self.STOP_QUEUE:
+                break
 
-        pipeline, project_id = res
+            pipeline, project_id = res
 
-        if pipeline is None:
-            # pipeline vanished
-            return
+            if pipeline is None:
+                # pipeline vanished from cache
+                continue
 
-        project = Project.query.get(project_id)
-        # create job to close pipeline
-        self.__jobs.run(project,
-                        'Model Interaction',
-                        f'{project.name} (close pipeline)',
-                        f'{project.name}/model-interaction',
-                        pipeline.close
-                    )
+            project = Project.query.get(project_id)
+            # create job to close pipeline
+            self.__jobs.run(project,
+                            'Model Interaction',
+                            f'{project.name} (close pipeline)',
+                            f'{project.name}/model-interaction',
+                            pipeline.close
+                        )
+        self._finish()
 
     # executed in a separate thread
-    def work(self, root_folder):
+    def work(self):
+        while True:
+            res = self.check_queue()
+
+            if res is self.STOP_QUEUE:
+                return res
+
+            elif res is self.CONTINUE_QUEUE:
+                continue
+
+            # an entry was found in the queue
+            return self._check_cache_entry(*res)
+
+    def _check_cache_entry(self, key):
         with self.__lock:
-            entry = self.__pipelines.get(root_folder)
+            entry = self.__pipelines.get(key)
 
         if entry is None:
-            app.logger.info(f"[{self.ident}] Entry for {root_folder} already gone")
-            return
+            self.info(f"Entry for {key} already gone")
+            return None, None
 
-        app.logger.info(f"[{self.ident}] Starting checks for {entry}...")
+        self.info(f"Starting checks for {entry}...")
         while True:
             now = dt.datetime.now()
 
             with self.__lock:
-                entry = self.__pipelines.get(root_folder)
+                entry = self.__pipelines.get(key)
 
                 if entry is None:
-                    app.logger.info(f"[{self.ident}] Entry for {root_folder} already gone")
+                    self.info(f"Entry for {key} already gone")
                     return None, None
 
                 delay = entry.last_used + self.CLOSE_TIMER - now
@@ -148,11 +165,12 @@ class PipelineCache(GreenWorker):
                 continue
 
             with self.__lock:
-                entry = self.__pipelines.pop(root_folder, None)
-                app.logger.info(f"[{self.ident}] Entry for {root_folder} already gone")
-                return None, None
+                entry = self.__pipelines.pop(key, None)
+                if entry is None:
+                    self.info(f"Entry for {key} already gone")
+                    return None, None
 
-            app.logger.info(f"[{self.ident}] Removed {entry} from cache")
+            self.info(f"Removed {entry} from cache")
             return entry.pipeline, entry.project_id
 
     # def __get(self):