6
0

JobRunner.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. from concurrent.futures import ThreadPoolExecutor
  2. from time import time
  3. from types import GeneratorType
  4. from typing import Callable, List, Generator, Optional, Any
  5. import eventlet
  6. # from eventlet import spawn, spawn_n, tpool
  7. from eventlet.event import Event
  8. from pycs.database.Project import Project
  9. from pycs.jobs.Job import Job
  10. from pycs.jobs.JobGroupBusyException import JobGroupBusyException
  11. from pycs.util.green_worker import GreenWorker
  12. class JobRunner(GreenWorker):
  13. """
  14. run jobs in a thread pool, but track progress and process results in eventlet queue
  15. """
  16. # pylint: disable=too-many-arguments
  17. def __init__(self):
  18. super().__init__()
  19. self.__jobs = []
  20. self.__groups = {}
  21. self.__executor = ThreadPoolExecutor(1)
  22. self.__create_listeners = []
  23. self.__start_listeners = []
  24. self.__progress_listeners = []
  25. self.__finish_listeners = []
  26. self.__remove_listeners = []
  27. def list(self) -> List[Job]:
  28. """
  29. get a list of all jobs including finished ones
  30. :return: list of job objects
  31. """
  32. return self.__jobs
  33. def on_create(self, callback: Callable[[Job], None]) -> None:
  34. """
  35. register a callback that is executed each time a job is created
  36. :param callback: callback function
  37. :return:
  38. """
  39. self.__create_listeners.append(callback)
  40. def on_start(self, callback: Callable[[Job], None]) -> None:
  41. """
  42. register a callback that is executed each time a job is started
  43. :param callback: callback function
  44. :return:
  45. """
  46. self.__start_listeners.append(callback)
  47. def on_progress(self, callback: Callable[[Job], None]) -> None:
  48. """
  49. register a callback that is executed each time a job changes it's progress
  50. :param callback: callback function
  51. :return:
  52. """
  53. self.__progress_listeners.append(callback)
  54. def on_finish(self, callback: Callable[[Job], None]) -> None:
  55. """
  56. register a callback that is executed each time a job is finished
  57. :param callback: callback function
  58. :return:
  59. """
  60. self.__finish_listeners.append(callback)
  61. def on_remove(self, callback: Callable[[Job], None]) -> None:
  62. """
  63. register a callback that is executed each time a job is removed
  64. :param callback: callback function
  65. :return:
  66. """
  67. self.__remove_listeners.append(callback)
  68. def remove(self, id):
  69. """
  70. remove a job using its unique id
  71. :param id: job id
  72. :return:
  73. """
  74. for i in range(len(self.__jobs)):
  75. if self.__jobs[i].id == id:
  76. if self.__jobs[i].finished is not None:
  77. job = self.__jobs[i]
  78. del self.__jobs[i]
  79. for callback in self.__remove_listeners:
  80. callback(job)
  81. return
  82. def run(self,
  83. project: Project,
  84. job_type: str,
  85. name: str,
  86. group: str,
  87. executable: Callable[[Any], Optional[Generator[float, None, None]]],
  88. *args,
  89. progress: Callable[[Any], float] = None,
  90. result: Callable[[Any], None] = None,
  91. result_event: Event = None,
  92. **kwargs) -> Job:
  93. """
  94. add a job to run it in a thread pool
  95. :param project: project the job is associated with
  96. :param job_type: job type
  97. :param name: job name
  98. :param group: job group (raises JobGroupBusyException if there is already a job running
  99. with the same group id)
  100. :param executable: function to execute
  101. :param args: arguments for executable
  102. :param progress: is called everytime executable yields a value
  103. :param result: is called as soon as executable returns a value
  104. :param result_event: eventlet event to be called as soon as executable returns a value
  105. :param kwargs: named arguments for executable
  106. :return: job object
  107. """
  108. # create job object
  109. job = Job(project, job_type, name)
  110. # abort if group is busy
  111. # otherwise add to groups dict
  112. if group is not None:
  113. if group in self.__groups:
  114. raise JobGroupBusyException
  115. self.__groups[group] = job
  116. # add to job list
  117. self.__jobs.append(job)
  118. # execute create listeners
  119. for callback in self.__create_listeners:
  120. callback(job)
  121. # add to execution queue
  122. # self.__queue.put((group, executable, job, progress, result, result_event, args, kwargs))
  123. self.queue.put((group, executable, job, progress, result, result_event, args, kwargs))
  124. # return job object
  125. return job
  126. def _job_started(self, job):
  127. job.start()
  128. for callback in self.__start_listeners:
  129. callback(job)
  130. def _job_progress(self, job, progress):
  131. job.update(progress=progress)
  132. for callback in self.__progress_listeners:
  133. callback(job)
  134. def _job_finished(self, job):
  135. job.finish()
  136. for callback in self.__finish_listeners:
  137. callback(job)
  138. def process_iterator(self, generator, job, progress_fun):
  139. try:
  140. iterator = iter(generator)
  141. while True:
  142. # run until next progress event
  143. future = self.__executor.submit(next, iterator)
  144. progress = eventlet.tpool.execute(future.result)
  145. # execute progress function
  146. if progress_fun is not None:
  147. if isinstance(progress, tuple):
  148. progress = progress_fun(*progress)
  149. else:
  150. progress = progress_fun(progress)
  151. self._job_progress(job, progress=progress)
  152. except StopIteration as stop_iteration_exception:
  153. return stop_iteration_exception.value
  154. def start_work(self, group, executable, job, progress_fun, result_fun, result_event, args, kwargs):
  155. """ started as coroutine. Starts other tasks in separate green threads. """
  156. # execute start listeners
  157. self._job_started(job)
  158. try:
  159. # result = generator = executable(*args, **kwargs)
  160. future = self.__executor.submit(executable, *args, **kwargs)
  161. result = generator = eventlet.tpool.execute(future.result)
  162. if isinstance(generator, GeneratorType):
  163. result = self.process_iterator(generator, job, progress_fun)
  164. self._job_progress(job, progress=1)
  165. # execute result function
  166. if result_fun is not None:
  167. if isinstance(result, tuple):
  168. result_fun(*result)
  169. else:
  170. result_fun(result)
  171. # execute event
  172. if result_event is not None:
  173. result_event.send(result)
  174. # save exceptions to show in ui
  175. except Exception as e:
  176. import traceback
  177. traceback.print_exc()
  178. job.exception = f'{type(e).__name__} ({str(e)})'
  179. # remove from group dict
  180. if group is not None:
  181. del self.__groups[group]
  182. self._job_finished(job)
  183. def __run(self):
  184. while True:
  185. # get execution function and job from queue
  186. group, executable, job, progress_fun, result_fun, result_event, args, kwargs \
  187. = self.__queue.get(block=True)
  188. # execute start listeners
  189. job.started = int(time())
  190. job.updated = int(time())
  191. for callback in self.__start_listeners:
  192. callback(job)
  193. # run function and track progress
  194. try:
  195. # result = generator = executable(*args, **kwargs)
  196. future = self.__executor.submit(executable, *args, **kwargs)
  197. result = generator = tpool.execute(future.result)
  198. if isinstance(generator, GeneratorType):
  199. iterator = iter(generator)
  200. try:
  201. while True:
  202. # run until next progress event
  203. future = self.__executor.submit(next, iterator)
  204. progress = tpool.execute(future.result)
  205. # progress = next(iterator)
  206. # execute progress function
  207. if progress_fun is not None:
  208. if isinstance(progress, tuple):
  209. progress = progress_fun(*progress)
  210. else:
  211. progress = progress_fun(progress)
  212. # execute progress listeners
  213. job.progress = progress
  214. job.updated = int(time())
  215. for callback in self.__progress_listeners:
  216. callback(job)
  217. except StopIteration as stop_iteration_exception:
  218. result = stop_iteration_exception.value
  219. # update progress
  220. job.progress = 1
  221. job.updated = int(time())
  222. for callback in self.__progress_listeners:
  223. callback(job)
  224. # execute result function
  225. if result_fun is not None:
  226. if isinstance(result, tuple):
  227. result_fun(*result)
  228. else:
  229. result_fun(result)
  230. # execute event
  231. if result_event is not None:
  232. result_event.send(result)
  233. # save exceptions to show in ui
  234. except Exception as exception:
  235. import traceback
  236. traceback.print_exc()
  237. job.exception = f'{type(exception).__name__} ({str(exception)})'
  238. # remove from group dict
  239. if group is not None:
  240. del self.__groups[group]
  241. # finish job
  242. job.finished = int(time())
  243. job.updated = int(time())
  244. for callback in self.__finish_listeners:
  245. callback(job)