green_worker.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import abc
  2. import atexit
  3. import eventlet
  4. import threading
  5. # from concurrent.futures import ThreadPoolExecutor
  6. from pycs import app
  7. class GreenWorker(abc.ABC):
  8. STOP_QUEUE = True
  9. CONTINUE_QUEUE = False
  10. def __init__(self):
  11. super(GreenWorker, self).__init__()
  12. self.pool = eventlet.GreenPool()
  13. self.stop_event = eventlet.Event()
  14. self.pool_finished = eventlet.Event()
  15. self.queue = eventlet.Queue()
  16. # self.executor = ThreadPoolExecutor()
  17. self.__sleep_time = 0.1
  18. self.__running = False
  19. @property
  20. def ident(self):
  21. return threading.get_ident()
  22. def start(self):
  23. if self.__running:
  24. return
  25. self._thread = self.pool.spawn_n(self.__run__)
  26. self.__running = True
  27. def stop(self):
  28. if self.stop_event.has_result():
  29. # do not re-send this event
  30. return
  31. # print(self, self.stop_event, "sending stop_event")
  32. self.stop_event.send(True)
  33. self.wait_for_empty_queue()
  34. # self.pool.waitall()
  35. pool_id = self.pool_finished.wait()
  36. self.pool_finished.reset()
  37. # print(f"pool_id #{pool_id} finished")
  38. self.__running = False
  39. def wait_for_empty_queue(self):
  40. while not self.queue.empty():
  41. eventlet.sleep(self.__sleep_time)
  42. continue
  43. def __log(self, log_func, msg):
  44. log_func(f"[{self.ident}] {self.__class__.__name__}: {msg}")
  45. def debug(self, msg):
  46. self.__log(app.logger.debug, msg)
  47. def info(self, msg):
  48. self.__log(app.logger.info, msg)
  49. def error(self, msg):
  50. self.__log(app.logger.error, msg)
  51. def warning(self, msg):
  52. self.__log(app.logger.warning, msg)
  53. def check_stop_event(self):
  54. if self.stop_event.ready() and \
  55. self.stop_event.wait(self.__sleep_time):
  56. self.debug("Stop event received.")
  57. self.stop_event.reset()
  58. return True
  59. eventlet.sleep(self.__sleep_time)
  60. self.debug(f"No stop event received. Waiting for {self.__sleep_time} seconds.")
  61. return False
  62. def check_queue(self):
  63. if self.queue.empty():
  64. self.debug("Queue was empty, checking for stop...")
  65. return self.STOP_QUEUE if self.check_stop_event() else self.CONTINUE_QUEUE
  66. return self.queue.get(block=True)
  67. def __run__(self):
  68. while True:
  69. res = self.check_queue()
  70. if res is self.STOP_QUEUE:
  71. break
  72. elif res is self.CONTINUE_QUEUE:
  73. continue
  74. self.info(f"Got a job from the cache queue")
  75. try:
  76. self.info("Starting work")
  77. self.start_work(*res)
  78. except Exception as e:
  79. self.error(f"Error occurred: {e}")
  80. finally:
  81. self.info(f"Work finished")
  82. self._finish()
  83. def _finish(self):
  84. self.info("Sending finish event")
  85. # if not self.pool_finished.has_result():
  86. self.pool_finished.send(threading.get_ident())
  87. def start_work(self, *args):
  88. return eventlet.tpool.execute(self.work, *args)
  89. def work(self, *args):
  90. pass
  91. if __name__ == '__main__':
  92. import _thread as thread
  93. class Foo(GreenWorker):
  94. def work(self, value):
  95. print(thread.get_ident(), value)
  96. worker = Foo()
  97. print("Main:", thread.get_ident())
  98. worker.start()
  99. worker.queue.put(("value1",))
  100. worker.queue.put(("value2",))
  101. worker.queue.put(("value3",))
  102. # eventlet.sleep(.01)
  103. worker.stop()