Skip to content

Commit 13955b9

Browse files
committed
add unittest for partial order
1 parent 60ba84d commit 13955b9

File tree

3 files changed

+182
-21
lines changed

3 files changed

+182
-21
lines changed

jobq/jobq.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ class Finish(object):
2222
pass
2323

2424

25+
class NotMatch(object):
26+
pass
27+
28+
2529
class JobWorkerError(Exception):
2630
pass
2731

@@ -176,18 +180,18 @@ def _exec(self, input_q, output_q, thread_index):
176180
with self.probe['probe_lock']:
177181
self.probe['out'] += 1
178182

179-
if self.partial_order:
183+
# If rst is an iterator, it procures more than one args to next job.
184+
# In order to be accurate, we only count an iterator as one.
185+
186+
_put_rst(output_q, rst)
187+
188+
if self.partial_order and args is not None:
180189
k = str(args[0])
181190
with self.buffer_lock:
182191
if k in self.in_working:
183192
del self.in_working[k]
184193
self.buffer_queue.not_match.set()
185194

186-
# If rst is an iterator, it procures more than one args to next job.
187-
# In order to be accurate, we only count an iterator as one.
188-
189-
_put_rst(output_q, rst)
190-
191195
def _exec_in_order(self, input_q, output_q, thread_index):
192196

193197
while self.running:
@@ -270,16 +274,22 @@ def check_expired():
270274
args = self.buffer_queue.get(exclude=self.in_working)
271275
if args is Finish:
272276
return
273-
elif args is None:
277+
elif args is NotMatch:
274278
self.buffer_queue.not_match.wait()
279+
self.buffer_queue.not_match.clear()
275280
else:
276-
with self.buffer_lock:
277-
self.in_working[str(args[0])] = time.time()
281+
if args is not None:
282+
with self.buffer_lock:
283+
self.in_working[str(args[0])] = time.time()
284+
278285
_put_rst(self.input_queue, args)
279286

280287
class JobManager(object):
281288

282-
def __init__(self, workers, queue_size=1024, expire=3000, probe=None, keep_order=False, partial_order=False):
289+
def __init__(self, workers, queue_size=1024, expire=None, probe=None, keep_order=False, partial_order=False):
290+
291+
if expire is None:
292+
expire = 60 * 5
283293

284294
if probe is None:
285295
probe = {}
@@ -489,10 +499,8 @@ def get(self, block=True, timeout=None, exclude=None):
489499
raise Queue.Empty
490500
self.not_empty.wait(remaining)
491501
item = self._get(exclude)
492-
if item is not None:
502+
if item is not NotMatch:
493503
self.not_full.notify()
494-
else:
495-
self.not_match.clear()
496504
return item
497505

498506
def _qsize(self):
@@ -502,8 +510,10 @@ def _put(self, item):
502510
self.queue.append(item)
503511

504512
def _get(self, exclude):
505-
item = None
513+
item = NotMatch
506514
index = None
515+
selected = {}
516+
507517
for i, v in enumerate(self.queue):
508518
if v is Finish:
509519
if i == 0:
@@ -514,9 +524,13 @@ def _get(self, exclude):
514524
break
515525
else:
516526
key = str(v[0])
527+
if key in selected:
528+
continue
517529
if not exclude or key not in exclude:
518530
index, item = i, v
519531
break
532+
selected[key] = True
533+
520534
if index is not None:
521535
del self.queue[index]
522536
return item

jobq/t.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/bin/sh
22

3-
python2 -m unittest -v discover
3+
python2 -m unittest discover -v
44

55
# python2 -m unittest -v test.test_jobq
66
# python2 -m unittest -v test.test_jobq.TestProbe

jobq/test/test_jobq.py

Lines changed: 153 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,25 @@ def sleep_5(args):
3636
return args
3737

3838

39+
def is_partial_order(rst):
40+
cache = {}
41+
42+
for item in rst:
43+
k = item[0]
44+
if k not in cache:
45+
cache[k] = item
46+
47+
last = cache[k]
48+
if last[1] > item[1]:
49+
return False
50+
cache[k] = item
51+
52+
return True
53+
54+
3955
class TestProbe(unittest.TestCase):
4056

41-
def _start_jobq_in_thread(self, n_items, n_worker, keep_order=False):
57+
def _start_jobq_in_thread(self, items, n_worker, keep_order=False, partial_order=False):
4258

4359
def _sleep_1(args):
4460
time.sleep(0.1)
@@ -48,11 +64,12 @@ def _nothing(args):
4864
return args
4965

5066
probe = {}
51-
th = threading.Thread(target=lambda: jobq.run(range(n_items),
67+
th = threading.Thread(target=lambda: jobq.run(items,
5268
[(_sleep_1, n_worker),
5369
_nothing],
5470
probe=probe,
5571
keep_order=keep_order,
72+
partial_order=partial_order,
5673
))
5774
th.daemon = True
5875
th.start()
@@ -67,13 +84,12 @@ def test_probe_single_thread(self):
6784
(0.2, 0, 'all done'),
6885
)
6986

70-
th, probe = self._start_jobq_in_thread(3, 1)
87+
th, probe = self._start_jobq_in_thread(range(3), 1)
7188

7289
for sleep_time, doing, case_mes in cases:
7390

7491
time.sleep(sleep_time)
7592
stat = jobq.stat(probe)
76-
7793
self.assertEqual(doing, stat['doing'], case_mes)
7894

7995
# qsize() is not reliable. do not test the value of it.
@@ -100,7 +116,7 @@ def test_probe_3_thread(self):
100116
(0.4, 0, 'all done'),
101117
)
102118

103-
th, probe = self._start_jobq_in_thread(10, 3)
119+
th, probe = self._start_jobq_in_thread(range(10), 3)
104120

105121
for sleep_time, doing, case_mes in cases:
106122

@@ -124,7 +140,7 @@ def test_probe_3_thread_keep_order(self):
124140
(0.4, 0, 'all done'),
125141
)
126142

127-
th, probe = self._start_jobq_in_thread(10, 3, keep_order=True)
143+
th, probe = self._start_jobq_in_thread(range(10), 3, keep_order=True)
128144

129145
for sleep_time, doing, case_mes in cases:
130146

@@ -140,6 +156,28 @@ def test_probe_3_thread_keep_order(self):
140156

141157
th.join()
142158

159+
def test_probe_3_thread_partial_order(self):
160+
cases = (
161+
(0.05, 3, '_sleep_1 is working on 1st 3 items'),
162+
(0.1, 3, '_sleep_1 is working on 2nd 3 items'),
163+
(0.4, 0, 'all done'),
164+
)
165+
166+
th, probe = self._start_jobq_in_thread(([i, 0] for i in range(10)), 3, partial_order=True)
167+
168+
for sleep_time, doing, case_mes in cases:
169+
time.sleep(sleep_time)
170+
stat = jobq.stat(probe)
171+
172+
self.assertEqual(doing, stat['doing'], case_mes)
173+
174+
# use the last stat
175+
176+
workers = stat['workers']
177+
self.assertEqual(2, len(workers))
178+
179+
th.join()
180+
143181

144182
class TestDispatcher(unittest.TestCase):
145183

@@ -264,6 +302,37 @@ def collect(args):
264302
self.assertEqual([0], rst)
265303

266304

305+
class TestExpire(unittest.TestCase):
306+
307+
def test_expire(self):
308+
309+
def _sleep_1(args):
310+
sleep_got.append(args)
311+
time.sleep(0.5)
312+
return args
313+
314+
def collect(args):
315+
rst.append(args)
316+
317+
rst = []
318+
sleep_got = []
319+
320+
jm = jobq.JobManager([(_sleep_1, 3), collect], expire=0.2, partial_order=True)
321+
322+
n = 6
323+
for i in range(n):
324+
jm.put((i % 2, i))
325+
326+
time.sleep(0.1)
327+
self.assertEqual(set([(0, 0), (1, 1)]), set(sleep_got))
328+
329+
time.sleep(0.3)
330+
self.assertEqual(3, len(sleep_got))
331+
332+
jm.join()
333+
self.assertEqual(set([(i % 2, i) for i in range(n)]), set(rst))
334+
335+
267336
class TestJobManager(unittest.TestCase):
268337

269338
def test_manager(self):
@@ -434,6 +503,42 @@ def _change_thread_nr():
434503
for th in ths:
435504
th.join()
436505

506+
def test_set_thread_num_partial_order(self):
507+
508+
def _pass(args):
509+
return args
510+
511+
rst = []
512+
513+
jm = jobq.JobManager([_pass, rst.append], partial_order=True)
514+
515+
setter = {'running': True}
516+
517+
def _change_thread_nr():
518+
while setter['running']:
519+
jm.set_thread_num(_pass, random.randint(1, 4))
520+
time.sleep(0.5)
521+
522+
ths = []
523+
for ii in range(3):
524+
th = threadutil.start_daemon_thread(_change_thread_nr)
525+
ths.append(th)
526+
527+
n = 10240
528+
for i in range(n):
529+
jm.put([i % 2, i])
530+
531+
jm.join()
532+
533+
self.assertEqual(n, len(rst))
534+
535+
self.assertTrue(is_partial_order(rst))
536+
537+
setter['running'] = False
538+
539+
for th in ths:
540+
th.join()
541+
437542

438543
class TestJobQ(unittest.TestCase):
439544

@@ -506,6 +611,36 @@ def collect(args):
506611
jobq.run(inp, workers + [collect], keep_order=True)
507612
self.assertEqual(out, rst)
508613

614+
def add_list(args):
615+
args[1] += 1
616+
return args
617+
618+
def multi2_list(args):
619+
args[1] *= 2
620+
return args
621+
622+
def multi2_list_sleep(args):
623+
time.sleep(0.02)
624+
args[1] *= 2
625+
return args
626+
627+
cases = (
628+
(list([i % 2, i] for i in range(100)), [add_list, (multi2_list_sleep, 10)],
629+
list([i % 2, (i + 1) * 2] for i in range(100))
630+
),
631+
(list([i % 50, i] for i in range(1024 * 10)), [add_list, (multi2_list, 10)],
632+
list([i % 50, (i + 1) * 2] for i in range(1024 * 10))
633+
),
634+
)
635+
for inp, workers, out in cases:
636+
rst = []
637+
jobq.run(inp, workers + [collect], partial_order=True)
638+
self.assertTrue(is_partial_order(rst))
639+
640+
out.sort()
641+
rst.sort()
642+
self.assertEqual(out, rst)
643+
509644
def test_generator(self):
510645

511646
def gen(args):
@@ -514,6 +649,7 @@ def gen(args):
514649
time.sleep(0.1)
515650

516651
def collect(args):
652+
time.sleep(random.uniform(0.005, 0.02))
517653
rst.append(args)
518654

519655
rst = []
@@ -528,6 +664,17 @@ def collect(args):
528664

529665
self.assertEqual(9, len(rst), 'nr of elts')
530666

667+
def _gen(args):
668+
k = args[0]
669+
for i in range(3):
670+
yield (k, i)
671+
672+
rst = []
673+
jobq.run([(k, 0) for k in range(3)], [(_gen, 3), (collect, 3)], partial_order=True)
674+
self.assertEqual(set([(k, v) for k in range(3) for v in range(3)]), set(rst),
675+
"generator should get all")
676+
self.assertTrue(is_partial_order(rst))
677+
531678

532679
class TestDefaultTimeout(unittest.TestCase):
533680

0 commit comments

Comments
 (0)