Skip to content

Commit 40aa1d4

Browse files
committed
update
1 parent 13955b9 commit 40aa1d4

File tree

1 file changed

+25
-20
lines changed

1 file changed

+25
-20
lines changed

jobq/jobq.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import threading
44
import time
55
import types
6+
from collections import deque
67

78
from pykit import threadutil
89

@@ -68,21 +69,21 @@ def __init__(self, index, worker, n_thread,
6869
# protect reading/writing worker_group info
6970
self.worker_group_lock = threading.RLock()
7071

71-
need_handle_buffer = False
72+
need_coordinator_partial = False
7273
need_coordinator = False
7374
if self.partial_order:
74-
need_handle_buffer = True
75+
need_coordinator_partial = True
7576
# dispatcher mode implies keep_order
7677
elif self.dispatcher is not None or self.keep_order:
7778
need_coordinator = True
7879

79-
if need_handle_buffer:
80-
self.buffer_queue = input_queue
80+
if need_coordinator_partial:
81+
self.partial_queue = input_queue
8182
self.input_queue = _make_q()
8283
self.output_queue = _make_q(1024, partial_order)
8384
self.dispatcher = None
8485
else:
85-
self.buffer_queue = None
86+
self.partial_queue = None
8687
self.input_queue = input_queue
8788
self.output_queue = _make_q()
8889

@@ -100,9 +101,9 @@ def __init__(self, index, worker, n_thread,
100101

101102
if need_coordinator:
102103
self.coordinator_thread = start_thread(self._coordinate)
103-
elif need_handle_buffer:
104-
self.buffer_lock = threading.RLock()
105-
self.buffer_thread = start_thread(self._handle_buffer)
104+
elif need_coordinator_partial:
105+
self.partial_order_lock = threading.RLock()
106+
self.coordinator_partial_thread = start_thread(self._coordinate_partial)
106107

107108
# `dispatcher` is a user-defined function to distribute args to workers.
108109
# It accepts the same args passed to worker and returns a number to
@@ -187,10 +188,10 @@ def _exec(self, input_q, output_q, thread_index):
187188

188189
if self.partial_order and args is not None:
189190
k = str(args[0])
190-
with self.buffer_lock:
191+
with self.partial_order_lock:
191192
if k in self.in_working:
192193
del self.in_working[k]
193-
self.buffer_queue.not_match.set()
194+
self.partial_queue.not_match.set()
194195

195196
def _exec_in_order(self, input_q, output_q, thread_index):
196197

@@ -256,30 +257,30 @@ def _dispatch(self):
256257
self.queue_of_output_q.put(outq)
257258
inq.put(args)
258259

259-
def _handle_buffer(self):
260+
def _coordinate_partial(self):
260261

261262
def check_expired():
262263
while self.running:
263264
now = time.time()
264-
with self.buffer_lock:
265+
with self.partial_order_lock:
265266
for k in self.in_working.keys():
266267
if now - self.in_working[k] > self.expire:
267268
del self.in_working[k]
268-
self.buffer_queue.not_match.set()
269+
self.partial_queue.not_match.set()
269270
time.sleep(0.1)
270271

271272
start_thread(check_expired)
272273

273274
while self.running:
274-
args = self.buffer_queue.get(exclude=self.in_working)
275+
args = self.partial_queue.get(exclude=self.in_working)
275276
if args is Finish:
276277
return
277278
elif args is NotMatch:
278-
self.buffer_queue.not_match.wait()
279-
self.buffer_queue.not_match.clear()
279+
self.partial_queue.not_match.wait()
280+
self.partial_queue.not_match.clear()
280281
else:
281282
if args is not None:
282-
with self.buffer_lock:
283+
with self.partial_order_lock:
283284
self.in_working[str(args[0])] = time.time()
284285

285286
_put_rst(self.input_queue, args)
@@ -417,8 +418,8 @@ def join(self, timeout=None):
417418

418419
if wg.partial_order:
419420
for th in ths:
420-
wg.buffer_queue.put(Finish)
421-
wg.buffer_thread.join(endtime - time.time())
421+
wg.partial_queue.put(Finish)
422+
wg.coordinator_partial_thread.join(endtime - time.time())
422423

423424
if wg.dispatcher is None:
424425
# put nr = len(threads) Finish
@@ -450,7 +451,7 @@ class PartialOrderQueue(object):
450451

451452
def __init__(self, maxsize):
452453
self.maxsize = maxsize
453-
self.queue = []
454+
self._init(maxsize)
454455
self.mutex = threading.Lock()
455456
self.not_empty = threading.Condition(self.mutex)
456457
self.not_full = threading.Condition(self.mutex)
@@ -503,6 +504,9 @@ def get(self, block=True, timeout=None, exclude=None):
503504
self.not_full.notify()
504505
return item
505506

507+
def _init(self, maxsize):
508+
self.queue = deque()
509+
506510
def _qsize(self):
507511
return len(self.queue)
508512

@@ -535,6 +539,7 @@ def _get(self, exclude):
535539
del self.queue[index]
536540
return item
537541

542+
538543
def run(input_it, workers, keep_order=False, partial_order=False, timeout=None, probe=None):
539544

540545
mgr = JobManager(workers, probe=probe, keep_order=keep_order, partial_order=partial_order)

0 commit comments

Comments
 (0)