JobRunner.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. import traceback
  2. from concurrent.futures import ThreadPoolExecutor
  3. from time import time
  4. from types import GeneratorType
  5. from typing import Any
  6. from typing import Callable
  7. from typing import Generator
  8. from typing import List
  9. from typing import Optional
  10. from eventlet import spawn_n
  11. from eventlet import tpool
  12. from eventlet.event import Event
  13. from eventlet.queue import Queue
  14. from pycs.database.Project import Project
  15. from pycs.jobs.Job import Job
  16. from pycs.jobs.JobGroupBusyException import JobGroupBusyException
  17. class JobRunner:
  18. """
  19. run jobs in a thread pool, but track progress and process results in eventlet queue
  20. """
  21. # pylint: disable=too-many-arguments
  22. def __init__(self):
  23. self.__jobs = []
  24. self.__groups = {}
  25. self.__executor = ThreadPoolExecutor(1)
  26. self.__queue = Queue()
  27. self.__create_listeners = []
  28. self.__start_listeners = []
  29. self.__progress_listeners = []
  30. self.__finish_listeners = []
  31. self.__remove_listeners = []
  32. spawn_n(self.__run)
  33. def list(self) -> List[Job]:
  34. """
  35. get a list of all jobs including finished ones
  36. :return: list of job objects
  37. """
  38. return self.__jobs
  39. def on_create(self, callback: Callable[[Job], None]) -> None:
  40. """
  41. register a callback that is executed each time a job is created
  42. :param callback: callback function
  43. :return:
  44. """
  45. self.__create_listeners.append(callback)
  46. def on_start(self, callback: Callable[[Job], None]) -> None:
  47. """
  48. register a callback that is executed each time a job is started
  49. :param callback: callback function
  50. :return:
  51. """
  52. self.__start_listeners.append(callback)
  53. def on_progress(self, callback: Callable[[Job], None]) -> None:
  54. """
  55. register a callback that is executed each time a job changes it's progress
  56. :param callback: callback function
  57. :return:
  58. """
  59. self.__progress_listeners.append(callback)
  60. def on_finish(self, callback: Callable[[Job], None]) -> None:
  61. """
  62. register a callback that is executed each time a job is finished
  63. :param callback: callback function
  64. :return:
  65. """
  66. self.__finish_listeners.append(callback)
  67. def on_remove(self, callback: Callable[[Job], None]) -> None:
  68. """
  69. register a callback that is executed each time a job is removed
  70. :param callback: callback function
  71. :return:
  72. """
  73. self.__remove_listeners.append(callback)
  74. def remove(self, identifier):
  75. """
  76. remove a job using its unique identifier
  77. :param identifier: job identifier
  78. :return:
  79. """
  80. for i in range(len(self.__jobs)):
  81. if self.__jobs[i].identifier == identifier:
  82. if self.__jobs[i].finished is not None:
  83. job = self.__jobs[i]
  84. del self.__jobs[i]
  85. for callback in self.__remove_listeners:
  86. callback(job)
  87. return
  88. def run(self,
  89. project: Project,
  90. job_type: str,
  91. name: str,
  92. group: str,
  93. executable: Callable[[Any], Optional[Generator[float, None, None]]],
  94. *args,
  95. progress: Callable[[Any], float] = None,
  96. result: Callable[[Any], None] = None,
  97. result_event: Event = None,
  98. **kwargs) -> Job:
  99. """
  100. add a job to run it in a thread pool
  101. :param project: project the job is associated with
  102. :param job_type: job type
  103. :param name: job name
  104. :param group: job group (raises JobGroupBusyException if there is already a job running
  105. with the same group identifier)
  106. :param executable: function to execute
  107. :param args: arguments for executable
  108. :param progress: is called everytime executable yields a value
  109. :param result: is called as soon as executable returns a value
  110. :param result_event: eventlet event to be called as soon as executable returns a value
  111. :param kwargs: named arguments for executable
  112. :return: job object
  113. """
  114. # create job object
  115. job = Job(project, job_type, name)
  116. # abort if group is busy
  117. # otherwise add to groups dict
  118. if group is not None:
  119. if group in self.__groups:
  120. raise JobGroupBusyException
  121. self.__groups[group] = job
  122. # add to job list
  123. self.__jobs.append(job)
  124. # execute create listeners
  125. for callback in self.__create_listeners:
  126. callback(job)
  127. # add to execution queue
  128. self.__queue.put((group, executable, job, progress, result, result_event, args, kwargs))
  129. # return job object
  130. return job
  131. def __run(self):
  132. while True:
  133. # get execution function and job from queue
  134. group, executable, job, progress_fun, result_fun, result_event, args, kwargs \
  135. = self.__queue.get(block=True)
  136. # execute start listeners
  137. job.started = int(time())
  138. job.updated = int(time())
  139. for callback in self.__start_listeners:
  140. callback(job)
  141. # run function and track progress
  142. try:
  143. future = self.__executor.submit(executable, *args, **kwargs)
  144. generator = tpool.execute(future.result)
  145. result = generator
  146. if isinstance(generator, GeneratorType):
  147. iterator = iter(generator)
  148. try:
  149. while True:
  150. # run until next progress event
  151. future = self.__executor.submit(next, iterator)
  152. progress = tpool.execute(future.result)
  153. # execute progress function
  154. if progress_fun is not None:
  155. if isinstance(progress, tuple):
  156. progress = progress_fun(*progress)
  157. else:
  158. progress = progress_fun(progress)
  159. # execute progress listeners
  160. job.progress = progress
  161. job.updated = int(time())
  162. for callback in self.__progress_listeners:
  163. callback(job)
  164. except StopIteration as stop_iteration_exception:
  165. result = stop_iteration_exception.value
  166. # update progress
  167. job.progress = 1
  168. job.updated = int(time())
  169. for callback in self.__progress_listeners:
  170. callback(job)
  171. # execute result function
  172. if result_fun is not None:
  173. if isinstance(result, tuple):
  174. result_fun(*result)
  175. else:
  176. result_fun(result)
  177. # execute event
  178. if result_event is not None:
  179. result_event.send(result)
  180. # save exceptions to show in ui
  181. except Exception as exception:
  182. traceback.print_exc()
  183. job.exception = f'{type(exception).__name__} ({str(exception)})'
  184. # remove from group dict
  185. if group is not None:
  186. del self.__groups[group]
  187. # finish job
  188. job.finished = int(time())
  189. job.updated = int(time())
  190. for callback in self.__finish_listeners:
  191. callback(job)