import traceback from concurrent.futures import ThreadPoolExecutor from time import time from types import GeneratorType from typing import Any from typing import Callable from typing import Generator from typing import List from typing import Optional from eventlet import spawn_n from eventlet import tpool from eventlet.event import Event from eventlet.queue import Queue from pycs.database.Project import Project from pycs.jobs.Job import Job from pycs.jobs.JobGroupBusyException import JobGroupBusyException class JobRunner: """ run jobs in a thread pool, but track progress and process results in eventlet queue """ # pylint: disable=too-many-arguments def __init__(self): self.__jobs = [] self.__groups = {} self.__executor = ThreadPoolExecutor(1) self.__queue = Queue() self.__create_listeners = [] self.__start_listeners = [] self.__progress_listeners = [] self.__finish_listeners = [] self.__remove_listeners = [] spawn_n(self.__run) def list(self) -> List[Job]: """ get a list of all jobs including finished ones :return: list of job objects """ return self.__jobs def on_create(self, callback: Callable[[Job], None]) -> None: """ register a callback that is executed each time a job is created :param callback: callback function :return: """ self.__create_listeners.append(callback) def on_start(self, callback: Callable[[Job], None]) -> None: """ register a callback that is executed each time a job is started :param callback: callback function :return: """ self.__start_listeners.append(callback) def on_progress(self, callback: Callable[[Job], None]) -> None: """ register a callback that is executed each time a job changes it's progress :param callback: callback function :return: """ self.__progress_listeners.append(callback) def on_finish(self, callback: Callable[[Job], None]) -> None: """ register a callback that is executed each time a job is finished :param callback: callback function :return: """ self.__finish_listeners.append(callback) def on_remove(self, callback: Callable[[Job], None]) -> None: """ register a callback that is executed each time a job is removed :param callback: callback function :return: """ self.__remove_listeners.append(callback) def remove(self, identifier): """ remove a job using its unique identifier :param identifier: job identifier :return: """ for i in range(len(self.__jobs)): if self.__jobs[i].identifier == identifier: if self.__jobs[i].finished is not None: job = self.__jobs[i] del self.__jobs[i] for callback in self.__remove_listeners: callback(job) return def run(self, project: Project, job_type: str, name: str, group: str, executable: Callable[[Any], Optional[Generator[float, None, None]]], *args, progress: Callable[[Any], float] = None, result: Callable[[Any], None] = None, result_event: Event = None, **kwargs) -> Job: """ add a job to run it in a thread pool :param project: project the job is associated with :param job_type: job type :param name: job name :param group: job group (raises JobGroupBusyException if there is already a job running with the same group identifier) :param executable: function to execute :param args: arguments for executable :param progress: is called everytime executable yields a value :param result: is called as soon as executable returns a value :param result_event: eventlet event to be called as soon as executable returns a value :param kwargs: named arguments for executable :return: job object """ # create job object job = Job(project, job_type, name) # abort if group is busy # otherwise add to groups dict if group is not None: if group in self.__groups: raise JobGroupBusyException self.__groups[group] = job # add to job list self.__jobs.append(job) # execute create listeners for callback in self.__create_listeners: callback(job) # add to execution queue self.__queue.put((group, executable, job, progress, result, result_event, args, kwargs)) # return job object return job def __run(self): while True: # get execution function and job from queue group, executable, job, progress_fun, result_fun, result_event, args, kwargs \ = self.__queue.get(block=True) # execute start listeners job.started = int(time()) job.updated = int(time()) for callback in self.__start_listeners: callback(job) # run function and track progress try: future = self.__executor.submit(executable, *args, **kwargs) generator = tpool.execute(future.result) result = generator if isinstance(generator, GeneratorType): iterator = iter(generator) try: while True: # run until next progress event future = self.__executor.submit(next, iterator) progress = tpool.execute(future.result) # execute progress function if progress_fun is not None: if isinstance(progress, tuple): progress = progress_fun(*progress) else: progress = progress_fun(progress) # execute progress listeners job.progress = progress job.updated = int(time()) for callback in self.__progress_listeners: callback(job) except StopIteration as stop_iteration_exception: result = stop_iteration_exception.value # update progress job.progress = 1 job.updated = int(time()) for callback in self.__progress_listeners: callback(job) # execute result function if result_fun is not None: if isinstance(result, tuple): result_fun(*result) else: result_fun(result) # execute event if result_event is not None: result_event.send(result) # save exceptions to show in ui except Exception as exception: traceback.print_exc() job.exception = f'{type(exception).__name__} ({str(exception)})' # remove from group dict if group is not None: del self.__groups[group] # finish job job.finished = int(time()) job.updated = int(time()) for callback in self.__finish_listeners: callback(job)