123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- from time import time
- from types import GeneratorType
- from typing import Callable, List, Generator, Optional, Any
- from eventlet import tpool, spawn_n
- from eventlet.event import Event
- from eventlet.queue import Queue
- from pycs.database.Project import Project
- from pycs.jobs.Job import Job
- from pycs.jobs.JobGroupBusyException import JobGroupBusyException
- class JobRunner:
- """
- run jobs in a thread pool, but track progress and process results in eventlet queue
- """
- # pylint: disable=too-many-arguments
- def __init__(self):
- self.__jobs = []
- self.__groups = {}
- self.__queue = Queue()
- self.__create_listeners = []
- self.__start_listeners = []
- self.__progress_listeners = []
- self.__finish_listeners = []
- self.__remove_listeners = []
- spawn_n(self.__run)
- def list(self) -> List[Job]:
- """
- get a list of all jobs including finished ones
- :return: list of job objects
- """
- return self.__jobs
- def on_create(self, callback: Callable[[Job], None]) -> None:
- """
- register a callback that is executed each time a job is created
- :param callback: callback function
- :return:
- """
- self.__create_listeners.append(callback)
- def on_start(self, callback: Callable[[Job], None]) -> None:
- """
- register a callback that is executed each time a job is started
- :param callback: callback function
- :return:
- """
- self.__start_listeners.append(callback)
- def on_progress(self, callback: Callable[[Job], None]) -> None:
- """
- register a callback that is executed each time a job changes it's progress
- :param callback: callback function
- :return:
- """
- self.__progress_listeners.append(callback)
- def on_finish(self, callback: Callable[[Job], None]) -> None:
- """
- register a callback that is executed each time a job is finished
- :param callback: callback function
- :return:
- """
- self.__finish_listeners.append(callback)
- def on_remove(self, callback: Callable[[Job], None]) -> None:
- """
- register a callback that is executed each time a job is removed
- :param callback: callback function
- :return:
- """
- self.__remove_listeners.append(callback)
- def remove(self, identifier):
- """
- remove a job using its unique identifier
- :param identifier: job identifier
- :return:
- """
- for i in range(len(self.__jobs)):
- if self.__jobs[i].identifier == identifier:
- if self.__jobs[i].finished is not None:
- job = self.__jobs[i]
- del self.__jobs[i]
- for callback in self.__remove_listeners:
- callback(job)
- return
- def run(self,
- project: Project,
- job_type: str,
- name: str,
- group: str,
- executable: Callable[[Any], Optional[Generator[float, None, None]]],
- *args,
- progress: Callable[[Any], float] = None,
- result: Callable[[Any], None] = None,
- result_event: Event = None,
- **kwargs) -> Job:
- """
- add a job to run it in a thread pool
- :param project: project the job is associated with
- :param job_type: job type
- :param name: job name
- :param group: job group (raises JobGroupBusyException if there is already a job running
- with the same group identifier)
- :param executable: function to execute
- :param args: arguments for executable
- :param progress: is called everytime executable yields a value
- :param result: is called as soon as executable returns a value
- :param result_event: eventlet event to be called as soon as executable returns a value
- :param kwargs: named arguments for executable
- :return: job object
- """
- # create job object
- job = Job(project, job_type, name)
- # abort if group is busy
- # otherwise add to groups dict
- if group is not None:
- if group in self.__groups:
- raise JobGroupBusyException
- self.__groups[group] = job
- # add to job list
- self.__jobs.append(job)
- # execute create listeners
- for callback in self.__create_listeners:
- callback(job)
- # add to execution queue
- self.__queue.put((group, executable, job, progress, result, result_event, args, kwargs))
- # return job object
- return job
- def __run(self):
- while True:
- # get execution function and job from queue
- group, executable, job, progress_fun, result_fun, result_event, args, kwargs \
- = self.__queue.get(block=True)
- # execute start listeners
- job.started = int(time())
- job.updated = int(time())
- for callback in self.__start_listeners:
- callback(job)
- # run function and track progress
- generator = tpool.execute(executable, *args, **kwargs)
- result = generator
- if isinstance(generator, GeneratorType):
- iterator = iter(generator)
- try:
- while True:
- # run until next progress event
- progress = tpool.execute(next, iterator)
- # execute progress function
- if progress_fun is not None:
- if isinstance(progress, tuple):
- progress = progress_fun(*progress)
- else:
- progress = progress_fun(progress)
- # execute progress listeners
- job.progress = progress
- job.updated = int(time())
- for callback in self.__progress_listeners:
- callback(job)
- except StopIteration as stop_iteration_exception:
- result = stop_iteration_exception.value
- # update progress
- job.progress = 1
- job.updated = int(time())
- for callback in self.__progress_listeners:
- callback(job)
- # execute result function
- if result_fun is not None:
- if isinstance(result, tuple):
- result_fun(*result)
- else:
- result_fun(result)
- # execute event
- if result_event is not None:
- result_event.send(result)
- # remove from group dict
- if group is not None:
- del self.__groups[group]
- # finish job
- job.finished = int(time())
- job.updated = int(time())
- for callback in self.__finish_listeners:
- callback(job)
|