Skip to content

Commit eb9f9f4

Browse files
committed
update jobq
1 parent 60ba84d commit eb9f9f4

File tree

4 files changed

+118
-22
lines changed

4 files changed

+118
-22
lines changed

jobq/jobq.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ class Finish(object):
2222
pass
2323

2424

25+
class NotMatch(object):
26+
pass
27+
28+
2529
class JobWorkerError(Exception):
2630
pass
2731

@@ -176,18 +180,18 @@ def _exec(self, input_q, output_q, thread_index):
176180
with self.probe['probe_lock']:
177181
self.probe['out'] += 1
178182

179-
if self.partial_order:
183+
# If rst is an iterator, it procures more than one args to next job.
184+
# In order to be accurate, we only count an iterator as one.
185+
186+
_put_rst(output_q, rst)
187+
188+
if self.partial_order and args is not None:
180189
k = str(args[0])
181190
with self.buffer_lock:
182191
if k in self.in_working:
183192
del self.in_working[k]
184193
self.buffer_queue.not_match.set()
185194

186-
# If rst is an iterator, it procures more than one args to next job.
187-
# In order to be accurate, we only count an iterator as one.
188-
189-
_put_rst(output_q, rst)
190-
191195
def _exec_in_order(self, input_q, output_q, thread_index):
192196

193197
while self.running:
@@ -270,16 +274,22 @@ def check_expired():
270274
args = self.buffer_queue.get(exclude=self.in_working)
271275
if args is Finish:
272276
return
273-
elif args is None:
277+
elif args is NotMatch:
274278
self.buffer_queue.not_match.wait()
279+
self.buffer_queue.not_match.clear()
275280
else:
276-
with self.buffer_lock:
277-
self.in_working[str(args[0])] = time.time()
281+
if args is not None:
282+
with self.buffer_lock:
283+
self.in_working[str(args[0])] = time.time()
284+
278285
_put_rst(self.input_queue, args)
279286

280287
class JobManager(object):
281288

282-
def __init__(self, workers, queue_size=1024, expire=3000, probe=None, keep_order=False, partial_order=False):
289+
def __init__(self, workers, queue_size=1024, expire=None, probe=None, keep_order=False, partial_order=False):
290+
291+
if expire is None:
292+
expire = 60 * 5
283293

284294
if probe is None:
285295
probe = {}
@@ -489,10 +499,8 @@ def get(self, block=True, timeout=None, exclude=None):
489499
raise Queue.Empty
490500
self.not_empty.wait(remaining)
491501
item = self._get(exclude)
492-
if item is not None:
502+
if item is not NotMatch:
493503
self.not_full.notify()
494-
else:
495-
self.not_match.clear()
496504
return item
497505

498506
def _qsize(self):
@@ -502,8 +510,10 @@ def _put(self, item):
502510
self.queue.append(item)
503511

504512
def _get(self, exclude):
505-
item = None
513+
item = NotMatch
506514
index = None
515+
selected = {}
516+
507517
for i, v in enumerate(self.queue):
508518
if v is Finish:
509519
if i == 0:
@@ -514,9 +524,13 @@ def _get(self, exclude):
514524
break
515525
else:
516526
key = str(v[0])
527+
if key in selected:
528+
continue
517529
if not exclude or key not in exclude:
518530
index, item = i, v
519531
break
532+
selected[key] = True
533+
520534
if index is not None:
521535
del self.queue[index]
522536
return item

jobq/t.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/bin/sh
22

3-
python2 -m unittest -v discover
3+
python2 -m unittest discover -v
44

55
# python2 -m unittest -v test.test_jobq
66
# python2 -m unittest -v test.test_jobq.TestProbe

jobq/test/test_jobq.py

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
1+
#!/usr/bin/env python2
2+
# coding: utf-8
3+
14
import logging
25
import random
36
import threading
47
import time
58
import unittest
69

10+
import sys
11+
12+
sys.path.insert(0,'/Users/连长/Code/Python')
13+
714
from pykit import jobq
815
from pykit import threadutil
916
from pykit import ututil
@@ -38,7 +45,7 @@ def sleep_5(args):
3845

3946
class TestProbe(unittest.TestCase):
4047

41-
def _start_jobq_in_thread(self, n_items, n_worker, keep_order=False):
48+
def _start_jobq_in_thread(self, items, n_worker, keep_order=False, partial_order=False):
4249

4350
def _sleep_1(args):
4451
time.sleep(0.1)
@@ -48,11 +55,12 @@ def _nothing(args):
4855
return args
4956

5057
probe = {}
51-
th = threading.Thread(target=lambda: jobq.run(range(n_items),
58+
th = threading.Thread(target=lambda: jobq.run(items,
5259
[(_sleep_1, n_worker),
5360
_nothing],
5461
probe=probe,
5562
keep_order=keep_order,
63+
partial_order=partial_order,
5664
))
5765
th.daemon = True
5866
th.start()
@@ -67,13 +75,13 @@ def test_probe_single_thread(self):
6775
(0.2, 0, 'all done'),
6876
)
6977

70-
th, probe = self._start_jobq_in_thread(3, 1)
78+
th, probe = self._start_jobq_in_thread(range(3), 1)
7179

7280
for sleep_time, doing, case_mes in cases:
7381

7482
time.sleep(sleep_time)
7583
stat = jobq.stat(probe)
76-
84+
print(stat)
7785
self.assertEqual(doing, stat['doing'], case_mes)
7886

7987
# qsize() is not reliable. do not test the value of it.
@@ -100,7 +108,7 @@ def test_probe_3_thread(self):
100108
(0.4, 0, 'all done'),
101109
)
102110

103-
th, probe = self._start_jobq_in_thread(10, 3)
111+
th, probe = self._start_jobq_in_thread(range(10), 3)
104112

105113
for sleep_time, doing, case_mes in cases:
106114

@@ -124,7 +132,7 @@ def test_probe_3_thread_keep_order(self):
124132
(0.4, 0, 'all done'),
125133
)
126134

127-
th, probe = self._start_jobq_in_thread(10, 3, keep_order=True)
135+
th, probe = self._start_jobq_in_thread(range(10), 3, keep_order=True)
128136

129137
for sleep_time, doing, case_mes in cases:
130138

@@ -140,6 +148,28 @@ def test_probe_3_thread_keep_order(self):
140148

141149
th.join()
142150

151+
def test_probe_3_thread_partial_order(self):
152+
cases = (
153+
(0.05, 3, '_sleep_1 is working on 1st 3 items'),
154+
(0.1, 3, '_sleep_1 is working on 2nd 3 items'),
155+
(0.4, 0, 'all done'),
156+
)
157+
158+
th, probe = self._start_jobq_in_thread(([i, 0] for i in range(10)), 3, partial_order=True)
159+
160+
for sleep_time, doing, case_mes in cases:
161+
time.sleep(sleep_time)
162+
stat = jobq.stat(probe)
163+
164+
self.assertEqual(doing, stat['doing'], case_mes)
165+
166+
# use the last stat
167+
168+
workers = stat['workers']
169+
self.assertEqual(2, len(workers))
170+
171+
th.join()
172+
143173

144174
class TestDispatcher(unittest.TestCase):
145175

@@ -434,6 +464,47 @@ def _change_thread_nr():
434464
for th in ths:
435465
th.join()
436466

467+
def test_set_thread_num_partial_order(self):
468+
469+
def _pass(args):
470+
print(args)
471+
return args
472+
473+
rst = []
474+
475+
jm = jobq.JobManager([_pass, (rst.append,5)], partial_order=True)
476+
# jm = jobq.JobManager([_pass, rst.append])
477+
478+
# setter = {'running': True}
479+
480+
# def _change_thread_nr():
481+
# while setter['running']:
482+
# jm.set_thread_num(_pass, random.randint(1, 4))
483+
# time.sleep(0.5)
484+
#
485+
# ths = []
486+
# for ii in range(3):
487+
# th = threadutil.start_daemon_thread(_change_thread_nr)
488+
# ths.append(th)
489+
490+
n = 60
491+
for i in range(n):
492+
jm.put([i % 2, i])
493+
494+
jm.join()
495+
496+
# rst.sort()
497+
print('')
498+
print(rst)
499+
for i in range(n):
500+
self.assertEqual(i % 2, rst[i][0])
501+
self.assertEqual(i, rst[i][1])
502+
#
503+
# setter['running'] = False
504+
#
505+
# for th in ths:
506+
# th.join()
507+
437508

438509
class TestJobQ(unittest.TestCase):
439510

threadutil/threadutil.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class AsyncRaiseError(Exception):
1414
class InvalidThreadIdError(AsyncRaiseError):
1515
pass
1616

17+
COUNT = 0
1718

1819
def start_thread(target, name=None, args=None, kwargs=None, daemon=False, after=None):
1920
args = args or ()
@@ -26,7 +27,17 @@ def _target(*args, **kwargs):
2627
time.sleep(after)
2728
target(*args, **kwargs)
2829

29-
t = threading.Thread(target=_target, name=name, args=args, kwargs=kwargs)
30+
import cProfile
31+
32+
def worker(*args, **kwargs):
33+
profiler = cProfile.Profile()
34+
t = threading.currentThread()
35+
global COUNT
36+
cProfile.runctx('_target(*args, **kwargs)', globals(), locals(), '{}-{}.prof'.format(str(_target.__name__),t.ident))
37+
COUNT += 1
38+
# print(COUNT)
39+
40+
t = threading.Thread(target=worker, name=name, args=args, kwargs=kwargs)
3041
t.daemon = daemon
3142
t.start()
3243

0 commit comments

Comments
 (0)