JobRunner.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. import eventlet
  2. from collections import defaultdict
  3. from concurrent.futures import ThreadPoolExecutor
  4. from time import time
  5. from types import GeneratorType
  6. from typing import Any
  7. from typing import Callable
  8. from typing import Generator
  9. from typing import List
  10. from typing import Optional
  11. # from eventlet import spawn, spawn_n, tpool
  12. from eventlet.event import Event
  13. from pycs.database.Project import Project
  14. from pycs.frontend.notifications.NotificationManager import NotificationManager
  15. from pycs.jobs.Job import Job
  16. from pycs.jobs.JobGroupBusyException import JobGroupBusyException
  17. from pycs.util import Callbacks
  18. from pycs.util import GreenWorker
  19. from pycs.util import Singleton
  20. class JobRunner(GreenWorker, Singleton):
  21. """
  22. run jobs in a thread pool, but track progress and process results in eventlet queue
  23. """
  24. """ Because it is a singleton (as described here:
  25. https://www.python.org/download/releases/2.2/descrintro/#__new__),
  26. __init__ is called every time the singleton object
  27. is requested. Hence, we do the actual initialization
  28. in init()!
  29. """
  30. def __init__(self):
  31. pass
  32. def init(self):
  33. super().__init__()
  34. super().init()
  35. self.__jobs = []
  36. self.__groups = {}
  37. self.__executor = ThreadPoolExecutor(1)
  38. self.__listeners = defaultdict(Callbacks)
  39. self.init_notifications()
  40. def init_notifications(self):
  41. nm = NotificationManager()
  42. self.on("create", nm.create("job"))
  43. self.on("start", nm.edit("job"))
  44. self.on("progress", nm.edit("job"))
  45. self.on("finish", nm.edit("job"))
  46. self.on("remove", nm.remove("job"))
  47. @classmethod
  48. def run(cls, *args, **kwargs):
  49. cls()._run(*args, **kwargs)
  50. @classmethod
  51. def remove(cls, *args, **kwargs):
  52. cls()._remove(*args, **kwargs)
  53. def _remove(self, id):
  54. """
  55. remove a job using its unique id
  56. :param id: job id
  57. :return:
  58. """
  59. for i in range(len(self.__jobs)):
  60. if self.__jobs[i].id == id:
  61. if self.__jobs[i].finished is not None:
  62. job = self.__jobs[i]
  63. del self.__jobs[i]
  64. self.__listeners["remove"](job)
  65. return
  66. def _run(self,
  67. project: Project,
  68. job_type: str,
  69. name: str,
  70. group: str,
  71. executable: Callable[[Any], Optional[Generator[float, None, None]]],
  72. *args,
  73. progress: Callable[[Any], float] = None,
  74. result: Callable[[Any], None] = None,
  75. result_event: Event = None,
  76. **kwargs) -> Job:
  77. """
  78. add a job to run it in a thread pool
  79. :param project: project the job is associated with
  80. :param job_type: job type
  81. :param name: job name
  82. :param group: job group (raises JobGroupBusyException if there is already a job running
  83. with the same group id)
  84. :param executable: function to execute
  85. :param args: arguments for executable
  86. :param progress: is called everytime executable yields a value
  87. :param result: is called as soon as executable returns a value
  88. :param result_event: eventlet event to be called as soon as executable returns a value
  89. :param kwargs: named arguments for executable
  90. :return: job object
  91. """
  92. # create job object
  93. job = Job(project, job_type, name)
  94. # abort if group is busy
  95. # otherwise add to groups dict
  96. if group is not None:
  97. if group in self.__groups:
  98. raise JobGroupBusyException
  99. self.__groups[group] = job
  100. # add to job list
  101. self.__jobs.append(job)
  102. # execute create listeners
  103. self.__listeners["create"](job)
  104. # add to execution queue
  105. # self.__queue.put((group, executable, job, progress, result, result_event, args, kwargs))
  106. self.queue.put((group, executable, job, progress, result, result_event, args, kwargs))
  107. # return job object
  108. return job
  109. def list(self) -> List[Job]:
  110. """
  111. get a list of all jobs including finished ones
  112. :return: list of job objects
  113. """
  114. return self.__jobs
  115. def on(self, operation, callback: Callable[[Job], None]) -> None:
  116. """
  117. register a callback that is executed each time a operation
  118. (create, start, progress, finish, or remove) on a job is performed
  119. :param callback: callback function
  120. :return:
  121. """
  122. assert operation in ["create", "start", "progress", "finish", "remove"], \
  123. f"invalid operation: {operation}"
  124. self.__listeners[operation].append(callback)
  125. def _job_started(self, job):
  126. job.start()
  127. self.__listeners["start"](job)
  128. def _job_progress(self, job, progress):
  129. job.update(progress=progress)
  130. self.__listeners["progress"](job)
  131. def _job_finished(self, job):
  132. job.finish()
  133. self.__listeners["finish"](job)
  134. def process_iterator(self, generator, job, progress_fun):
  135. try:
  136. iterator = iter(generator)
  137. while True:
  138. # run until next progress event
  139. future = self.__executor.submit(next, iterator)
  140. progress = eventlet.tpool.execute(future.result)
  141. # execute progress function
  142. if progress_fun is not None:
  143. if isinstance(progress, tuple):
  144. progress = progress_fun(*progress)
  145. else:
  146. progress = progress_fun(progress)
  147. self._job_progress(job, progress=progress)
  148. except StopIteration as stop_iteration_exception:
  149. return stop_iteration_exception.value
  150. def start_work(self, group, executable, job, progress_fun, result_fun, result_event, args, kwargs):
  151. """ started as coroutine. Starts other tasks in separate green threads. """
  152. # execute start listeners
  153. self._job_started(job)
  154. try:
  155. # result = generator = executable(*args, **kwargs)
  156. future = self.__executor.submit(executable, *args, **kwargs)
  157. result = generator = eventlet.tpool.execute(future.result)
  158. if isinstance(generator, GeneratorType):
  159. result = self.process_iterator(generator, job, progress_fun)
  160. self._job_progress(job, progress=1)
  161. # execute result function
  162. if result_fun is not None:
  163. if isinstance(result, tuple):
  164. result_fun(*result)
  165. else:
  166. result_fun(result)
  167. # execute event
  168. if result_event is not None:
  169. result_event.send(result)
  170. # save exceptions to show in ui
  171. except Exception as e:
  172. import traceback
  173. traceback.print_exc()
  174. job.exception = f'{type(e).__name__} ({str(e)})'
  175. # remove from group dict
  176. if group is not None:
  177. del self.__groups[group]
  178. self._job_finished(job)