import abc import atexit import eventlet import threading # from concurrent.futures import ThreadPoolExecutor from pycs import app class GreenWorker(abc.ABC): STOP_QUEUE = True CONTINUE_QUEUE = False def __init__(self): super(GreenWorker, self).__init__() self.pool = eventlet.GreenPool() self.stop_event = eventlet.Event() self.pool_finished = eventlet.Event() self.queue = eventlet.Queue() # self.executor = ThreadPoolExecutor() self.__sleep_time = 0.1 self.__running = False @property def ident(self): return threading.get_ident() @property def running(self): return self.__running def start(self): if self.running: return app.logger.info(f'Starting {self.__class__.__name__}... ') self._thread = self.pool.spawn_n(self.__run__) self.__running = True def stop(self): if self.stop_event.has_result(): # do not re-send this event return # print(self, self.stop_event, "sending stop_event") self.stop_event.send(True) self.wait_for_empty_queue() # self.pool.waitall() pool_id = self.pool_finished.wait() self.pool_finished.reset() # print(f"pool_id #{pool_id} finished") self.__running = False def wait_for_empty_queue(self): while not self.queue.empty(): eventlet.sleep(self.__sleep_time) continue def __log(self, log_func, msg): log_func(f"[0x{self.ident:x}] {self.__class__.__name__}: {msg}") def debug(self, msg): self.__log(app.logger.debug, msg) def info(self, msg): self.__log(app.logger.info, msg) def error(self, msg): self.__log(app.logger.error, msg) def warning(self, msg): self.__log(app.logger.warning, msg) def check_stop_event(self): if self.stop_event.ready() and \ self.stop_event.wait(self.__sleep_time): self.debug("Stop event received.") self.stop_event.reset() return True eventlet.sleep(self.__sleep_time) self.debug(f"No stop event received. Waiting for {self.__sleep_time} seconds.") return False def check_queue(self): if self.queue.empty(): self.debug("Queue was empty, checking for stop...") return self.STOP_QUEUE if self.check_stop_event() else self.CONTINUE_QUEUE return self.queue.get(block=True) def __run__(self): while True: res = self.check_queue() if res is self.STOP_QUEUE: break elif res is self.CONTINUE_QUEUE: continue self.info(f"Got a job from the cache queue") try: self.info("Starting work") self.start_work(*res) except Exception as e: self.error(f"Error occurred: {e}") finally: self.info(f"Work finished") self._finish() def _finish(self): self.info("Sending finish event") # if not self.pool_finished.has_result(): self.pool_finished.send(threading.get_ident()) def start_work(self, *args): return eventlet.tpool.execute(self.work, *args) def work(self, *args): pass if __name__ == '__main__': import _thread as thread class Foo(GreenWorker): def work(self, value): print(thread.get_ident(), value) worker = Foo() print("Main:", thread.get_ident()) worker.start() worker.queue.put(("value1",)) worker.queue.put(("value2",)) worker.queue.put(("value3",)) # eventlet.sleep(.01) worker.stop()