123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- import abc
- import atexit
- import eventlet
- import threading
- # from concurrent.futures import ThreadPoolExecutor
- from pycs import app
- class GreenWorker(abc.ABC):
- STOP_QUEUE = True
- CONTINUE_QUEUE = False
- def __init__(self):
- super(GreenWorker, self).__init__()
- self.pool = eventlet.GreenPool()
- self.stop_event = eventlet.Event()
- self.pool_finished = eventlet.Event()
- self.queue = eventlet.Queue()
- # self.executor = ThreadPoolExecutor()
- self.__sleep_time = 0.1
- self.__running = False
- @property
- def ident(self):
- return threading.get_ident()
- @property
- def running(self):
- return self.__running
- def start(self):
- if self.running:
- return
- app.logger.info(f'Starting {self.__class__.__name__}... ')
- self._thread = self.pool.spawn_n(self.__run__)
- self.__running = True
- def stop(self):
- if self.stop_event.has_result():
- # do not re-send this event
- return
- # print(self, self.stop_event, "sending stop_event")
- self.stop_event.send(True)
- self.wait_for_empty_queue()
- # self.pool.waitall()
- pool_id = self.pool_finished.wait()
- self.pool_finished.reset()
- # print(f"pool_id #{pool_id} finished")
- self.__running = False
- def wait_for_empty_queue(self):
- while not self.queue.empty():
- eventlet.sleep(self.__sleep_time)
- continue
- def __log(self, log_func, msg):
- log_func(f"[0x{self.ident:x}] {self.__class__.__name__}: {msg}")
- def debug(self, msg):
- self.__log(app.logger.debug, msg)
- def info(self, msg):
- self.__log(app.logger.info, msg)
- def error(self, msg):
- self.__log(app.logger.error, msg)
- def warning(self, msg):
- self.__log(app.logger.warning, msg)
- def check_stop_event(self):
- if self.stop_event.ready() and \
- self.stop_event.wait(self.__sleep_time):
- self.debug("Stop event received.")
- self.stop_event.reset()
- return True
- eventlet.sleep(self.__sleep_time)
- self.debug(f"No stop event received. Waiting for {self.__sleep_time} seconds.")
- return False
- def check_queue(self):
- if self.queue.empty():
- self.debug("Queue was empty, checking for stop...")
- return self.STOP_QUEUE if self.check_stop_event() else self.CONTINUE_QUEUE
- return self.queue.get(block=True)
- def __run__(self):
- while True:
- res = self.check_queue()
- if res is self.STOP_QUEUE:
- break
- elif res is self.CONTINUE_QUEUE:
- continue
- self.info(f"Got a job from the cache queue")
- try:
- self.info("Starting work")
- self.start_work(*res)
- except Exception as e:
- self.error(f"Error occurred: {e}")
- finally:
- self.info(f"Work finished")
- self._finish()
- def _finish(self):
- self.info("Sending finish event")
- # if not self.pool_finished.has_result():
- self.pool_finished.send(threading.get_ident())
- def start_work(self, *args):
- return eventlet.tpool.execute(self.work, *args)
- def work(self, *args):
- pass
- if __name__ == '__main__':
- import _thread as thread
- class Foo(GreenWorker):
- def work(self, value):
- print(thread.get_ident(), value)
- worker = Foo()
- print("Main:", thread.get_ident())
- worker.start()
- worker.queue.put(("value1",))
- worker.queue.put(("value2",))
- worker.queue.put(("value3",))
- # eventlet.sleep(.01)
- worker.stop()
|