Skip to content

Commit fd369c2

Browse files
committed
implement job ids, fix #53
1 parent f1a6629 commit fd369c2

9 files changed

+97
-57
lines changed

HISTORY.rst

+3-2
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
History
44
-------
55

6-
v0.9.1 (2017-08-16)
7-
...................
6+
v0.10.0 (2017-08-16)
7+
....................
88
* log redis version when starting worker, fix #64
99
* log "connection success" when connecting to redis after connection failures, fix #67
10+
* add job ids, for now they're just used in logging, fix #53
1011

1112
v0.9.0 (2017-06-23)
1213
...................

arq/jobs.py

+26-7
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
55
Defines the ``Job`` class and descendants which deal with encoding and decoding job data.
66
"""
7+
import base64
8+
import os
79
from datetime import datetime
810

911
import msgpack
@@ -21,6 +23,15 @@ class JobSerialisationError(ArqError):
2123
pass
2224

2325

26+
def gen_random():
27+
"""
28+
generate a lowercase alpha-numeric random string of length 24.
29+
30+
Should have more randomness for its size thank uuid
31+
"""
32+
return base64.b32encode(os.urandom(10))[:16].decode().lower()
33+
34+
2435
# "device control one" should be fairly unique as a dict key and only one byte
2536
DEVICE_CONTROL_ONE = '\x11'
2637

@@ -30,7 +41,7 @@ class Job:
3041
Main Job class responsible for encoding and decoding jobs as they go
3142
into and come out of redis.
3243
"""
33-
__slots__ = ('queue', 'queued_at', 'class_name', 'func_name', 'args', 'kwargs', 'raw_queue', 'raw_data')
44+
__slots__ = 'id', 'queue', 'queued_at', 'class_name', 'func_name', 'args', 'kwargs', 'raw_queue', 'raw_data'
3445

3546
def __init__(self, raw_data: bytes, *, queue_name: str=None, raw_queue: bytes=None) -> None:
3647
"""
@@ -45,16 +56,17 @@ def __init__(self, raw_data: bytes, *, queue_name: str=None, raw_queue: bytes=No
4556
raise ArqError('either queue_name or raw_queue are required')
4657
self.queue = queue_name or raw_queue.decode()
4758
self.raw_queue = raw_queue or queue_name.encode()
48-
self.queued_at, self.class_name, self.func_name, self.args, self.kwargs = self._decode(raw_data)
59+
self.queued_at, self.class_name, self.func_name, self.args, self.kwargs, self.id = self.decode_raw(raw_data)
4960
self.queued_at /= 1000
5061

5162
@classmethod
52-
def encode(cls, *, queued_at: int=None, class_name: str, func_name: str,
63+
def encode(cls, *, job_id: str=None, queued_at: int=None, class_name: str, func_name: str,
5364
args: tuple, kwargs: dict) -> bytes:
5465
"""
5566
Create a byte string suitable for pushing into redis which contains all
5667
required information about a job to be performed.
5768
69+
:param job_id: id to use for the job, leave blank to generate a uuid
5870
:param queued_at: time in ms unix time when the job was queue, if None now is used
5971
:param class_name: name (see :attr:`arq.main.Actor.name`) of the actor class where the job is defined
6072
:param func_name: name of the function be called
@@ -63,10 +75,14 @@ def encode(cls, *, queued_at: int=None, class_name: str, func_name: str,
6375
"""
6476
queued_at = queued_at or int(timestamp() * 1000)
6577
try:
66-
return cls._encode([queued_at, class_name, func_name, args, kwargs])
78+
return cls.encode_raw([queued_at, class_name, func_name, args, kwargs, cls.generate_id(job_id)])
6779
except TypeError as e:
6880
raise JobSerialisationError(str(e)) from e
6981

82+
@classmethod
83+
def generate_id(cls, given_id):
84+
return given_id or gen_random()
85+
7086
@classmethod
7187
def msgpack_encoder(cls, obj):
7288
"""
@@ -84,11 +100,11 @@ def msgpack_object_hook(cls, obj):
84100
return obj
85101

86102
@classmethod
87-
def _encode(cls, data) -> bytes:
103+
def encode_raw(cls, data) -> bytes:
88104
return msgpack.packb(data, default=cls.msgpack_encoder, use_bin_type=True)
89105

90106
@classmethod
91-
def _decode(cls, data: bytes):
107+
def decode_raw(cls, data: bytes):
92108
return msgpack.unpackb(data, object_hook=cls.msgpack_object_hook, encoding='utf8')
93109

94110
def to_string(self, args_curtail=DEFAULT_CURTAIL):
@@ -100,7 +116,10 @@ def to_string(self, args_curtail=DEFAULT_CURTAIL):
100116
arguments += ', '
101117
arguments += ', '.join(f'{k}={v!r}' for k, v in sorted(self.kwargs.items()))
102118

103-
return '{s.class_name}.{s.func_name}({args})'.format(s=self, args=truncate(arguments, args_curtail))
119+
return '{s.id:.6} {s.class_name}.{s.func_name}({args})'.format(s=self, args=truncate(arguments, args_curtail))
120+
121+
def short_ref(self):
122+
return '{s.id:.6} {s.class_name}.{s.func_name}'.format(s=self)
104123

105124
def __str__(self):
106125
return self.to_string()

arq/version.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
__all__ = ['VERSION']
44

5-
VERSION = StrictVersion('0.9.1')
5+
VERSION = StrictVersion('0.10.0')

arq/worker.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ def log_job_result(self, started_at: float, result, j: Job):
330330
return
331331
job_time = timestamp() - started_at
332332
sr = '' if result is None else truncate(repr(result), self.log_curtail)
333-
jobs_logger.info('%-4s ran in%7.3fs ← %s.%s ● %s', j.queue, job_time, j.class_name, j.func_name, sr)
333+
jobs_logger.info('%-4s ran in%7.3fs ← %s ● %s', j.queue, job_time, j.short_ref(), sr)
334334

335335
def handle_prepare_exc(self, msg: str):
336336
self.drain.jobs_failed += 1
@@ -341,10 +341,10 @@ def handle_prepare_exc(self, msg: str):
341341
@classmethod
342342
def handle_stop_job(cls, started_at: float, exc: StopJob, j: Job):
343343
if exc.warning:
344-
msg, logger = '%-4s ran in%7.3fs ■ %s.%s ● Stopped Warning %s', jobs_logger.warning
344+
msg, logger = '%-4s ran in%7.3fs ■ %s ● Stopped Warning %s', jobs_logger.warning
345345
else:
346-
msg, logger = '%-4s ran in%7.3fs ■ %s.%s ● Stopped %s', jobs_logger.info
347-
logger(msg, j.queue, timestamp() - started_at, j.class_name, j.func_name, exc)
346+
msg, logger = '%-4s ran in%7.3fs ■ %s ● Stopped %s', jobs_logger.info
347+
logger(msg, j.queue, timestamp() - started_at, j.short_ref(), exc)
348348

349349
@classmethod
350350
def handle_execute_exc(cls, started_at: float, exc: BaseException, j: Job):

tests/fixtures.py

+16-1
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,20 @@
66
import time
77
from pathlib import Path
88

9-
from arq import Actor, BaseWorker, StopJob, concurrent, cron
9+
from arq import Actor, BaseWorker, Job, StopJob, concurrent, cron
1010
from arq.drain import Drain
1111
from arq.testing import MockRedisMixin
1212

1313

14+
class JobConstID(Job):
15+
@classmethod
16+
def generate_id(cls, given_id):
17+
return '__id__'
18+
19+
1420
class DemoActor(Actor):
21+
job_class = JobConstID
22+
1523
@concurrent
1624
async def add_numbers(self, a, b):
1725
"""add_number docs"""
@@ -88,6 +96,10 @@ async def stop_job_warning(self):
8896
raise StopJob('stopping job with warning', warning=True)
8997

9098

99+
class RealJobActor(DemoActor):
100+
job_class = Job
101+
102+
91103
class MockRedisDemoActor(MockRedisMixin, DemoActor):
92104
pass
93105

@@ -97,6 +109,8 @@ class Worker(BaseWorker):
97109

98110

99111
class StartupActor(Actor):
112+
job_class = JobConstID
113+
100114
async def startup(self):
101115
with open('events', 'a') as f:
102116
f.write('startup[{}],'.format(self.is_shadow))
@@ -190,6 +204,7 @@ class ReEnqueueActor(DemoActor):
190204

191205
class CronActor(Actor):
192206
# using 3:0:0 makes it very unlikely the job will be caused due hitting the right time
207+
job_class = JobConstID
193208

194209
@cron(hour=3, minute=0, second=0, run_at_startup=True)
195210
async def save_foobar(self):

tests/test_customisation.py

+15-15
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from arq.jobs import DatetimeJob, Job, JobSerialisationError
88
from arq.worker import BaseWorker
99

10-
from .fixtures import DemoActor
10+
from .fixtures import DemoActor, RealJobActor
1111

1212

1313
class DatetimeActor(DemoActor):
@@ -77,7 +77,7 @@ async def test_encode_non_datetimes(tmpworkdir, loop, redis_conn):
7777

7878

7979
async def test_wrong_job_class(loop):
80-
worker = DatetimeWorker(loop=loop, burst=True, shadows=[DemoActor, DemoActor, DatetimeActor])
80+
worker = DatetimeWorker(loop=loop, burst=True, shadows=[RealJobActor, RealJobActor, DatetimeActor])
8181
with pytest.raises(TypeError) as excinfo:
8282
await worker.run()
8383
assert excinfo.value.args[0].endswith("has a different job class to the first shadow, "
@@ -99,7 +99,7 @@ class DifferentQueuesActor(DemoActor):
9999

100100

101101
async def test_switch_job_class(loop):
102-
worker = DatetimeWorker(loop=loop, burst=True, shadows=[DemoActor])
102+
worker = DatetimeWorker(loop=loop, burst=True, shadows=[RealJobActor])
103103
assert worker.job_class is None
104104
await worker.run()
105105
assert worker.job_class == Job
@@ -109,26 +109,26 @@ async def test_switch_job_class(loop):
109109
def test_naïve_dt_encoding():
110110
t = datetime(2000, 1, 1)
111111
assert str(t) == '2000-01-01 00:00:00'
112-
p = DatetimeJob._encode(t)
113-
t2 = DatetimeJob._decode(p)
112+
p = DatetimeJob.encode_raw(t)
113+
t2 = DatetimeJob.decode_raw(p)
114114
assert t == t2
115115
assert str(t2) == '2000-01-01 00:00:00'
116116

117117

118118
def test_utc_dt_encoding():
119119
t = datetime(2000, 1, 1, tzinfo=timezone.utc)
120120
assert str(t) == '2000-01-01 00:00:00+00:00'
121-
p = DatetimeJob._encode(t)
122-
t2 = DatetimeJob._decode(p)
121+
p = DatetimeJob.encode_raw(t)
122+
t2 = DatetimeJob.decode_raw(p)
123123
assert t == t2
124124
assert str(t2) == '2000-01-01 00:00:00+00:00'
125125

126126

127127
def test_new_york_dt_encoding():
128128
t = datetime(2000, 1, 1, tzinfo=timezone(timedelta(hours=-5)))
129129
assert str(t) == '2000-01-01 00:00:00-05:00'
130-
p = DatetimeJob._encode(t)
131-
t2 = DatetimeJob._decode(p)
130+
p = DatetimeJob.encode_raw(t)
131+
t2 = DatetimeJob.decode_raw(p)
132132
assert t == t2
133133
assert str(t2) == '2000-01-01 00:00:00-05:00'
134134

@@ -137,8 +137,8 @@ def test_pytz_new_york_dt_encoding():
137137
ny = pytz.timezone('America/New_York')
138138
t = ny.localize(datetime(2000, 1, 1))
139139
assert str(t) == '2000-01-01 00:00:00-05:00'
140-
p = DatetimeJob._encode(t)
141-
t2 = DatetimeJob._decode(p)
140+
p = DatetimeJob.encode_raw(t)
141+
t2 = DatetimeJob.decode_raw(p)
142142
assert t == t2
143143
assert datetime(2000, 1, 1, tzinfo=timezone(timedelta(hours=-5))) == t2
144144
assert str(t2) == '2000-01-01 00:00:00-05:00'
@@ -147,17 +147,17 @@ def test_pytz_new_york_dt_encoding():
147147
def test_dt_encoding_with_ms():
148148
t = datetime(2000, 1, 1, 0, 0, 0, 123000)
149149
assert str(t) == '2000-01-01 00:00:00.123000'
150-
p = DatetimeJob._encode(t)
151-
t2 = DatetimeJob._decode(p)
150+
p = DatetimeJob.encode_raw(t)
151+
t2 = DatetimeJob.decode_raw(p)
152152
assert t == t2
153153
assert str(t2) == '2000-01-01 00:00:00.123000'
154154

155155

156156
def test_dt_encoding_with_μs():
157157
t = datetime(2000, 1, 1, 0, 0, 0, 123456)
158158
assert str(t) == '2000-01-01 00:00:00.123456'
159-
p = DatetimeJob._encode(t)
160-
t2 = DatetimeJob._decode(p)
159+
p = DatetimeJob.encode_raw(t)
160+
t2 = DatetimeJob.decode_raw(p)
161161
assert t != t2
162162
assert (t - t2) == timedelta(microseconds=456)
163163
assert str(t2) == '2000-01-01 00:00:00.123000'

tests/test_doc_example.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,9 @@ async def test_run_job_burst(redis_conn, loop, caplog):
1313
await downloader.download_content('http://example.com')
1414
await worker.run()
1515
await downloader.close()
16-
assert 's → Downloader.download_content(http://example.com)' in caplog
17-
assert 's ← Downloader.download_content ● 1' in caplog
16+
log = caplog(
17+
(' [0-9a-z]{6} ', ' __id__ ')
18+
)
19+
print(log)
20+
assert 's → __id__ Downloader.download_content(http://example.com)' in log
21+
assert 's ← __id__ Downloader.download_content ● 1' in log

tests/test_main.py

+13-13
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ async def test_simple_job_dispatch(tmpworkdir, loop, debug):
2424
data = msgpack.unpackb(v[0], encoding='utf8')
2525
# timestamp
2626
assert 1e12 < data.pop(0) < 3e12
27-
assert data == ['MockRedisDemoActor', 'add_numbers', [1, 2], {}]
27+
assert data == ['MockRedisDemoActor', 'add_numbers', [1, 2], {}, '__id__']
2828

2929

3030
async def test_concurrency_disabled_job_dispatch(tmpworkdir, loop):
@@ -44,7 +44,7 @@ async def test_enqueue_redis_job(actor, redis_conn):
4444
data = msgpack.unpackb(dft_queue[0], encoding='utf8')
4545
# timestamp
4646
assert 1e12 < data.pop(0) < 3e12
47-
assert data == ['DemoActor', 'add_numbers', [1, 2], {}]
47+
assert data == ['DemoActor', 'add_numbers', [1, 2], {}, '__id__']
4848

4949

5050
async def test_dispatch_work(tmpworkdir, loop, caplog, redis_conn):
@@ -69,22 +69,22 @@ async def test_dispatch_work(tmpworkdir, loop, caplog, redis_conn):
6969
assert ('MockRedisDemoActor.add_numbers → dft\n'
7070
'MockRedisDemoActor.high_add_numbers → high\n'
7171
'Initialising work manager, burst mode: True, creating shadows...\n'
72-
'Using first shadows job class "Job"\n'
72+
'Using first shadows job class "JobConstID"\n'
7373
'Running worker with 1 shadow listening to 3 queues\n'
7474
'shadows: MockRedisDemoActor | queues: high, dft, low\n'
7575
'recording health: <date time2> j_complete=0 j_failed=0 j_timedout=0 j_ongoing=0 q_high=1 q_dft=1 q_low=0\n'
7676
'starting main blpop loop\n'
7777
'populating quit queue to prompt exit: arq:quit-<random>\n'
7878
'jobs in progress 1\n'
79-
'scheduling job <Job MockRedisDemoActor.high_add_numbers(3, 4, c=5) on high>, re-enqueue: False\n'
79+
'scheduling job <Job __id__ MockRedisDemoActor.high_add_numbers(3, 4, c=5) on high>, re-enqueue: False\n'
8080
'jobs in progress 2\n'
81-
'scheduling job <Job MockRedisDemoActor.add_numbers(1, 2) on dft>, re-enqueue: False\n'
81+
'scheduling job <Job __id__ MockRedisDemoActor.add_numbers(1, 2) on dft>, re-enqueue: False\n'
8282
'got job from the quit queue, stopping\n'
8383
'drain waiting 5.0s for 2 tasks to finish\n'
84-
'high queued 0.0XXs → MockRedisDemoActor.high_add_numbers(3, 4, c=5)\n'
85-
'high ran in 0.0XXs ← MockRedisDemoActor.high_add_numbers ● 12\n'
86-
'dft queued 0.0XXs → MockRedisDemoActor.add_numbers(1, 2)\n'
87-
'dft ran in 0.0XXs ← MockRedisDemoActor.add_numbers ● \n'
84+
'high queued 0.0XXs → __id__ MockRedisDemoActor.high_add_numbers(3, 4, c=5)\n'
85+
'high ran in 0.0XXs ← __id__ MockRedisDemoActor.high_add_numbers ● 12\n'
86+
'dft queued 0.0XXs → __id__ MockRedisDemoActor.add_numbers(1, 2)\n'
87+
'dft ran in 0.0XXs ← __id__ MockRedisDemoActor.add_numbers ● \n'
8888
'task complete, 1 jobs done, 0 failed\n'
8989
'task complete, 2 jobs done, 0 failed\n'
9090
'shutting down worker after 0.0XXs ◆ 2 jobs done ◆ 0 failed ◆ 0 timed out\n') == log
@@ -109,8 +109,8 @@ async def test_handle_exception(loop, caplog):
109109
'shadows: MockRedisDemoActor | queues: high, dft, low\n'
110110
'recording health: <date time2> j_complete=0 j_failed=0 j_timedout=0 j_ongoing=0 q_high=0 q_dft=1 q_low=0\n'
111111
'drain waiting 5.0s for 1 tasks to finish\n'
112-
'dft queued 0.0XXs → MockRedisDemoActor.boom()\n'
113-
'dft ran in 0.0XXs ! MockRedisDemoActor.boom(): RuntimeError\n'
112+
'dft queued 0.0XXs → __id__ MockRedisDemoActor.boom()\n'
113+
'dft ran in 0.0XXs ! __id__ MockRedisDemoActor.boom(): RuntimeError\n'
114114
'Traceback (most recent call last):\n'
115115
' File "/path/to/arq/worker.py", line <no>, in run_job\n'
116116
' result = await func(*j.args, **j.kwargs)\n'
@@ -157,8 +157,8 @@ async def test_call_direct(mock_actor_worker, caplog):
157157
assert worker.jobs_failed == 0
158158
assert worker.jobs_complete == 1
159159
log = re.sub('0.0\d\ds', '0.0XXs', caplog.log)
160-
assert ('arq.jobs: dft queued 0.0XXs → MockRedisDemoActor.direct_method(1, 2)\n'
161-
'arq.jobs: dft ran in 0.0XXs ← MockRedisDemoActor.direct_method ● 3') in log
160+
assert ('arq.jobs: dft queued 0.0XXs → __id__ MockRedisDemoActor.direct_method(1, 2)\n'
161+
'arq.jobs: dft ran in 0.0XXs ← __id__ MockRedisDemoActor.direct_method ● 3') in log
162162

163163

164164
async def test_direct_binding(mock_actor_worker, caplog):

0 commit comments

Comments
 (0)