Przeglądaj źródła

Merge branch '130-improve-threading' into 'master'

Resolve "improve threading"

Closes #130

See merge request troebs/pycs!114
Eric Tröbs 3 lat temu
rodzic
commit
e4d642412a

+ 2 - 1
pycs/frontend/endpoints/data/UploadFile.py

@@ -54,7 +54,8 @@ class UploadFile(View):
 
         # detect file type
         try:
-            ftype, frames, fps = file_info(self.data_folder, self.file_id, self.file_extension)
+            ftype, frames, fps = tpool.execute(file_info,
+                                               self.data_folder, self.file_id, self.file_extension)
         except ValueError:
             return abort(400)
 

+ 2 - 2
pycs/interfaces/MediaFileList.py

@@ -39,10 +39,10 @@ class MediaFileList:
             source = self.__project
 
         if self.__label is None:
-            for file in source.files_iter():
+            for file in source.files():
                 yield MediaFile(file, self.__notifications)
         else:
-            for file in source.files_iter():
+            for file in source.files():
                 for result in file.results():
                     if result.label == self.__label:
                         yield MediaFile(file, self.__notifications)

+ 8 - 16
pycs/jobs/JobRunner.py

@@ -1,8 +1,9 @@
+from concurrent.futures import ThreadPoolExecutor
 from time import time
 from types import GeneratorType
 from typing import Callable, List, Generator, Optional, Any
 
-from eventlet import spawn_n, GreenPool
+from eventlet import spawn_n, tpool
 from eventlet.event import Event
 from eventlet.queue import Queue
 
@@ -20,6 +21,8 @@ class JobRunner:
     def __init__(self):
         self.__jobs = []
         self.__groups = {}
+
+        self.__executor = ThreadPoolExecutor(1)
         self.__queue = Queue()
 
         self.__create_listeners = []
@@ -152,16 +155,7 @@ class JobRunner:
         # return job object
         return job
 
-    @staticmethod
-    def __next(it):
-        try:
-            return next(it)
-        except StopIteration as e:
-            return e
-
     def __run(self):
-        pool = GreenPool(1)
-
         while True:
             # get execution function and job from queue
             group, executable, job, progress_fun, result_fun, result_event, args, kwargs \
@@ -176,7 +170,8 @@ class JobRunner:
 
             # run function and track progress
             try:
-                generator = pool.spawn(executable, *args, **kwargs).wait()
+                future = self.__executor.submit(executable, *args, **kwargs)
+                generator = tpool.execute(future.result)
                 result = generator
 
                 if isinstance(generator, GeneratorType):
@@ -185,11 +180,8 @@ class JobRunner:
                     try:
                         while True:
                             # run until next progress event
-                            progress = pool.spawn(self.__next, iterator).wait()
-
-                            # raise StopIteration if return is of this type
-                            if isinstance(progress, StopIteration):
-                                raise progress
+                            future = self.__executor.submit(next, iterator)
+                            progress = tpool.execute(future.result)
 
                             # execute progress function
                             if progress_fun is not None: