6
0
Переглянути джерело

refactored green worker methods. Some import changes in Project module

Dimitri Korsch 3 роки тому
батько
коміт
e5aac193a8
4 змінених файлів з 72 додано та 46 видалено
  1. 1 2
      .gitlab-ci.yml
  2. 5 7
      pycs/database/Project.py
  3. 2 0
      pycs/frontend/WebServer.py
  4. 64 37
      pycs/util/green_worker.py

+ 1 - 2
.gitlab-ci.yml

@@ -33,8 +33,7 @@ webui:
     - python -V
     - python -m venv env
     - source env/bin/activate
-    - pip install numpy opencv-python Pillow scipy
-    - pip install eventlet flask python-socketio
+    - pip install -r requirements.txt
     - pip install coverage pylint
   script:
     - coverage run --source=pycs/ -m unittest discover test/

+ 5 - 7
pycs/database/Project.py

@@ -1,13 +1,11 @@
+import os
 import typing as T
 
 from contextlib import closing
 from datetime import datetime
-from os.path import join
-from typing import List, Optional, Tuple, Iterator
 
 from pycs import db
 from pycs.database.base import NamedBaseModel
-
 from pycs.database.Collection import Collection
 from pycs.database.File import File
 from pycs.database.Label import Label
@@ -101,7 +99,7 @@ class Project(NamedBaseModel):
         return self.collections.filter_by(reference=reference).one_or_none()
 
     def create_label(self, name: str, reference: str = None,
-                     parent_id: int = None) -> Tuple[Optional[Label], bool]:
+                     parent_id: int = None) -> T.Tuple[T.Optional[Label], bool]:
         """
         create a label for this project. If there is already a label with the same reference
         in the database its name is updated.
@@ -152,7 +150,7 @@ class Project(NamedBaseModel):
         :param fps: frames per second
         :return: file
         """
-        path = join(self.data_folder, filename + extension)
+        path = os.path.join(self.data_folder, filename + extension)
 
         file, is_new = File.get_or_create(project=self, path=path)
 
@@ -213,7 +211,7 @@ class Project(NamedBaseModel):
             ''', [self.id])
             return cursor.fetchone()[0]
 
-    def files_without_results(self) -> Iterator[File]:
+    def files_without_results(self) -> T.Iterator[File]:
         """
         get an iterator of files without associated results
 
@@ -233,7 +231,7 @@ class Project(NamedBaseModel):
             for row in cursor:
                 yield File(self.database, row)
 
-    def files_without_collection(self, offset: int = 0, limit: int = -1) -> Iterator[File]:
+    def files_without_collection(self, offset: int = 0, limit: int = -1) -> T.Iterator[File]:
         """
         get an iterator of files without not associated with any collection
 

+ 2 - 0
pycs/frontend/WebServer.py

@@ -6,6 +6,7 @@ from logging.config import dictConfig
 
 from flask import send_from_directory
 from flask_socketio import SocketIO
+from logging import config
 
 from pycs import app
 from pycs.database.Database import Database
@@ -60,6 +61,7 @@ class WebServer:
     def __init__(self, app, settings: dict):
 
         dictConfig(settings["logging"])
+        config.dictConfig(settings["logging"])
         # initialize flask app instance
         self.app = app
 

+ 64 - 37
pycs/util/green_worker.py

@@ -5,10 +5,13 @@ import threading
 
 # from concurrent.futures import ThreadPoolExecutor
 from pycs import app
-from eventlet import tpool
 
 class GreenWorker(abc.ABC):
+    STOP_QUEUE = True
+    CONTINUE_QUEUE = False
+
     def __init__(self):
+
         super(GreenWorker, self).__init__()
 
         self.pool = eventlet.GreenPool()
@@ -21,12 +24,15 @@ class GreenWorker(abc.ABC):
         self.__running = False
 
 
+    @property
+    def ident(self):
+        return threading.get_ident()
+
 
     def start(self):
         if self.__running:
             return
-        # self._thread = self.pool.
-        eventlet.spawn(self.__run__)
+        self._thread = self.pool.spawn_n(self.__run__)
         self.__running = True
 
     def stop(self):
@@ -50,52 +56,73 @@ class GreenWorker(abc.ABC):
             eventlet.sleep(self.__sleep_time)
             continue
 
+    def __log(self, log_func, msg):
+        log_func(f"[{self.ident}] {self.__class__.__name__}: {msg}")
+
+    def debug(self, msg):
+        self.__log(app.logger.debug, msg)
+
+    def info(self, msg):
+        self.__log(app.logger.info, msg)
+
+    def error(self, msg):
+        self.__log(app.logger.error, msg)
+
+    def warning(self, msg):
+        self.__log(app.logger.warning, msg)
+
+    def check_stop_event(self):
+        if self.stop_event.ready() and \
+            self.stop_event.wait(self.__sleep_time):
+
+            self.debug("Stop event received.")
+            self.stop_event.reset()
+            return True
+
+        eventlet.sleep(self.__sleep_time)
+        self.debug(f"No stop event received. Waiting for {self.__sleep_time} seconds.")
+        return False
+
+    def check_queue(self):
+        if self.queue.empty():
+            self.debug("Queue was empty, checking for stop...")
+            return self.STOP_QUEUE if self.check_stop_event() else self.CONTINUE_QUEUE
+
+        return self.queue.get(block=True)
+
+
     def __run__(self):
         while True:
-            if self.queue.empty():
-                # print("Queue was empty, checking for stop")
-                if self.stop_event.ready() and \
-                    self.stop_event.wait(self.__sleep_time):
-                    # print("Stop event received")
-                    self.stop_event.reset()
-                    break
-                else:
-                    eventlet.sleep(self.__sleep_time)
-                    # print("no stop event received")
-                    continue
-
-            app.logger.info(f"starting job in thread #{self.ident}...")
-            args = self.queue.get(block=True)
-
-            self.start_work(*args)
-
-        app.logger.info(f"pool #{self.ident} sending finish event...")
-        # if not self.pool_finished.has_result():
-        self.pool_finished.send(threading.get_ident())
+            res = self.check_queue()
+            if res is self.STOP_QUEUE:
+                break
 
-    def start_work(self, *args):
-        app.logger.info(f"[{self.__class__.__name__} - {self.ident}] starting work")
+            elif res is self.CONTINUE_QUEUE:
+                continue
 
-        try:
-            return tpool.execute(self.work, *args)
+            self.info(f"Got a job from the chache queue")
+            try:
+                self.info("Starting work")
+                self.start_work(*res)
+            except Exception as e:
+                self.error(f"Error occurred: {e}")
 
-        except Exception as e:
-            app.logger.error(f"[{self.__class__.__name__} - {self.ident}] error occurred: {e}")
+            finally:
+                self.info(f"Work finished")
 
-        finally:
-            app.logger.info(f"[{self.__class__.__name__} - {self.ident}] work finished")
+        self._finish()
 
-    @property
-    def ident(self):
-        return threading.get_ident()
+    def _finish(self):
+        self.info("Sending finish event")
+        # if not self.pool_finished.has_result():
+        self.pool_finished.send(threading.get_ident())
 
+    def start_work(self, *args):
+        return eventlet.tpool.execute(self.work, *args)
 
-    @abc.abstractmethod
     def work(self, *args):
         pass
 
-
-
 if __name__ == '__main__':
     import _thread as thread
     class Foo(GreenWorker):