green_worker.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import abc
  2. import atexit
  3. import eventlet
  4. import threading
  5. # from concurrent.futures import ThreadPoolExecutor
  6. from pycs import app
  7. from eventlet import tpool
  8. class GreenWorker(abc.ABC):
  9. def __init__(self):
  10. super(GreenWorker, self).__init__()
  11. self.pool = eventlet.GreenPool()
  12. self.stop_event = eventlet.Event()
  13. self.pool_finished = eventlet.Event()
  14. self.queue = eventlet.Queue()
  15. # self.executor = ThreadPoolExecutor()
  16. self.__sleep_time = 0.1
  17. self.__running = False
  18. def start(self):
  19. if self.__running:
  20. return
  21. # self._thread = self.pool.
  22. eventlet.spawn(self.__run__)
  23. self.__running = True
  24. def stop(self):
  25. if self.stop_event.has_result():
  26. # do not re-send this event
  27. return
  28. # print(self, self.stop_event, "sending stop_event")
  29. self.stop_event.send(True)
  30. self.wait_for_empty_queue()
  31. # self.pool.waitall()
  32. pool_id = self.pool_finished.wait()
  33. self.pool_finished.reset()
  34. # print(f"pool_id #{pool_id} finished")
  35. self.__running = False
  36. def wait_for_empty_queue(self):
  37. while not self.queue.empty():
  38. eventlet.sleep(self.__sleep_time)
  39. continue
  40. def __run__(self):
  41. while True:
  42. if self.queue.empty():
  43. # print("Queue was empty, checking for stop")
  44. if self.stop_event.ready() and \
  45. self.stop_event.wait(self.__sleep_time):
  46. # print("Stop event received")
  47. self.stop_event.reset()
  48. break
  49. else:
  50. eventlet.sleep(self.__sleep_time)
  51. # print("no stop event received")
  52. continue
  53. app.logger.info(f"starting job in thread #{self.ident}...")
  54. args = self.queue.get(block=True)
  55. self.start_work(*args)
  56. app.logger.info(f"pool #{self.ident} sending finish event...")
  57. # if not self.pool_finished.has_result():
  58. self.pool_finished.send(threading.get_ident())
  59. def start_work(self, *args):
  60. app.logger.info(f"[{self.__class__.__name__} - {self.ident}] starting work")
  61. res = tpool.execute(self.work, *args)
  62. app.logger.info(f"[{self.__class__.__name__} - {self.ident}] work finished")
  63. return res
  64. @property
  65. def ident(self):
  66. return threading.get_ident()
  67. @abc.abstractmethod
  68. def work(self, *args):
  69. pass
  70. if __name__ == '__main__':
  71. import _thread as thread
  72. class Foo(GreenWorker):
  73. def work(self, value):
  74. print(thread.get_ident(), value)
  75. worker = Foo()
  76. print("Main:", thread.get_ident())
  77. worker.start()
  78. worker.queue.put(("value1",))
  79. worker.queue.put(("value2",))
  80. worker.queue.put(("value3",))
  81. # eventlet.sleep(.01)
  82. worker.stop()