Skip to content

Commit cf80536

Browse files
authored
docker/aiohttp demo (#75)
* adding docker/aiohttp demo * adding aiohttp docs, fix #69 * add history
1 parent b9f1537 commit cf80536

12 files changed

+319
-14
lines changed

.gitignore

+8-8
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
diff-match-patch
2-
env
3-
.idea
1+
/env
2+
/.idea
43
__pycache__/
54
*.py[cod]
65
*.cache
76
.coverage.*
8-
.coverage
9-
htmlcov/
7+
/.coverage
8+
/htmlcov/
109
/build
1110
/dist
12-
test.py
11+
/test.py
1312
*.egg-info
14-
docs/_build/
15-
.mypy_cache/
13+
/docs/_build/
14+
/.mypy_cache/
15+
/demo/tmp/

HISTORY.rst

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
History
44
-------
55

6-
v0.12.0 (2017-11-XX)
6+
v0.12.0 (2017-11-16)
77
....................
88
* better signal handling, support ``uvloop`` #73
99
* drain pending tasks and drain task cancellation #74
10+
* add aiohttp and docker demo ``/demo`` #75
1011

1112
v0.11.0 (2017-08-25)
1213
....................

arq/drain.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -172,12 +172,18 @@ def _job_callback(self, task):
172172
self.task_exception = task_exception
173173
elif task.result():
174174
self.jobs_failed += 1
175-
self.pending_tasks.remove(task)
175+
self._remove_task(task)
176176
jobs_logger.debug('task complete, %d jobs done, %d failed', self.jobs_complete, self.jobs_failed)
177177

178178
def _cancel_job(self, task, job):
179179
if not task.cancel():
180180
return
181181
self.jobs_timed_out += 1
182182
jobs_logger.error('task timed out %r', job)
183-
self.pending_tasks.remove(task)
183+
self._remove_task(task)
184+
185+
def _remove_task(self, task):
186+
try:
187+
self.pending_tasks.remove(task)
188+
except KeyError:
189+
pass

demo/Dockerfile

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# docker image to run the arq demo
2+
# image must be built using build.sh
3+
FROM python:3.6-alpine
4+
5+
LABEL maintainer "[email protected]"
6+
7+
RUN apk --update --no-cache add gcc g++ musl-dev libuv make \
8+
&& rm -rf /var/cache/apk/*
9+
10+
RUN pip install -U pip setuptools
11+
ADD ./docker-requirements.txt /home/root/
12+
RUN pip install -r /home/root/docker-requirements.txt
13+
14+
ADD ./setup.py /home/root/src/
15+
ADD ./arq /home/root/src/arq
16+
RUN pip install -e /home/root/src/
17+
ADD ./app.py /home/root/
18+
19+
ENV PYTHONUNBUFFERED 1
20+
WORKDIR /home/root
21+
ENTRYPOINT ["./app.py"]

demo/README.md

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# arq demo with docker compose and aiohttp
2+
3+
The directory contains files required to serve the demo docker/aiohttp example.
4+
5+
## Usage
6+
7+
**(all from the project root directory)**
8+
9+
To build:
10+
11+
./demo/build.sh
12+
13+
To run the compose example:
14+
15+
export COMPOSE_FILE='demo/docker-compose.yml'
16+
export COMPOSE_PROJECT_NAME='arq'
17+
docker-compose up -d
18+
19+
You'll want to then connect to logspout to view the log with something like
20+
21+
curl -q -s http://localhost:5001/logs
22+
23+
You can then stop and start the worker and watch jobs get cancelled and re-enqueued
24+
25+
docker-compose stop worker
26+
docker-compose start worker

demo/app.py

+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
#!/usr/bin/env python3.6
2+
import os
3+
import asyncio
4+
from time import time
5+
6+
import chevron
7+
import uvloop
8+
from aiohttp import web, ClientError, ClientSession
9+
from aiohttp_session import SimpleCookieStorage, get_session
10+
from aiohttp_session import setup as session_setup
11+
from arq import Actor, BaseWorker, RedisSettings, concurrent
12+
13+
R_OUTPUT = 'output'
14+
15+
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
16+
17+
18+
class Downloader(Actor):
19+
re_enqueue_jobs = True
20+
21+
async def startup(self):
22+
self.session = ClientSession(loop=self.loop)
23+
24+
@concurrent
25+
async def download_content(self, url, count):
26+
total_size = 0
27+
errors = []
28+
start = time()
29+
for _ in range(count):
30+
try:
31+
async with self.session.get(url) as r:
32+
content = await r.read()
33+
total_size += len(content)
34+
if r.status != 200:
35+
errors.append(f'{r.status} length: {len(content)}')
36+
except ClientError as e:
37+
errors.append(f'{e.__class__.__name__}: {e}')
38+
output = f'{time() - start:0.2f}s, {count} downloads, total size: {total_size}'
39+
if errors:
40+
output += ', errors: ' + ', '.join(errors)
41+
async with self.redis_pool.get() as redis:
42+
await redis.rpush(R_OUTPUT, output.encode())
43+
return total_size
44+
45+
async def shutdown(self):
46+
self.session.close()
47+
48+
49+
html_template = """
50+
<h1>arq demo</h1>
51+
52+
{{#message}}
53+
<div>{{ message }}</div>
54+
{{/message}}
55+
56+
<form method="post" action="/start-job/">
57+
<p>
58+
<label for="url">Url to download</label>
59+
<input type="url" name="url" id="url" value="https://httpbin.org/get" required/>
60+
</p>
61+
<p>
62+
<label for="count">Download count</label>
63+
<input type="number" step="1" name="count" id="count" value="10" required/>
64+
</p>
65+
<p>
66+
<input type="submit" value="Download"/>
67+
</p>
68+
</form>
69+
70+
<h2>Results:</h2>
71+
{{#results}}
72+
<p>{{ . }}</p>
73+
{{/results}}
74+
"""
75+
76+
77+
async def index(request):
78+
async with await request.app['downloader'].get_redis_conn() as redis:
79+
data = await redis.lrange(R_OUTPUT, 0, -1)
80+
results = [r.decode() for r in data]
81+
82+
session = await get_session(request)
83+
html = chevron.render(html_template, {'message': session.get('message'), 'results': results})
84+
session.invalidate()
85+
return web.Response(text=html, content_type='text/html')
86+
87+
88+
async def start_job(request):
89+
data = await request.post()
90+
session = await get_session(request)
91+
try:
92+
url = data['url']
93+
count = int(data['count'])
94+
except (KeyError, ValueError) as e:
95+
session['message'] = f'Invalid input, {e.__class__.__name__}: {e}'
96+
else:
97+
await request.app['downloader'].download_content(url, count)
98+
session['message'] = f'Downloading "{url}" ' + (f'{count} times.' if count > 1 else 'once.')
99+
raise web.HTTPFound(location='/')
100+
101+
102+
redis_settings = RedisSettings(host=os.getenv('REDIS_HOST', 'localhost'))
103+
104+
105+
async def shutdown(app):
106+
await app['downloader'].close()
107+
108+
109+
def create_app():
110+
app = web.Application()
111+
app.router.add_get('/', index)
112+
app.router.add_post('/start-job/', start_job)
113+
app['downloader'] = Downloader(redis_settings=redis_settings)
114+
app.on_shutdown.append(shutdown)
115+
session_setup(app, SimpleCookieStorage())
116+
return app
117+
118+
119+
class Worker(BaseWorker):
120+
# used by `arq app.py` command
121+
shadows = [Downloader]
122+
# set to small value so we can play with timeouts
123+
timeout_seconds = 10
124+
125+
def __init__(self, *args, **kwargs):
126+
kwargs['redis_settings'] = redis_settings
127+
super().__init__(*args, **kwargs)
128+
129+
130+
if __name__ == '__main__':
131+
# when called directly run the webserver
132+
app = create_app()
133+
web.run_app(app, port=8000)

demo/build.sh

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#!/usr/bin/env bash
2+
3+
set -e
4+
5+
THIS_DIR=`dirname "$0"`
6+
7+
cd ${THIS_DIR}
8+
if [[ ! -d tmp ]]; then
9+
echo "creating tmp directory..."
10+
mkdir tmp
11+
else
12+
echo "tmp directory already exists"
13+
fi
14+
15+
echo "copying necessary files into place..."
16+
rsync -i -a requirements.txt tmp/docker-requirements.txt
17+
rsync -i -a --delete --exclude=*.pyc --exclude=__pycache__ ../arq tmp/
18+
rsync -i -a ../setup.py tmp/
19+
rsync -i -a app.py tmp/
20+
rsync -i -a Dockerfile tmp/
21+
22+
echo "building docker image..."
23+
docker build tmp -t arq-demo
24+
echo "done."
25+

demo/docker-compose.yml

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
version: '3.2'
2+
3+
volumes:
4+
redis: {}
5+
6+
services:
7+
logs:
8+
image: gliderlabs/logspout
9+
environment:
10+
SYSLOG_HOSTNAME: 'arq'
11+
volumes:
12+
- /var/run/docker.sock:/var/run/docker.sock
13+
ports:
14+
- 5001:80
15+
16+
redis:
17+
image: redis:4.0-alpine
18+
volumes:
19+
- redis:/data
20+
ports:
21+
# redis-cli -p 63790
22+
- 63790:6379
23+
restart: always
24+
depends_on:
25+
- logs
26+
27+
web:
28+
image: arq-demo
29+
restart: always
30+
ports:
31+
- 8000:8000
32+
environment:
33+
REDIS_HOST: redis
34+
depends_on:
35+
- redis
36+
- logs
37+
38+
worker:
39+
image: arq-demo
40+
entrypoint: ['arq', 'app.py']
41+
environment:
42+
REDIS_HOST: redis
43+
restart: always
44+
depends_on:
45+
- redis
46+
- logs

demo/requirements.txt

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
aiohttp==2.3.2
2+
aiohttp-session==1.2.0
3+
chevron==0.11.1
4+
uvloop==0.8.1

docs/examples/aiohttp.py

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from aiohttp import web
2+
3+
4+
async def start_job(request):
5+
data = await request.post()
6+
# this will enqueue the download_content job
7+
await request.app['downloader'].download_content(data['url'])
8+
raise web.HTTPFound(location='/wherever/')
9+
10+
11+
async def shutdown(app):
12+
await app['downloader'].close()
13+
14+
15+
def create_app():
16+
app = web.Application()
17+
...
18+
app.router.add_post('/start-job/', start_job)
19+
app['downloader'] = Downloader()
20+
# use aiohttp's on_shutdown trigger to close downloader
21+
app.on_shutdown.append(shutdown)
22+
return app
23+
24+
25+
if __name__ == '__main__':
26+
app = create_app()
27+
web.run_app(app, port=8000)

docs/usage.rst

+13
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,17 @@ In other words: if you need these coroutines to be called when using an actor in
3333
For example, in the above example there's no need for ``self.session`` when using the actor in "default" mode,
3434
eg. called with ``python demo.py``, so neither ``startup`` or ``shutdown`` are called.
3535

36+
Usage with aiohttp
37+
..................
38+
39+
Assuming you have ``Downloader`` already defined as per above.
40+
41+
.. literalinclude:: examples/aiohttp.py
42+
43+
(Won’t run as ``Downloader`` is not defined)
44+
45+
For a full example *arq* usage with aiohttp and docker see the `demo`_ app.
46+
3647
Health checks
3748
.............
3849

@@ -114,3 +125,5 @@ provided ``Downloader`` and ``FooBar`` are defined and imported it should run "a
114125
See :meth:`arq.worker.BaseWorker` for more customisation options.
115126

116127
For more information on logging see :meth:`arq.logs.default_log_config`.
128+
129+
.. _demo: https://github.com/samuelcolvin/arq/tree/master/arq/demo

setup.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
from importlib.machinery import SourceFileLoader
44
from setuptools import setup
55

6-
with Path(__file__).resolve().parent.joinpath('README.rst').open() as f:
7-
long_description = f.read()
8-
6+
readme = Path(__file__).parent.joinpath('README.rst')
7+
if readme.exists():
8+
with readme.open() as f:
9+
long_description = f.read()
10+
else:
11+
long_description = '-'
912
# avoid loading the package before requirements are installed:
1013
version = SourceFileLoader('version', 'arq/version.py').load_module()
1114

0 commit comments

Comments
 (0)