from time import time from types import GeneratorType from typing import Callable, List, Generator, Optional, Any from eventlet import tpool, spawn_n from eventlet.event import Event from eventlet.queue import Queue from pycs.database.Project import Project from pycs.jobs.Job import Job from pycs.jobs.JobGroupBusyException import JobGroupBusyException class JobRunner: """ run jobs in a thread pool, but track progress and process results in eventlet queue """ # pylint: disable=too-many-arguments def __init__(self): self.__jobs = [] self.__groups = {} self.__queue = Queue() self.__create_listeners = [] self.__start_listeners = [] self.__progress_listeners = [] self.__finish_listeners = [] self.__remove_listeners = [] spawn_n(self.__run) def list(self) -> List[Job]: """ get a list of all jobs including finished ones :return: list of job objects """ return self.__jobs def on_create(self, callback: Callable[[Job], None]) -> None: """ register a callback that is executed each time a job is created :param callback: callback function :return: """ self.__create_listeners.append(callback) def on_start(self, callback: Callable[[Job], None]) -> None: """ register a callback that is executed each time a job is started :param callback: callback function :return: """ self.__start_listeners.append(callback) def on_progress(self, callback: Callable[[Job], None]) -> None: """ register a callback that is executed each time a job changes it's progress :param callback: callback function :return: """ self.__progress_listeners.append(callback) def on_finish(self, callback: Callable[[Job], None]) -> None: """ register a callback that is executed each time a job is finished :param callback: callback function :return: """ self.__finish_listeners.append(callback) def on_remove(self, callback: Callable[[Job], None]) -> None: """ register a callback that is executed each time a job is removed :param callback: callback function :return: """ self.__remove_listeners.append(callback) def remove(self, identifier): """ remove a job using its unique identifier :param identifier: job identifier :return: """ for i in range(len(self.__jobs)): if self.__jobs[i].identifier == identifier: if self.__jobs[i].finished is not None: job = self.__jobs[i] del self.__jobs[i] for callback in self.__remove_listeners: callback(job) return def run(self, project: Project, job_type: str, name: str, group: str, executable: Callable[[Any], Optional[Generator[float, None, None]]], *args, progress: Callable[[Any], float] = None, result: Callable[[Any], None] = None, result_event: Event = None, **kwargs) -> Job: """ add a job to run it in a thread pool :param project: project the job is associated with :param job_type: job type :param name: job name :param group: job group (raises JobGroupBusyException if there is already a job running with the same group identifier) :param executable: function to execute :param args: arguments for executable :param progress: is called everytime executable yields a value :param result: is called as soon as executable returns a value :param result_event: eventlet event to be called as soon as executable returns a value :param kwargs: named arguments for executable :return: job object """ # create job object job = Job(project, job_type, name) # abort if group is busy # otherwise add to groups dict if group is not None: if group in self.__groups: raise JobGroupBusyException self.__groups[group] = job # add to job list self.__jobs.append(job) # execute create listeners for callback in self.__create_listeners: callback(job) # add to execution queue self.__queue.put((group, executable, job, progress, result, result_event, args, kwargs)) # return job object return job def __run(self): while True: # get execution function and job from queue group, executable, job, progress_fun, result_fun, result_event, args, kwargs \ = self.__queue.get(block=True) # execute start listeners job.started = int(time()) job.updated = int(time()) for callback in self.__start_listeners: callback(job) # run function and track progress generator = tpool.execute(executable, *args, **kwargs) result = generator if isinstance(generator, GeneratorType): iterator = iter(generator) try: while True: # run until next progress event progress = tpool.execute(next, iterator) # execute progress function if progress_fun is not None: if isinstance(progress, tuple): progress = progress_fun(*progress) else: progress = progress_fun(progress) # execute progress listeners job.progress = progress job.updated = int(time()) for callback in self.__progress_listeners: callback(job) except StopIteration as stop_iteration_exception: result = stop_iteration_exception.value # update progress job.progress = 1 job.updated = int(time()) for callback in self.__progress_listeners: callback(job) # execute result function if result_fun is not None: if isinstance(result, tuple): result_fun(*result) else: result_fun(result) # execute event if result_event is not None: result_event.send(result) # remove from group dict if group is not None: del self.__groups[group] # finish job job.finished = int(time()) job.updated = int(time()) for callback in self.__finish_listeners: callback(job)