JobRunner.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. from concurrent.futures import ThreadPoolExecutor
  2. from time import time
  3. from types import GeneratorType
  4. from typing import Callable, List, Generator, Optional, Any
  5. from eventlet import spawn_n, tpool
  6. from eventlet.event import Event
  7. from eventlet.queue import Queue
  8. from pycs.database.Project import Project
  9. from pycs.jobs.Job import Job
  10. from pycs.jobs.JobGroupBusyException import JobGroupBusyException
  11. class JobRunner:
  12. """
  13. run jobs in a thread pool, but track progress and process results in eventlet queue
  14. """
  15. # pylint: disable=too-many-arguments
  16. def __init__(self):
  17. self.__jobs = []
  18. self.__groups = {}
  19. self.__executor = ThreadPoolExecutor(1)
  20. self.__queue = Queue()
  21. self.__create_listeners = []
  22. self.__start_listeners = []
  23. self.__progress_listeners = []
  24. self.__finish_listeners = []
  25. self.__remove_listeners = []
  26. spawn_n(self.__run)
  27. def list(self) -> List[Job]:
  28. """
  29. get a list of all jobs including finished ones
  30. :return: list of job objects
  31. """
  32. return self.__jobs
  33. def on_create(self, callback: Callable[[Job], None]) -> None:
  34. """
  35. register a callback that is executed each time a job is created
  36. :param callback: callback function
  37. :return:
  38. """
  39. self.__create_listeners.append(callback)
  40. def on_start(self, callback: Callable[[Job], None]) -> None:
  41. """
  42. register a callback that is executed each time a job is started
  43. :param callback: callback function
  44. :return:
  45. """
  46. self.__start_listeners.append(callback)
  47. def on_progress(self, callback: Callable[[Job], None]) -> None:
  48. """
  49. register a callback that is executed each time a job changes it's progress
  50. :param callback: callback function
  51. :return:
  52. """
  53. self.__progress_listeners.append(callback)
  54. def on_finish(self, callback: Callable[[Job], None]) -> None:
  55. """
  56. register a callback that is executed each time a job is finished
  57. :param callback: callback function
  58. :return:
  59. """
  60. self.__finish_listeners.append(callback)
  61. def on_remove(self, callback: Callable[[Job], None]) -> None:
  62. """
  63. register a callback that is executed each time a job is removed
  64. :param callback: callback function
  65. :return:
  66. """
  67. self.__remove_listeners.append(callback)
  68. def remove(self, identifier):
  69. """
  70. remove a job using its unique identifier
  71. :param identifier: job identifier
  72. :return:
  73. """
  74. for i in range(len(self.__jobs)):
  75. if self.__jobs[i].identifier == identifier:
  76. if self.__jobs[i].finished is not None:
  77. job = self.__jobs[i]
  78. del self.__jobs[i]
  79. for callback in self.__remove_listeners:
  80. callback(job)
  81. return
  82. def run(self,
  83. project: Project,
  84. job_type: str,
  85. name: str,
  86. group: str,
  87. executable: Callable[[Any], Optional[Generator[float, None, None]]],
  88. *args,
  89. progress: Callable[[Any], float] = None,
  90. result: Callable[[Any], None] = None,
  91. result_event: Event = None,
  92. **kwargs) -> Job:
  93. """
  94. add a job to run it in a thread pool
  95. :param project: project the job is associated with
  96. :param job_type: job type
  97. :param name: job name
  98. :param group: job group (raises JobGroupBusyException if there is already a job running
  99. with the same group identifier)
  100. :param executable: function to execute
  101. :param args: arguments for executable
  102. :param progress: is called everytime executable yields a value
  103. :param result: is called as soon as executable returns a value
  104. :param result_event: eventlet event to be called as soon as executable returns a value
  105. :param kwargs: named arguments for executable
  106. :return: job object
  107. """
  108. # create job object
  109. job = Job(project, job_type, name)
  110. # abort if group is busy
  111. # otherwise add to groups dict
  112. if group is not None:
  113. if group in self.__groups:
  114. raise JobGroupBusyException
  115. self.__groups[group] = job
  116. # add to job list
  117. self.__jobs.append(job)
  118. # execute create listeners
  119. for callback in self.__create_listeners:
  120. callback(job)
  121. # add to execution queue
  122. self.__queue.put((group, executable, job, progress, result, result_event, args, kwargs))
  123. # return job object
  124. return job
  125. def __run(self):
  126. while True:
  127. # get execution function and job from queue
  128. group, executable, job, progress_fun, result_fun, result_event, args, kwargs \
  129. = self.__queue.get(block=True)
  130. # execute start listeners
  131. job.started = int(time())
  132. job.updated = int(time())
  133. for callback in self.__start_listeners:
  134. callback(job)
  135. # run function and track progress
  136. try:
  137. future = self.__executor.submit(executable, *args, **kwargs)
  138. generator = tpool.execute(future.result)
  139. result = generator
  140. if isinstance(generator, GeneratorType):
  141. iterator = iter(generator)
  142. try:
  143. while True:
  144. # run until next progress event
  145. future = self.__executor.submit(next, iterator)
  146. progress = tpool.execute(future.result)
  147. # execute progress function
  148. if progress_fun is not None:
  149. if isinstance(progress, tuple):
  150. progress = progress_fun(*progress)
  151. else:
  152. progress = progress_fun(progress)
  153. # execute progress listeners
  154. job.progress = progress
  155. job.updated = int(time())
  156. for callback in self.__progress_listeners:
  157. callback(job)
  158. except StopIteration as stop_iteration_exception:
  159. result = stop_iteration_exception.value
  160. # update progress
  161. job.progress = 1
  162. job.updated = int(time())
  163. for callback in self.__progress_listeners:
  164. callback(job)
  165. # execute result function
  166. if result_fun is not None:
  167. if isinstance(result, tuple):
  168. result_fun(*result)
  169. else:
  170. result_fun(result)
  171. # execute event
  172. if result_event is not None:
  173. result_event.send(result)
  174. # save exceptions to show in ui
  175. except Exception as exception:
  176. job.exception = f'{type(exception).__name__} ({str(exception)})'
  177. # remove from group dict
  178. if group is not None:
  179. del self.__groups[group]
  180. # finish job
  181. job.finished = int(time())
  182. job.updated = int(time())
  183. for callback in self.__finish_listeners:
  184. callback(job)