6
0

JobRunner.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. # from concurrent.futures import ThreadPoolExecutor
  2. from time import time
  3. from types import GeneratorType
  4. from typing import Callable, List, Generator, Optional, Any
  5. # import eventlet
  6. # from eventlet import spawn, spawn_n, tpool
  7. from eventlet.event import Event
  8. from pycs.database.Project import Project
  9. from pycs.jobs.Job import Job
  10. from pycs.jobs.JobGroupBusyException import JobGroupBusyException
  11. from pycs.util.green_worker import GreenWorker
  12. class JobRunner(GreenWorker):
  13. """
  14. run jobs in a thread pool, but track progress and process results in eventlet queue
  15. """
  16. # pylint: disable=too-many-arguments
  17. def __init__(self):
  18. super().__init__()
  19. self.__jobs = []
  20. self.__groups = {}
  21. # self.__executor = ThreadPoolExecutor(1)
  22. self.__create_listeners = []
  23. self.__start_listeners = []
  24. self.__progress_listeners = []
  25. self.__finish_listeners = []
  26. self.__remove_listeners = []
  27. def list(self) -> List[Job]:
  28. """
  29. get a list of all jobs including finished ones
  30. :return: list of job objects
  31. """
  32. return self.__jobs
  33. def on_create(self, callback: Callable[[Job], None]) -> None:
  34. """
  35. register a callback that is executed each time a job is created
  36. :param callback: callback function
  37. :return:
  38. """
  39. self.__create_listeners.append(callback)
  40. def on_start(self, callback: Callable[[Job], None]) -> None:
  41. """
  42. register a callback that is executed each time a job is started
  43. :param callback: callback function
  44. :return:
  45. """
  46. self.__start_listeners.append(callback)
  47. def on_progress(self, callback: Callable[[Job], None]) -> None:
  48. """
  49. register a callback that is executed each time a job changes it's progress
  50. :param callback: callback function
  51. :return:
  52. """
  53. self.__progress_listeners.append(callback)
  54. def on_finish(self, callback: Callable[[Job], None]) -> None:
  55. """
  56. register a callback that is executed each time a job is finished
  57. :param callback: callback function
  58. :return:
  59. """
  60. self.__finish_listeners.append(callback)
  61. def on_remove(self, callback: Callable[[Job], None]) -> None:
  62. """
  63. register a callback that is executed each time a job is removed
  64. :param callback: callback function
  65. :return:
  66. """
  67. self.__remove_listeners.append(callback)
  68. def remove(self, id):
  69. """
  70. remove a job using its unique id
  71. :param id: job id
  72. :return:
  73. """
  74. for i in range(len(self.__jobs)):
  75. if self.__jobs[i].id == id:
  76. if self.__jobs[i].finished is not None:
  77. job = self.__jobs[i]
  78. del self.__jobs[i]
  79. for callback in self.__remove_listeners:
  80. callback(job)
  81. return
  82. def run(self,
  83. project: Project,
  84. job_type: str,
  85. name: str,
  86. group: str,
  87. executable: Callable[[Any], Optional[Generator[float, None, None]]],
  88. *args,
  89. progress: Callable[[Any], float] = None,
  90. result: Callable[[Any], None] = None,
  91. result_event: Event = None,
  92. **kwargs) -> Job:
  93. """
  94. add a job to run it in a thread pool
  95. :param project: project the job is associated with
  96. :param job_type: job type
  97. :param name: job name
  98. :param group: job group (raises JobGroupBusyException if there is already a job running
  99. with the same group id)
  100. :param executable: function to execute
  101. :param args: arguments for executable
  102. :param progress: is called everytime executable yields a value
  103. :param result: is called as soon as executable returns a value
  104. :param result_event: eventlet event to be called as soon as executable returns a value
  105. :param kwargs: named arguments for executable
  106. :return: job object
  107. """
  108. # create job object
  109. job = Job(project, job_type, name)
  110. # abort if group is busy
  111. # otherwise add to groups dict
  112. if group is not None:
  113. if group in self.__groups:
  114. raise JobGroupBusyException
  115. self.__groups[group] = job
  116. # add to job list
  117. self.__jobs.append(job)
  118. # execute create listeners
  119. for callback in self.__create_listeners:
  120. callback(job)
  121. # add to execution queue
  122. # self.__queue.put((group, executable, job, progress, result, result_event, args, kwargs))
  123. self.queue.put((group, executable, job, progress, result, result_event, args, kwargs))
  124. # return job object
  125. return job
  126. def process_iterator(self, iterator, job, progress_fun):
  127. try:
  128. iterator = iter(generator)
  129. while True:
  130. # run until next progress event
  131. # future = self.__executor.submit(next, iterator)
  132. # progress = tpool.execute(future.result)
  133. # progress = future.result()
  134. progress = next(iterator)
  135. # execute progress function
  136. if progress_fun is not None:
  137. if isinstance(progress, tuple):
  138. progress = progress_fun(*progress)
  139. else:
  140. progress = progress_fun(progress)
  141. # execute progress listeners
  142. job.progress = progress
  143. job.updated = int(time())
  144. for callback in self.__progress_listeners:
  145. callback(job)
  146. except StopIteration as stop_iteration_exception:
  147. return stop_iteration_exception.value
  148. # done in a separate green thread
  149. def work(self, group, executable, job, progress_fun, result_fun, result_event, args, kwargs):
  150. # execute start listeners
  151. job.started = int(time())
  152. job.updated = int(time())
  153. for callback in self.__start_listeners:
  154. callback(job)
  155. try:
  156. result = generator = executable(*args, **kwargs)
  157. if isinstance(generator, GeneratorType):
  158. result = self.process_iterator(iterator, job, progress_fun)
  159. # update progress
  160. job.progress = 1
  161. job.updated = int(time())
  162. for callback in self.__progress_listeners:
  163. callback(job)
  164. # execute result function
  165. if result_fun is not None:
  166. if isinstance(result, tuple):
  167. result_fun(*result)
  168. else:
  169. result_fun(result)
  170. # execute event
  171. if result_event is not None:
  172. result_event.send(result)
  173. # save exceptions to show in ui
  174. except Exception as e:
  175. import traceback
  176. traceback.print_exc()
  177. job.exception = f'{type(e).__name__} ({str(e)})'
  178. # remove from group dict
  179. if group is not None:
  180. del self.__groups[group]
  181. # finish job
  182. job.finished = int(time())
  183. job.updated = int(time())
  184. for callback in self.__finish_listeners:
  185. callback(job)
  186. def __run(self):
  187. while True:
  188. # get execution function and job from queue
  189. group, executable, job, progress_fun, result_fun, result_event, args, kwargs \
  190. = self.__queue.get(block=True)
  191. # execute start listeners
  192. job.started = int(time())
  193. job.updated = int(time())
  194. for callback in self.__start_listeners:
  195. callback(job)
  196. # run function and track progress
  197. try:
  198. # result = generator = executable(*args, **kwargs)
  199. future = self.__executor.submit(executable, *args, **kwargs)
  200. result = generator = tpool.execute(future.result)
  201. if isinstance(generator, GeneratorType):
  202. iterator = iter(generator)
  203. try:
  204. while True:
  205. # run until next progress event
  206. future = self.__executor.submit(next, iterator)
  207. progress = tpool.execute(future.result)
  208. # progress = next(iterator)
  209. # execute progress function
  210. if progress_fun is not None:
  211. if isinstance(progress, tuple):
  212. progress = progress_fun(*progress)
  213. else:
  214. progress = progress_fun(progress)
  215. # execute progress listeners
  216. job.progress = progress
  217. job.updated = int(time())
  218. for callback in self.__progress_listeners:
  219. callback(job)
  220. except StopIteration as stop_iteration_exception:
  221. result = stop_iteration_exception.value
  222. # update progress
  223. job.progress = 1
  224. job.updated = int(time())
  225. for callback in self.__progress_listeners:
  226. callback(job)
  227. # execute result function
  228. if result_fun is not None:
  229. if isinstance(result, tuple):
  230. result_fun(*result)
  231. else:
  232. result_fun(result)
  233. # execute event
  234. if result_event is not None:
  235. result_event.send(result)
  236. # save exceptions to show in ui
  237. except Exception as e:
  238. import traceback
  239. traceback.print_exc()
  240. job.exception = f'{type(e).__name__} ({str(e)})'
  241. # remove from group dict
  242. if group is not None:
  243. del self.__groups[group]
  244. # finish job
  245. job.finished = int(time())
  246. job.updated = int(time())
  247. for callback in self.__finish_listeners:
  248. callback(job)