6
0

green_worker.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. import abc
  2. import atexit
  3. import eventlet
  4. import threading
  5. # from concurrent.futures import ThreadPoolExecutor
  6. from pycs import app
  7. class GreenWorker(abc.ABC):
  8. STOP_QUEUE = True
  9. CONTINUE_QUEUE = False
  10. def __init__(self):
  11. super(GreenWorker, self).__init__()
  12. self.pool = eventlet.GreenPool()
  13. self.stop_event = eventlet.Event()
  14. self.pool_finished = eventlet.Event()
  15. self.queue = eventlet.Queue()
  16. # self.executor = ThreadPoolExecutor()
  17. self.__sleep_time = 0.1
  18. self.__running = False
  19. @property
  20. def ident(self):
  21. return threading.get_ident()
  22. @property
  23. def running(self):
  24. return self.__running
  25. def start(self):
  26. if self.running:
  27. return
  28. app.logger.info(f'Starting {self.__class__.__name__}... ')
  29. self._thread = self.pool.spawn_n(self.__run__)
  30. self.__running = True
  31. def stop(self):
  32. if self.stop_event.has_result():
  33. # do not re-send this event
  34. return
  35. # print(self, self.stop_event, "sending stop_event")
  36. self.stop_event.send(True)
  37. self.wait_for_empty_queue()
  38. # self.pool.waitall()
  39. pool_id = self.pool_finished.wait()
  40. self.pool_finished.reset()
  41. # print(f"pool_id #{pool_id} finished")
  42. self.__running = False
  43. def wait_for_empty_queue(self):
  44. while not self.queue.empty():
  45. eventlet.sleep(self.__sleep_time)
  46. continue
  47. def __log(self, log_func, msg):
  48. log_func(f"[0x{self.ident:x}] {self.__class__.__name__}: {msg}")
  49. def debug(self, msg):
  50. self.__log(app.logger.debug, msg)
  51. def info(self, msg):
  52. self.__log(app.logger.info, msg)
  53. def error(self, msg):
  54. self.__log(app.logger.error, msg)
  55. def warning(self, msg):
  56. self.__log(app.logger.warning, msg)
  57. def check_stop_event(self):
  58. if self.stop_event.ready() and \
  59. self.stop_event.wait(self.__sleep_time):
  60. self.debug("Stop event received.")
  61. self.stop_event.reset()
  62. return True
  63. eventlet.sleep(self.__sleep_time)
  64. self.debug(f"No stop event received. Waiting for {self.__sleep_time} seconds.")
  65. return False
  66. def check_queue(self):
  67. if self.queue.empty():
  68. self.debug("Queue was empty, checking for stop...")
  69. return self.STOP_QUEUE if self.check_stop_event() else self.CONTINUE_QUEUE
  70. return self.queue.get(block=True)
  71. def __run__(self):
  72. while True:
  73. res = self.check_queue()
  74. if res is self.STOP_QUEUE:
  75. break
  76. elif res is self.CONTINUE_QUEUE:
  77. continue
  78. self.info(f"Got a job from the cache queue")
  79. try:
  80. self.info("Starting work")
  81. self.start_work(*res)
  82. except Exception as e:
  83. self.error(f"Error occurred: {e}")
  84. finally:
  85. self.info(f"Work finished")
  86. self._finish()
  87. def _finish(self):
  88. self.info("Sending finish event")
  89. # if not self.pool_finished.has_result():
  90. self.pool_finished.send(threading.get_ident())
  91. def start_work(self, *args):
  92. return eventlet.tpool.execute(self.work, *args)
  93. def work(self, *args):
  94. pass
  95. if __name__ == '__main__':
  96. import _thread as thread
  97. class Foo(GreenWorker):
  98. def work(self, value):
  99. print(thread.get_ident(), value)
  100. worker = Foo()
  101. print("Main:", thread.get_ident())
  102. worker.start()
  103. worker.queue.put(("value1",))
  104. worker.queue.put(("value2",))
  105. worker.queue.put(("value3",))
  106. # eventlet.sleep(.01)
  107. worker.stop()