123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- import abc
- import atexit
- import eventlet
- import threading
- # from concurrent.futures import ThreadPoolExecutor
- from pycs import app
- from eventlet import tpool
- class GreenWorker(abc.ABC):
- def __init__(self):
- super(GreenWorker, self).__init__()
- self.pool = eventlet.GreenPool()
- self.stop_event = eventlet.Event()
- self.pool_finished = eventlet.Event()
- self.queue = eventlet.Queue()
- # self.executor = ThreadPoolExecutor()
- self.__sleep_time = 0.1
- self.__running = False
- def start(self):
- if self.__running:
- return
- # self._thread = self.pool.
- eventlet.spawn(self.__run__)
- self.__running = True
- def stop(self):
- if self.stop_event.has_result():
- # do not re-send this event
- return
- # print(self, self.stop_event, "sending stop_event")
- self.stop_event.send(True)
- self.wait_for_empty_queue()
- # self.pool.waitall()
- pool_id = self.pool_finished.wait()
- self.pool_finished.reset()
- # print(f"pool_id #{pool_id} finished")
- self.__running = False
- def wait_for_empty_queue(self):
- while not self.queue.empty():
- eventlet.sleep(self.__sleep_time)
- continue
- def __run__(self):
- while True:
- if self.queue.empty():
- # print("Queue was empty, checking for stop")
- if self.stop_event.ready() and \
- self.stop_event.wait(self.__sleep_time):
- # print("Stop event received")
- self.stop_event.reset()
- break
- else:
- eventlet.sleep(self.__sleep_time)
- # print("no stop event received")
- continue
- app.logger.info(f"starting job in thread #{self.ident}...")
- args = self.queue.get(block=True)
- self.start_work(*args)
- app.logger.info(f"pool #{self.ident} sending finish event...")
- # if not self.pool_finished.has_result():
- self.pool_finished.send(threading.get_ident())
- def start_work(self, *args):
- app.logger.info(f"[{self.__class__.__name__} - {self.ident}] starting work")
- res = tpool.execute(self.work, *args)
- app.logger.info(f"[{self.__class__.__name__} - {self.ident}] work finished")
- return res
- @property
- def ident(self):
- return threading.get_ident()
- @abc.abstractmethod
- def work(self, *args):
- pass
- if __name__ == '__main__':
- import _thread as thread
- class Foo(GreenWorker):
- def work(self, value):
- print(thread.get_ident(), value)
- worker = Foo()
- print("Main:", thread.get_ident())
- worker.start()
- worker.queue.put(("value1",))
- worker.queue.put(("value2",))
- worker.queue.put(("value3",))
- # eventlet.sleep(.01)
- worker.stop()
|