import abc import atexit import eventlet import threading # from concurrent.futures import ThreadPoolExecutor from pycs import app from eventlet import tpool class GreenWorker(abc.ABC): def __init__(self): super(GreenWorker, self).__init__() self.pool = eventlet.GreenPool() self.stop_event = eventlet.Event() self.pool_finished = eventlet.Event() self.queue = eventlet.Queue() # self.executor = ThreadPoolExecutor() self.__sleep_time = 0.1 self.__running = False def start(self): if self.__running: return # self._thread = self.pool. eventlet.spawn(self.__run__) self.__running = True def stop(self): if self.stop_event.has_result(): # do not re-send this event return # print(self, self.stop_event, "sending stop_event") self.stop_event.send(True) self.wait_for_empty_queue() # self.pool.waitall() pool_id = self.pool_finished.wait() self.pool_finished.reset() # print(f"pool_id #{pool_id} finished") self.__running = False def wait_for_empty_queue(self): while not self.queue.empty(): eventlet.sleep(self.__sleep_time) continue def __run__(self): while True: if self.queue.empty(): # print("Queue was empty, checking for stop") if self.stop_event.ready() and \ self.stop_event.wait(self.__sleep_time): # print("Stop event received") self.stop_event.reset() break else: eventlet.sleep(self.__sleep_time) # print("no stop event received") continue app.logger.info(f"starting job in thread #{self.ident}...") args = self.queue.get(block=True) self.start_work(*args) app.logger.info(f"pool #{self.ident} sending finish event...") # if not self.pool_finished.has_result(): self.pool_finished.send(threading.get_ident()) def start_work(self, *args): app.logger.info(f"[{self.__class__.__name__} - {self.ident}] starting work") res = tpool.execute(self.work, *args) app.logger.info(f"[{self.__class__.__name__} - {self.ident}] work finished") return res @property def ident(self): return threading.get_ident() @abc.abstractmethod def work(self, *args): pass if __name__ == '__main__': import _thread as thread class Foo(GreenWorker): def work(self, value): print(thread.get_ident(), value) worker = Foo() print("Main:", thread.get_ident()) worker.start() worker.queue.put(("value1",)) worker.queue.put(("value2",)) worker.queue.put(("value3",)) # eventlet.sleep(.01) worker.stop()