|
@@ -1,90 +1,67 @@
|
|
|
+import eventlet
|
|
|
+
|
|
|
+from collections import defaultdict
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
from time import time
|
|
|
from types import GeneratorType
|
|
|
-from typing import Callable, List, Generator, Optional, Any
|
|
|
-
|
|
|
-import eventlet
|
|
|
+from typing import Any
|
|
|
+from typing import Callable
|
|
|
+from typing import Generator
|
|
|
+from typing import List
|
|
|
+from typing import Optional
|
|
|
# from eventlet import spawn, spawn_n, tpool
|
|
|
from eventlet.event import Event
|
|
|
|
|
|
|
|
|
from pycs.database.Project import Project
|
|
|
from pycs.jobs.Job import Job
|
|
|
+from pycs.frontend.notifications.NotificationManager import NotificationManager
|
|
|
from pycs.jobs.JobGroupBusyException import JobGroupBusyException
|
|
|
+from pycs.util import Callbacks
|
|
|
+from pycs.util import GreenWorker
|
|
|
+from pycs.util import Singleton
|
|
|
|
|
|
-from pycs.util.green_worker import GreenWorker
|
|
|
|
|
|
-class JobRunner(GreenWorker):
|
|
|
+class JobRunner(GreenWorker, Singleton):
|
|
|
"""
|
|
|
run jobs in a thread pool, but track progress and process results in eventlet queue
|
|
|
"""
|
|
|
|
|
|
# pylint: disable=too-many-arguments
|
|
|
+ """ Because it is a singleton (as described here:
|
|
|
+ https://www.python.org/download/releases/2.2/descrintro/#__new__),
|
|
|
+ __init__ is called every time the singleton object
|
|
|
+ is requested. Hence, we do the actual initialization
|
|
|
+ in init()!
|
|
|
+ """
|
|
|
def __init__(self):
|
|
|
+ pass
|
|
|
+
|
|
|
+ def init(self):
|
|
|
super().__init__()
|
|
|
+ super().init()
|
|
|
self.__jobs = []
|
|
|
self.__groups = {}
|
|
|
|
|
|
self.__executor = ThreadPoolExecutor(1)
|
|
|
|
|
|
- self.__create_listeners = []
|
|
|
- self.__start_listeners = []
|
|
|
- self.__progress_listeners = []
|
|
|
- self.__finish_listeners = []
|
|
|
- self.__remove_listeners = []
|
|
|
+ self.__listeners = defaultdict(Callbacks)
|
|
|
|
|
|
- def list(self) -> List[Job]:
|
|
|
- """
|
|
|
- get a list of all jobs including finished ones
|
|
|
+ def init_notifications(self, nm: NotificationManager):
|
|
|
|
|
|
- :return: list of job objects
|
|
|
- """
|
|
|
- return self.__jobs
|
|
|
+ self.on("create", nm.create_job)
|
|
|
+ self.on("start", nm.edit_job)
|
|
|
+ self.on("progress", nm.edit_job)
|
|
|
+ self.on("finish", nm.edit_job)
|
|
|
+ self.on("remove", nm.remove_job)
|
|
|
|
|
|
- def on_create(self, callback: Callable[[Job], None]) -> None:
|
|
|
- """
|
|
|
- register a callback that is executed each time a job is created
|
|
|
-
|
|
|
- :param callback: callback function
|
|
|
- :return:
|
|
|
- """
|
|
|
- self.__create_listeners.append(callback)
|
|
|
-
|
|
|
- def on_start(self, callback: Callable[[Job], None]) -> None:
|
|
|
- """
|
|
|
- register a callback that is executed each time a job is started
|
|
|
-
|
|
|
- :param callback: callback function
|
|
|
- :return:
|
|
|
- """
|
|
|
- self.__start_listeners.append(callback)
|
|
|
-
|
|
|
- def on_progress(self, callback: Callable[[Job], None]) -> None:
|
|
|
- """
|
|
|
- register a callback that is executed each time a job changes it's progress
|
|
|
-
|
|
|
- :param callback: callback function
|
|
|
- :return:
|
|
|
- """
|
|
|
- self.__progress_listeners.append(callback)
|
|
|
-
|
|
|
- def on_finish(self, callback: Callable[[Job], None]) -> None:
|
|
|
- """
|
|
|
- register a callback that is executed each time a job is finished
|
|
|
-
|
|
|
- :param callback: callback function
|
|
|
- :return:
|
|
|
- """
|
|
|
- self.__finish_listeners.append(callback)
|
|
|
-
|
|
|
- def on_remove(self, callback: Callable[[Job], None]) -> None:
|
|
|
- """
|
|
|
- register a callback that is executed each time a job is removed
|
|
|
+ @classmethod
|
|
|
+ def Run(cls, *args, **kwargs):
|
|
|
+ cls().run(*args, **kwargs)
|
|
|
|
|
|
- :param callback: callback function
|
|
|
- :return:
|
|
|
- """
|
|
|
- self.__remove_listeners.append(callback)
|
|
|
+ @classmethod
|
|
|
+ def Remove(cls, *args, **kwargs):
|
|
|
+ cls().remove(*args, **kwargs)
|
|
|
|
|
|
def remove(self, id):
|
|
|
"""
|
|
@@ -146,8 +123,7 @@ class JobRunner(GreenWorker):
|
|
|
self.__jobs.append(job)
|
|
|
|
|
|
# execute create listeners
|
|
|
- for callback in self.__create_listeners:
|
|
|
- callback(job)
|
|
|
+ self.__listeners["create"](job)
|
|
|
|
|
|
# add to execution queue
|
|
|
# self.__queue.put((group, executable, job, progress, result, result_event, args, kwargs))
|
|
@@ -156,21 +132,37 @@ class JobRunner(GreenWorker):
|
|
|
# return job object
|
|
|
return job
|
|
|
|
|
|
+ def list(self) -> List[Job]:
|
|
|
+ """
|
|
|
+ get a list of all jobs including finished ones
|
|
|
+
|
|
|
+ :return: list of job objects
|
|
|
+ """
|
|
|
+ return self.__jobs
|
|
|
+
|
|
|
+ def on(self, operation, callback: Callable[[Job], None]) -> None:
|
|
|
+ """
|
|
|
+ register a callback that is executed each time a operation
|
|
|
+ (create, start, progress, finish, or remove) on a job is performed
|
|
|
+
|
|
|
+ :param callback: callback function
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ assert operation in ["create", "start", "progress", "finish", "remove"], \
|
|
|
+ f"invalid operation: {operation}"
|
|
|
+ self.__listeners[operation].append(callback)
|
|
|
|
|
|
def _job_started(self, job):
|
|
|
job.start()
|
|
|
- for callback in self.__start_listeners:
|
|
|
- callback(job)
|
|
|
+ self.__listeners["start"](job)
|
|
|
|
|
|
def _job_progress(self, job, progress):
|
|
|
job.update(progress=progress)
|
|
|
- for callback in self.__progress_listeners:
|
|
|
- callback(job)
|
|
|
+ self.__listeners["progress"](job)
|
|
|
|
|
|
def _job_finished(self, job):
|
|
|
job.finish()
|
|
|
- for callback in self.__finish_listeners:
|
|
|
- callback(job)
|
|
|
+ self.__listeners["finish"](job)
|
|
|
|
|
|
|
|
|
def process_iterator(self, generator, job, progress_fun):
|
|
@@ -232,86 +224,3 @@ class JobRunner(GreenWorker):
|
|
|
del self.__groups[group]
|
|
|
|
|
|
self._job_finished(job)
|
|
|
-
|
|
|
- def __run(self):
|
|
|
-
|
|
|
- while True:
|
|
|
-
|
|
|
- # get execution function and job from queue
|
|
|
- group, executable, job, progress_fun, result_fun, result_event, args, kwargs \
|
|
|
- = self.__queue.get(block=True)
|
|
|
-
|
|
|
- # execute start listeners
|
|
|
- job.started = int(time())
|
|
|
- job.updated = int(time())
|
|
|
-
|
|
|
- for callback in self.__start_listeners:
|
|
|
- callback(job)
|
|
|
-
|
|
|
- # run function and track progress
|
|
|
- try:
|
|
|
- # result = generator = executable(*args, **kwargs)
|
|
|
- future = self.__executor.submit(executable, *args, **kwargs)
|
|
|
- result = generator = tpool.execute(future.result)
|
|
|
-
|
|
|
- if isinstance(generator, GeneratorType):
|
|
|
- iterator = iter(generator)
|
|
|
-
|
|
|
- try:
|
|
|
- while True:
|
|
|
- # run until next progress event
|
|
|
- future = self.__executor.submit(next, iterator)
|
|
|
- progress = tpool.execute(future.result)
|
|
|
- # progress = next(iterator)
|
|
|
-
|
|
|
-
|
|
|
- # execute progress function
|
|
|
- if progress_fun is not None:
|
|
|
- if isinstance(progress, tuple):
|
|
|
- progress = progress_fun(*progress)
|
|
|
- else:
|
|
|
- progress = progress_fun(progress)
|
|
|
-
|
|
|
- # execute progress listeners
|
|
|
- job.progress = progress
|
|
|
- job.updated = int(time())
|
|
|
-
|
|
|
- for callback in self.__progress_listeners:
|
|
|
- callback(job)
|
|
|
- except StopIteration as stop_iteration_exception:
|
|
|
- result = stop_iteration_exception.value
|
|
|
-
|
|
|
- # update progress
|
|
|
- job.progress = 1
|
|
|
- job.updated = int(time())
|
|
|
-
|
|
|
- for callback in self.__progress_listeners:
|
|
|
- callback(job)
|
|
|
-
|
|
|
- # execute result function
|
|
|
- if result_fun is not None:
|
|
|
- if isinstance(result, tuple):
|
|
|
- result_fun(*result)
|
|
|
- else:
|
|
|
- result_fun(result)
|
|
|
-
|
|
|
- # execute event
|
|
|
- if result_event is not None:
|
|
|
- result_event.send(result)
|
|
|
-
|
|
|
- # save exceptions to show in ui
|
|
|
- except Exception as exception:
|
|
|
- import traceback
|
|
|
- traceback.print_exc()
|
|
|
- job.exception = f'{type(exception).__name__} ({str(exception)})'
|
|
|
-
|
|
|
- # remove from group dict
|
|
|
- if group is not None:
|
|
|
- del self.__groups[group]
|
|
|
-
|
|
|
- # finish job
|
|
|
- job.finished = int(time())
|
|
|
- job.updated = int(time())
|
|
|
-
|
|
|
- for callback in self.__finish_listeners:
|
|
|
- callback(job)
|