Skip to content

Commit a6c59dc

Browse files
committed
improving docs
1 parent 05ceabc commit a6c59dc

File tree

5 files changed

+116
-36
lines changed

5 files changed

+116
-36
lines changed

HISTORY.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
History
44
-------
55

6-
v0.5.0 (TBD)
6+
v0.5.0 (2017-02-20)
77
...................
88
* use ``gather`` rather than ``wait`` for startup and shutdown so exceptions propagate.
99
* add ``--check`` option to confirm arq worker is running.

arq/main.py

+12
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,15 @@ def __init__(self, *args, is_shadow=False, concurrency_enabled=True, **kwargs):
7373
super().__init__(*args, **kwargs)
7474

7575
async def startup(self):
76+
"""
77+
Override to setup objects you'll need while running the worker, eg. sessions and database connections
78+
"""
7679
pass
7780

7881
async def shutdown(self):
82+
"""
83+
Override to gracefully close or delete any objects you setup in ``startup``
84+
"""
7985
pass
8086

8187
def _bind_concurrent(self):
@@ -112,6 +118,12 @@ async def enqueue_job(self, func_name: str, *args, queue: str=None, **kwargs):
112118
await getattr(self, j.func_name).direct(*j.args, **j.kwargs)
113119

114120
async def close(self, shutdown=False):
121+
"""
122+
Close down the actor, eg. close the associated redis pool, optionally also calling shutdown.
123+
124+
:param shutdown: whether or not to also call the shutdown coroutine, you probably only want to set this
125+
to ``True`` it you called startup previously
126+
"""
115127
if shutdown:
116128
await self.shutdown()
117129
await super().close()

docs/demo.py

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import asyncio
2+
from aiohttp import ClientSession
3+
from arq import Actor, BaseWorker, concurrent
4+
5+
6+
class Downloader(Actor):
7+
async def startup(self):
8+
self.session = ClientSession(loop=self.loop)
9+
10+
@concurrent
11+
async def download_content(self, url):
12+
async with self.session.get(url) as response:
13+
content = await response.read()
14+
print('{}: {:.80}...'.format(url, content.decode()))
15+
return len(content)
16+
17+
async def shutdown(self):
18+
self.session.close()
19+
20+
21+
class Worker(BaseWorker):
22+
shadows = [Downloader]
23+
24+
25+
async def download_lots():
26+
d = Downloader()
27+
for url in ('https://facebook.com', 'https://microsoft.com', 'https://github.com'):
28+
await d.download_content(url)
29+
await d.close()
30+
31+
32+
if __name__ == '__main__':
33+
loop = asyncio.get_event_loop()
34+
loop.run_until_complete(download_lots())

docs/index.rst

+26-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ Why use *arq*?
3636
to the request? or thread local? or truly global? where am I, hell, what does global even mean?).
3737

3838
**small**
39-
and easy to reason with - currently *arq* is only about 600 lines, that won't change significantly.
39+
and easy to reason with - currently *arq* is only about 700 lines, that won't change significantly.
4040

4141
Dependencies
4242
------------
@@ -60,6 +60,31 @@ Just::
6060

6161
pip install arq
6262

63+
Terminology
64+
-----------
65+
66+
The old computer science proverb/joke goes:
67+
68+
There are only two challenges in computer science: cache invalidation, naming things and the n + 1 problem.
69+
70+
*arq* tries to use generally accepted terminology for as much as possible, however "actors" and "shadows" are not so
71+
standard and bear describing:
72+
73+
An **Actor** is a class with some concurrent methods, you can define and use multiple actors. Actors hold a
74+
reference to a redis pool for enqueuing are generally singletons.
75+
76+
The **Worker** is the class which is responsible for running jobs for one or more actors. Workers should inherit
77+
from ``BaseWorker``, your application will generally only have one worker.
78+
79+
Actors are therefore used in two distinctly different modes:
80+
81+
* **default** mode where you initialise, then use and abuse the actor including calling concurrent methods and
82+
thereby enqueuing jobs
83+
* **shadow** mode where the actor was initialised by the worker in order to perform jobs enqueued by the actor in
84+
default (or even shadow) mode.
85+
86+
It's possible to check what mode an actor is in by checking the ``is_shadow`` variable.
87+
6388
.. include:: usage.rst
6489

6590
API Reference

docs/usage.rst

+43-34
Original file line numberDiff line numberDiff line change
@@ -6,40 +6,7 @@ Usage is best described by example.
66
Simple Usage
77
............
88

9-
.. code:: python
10-
11-
import asyncio
12-
from aiohttp import ClientSession
13-
from arq import Actor, BaseWorker, concurrent
14-
15-
class Downloader(Actor):
16-
def __init__(self, **kwargs):
17-
super().__init__(**kwargs)
18-
self.session = ClientSession(loop=self.loop)
19-
20-
@concurrent
21-
async def download_content(self, url):
22-
async with self.session.get(url) as response:
23-
content = await response.read()
24-
print('{}: {:.80}...'.format(url, content.decode()))
25-
return len(content)
26-
27-
async def close(self):
28-
await super().close()
29-
self.session.close()
30-
31-
class Worker(BaseWorker):
32-
shadows = [Downloader]
33-
34-
async def download_lots():
35-
d = Downloader()
36-
for url in ('https://facebook.com', 'https://microsoft.com', 'https://github.com'):
37-
await d.download_content(url)
38-
await d.close()
39-
40-
if __name__ == '__main__':
41-
loop = asyncio.get_event_loop()
42-
loop.run_until_complete(download_lots())
9+
.. literalinclude:: demo.py
4310

4411
(This script is complete, it should run "as is" both to enqueue jobs and run them)
4512

@@ -55,6 +22,45 @@ For details on the *arq* CLI::
5522

5623
arq --help
5724

25+
Startup & Shutdown coroutines
26+
.............................
27+
28+
The ``startup`` and ``shutdown`` are provided as a convenient way to run logic as actors start and finish,
29+
however it's important to not that these methods **are not called by default when actors are initialised or closed**.
30+
They are however called when the actor started and closed on the worker, eg. in "shadow" mode, see above.
31+
In other words: if you need these coroutines to be called when using an actor in your code, that's your responsibility.
32+
33+
For example, in the above code there's no need for ``self.session`` when using the actor in "default" mode, eg. called
34+
with ``python demo.py``, so neither ``startup`` or ``shutdown`` are called.
35+
36+
Health checks
37+
.............
38+
39+
*arq* will automatically record some info about it's current state in redis every ``health_check_interval`` seconds,
40+
see :attr:`arq.worker.BaseWorker.health_check_interval`. That key/value will expire after ``health_check_interval + 1``
41+
so you can be sure if the variable exists you can be sure *arq* is alive and kicking (technically you can be sure it
42+
was alive and kicking ``health_check_interval`` seconds ago).
43+
44+
You can run a health check with the CLI using (assuming you're using the above example)::
45+
46+
arq --check demo.py
47+
48+
The command will output the value of the health check if found,
49+
then exit ``0`` if the key was found and ``1`` if it was not.
50+
51+
A health check value takes the following form::
52+
53+
Feb-20_11:02:40 j_complete=0 j_failed=0 j_timedout=0 j_ongoing=0 q_high=0 q_dft=0 q_low=0
54+
55+
Where the values have the following meaning:
56+
57+
* ``j_complete`` the number of jobs completed
58+
* ``j_failed`` the number of jobs which have failed eg. raised an exception
59+
* ``j_timedout`` the number of jobs which have timed out, eg. exceeded :attr:`arq.worker.BaseWorker.timeout_seconds`
60+
and been cancelled
61+
* ``j_ongoing`` the number of jobs currently being performed.
62+
* ``q_*`` the number of pending jobs in each queue.
63+
5864
Multiple Queues
5965
...............
6066

@@ -123,6 +129,9 @@ document and record.
123129
# jobs may not take more than 10 seconds, default 60
124130
timeout_seconds = 10
125131
132+
# number of seconds between health checks, default 60
133+
health_check_interval = 30
134+
126135
def logging_config(self, verbose):
127136
conf = super().logging_config(verbose)
128137
# alter logging setup to set arq.jobs level to WARNING

0 commit comments

Comments
 (0)