What is the cleanest way to stop a python multiprocessing worker attached to a queue in an infinite loop? -
i'm implementing producer-consumer pattern in python using multiprocessing.pool , multiprocessing.queue. consumers pre-forked processes uses gevent spawn multiple tasks.
here trimmed down version of code:
import gevent queue import empty queueempty multiprocessing import process, queue, pool import signal import time # task queue queue = queue() def init_worker (): # ignore signals in worker signal.signal( signal.sigterm, signal.sig_ign ) signal.signal( signal.sigint, signal.sig_ign ) signal.signal( signal.sigquit, signal.sig_ign ) # 1 of worker task def worker_task1( ): while true: try: m = queue.get( timeout = 2 ) # break out if producer says quit if m == 'quit': print 'time quit' break except queueempty: pass # worker def work( ): gevent.joinall([ gevent.spawn( worker_task1 ), ]) pool = pool( 2, init_worker ) in xrange( 2 ): pool.apply_async( work ) try: while true: queue.put( 'some task' ) time.sleep( 2 ) except keyboardinterrupt e: print 'stopping' # signal workers quit in xrange( 2 ): queue.put( 'quit' ) pool.join() now when try quit it, following state:
- parent process waiting 1 of children join.
- one of children in defunct state. finished parent waiting other child finish.
- other child showing:
futex(0x7f99d9188000, futex_wait, 0, null ....
so correct way end such process cleanly?
i figured out problem. according documentation multiprocessing.pool.join(), pool needs close()ed before can join()ed. adding pool.close() before pool.join() solved problem.
Comments
Post a Comment