import eventlet from collections import defaultdict from concurrent.futures import ThreadPoolExecutor from time import time from types import GeneratorType from typing import Any from typing import Callable from typing import Generator from typing import List from typing import Optional # from eventlet import spawn, spawn_n, tpool from eventlet.event import Event from pycs.database.Project import Project from pycs.jobs.Job import Job from pycs.frontend.notifications.NotificationManager import NotificationManager from pycs.jobs.JobGroupBusyException import JobGroupBusyException from pycs.util import Callbacks from pycs.util import GreenWorker from pycs.util import Singleton class JobRunner(GreenWorker, Singleton): """ run jobs in a thread pool, but track progress and process results in eventlet queue """ # pylint: disable=too-many-arguments """ Because it is a singleton (as described here: https://www.python.org/download/releases/2.2/descrintro/#__new__), __init__ is called every time the singleton object is requested. Hence, we do the actual initialization in init()! """ def __init__(self): pass def init(self): super().__init__() super().init() self.__jobs = [] self.__groups = {} self.__executor = ThreadPoolExecutor(1) self.__listeners = defaultdict(Callbacks) self.init_notifications() def init_notifications(self): nm = NotificationManager() self.on("create", nm.create("job")) self.on("start", nm.edit("job")) self.on("progress", nm.edit("job")) self.on("finish", nm.edit("job")) self.on("remove", nm.remove("job")) @classmethod def Run(cls, *args, **kwargs): cls().run(*args, **kwargs) @classmethod def Remove(cls, *args, **kwargs): cls().remove(*args, **kwargs) def remove(self, id): """ remove a job using its unique id :param id: job id :return: """ for i in range(len(self.__jobs)): if self.__jobs[i].id == id: if self.__jobs[i].finished is not None: job = self.__jobs[i] del self.__jobs[i] for callback in self.__remove_listeners: callback(job) return def run(self, project: Project, job_type: str, name: str, group: str, executable: Callable[[Any], Optional[Generator[float, None, None]]], *args, progress: Callable[[Any], float] = None, result: Callable[[Any], None] = None, result_event: Event = None, **kwargs) -> Job: """ add a job to run it in a thread pool :param project: project the job is associated with :param job_type: job type :param name: job name :param group: job group (raises JobGroupBusyException if there is already a job running with the same group id) :param executable: function to execute :param args: arguments for executable :param progress: is called everytime executable yields a value :param result: is called as soon as executable returns a value :param result_event: eventlet event to be called as soon as executable returns a value :param kwargs: named arguments for executable :return: job object """ # create job object job = Job(project, job_type, name) # abort if group is busy # otherwise add to groups dict if group is not None: if group in self.__groups: raise JobGroupBusyException self.__groups[group] = job # add to job list self.__jobs.append(job) # execute create listeners self.__listeners["create"](job) # add to execution queue # self.__queue.put((group, executable, job, progress, result, result_event, args, kwargs)) self.queue.put((group, executable, job, progress, result, result_event, args, kwargs)) # return job object return job def list(self) -> List[Job]: """ get a list of all jobs including finished ones :return: list of job objects """ return self.__jobs def on(self, operation, callback: Callable[[Job], None]) -> None: """ register a callback that is executed each time a operation (create, start, progress, finish, or remove) on a job is performed :param callback: callback function :return: """ assert operation in ["create", "start", "progress", "finish", "remove"], \ f"invalid operation: {operation}" self.__listeners[operation].append(callback) def _job_started(self, job): job.start() self.__listeners["start"](job) def _job_progress(self, job, progress): job.update(progress=progress) self.__listeners["progress"](job) def _job_finished(self, job): job.finish() self.__listeners["finish"](job) def process_iterator(self, generator, job, progress_fun): try: iterator = iter(generator) while True: # run until next progress event future = self.__executor.submit(next, iterator) progress = eventlet.tpool.execute(future.result) # execute progress function if progress_fun is not None: if isinstance(progress, tuple): progress = progress_fun(*progress) else: progress = progress_fun(progress) self._job_progress(job, progress=progress) except StopIteration as stop_iteration_exception: return stop_iteration_exception.value def start_work(self, group, executable, job, progress_fun, result_fun, result_event, args, kwargs): """ started as coroutine. Starts other tasks in separate green threads. """ # execute start listeners self._job_started(job) try: # result = generator = executable(*args, **kwargs) future = self.__executor.submit(executable, *args, **kwargs) result = generator = eventlet.tpool.execute(future.result) if isinstance(generator, GeneratorType): result = self.process_iterator(generator, job, progress_fun) self._job_progress(job, progress=1) # execute result function if result_fun is not None: if isinstance(result, tuple): result_fun(*result) else: result_fun(result) # execute event if result_event is not None: result_event.send(result) # save exceptions to show in ui except Exception as e: import traceback traceback.print_exc() job.exception = f'{type(e).__name__} ({str(e)})' # remove from group dict if group is not None: del self.__groups[group] self._job_finished(job)