JobRunner.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. from time import time
  2. from types import GeneratorType
  3. from typing import Callable, List, Generator, Optional, Any
  4. from eventlet import tpool, spawn_n
  5. from eventlet.event import Event
  6. from eventlet.queue import Queue
  7. from pycs.database.Project import Project
  8. from pycs.jobs.Job import Job
  9. from pycs.jobs.JobGroupBusyException import JobGroupBusyException
  10. class JobRunner:
  11. """
  12. run jobs in a thread pool, but track progress and process results in eventlet queue
  13. """
  14. # pylint: disable=too-many-arguments
  15. def __init__(self):
  16. self.__jobs = []
  17. self.__groups = {}
  18. self.__queue = Queue()
  19. self.__create_listeners = []
  20. self.__start_listeners = []
  21. self.__progress_listeners = []
  22. self.__finish_listeners = []
  23. self.__remove_listeners = []
  24. spawn_n(self.__run)
  25. def list(self) -> List[Job]:
  26. """
  27. get a list of all jobs including finished ones
  28. :return: list of job objects
  29. """
  30. return self.__jobs
  31. def on_create(self, callback: Callable[[Job], None]) -> None:
  32. """
  33. register a callback that is executed each time a job is created
  34. :param callback: callback function
  35. :return:
  36. """
  37. self.__create_listeners.append(callback)
  38. def on_start(self, callback: Callable[[Job], None]) -> None:
  39. """
  40. register a callback that is executed each time a job is started
  41. :param callback: callback function
  42. :return:
  43. """
  44. self.__start_listeners.append(callback)
  45. def on_progress(self, callback: Callable[[Job], None]) -> None:
  46. """
  47. register a callback that is executed each time a job changes it's progress
  48. :param callback: callback function
  49. :return:
  50. """
  51. self.__progress_listeners.append(callback)
  52. def on_finish(self, callback: Callable[[Job], None]) -> None:
  53. """
  54. register a callback that is executed each time a job is finished
  55. :param callback: callback function
  56. :return:
  57. """
  58. self.__finish_listeners.append(callback)
  59. def on_remove(self, callback: Callable[[Job], None]) -> None:
  60. """
  61. register a callback that is executed each time a job is removed
  62. :param callback: callback function
  63. :return:
  64. """
  65. self.__remove_listeners.append(callback)
  66. def remove(self, identifier):
  67. """
  68. remove a job using its unique identifier
  69. :param identifier: job identifier
  70. :return:
  71. """
  72. for i in range(len(self.__jobs)):
  73. if self.__jobs[i].identifier == identifier:
  74. if self.__jobs[i].finished is not None:
  75. job = self.__jobs[i]
  76. del self.__jobs[i]
  77. for callback in self.__remove_listeners:
  78. callback(job)
  79. return
  80. def run(self,
  81. project: Project,
  82. job_type: str,
  83. name: str,
  84. group: str,
  85. executable: Callable[[Any], Optional[Generator[float, None, None]]],
  86. *args,
  87. progress: Callable[[Any], float] = None,
  88. result: Callable[[Any], None] = None,
  89. result_event: Event = None,
  90. **kwargs) -> Job:
  91. """
  92. add a job to run it in a thread pool
  93. :param project: project the job is associated with
  94. :param job_type: job type
  95. :param name: job name
  96. :param group: job group (raises JobGroupBusyException if there is already a job running
  97. with the same group identifier)
  98. :param executable: function to execute
  99. :param args: arguments for executable
  100. :param progress: is called everytime executable yields a value
  101. :param result: is called as soon as executable returns a value
  102. :param result_event: eventlet event to be called as soon as executable returns a value
  103. :param kwargs: named arguments for executable
  104. :return: job object
  105. """
  106. # create job object
  107. job = Job(project, job_type, name)
  108. # abort if group is busy
  109. # otherwise add to groups dict
  110. if group is not None:
  111. if group in self.__groups:
  112. raise JobGroupBusyException
  113. self.__groups[group] = job
  114. # add to job list
  115. self.__jobs.append(job)
  116. # execute create listeners
  117. for callback in self.__create_listeners:
  118. callback(job)
  119. # add to execution queue
  120. self.__queue.put((group, executable, job, progress, result, result_event, args, kwargs))
  121. # return job object
  122. return job
  123. def __run(self):
  124. while True:
  125. # get execution function and job from queue
  126. group, executable, job, progress_fun, result_fun, result_event, args, kwargs \
  127. = self.__queue.get(block=True)
  128. # execute start listeners
  129. job.started = int(time())
  130. job.updated = int(time())
  131. for callback in self.__start_listeners:
  132. callback(job)
  133. # run function and track progress
  134. generator = tpool.execute(executable, *args, **kwargs)
  135. result = generator
  136. if isinstance(generator, GeneratorType):
  137. iterator = iter(generator)
  138. try:
  139. while True:
  140. # run until next progress event
  141. progress = tpool.execute(next, iterator)
  142. # execute progress function
  143. if progress_fun is not None:
  144. if isinstance(progress, tuple):
  145. progress = progress_fun(*progress)
  146. else:
  147. progress = progress_fun(progress)
  148. # execute progress listeners
  149. job.progress = progress
  150. job.updated = int(time())
  151. for callback in self.__progress_listeners:
  152. callback(job)
  153. except StopIteration as stop_iteration_exception:
  154. result = stop_iteration_exception.value
  155. # update progress
  156. job.progress = 1
  157. job.updated = int(time())
  158. for callback in self.__progress_listeners:
  159. callback(job)
  160. # execute result function
  161. if result_fun is not None:
  162. if isinstance(result, tuple):
  163. result_fun(*result)
  164. else:
  165. result_fun(result)
  166. # execute event
  167. if result_event is not None:
  168. result_event.send(result)
  169. # remove from group dict
  170. if group is not None:
  171. del self.__groups[group]
  172. # finish job
  173. job.finished = int(time())
  174. job.updated = int(time())
  175. for callback in self.__finish_listeners:
  176. callback(job)