浏览代码

reworked threading a bit

Dimitri Korsch 4 年之前
父节点
当前提交
2e42d38232
共有 6 个文件被更改,包括 118 次插入71 次删除
  1. 1 1
      pycs/database/Database.py
  2. 6 0
      pycs/frontend/WebServer.py
  3. 15 0
      pycs/jobs/Job.py
  4. 17 22
      pycs/jobs/JobRunner.py
  5. 66 46
      pycs/util/PipelineCache.py
  6. 13 2
      pycs/util/green_worker.py

+ 1 - 1
pycs/database/Database.py

@@ -50,7 +50,7 @@ class Database:
         db.session.commit()
         db.session.commit()
 
 
     def copy(self):
     def copy(self):
-        return Database(self.path, initialization=False, discovery=False)
+        return Database(initialization=False, discovery=False)
 
 
     def models(self) -> Iterator[Model]:
     def models(self) -> Iterator[Model]:
         """
         """

+ 6 - 0
pycs/frontend/WebServer.py

@@ -1,11 +1,13 @@
 import os
 import os
 import glob
 import glob
+import threading
 
 
 from logging.config import dictConfig
 from logging.config import dictConfig
 
 
 from flask import send_from_directory
 from flask import send_from_directory
 from flask_socketio import SocketIO
 from flask_socketio import SocketIO
 
 
+from pycs import app
 from pycs.database.Database import Database
 from pycs.database.Database import Database
 from pycs.frontend.endpoints.ListJobs import ListJobs
 from pycs.frontend.endpoints.ListJobs import ListJobs
 from pycs.frontend.endpoints.ListLabelProviders import ListLabelProviders
 from pycs.frontend.endpoints.ListLabelProviders import ListLabelProviders
@@ -84,11 +86,15 @@ class WebServer:
         # set json encoder so database objects are serialized correctly
         # set json encoder so database objects are serialized correctly
         self.app.json_encoder = JSONEncoder
         self.app.json_encoder = JSONEncoder
 
 
+        self.start_runner()
         self.init_notifications()
         self.init_notifications()
         self.define_routes()
         self.define_routes()
         self.logger.info("Server initialized")
         self.logger.info("Server initialized")
 
 
+
+
     def start_runner(self):
     def start_runner(self):
+        app.logger.info(f"Main Thread ID: {threading.get_ident()}")
         self.jobs.start()
         self.jobs.start()
         self.pipelines.start()
         self.pipelines.start()
 
 

+ 15 - 0
pycs/jobs/Job.py

@@ -21,3 +21,18 @@ class Job:
         self.updated = int(time())
         self.updated = int(time())
         self.started = None
         self.started = None
         self.finished = None
         self.finished = None
+
+    def __str__(self):
+        return f"<Job {self.type}: name={self.name}, project={self.project_id} progress={self.progress}>"
+
+    def start(self):
+        self.started = int(time())
+        self.update(progress=0)
+
+    def finish(self):
+        self.finished = int(time())
+        self.update(progress=1)
+
+    def update(self, progress: float):
+        self.progress = progress
+        self.updated = int(time())

+ 17 - 22
pycs/jobs/JobRunner.py

@@ -1,9 +1,9 @@
-# from concurrent.futures import ThreadPoolExecutor
+from concurrent.futures import ThreadPoolExecutor
 from time import time
 from time import time
 from types import GeneratorType
 from types import GeneratorType
 from typing import Callable, List, Generator, Optional, Any
 from typing import Callable, List, Generator, Optional, Any
 
 
-# import eventlet
+import eventlet
 # from eventlet import spawn, spawn_n, tpool
 # from eventlet import spawn, spawn_n, tpool
 from eventlet.event import Event
 from eventlet.event import Event
 
 
@@ -25,7 +25,7 @@ class JobRunner(GreenWorker):
         self.__jobs = []
         self.__jobs = []
         self.__groups = {}
         self.__groups = {}
 
 
-        # self.__executor = ThreadPoolExecutor(1)
+        self.__executor = ThreadPoolExecutor(1)
 
 
         self.__create_listeners = []
         self.__create_listeners = []
         self.__start_listeners = []
         self.__start_listeners = []
@@ -156,17 +156,17 @@ class JobRunner(GreenWorker):
         # return job object
         # return job object
         return job
         return job
 
 
-    def process_iterator(self, iterator, job, progress_fun):
+    def process_iterator(self, generator, job, progress_fun):
         try:
         try:
             iterator = iter(generator)
             iterator = iter(generator)
 
 
             while True:
             while True:
                 # run until next progress event
                 # run until next progress event
-                # future = self.__executor.submit(next, iterator)
-                # progress = tpool.execute(future.result)
+                future = self.__executor.submit(next, iterator)
+                progress = eventlet.tpool.execute(future.result)
 
 
                 # progress = future.result()
                 # progress = future.result()
-                progress = next(iterator)
+                # progress = next(iterator)
 
 
                 # execute progress function
                 # execute progress function
                 if progress_fun is not None:
                 if progress_fun is not None:
@@ -176,9 +176,7 @@ class JobRunner(GreenWorker):
                         progress = progress_fun(progress)
                         progress = progress_fun(progress)
 
 
                 # execute progress listeners
                 # execute progress listeners
-                job.progress = progress
-                job.updated = int(time())
-
+                job.update(progress=progress)
                 for callback in self.__progress_listeners:
                 for callback in self.__progress_listeners:
                     callback(job)
                     callback(job)
 
 
@@ -188,22 +186,22 @@ class JobRunner(GreenWorker):
 
 
     # done in a separate green thread
     # done in a separate green thread
     def work(self, group, executable, job, progress_fun, result_fun, result_event, args, kwargs):
     def work(self, group, executable, job, progress_fun, result_fun, result_event, args, kwargs):
-        # execute start listeners
-        job.started = int(time())
-        job.updated = int(time())
 
 
+        # execute start listeners
+        job.start()
         for callback in self.__start_listeners:
         for callback in self.__start_listeners:
             callback(job)
             callback(job)
 
 
         try:
         try:
-            result = generator = executable(*args, **kwargs)
+            # result = generator = executable(*args, **kwargs)
+            future = self.__executor.submit(executable, *args, **kwargs)
+            result = generator = eventlet.tpool.execute(future.result)
+
             if isinstance(generator, GeneratorType):
             if isinstance(generator, GeneratorType):
-                result = self.process_iterator(iterator, job, progress_fun)
+                result = self.process_iterator(generator, job, progress_fun)
 
 
-            # update progress
-            job.progress = 1
-            job.updated = int(time())
 
 
+            job.update(progress=1)
             for callback in self.__progress_listeners:
             for callback in self.__progress_listeners:
                 callback(job)
                 callback(job)
 
 
@@ -228,10 +226,7 @@ class JobRunner(GreenWorker):
         if group is not None:
         if group is not None:
             del self.__groups[group]
             del self.__groups[group]
 
 
-        # finish job
-        job.finished = int(time())
-        job.updated = int(time())
-
+        job.finish()
         for callback in self.__finish_listeners:
         for callback in self.__finish_listeners:
             callback(job)
             callback(job)
 
 

+ 66 - 46
pycs/util/PipelineCache.py

@@ -5,15 +5,30 @@ from threading import Lock
 from time import time, sleep
 from time import time, sleep
 
 
 from eventlet import tpool, spawn_n
 from eventlet import tpool, spawn_n
+from dataclasses import dataclass
 from collections import namedtuple
 from collections import namedtuple
 
 
+from pycs import app
 from pycs.database.Project import Project
 from pycs.database.Project import Project
 from pycs.interfaces.Pipeline import Pipeline
 from pycs.interfaces.Pipeline import Pipeline
 from pycs.jobs.JobRunner import JobRunner
 from pycs.jobs.JobRunner import JobRunner
 from pycs.util.PipelineUtil import load_from_root_folder
 from pycs.util.PipelineUtil import load_from_root_folder
 from pycs.util.green_worker import GreenWorker
 from pycs.util.green_worker import GreenWorker
 
 
-PipelineEntry = namedtuple("PipelineEntry", "counter pipeline project_id")
+
+@dataclass
+class PipelineEntry(object):
+    counter: int = 1
+    pipeline: Pipeline = None
+    pipeline_name: str = None
+    project_id: int = -1
+
+    def __post_init__(self):
+        if self.pipeline is not None:
+            self.pipeline_name = self.pipeline.__class__.__name__
+
+    def __str__(self):
+        return f"<Pipeline '{self.pipeline_name}' for project #{self.project_id} (counter={self.counter})>"
 
 
 class PipelineCache(GreenWorker):
 class PipelineCache(GreenWorker):
     CLOSE_TIMER = 120
     CLOSE_TIMER = 120
@@ -44,6 +59,7 @@ class PipelineCache(GreenWorker):
 
 
                 # increase reference counter
                 # increase reference counter
                 entry.counter += 1
                 entry.counter += 1
+                app.logger.info(f"Using {entry}")
 
 
                 # return entry
                 # return entry
                 return entry.pipeline
                 return entry.pipeline
@@ -53,7 +69,9 @@ class PipelineCache(GreenWorker):
 
 
         # save instance to cache
         # save instance to cache
         with self.__lock:
         with self.__lock:
-            self.__pipelines[root_folder] = PipelineEntry(1, pipeline, project.id)
+            entry = PipelineEntry(counter=1, pipeline=pipeline, project_id=project.id)
+            app.logger.info(f"Cached {entry}")
+            self.__pipelines[root_folder] = entry
 
 
         # return
         # return
         return pipeline
         return pipeline
@@ -79,7 +97,7 @@ class PipelineCache(GreenWorker):
     def start_work(self, root_folder, timestamp):
     def start_work(self, root_folder, timestamp):
 
 
         # delegate to work method in a separate thread
         # delegate to work method in a separate thread
-        pipeline, project_id = super().run(root_folder, timestamp)
+        pipeline, project_id = super().start_work(root_folder, timestamp)
 
 
         project = Project.query.get(project_id)
         project = Project.query.get(project_id)
 
 
@@ -98,60 +116,62 @@ class PipelineCache(GreenWorker):
             delay = int(timestamp + self.CLOSE_TIMER - time())
             delay = int(timestamp + self.CLOSE_TIMER - time())
 
 
             if delay > 0:
             if delay > 0:
-                eventlet.sleep(delay)
+                sleep(delay)
 
 
             # lock and access __pipelines
             # lock and access __pipelines
             with self.__lock:
             with self.__lock:
-                instance: PipelineEntry = self.__pipelines[root_folder]
+                entry: PipelineEntry = self.__pipelines[root_folder]
 
 
                 # reference counter greater than 1
                 # reference counter greater than 1
-                if instance.counter > 1:
+                if entry.counter > 1:
                     # decrease reference counter
                     # decrease reference counter
-                    instance.counter -= 1
+                    entry.counter -= 1
+                    app.logger.info(f"Decreased counter of {entry}.")
                     continue
                     continue
 
 
                 # reference counter equals 1
                 # reference counter equals 1
                 else:
                 else:
                     # delete instance from __pipelines and return to call `close` function
                     # delete instance from __pipelines and return to call `close` function
                     del self.__pipelines[root_folder]
                     del self.__pipelines[root_folder]
+                    app.logger.info(f"Removed {entry} from cache.")
                     return entry.pipeline, entry.project_id
                     return entry.pipeline, entry.project_id
 
 
-    def __get(self):
-        while True:
-            # get element from queue
-            root_folder, timestamp = self.__queue.get()
-
-            # sleep if needed
-            delay = int(timestamp + self.CLOSE_TIMER - time())
-
-            if delay > 0:
-                eventlet.sleep(delay)
-
-            # lock and access __pipelines
-            with self.__lock:
-                instance = self.__pipelines[root_folder]
-
-                # reference counter greater than 1
-                if instance[0] > 1:
-                    # decrease reference counter
-                    instance[0] -= 1
-
-                # reference counter equals 1
-                else:
-                    # delete instance from __pipelines and return to call `close` function
-                    del self.__pipelines[root_folder]
-                    return instance[1], instance[2]
-
-    def __run(self):
-        while True:
-            # get pipeline
-            pipeline, project_id = tpool.execute(self.__get)
-            project = Project.query.get(project_id)
-
-            # create job to close pipeline
-            self.__jobs.run(project,
-                            'Model Interaction',
-                            f'{project.name} (close pipeline)',
-                            f'{project.name}/model-interaction',
-                            pipeline.close
-                            )
+    # def __get(self):
+    #     while True:
+    #         # get element from queue
+    #         root_folder, timestamp = self.__queue.get()
+
+    #         # sleep if needed
+    #         delay = int(timestamp + self.CLOSE_TIMER - time())
+
+    #         if delay > 0:
+    #             eventlet.sleep(delay)
+
+    #         # lock and access __pipelines
+    #         with self.__lock:
+    #             instance = self.__pipelines[root_folder]
+
+    #             # reference counter greater than 1
+    #             if instance.counter > 1:
+    #                 # decrease reference counter
+    #                 instance.counter -= 1
+
+    #             # reference counter equals 1
+    #             else:
+    #                 # delete instance from __pipelines and return to call `close` function
+    #                 del self.__pipelines[root_folder]
+    #                 return instance.pipeline, instance.project_id
+
+    # def __run(self):
+    #     while True:
+    #         # get pipeline
+    #         pipeline, project_id = tpool.execute(self.__get)
+    #         project = Project.query.get(project_id)
+
+    #         # create job to close pipeline
+    #         self.__jobs.run(project,
+    #                         'Model Interaction',
+    #                         f'{project.name} (close pipeline)',
+    #                         f'{project.name}/model-interaction',
+    #                         pipeline.close
+    #                         )

+ 13 - 2
pycs/util/green_worker.py

@@ -4,6 +4,7 @@ import eventlet
 import threading
 import threading
 
 
 # from concurrent.futures import ThreadPoolExecutor
 # from concurrent.futures import ThreadPoolExecutor
+from pycs import app
 from eventlet import tpool
 from eventlet import tpool
 
 
 class GreenWorker(abc.ABC):
 class GreenWorker(abc.ABC):
@@ -63,15 +64,25 @@ class GreenWorker(abc.ABC):
                     # print("no stop event received")
                     # print("no stop event received")
                     continue
                     continue
 
 
+            app.logger.info(f"starting job in thread #{self.ident}...")
             args = self.queue.get(block=True)
             args = self.queue.get(block=True)
+
             self.start_work(*args)
             self.start_work(*args)
 
 
-        # print(self.pool_finished)
+        app.logger.info(f"pool #{self.ident} sending finish event...")
         # if not self.pool_finished.has_result():
         # if not self.pool_finished.has_result():
         self.pool_finished.send(threading.get_ident())
         self.pool_finished.send(threading.get_ident())
 
 
     def start_work(self, *args):
     def start_work(self, *args):
-        return tpool.execute(self.work, *args)
+        app.logger.info(f"[{self.__class__.__name__} - {self.ident}] starting work")
+        res = tpool.execute(self.work, *args)
+        app.logger.info(f"[{self.__class__.__name__} - {self.ident}] work finished")
+        return res
+
+    @property
+    def ident(self):
+        return threading.get_ident()
+
 
 
     @abc.abstractmethod
     @abc.abstractmethod
     def work(self, *args):
     def work(self, *args):