JobRunner.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. import eventlet
  2. from collections import defaultdict
  3. from concurrent.futures import ThreadPoolExecutor
  4. from time import time
  5. from types import GeneratorType
  6. from typing import Any
  7. from typing import Callable
  8. from typing import Generator
  9. from typing import List
  10. from typing import Optional
  11. # from eventlet import spawn, spawn_n, tpool
  12. from eventlet.event import Event
  13. from pycs.database.Project import Project
  14. from pycs.jobs.Job import Job
  15. from pycs.frontend.notifications.NotificationManager import NotificationManager
  16. from pycs.jobs.JobGroupBusyException import JobGroupBusyException
  17. from pycs.util import Callbacks
  18. from pycs.util import GreenWorker
  19. from pycs.util import Singleton
  20. class JobRunner(GreenWorker, Singleton):
  21. """
  22. run jobs in a thread pool, but track progress and process results in eventlet queue
  23. """
  24. # pylint: disable=too-many-arguments
  25. """ Because it is a singleton (as described here:
  26. https://www.python.org/download/releases/2.2/descrintro/#__new__),
  27. __init__ is called every time the singleton object
  28. is requested. Hence, we do the actual initialization
  29. in init()!
  30. """
  31. def __init__(self):
  32. pass
  33. def init(self):
  34. super().__init__()
  35. super().init()
  36. self.__jobs = []
  37. self.__groups = {}
  38. self.__executor = ThreadPoolExecutor(1)
  39. self.__listeners = defaultdict(Callbacks)
  40. self.init_notifications()
  41. def init_notifications(self):
  42. nm = NotificationManager()
  43. self.on("create", nm.create("job"))
  44. self.on("start", nm.edit("job"))
  45. self.on("progress", nm.edit("job"))
  46. self.on("finish", nm.edit("job"))
  47. self.on("remove", nm.remove("job"))
  48. @classmethod
  49. def Run(cls, *args, **kwargs):
  50. cls().run(*args, **kwargs)
  51. @classmethod
  52. def Remove(cls, *args, **kwargs):
  53. cls().remove(*args, **kwargs)
  54. def remove(self, id):
  55. """
  56. remove a job using its unique id
  57. :param id: job id
  58. :return:
  59. """
  60. for i in range(len(self.__jobs)):
  61. if self.__jobs[i].id == id:
  62. if self.__jobs[i].finished is not None:
  63. job = self.__jobs[i]
  64. del self.__jobs[i]
  65. for callback in self.__remove_listeners:
  66. callback(job)
  67. return
  68. def run(self,
  69. project: Project,
  70. job_type: str,
  71. name: str,
  72. group: str,
  73. executable: Callable[[Any], Optional[Generator[float, None, None]]],
  74. *args,
  75. progress: Callable[[Any], float] = None,
  76. result: Callable[[Any], None] = None,
  77. result_event: Event = None,
  78. **kwargs) -> Job:
  79. """
  80. add a job to run it in a thread pool
  81. :param project: project the job is associated with
  82. :param job_type: job type
  83. :param name: job name
  84. :param group: job group (raises JobGroupBusyException if there is already a job running
  85. with the same group id)
  86. :param executable: function to execute
  87. :param args: arguments for executable
  88. :param progress: is called everytime executable yields a value
  89. :param result: is called as soon as executable returns a value
  90. :param result_event: eventlet event to be called as soon as executable returns a value
  91. :param kwargs: named arguments for executable
  92. :return: job object
  93. """
  94. # create job object
  95. job = Job(project, job_type, name)
  96. # abort if group is busy
  97. # otherwise add to groups dict
  98. if group is not None:
  99. if group in self.__groups:
  100. raise JobGroupBusyException
  101. self.__groups[group] = job
  102. # add to job list
  103. self.__jobs.append(job)
  104. # execute create listeners
  105. self.__listeners["create"](job)
  106. # add to execution queue
  107. # self.__queue.put((group, executable, job, progress, result, result_event, args, kwargs))
  108. self.queue.put((group, executable, job, progress, result, result_event, args, kwargs))
  109. # return job object
  110. return job
  111. def list(self) -> List[Job]:
  112. """
  113. get a list of all jobs including finished ones
  114. :return: list of job objects
  115. """
  116. return self.__jobs
  117. def on(self, operation, callback: Callable[[Job], None]) -> None:
  118. """
  119. register a callback that is executed each time a operation
  120. (create, start, progress, finish, or remove) on a job is performed
  121. :param callback: callback function
  122. :return:
  123. """
  124. assert operation in ["create", "start", "progress", "finish", "remove"], \
  125. f"invalid operation: {operation}"
  126. self.__listeners[operation].append(callback)
  127. def _job_started(self, job):
  128. job.start()
  129. self.__listeners["start"](job)
  130. def _job_progress(self, job, progress):
  131. job.update(progress=progress)
  132. self.__listeners["progress"](job)
  133. def _job_finished(self, job):
  134. job.finish()
  135. self.__listeners["finish"](job)
  136. def process_iterator(self, generator, job, progress_fun):
  137. try:
  138. iterator = iter(generator)
  139. while True:
  140. # run until next progress event
  141. future = self.__executor.submit(next, iterator)
  142. progress = eventlet.tpool.execute(future.result)
  143. # execute progress function
  144. if progress_fun is not None:
  145. if isinstance(progress, tuple):
  146. progress = progress_fun(*progress)
  147. else:
  148. progress = progress_fun(progress)
  149. self._job_progress(job, progress=progress)
  150. except StopIteration as stop_iteration_exception:
  151. return stop_iteration_exception.value
  152. def start_work(self, group, executable, job, progress_fun, result_fun, result_event, args, kwargs):
  153. """ started as coroutine. Starts other tasks in separate green threads. """
  154. # execute start listeners
  155. self._job_started(job)
  156. try:
  157. # result = generator = executable(*args, **kwargs)
  158. future = self.__executor.submit(executable, *args, **kwargs)
  159. result = generator = eventlet.tpool.execute(future.result)
  160. if isinstance(generator, GeneratorType):
  161. result = self.process_iterator(generator, job, progress_fun)
  162. self._job_progress(job, progress=1)
  163. # execute result function
  164. if result_fun is not None:
  165. if isinstance(result, tuple):
  166. result_fun(*result)
  167. else:
  168. result_fun(result)
  169. # execute event
  170. if result_event is not None:
  171. result_event.send(result)
  172. # save exceptions to show in ui
  173. except Exception as e:
  174. import traceback
  175. traceback.print_exc()
  176. job.exception = f'{type(e).__name__} ({str(e)})'
  177. # remove from group dict
  178. if group is not None:
  179. del self.__groups[group]
  180. self._job_finished(job)