Skip to content

Commit 1667afc

Browse files
committed
re implementing 'reusable'
1 parent a6c59dc commit 1667afc

File tree

4 files changed

+46
-21
lines changed

4 files changed

+46
-21
lines changed

arq/worker.py

+14-6
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ class BaseWorker(RedisMixin):
6060
health_check_interval = 60
6161
health_check_key = b'arq:health-check'
6262

63+
#: Mostly used in tests; if true actors and the redis pool will not be closed at the end of run()
64+
# allowing reuse of the worker, eg. ``worker.run()`` can be called multiple times.
65+
reusable = False
66+
6367
def __init__(self, *,
6468
burst: bool=False,
6569
shadows: list=None,
@@ -92,7 +96,7 @@ def __init__(self, *,
9296
signal.signal(signal.SIGTERM, self.handle_sig)
9397
signal.signal(SIG_PROXY, self.handle_proxy_signal)
9498
super().__init__(**kwargs) # type: ignore # TODO
95-
self._closing_lock = asyncio.Lock(loop=self.loop)
99+
self._shutdown_lock = asyncio.Lock(loop=self.loop)
96100

97101
async def shadow_factory(self) -> list:
98102
"""
@@ -142,6 +146,7 @@ async def run(self):
142146
Main entry point for the the worker which initialises shadows, checks they look ok then runs ``work`` to
143147
perform jobs.
144148
"""
149+
self._stopped = False
145150
work_logger.info('Initialising work manager, burst mode: %s', self._burst_mode)
146151

147152
shadows = await self.shadow_factory()
@@ -170,7 +175,7 @@ async def run(self):
170175
try:
171176
await self.work()
172177
finally:
173-
await self.close()
178+
await self.shutdown()
174179
if self._task_exception:
175180
work_logger.error('Found task exception "%s"', self._task_exception)
176181
raise self._task_exception
@@ -241,6 +246,7 @@ async def _check_health(self):
241246
def check_health(cls, **kwargs):
242247
"""
243248
Run a health check on the worker return the appropriate exit code.
249+
244250
:return: 0 if successful, 1 if not
245251
"""
246252
self = cls(**kwargs)
@@ -333,17 +339,19 @@ def handle_execute_exc(cls, started_at, exc, j):
333339
exc_type = exc.__class__.__name__
334340
jobs_logger.exception('%-4s ran in%7.3fs ! %s: %s', j.queue, timestamp() - started_at, j, exc_type)
335341

336-
async def close(self):
337-
with await self._closing_lock:
338-
if self._closed:
339-
return
342+
async def shutdown(self):
343+
with await self._shutdown_lock:
340344
if self._pending_tasks:
341345
work_logger.info('shutting down worker, waiting for %d jobs to finish', len(self._pending_tasks))
342346
await asyncio.wait(self._pending_tasks, loop=self.loop)
343347
t = (timestamp() - self.start) if self.start else 0
344348
work_logger.info('shutting down worker after %0.3fs ◆ %d jobs done ◆ %d failed ◆ %d timed out',
345349
t, self.jobs_complete, self.jobs_failed, self.jobs_timed_out)
350+
if not self.reusable:
351+
await self.close()
346352

353+
async def close(self):
354+
if not self._closed:
347355
if self._shadow_lookup:
348356
await asyncio.gather(*[s.close(True) for s in self._shadow_lookup.values()], loop=self.loop)
349357
await super().close()

docs/index.rst

+4-4
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,14 @@ The old computer science proverb/joke goes:
6767

6868
There are only two challenges in computer science: cache invalidation, naming things and the n + 1 problem.
6969

70-
*arq* tries to use generally accepted terminology for as much as possible, however "actors" and "shadows" are not so
71-
standard and bear describing:
70+
*arq* tries to avoid confusion over what's named what by using generally accepted terminology as much as possible,
71+
however a few terms (like "actors" and "shadows") are not so standard and bear describing:
7272

7373
An **Actor** is a class with some concurrent methods, you can define and use multiple actors. Actors hold a
74-
reference to a redis pool for enqueuing are generally singletons.
74+
reference to a redis pool for enqueuing jobs and are generally singletons.
7575

7676
The **Worker** is the class which is responsible for running jobs for one or more actors. Workers should inherit
77-
from ``BaseWorker``, your application will generally only have one worker.
77+
from ``BaseWorker``, your application will generally have just one worker.
7878

7979
Actors are therefore used in two distinctly different modes:
8080

docs/usage.rst

+11-11
Original file line numberDiff line numberDiff line change
@@ -25,41 +25,41 @@ For details on the *arq* CLI::
2525
Startup & Shutdown coroutines
2626
.............................
2727

28-
The ``startup`` and ``shutdown`` are provided as a convenient way to run logic as actors start and finish,
28+
The ``startup`` and ``shutdown`` coroutines are provided as a convenient way to run logic as actors start and finish,
2929
however it's important to not that these methods **are not called by default when actors are initialised or closed**.
30-
They are however called when the actor started and closed on the worker, eg. in "shadow" mode, see above.
31-
In other words: if you need these coroutines to be called when using an actor in your code, that's your responsibility.
30+
They are however called when the actor was started and closed on the worker, eg. in "shadow" mode, see above.
31+
In other words: if you need these coroutines to be called when using an actor in your code; that's your responsibility.
3232

33-
For example, in the above code there's no need for ``self.session`` when using the actor in "default" mode, eg. called
34-
with ``python demo.py``, so neither ``startup`` or ``shutdown`` are called.
33+
For example, in the above example there's no need for ``self.session`` when using the actor in "default" mode,
34+
eg. called with ``python demo.py``, so neither ``startup`` or ``shutdown`` are called.
3535

3636
Health checks
3737
.............
3838

3939
*arq* will automatically record some info about it's current state in redis every ``health_check_interval`` seconds,
4040
see :attr:`arq.worker.BaseWorker.health_check_interval`. That key/value will expire after ``health_check_interval + 1``
41-
so you can be sure if the variable exists you can be sure *arq* is alive and kicking (technically you can be sure it
41+
seconds so you can be sure if the variable exists *arq* is alive and kicking (technically you can be sure it
4242
was alive and kicking ``health_check_interval`` seconds ago).
4343

44-
You can run a health check with the CLI using (assuming you're using the above example)::
44+
You can run a health check with the CLI (assuming you're using the above example)::
4545

4646
arq --check demo.py
4747

48-
The command will output the value of the health check if found,
48+
The command will output the value of the health check if found;
4949
then exit ``0`` if the key was found and ``1`` if it was not.
5050

5151
A health check value takes the following form::
5252

5353
Feb-20_11:02:40 j_complete=0 j_failed=0 j_timedout=0 j_ongoing=0 q_high=0 q_dft=0 q_low=0
5454

55-
Where the values have the following meaning:
55+
Where the items have the following meaning:
5656

5757
* ``j_complete`` the number of jobs completed
5858
* ``j_failed`` the number of jobs which have failed eg. raised an exception
5959
* ``j_timedout`` the number of jobs which have timed out, eg. exceeded :attr:`arq.worker.BaseWorker.timeout_seconds`
6060
and been cancelled
61-
* ``j_ongoing`` the number of jobs currently being performed.
62-
* ``q_*`` the number of pending jobs in each queue.
61+
* ``j_ongoing`` the number of jobs currently being performed
62+
* ``q_*`` the number of pending jobs in each queue
6363

6464
Multiple Queues
6565
...............

tests/test_worker.py

+17
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,23 @@ async def test_raise_worker_prepare(redis_conn, actor):
284284
await worker.close()
285285

286286

287+
async def test_reusable_worker(tmpworkdir, redis_conn, actor):
288+
worker = Worker(burst=True, loop=actor.loop)
289+
worker.reusable = True
290+
291+
await actor.add_numbers(1, 2)
292+
assert not tmpworkdir.join('add_numbers').exists()
293+
await worker.run()
294+
assert tmpworkdir.join('add_numbers').read() == '3'
295+
assert worker.jobs_failed == 0
296+
297+
await actor.add_numbers(3, 4)
298+
await worker.run()
299+
assert tmpworkdir.join('add_numbers').read() == '7'
300+
assert worker.jobs_failed == 0
301+
await worker.close()
302+
303+
287304
async def test_startup_shutdown(tmpworkdir, redis_conn, loop):
288305
worker = StartupWorker(burst=True, loop=loop)
289306

0 commit comments

Comments
 (0)