@@ -180,6 +180,7 @@ def _exec(self, input_q, output_q, thread_index):
180180 with self .buffer_lock :
181181 if k in self .in_working :
182182 del self .in_working [k ]
183+ self .buffer_queue .not_match .set ()
183184
184185 # If rst is an iterator, it procures more than one args to next job.
185186 # In order to be accurate, we only count an iterator as one.
@@ -234,6 +235,7 @@ def _coordinate(self):
234235 _put_rst (self .output_queue , outq .get ())
235236
236237 def _dispatch (self ):
238+
237239 while self .running :
238240
239241 args = self .input_queue .get ()
@@ -251,20 +253,27 @@ def _dispatch(self):
251253
252254 def _handle_buffer (self ):
253255
256+ def check_expired ():
257+ while self .running :
258+ now = time .time ()
259+ with self .buffer_lock :
260+ for k in self .in_working .keys ():
261+ if now - self .in_working [k ] > 60 * 2 :
262+ del self .in_working [k ]
263+ self .buffer_queue .not_match .set ()
264+ time .sleep (0.1 )
265+
266+ start_thread (check_expired )
267+
254268 while self .running :
255- now = time .time ()
256- with self .buffer_lock :
257- for k in self .in_working .keys ():
258- if now - self .in_working [k ] > 60 * 2 :
259- del self .in_working [k ]
260- exclude = copy .copy (self .in_working )
261- args = self .buffer_queue .get (exclude = exclude )
269+ args = self .buffer_queue .get (exclude = self .in_working )
262270 if args is Finish :
263271 return
264272 elif args is None :
265- time . sleep ( 0.001 )
273+ self . buffer_queue . not_match . wait ( )
266274 else :
267- self .in_working [str (args [0 ])] = time .time ()
275+ with self .buffer_lock :
276+ self .in_working [str (args [0 ])] = time .time ()
268277 _put_rst (self .input_queue , args )
269278
270279class JobManager (object ):
@@ -432,6 +441,7 @@ def __init__(self, maxsize):
432441 self .mutex = threading .Lock ()
433442 self .not_empty = threading .Condition (self .mutex )
434443 self .not_full = threading .Condition (self .mutex )
444+ self .not_match = threading .Event ()
435445
436446 def put (self , item , block = True , timeout = None ):
437447 if item not in [Finish , None ] and (type (item ) not in [tuple , list ] or len (item ) == 0 ):
@@ -456,6 +466,7 @@ def put(self, item, block=True, timeout=None):
456466 self .not_full .wait (remaining )
457467 self ._put (item )
458468 self .not_empty .notify ()
469+ self .not_match .set ()
459470
460471 def get (self , block = True , timeout = None , exclude = None ):
461472 with self .not_empty :
@@ -477,6 +488,8 @@ def get(self, block=True, timeout=None, exclude=None):
477488 item = self ._get (exclude )
478489 if item is not None :
479490 self .not_full .notify ()
491+ else :
492+ self .not_match .clear ()
480493 return item
481494
482495 def _qsize (self ):
0 commit comments