Skip to content

Commit a15ee0e

Browse files
committed
adding async-timeout
1 parent 13e50d4 commit a15ee0e

8 files changed

+27
-15
lines changed

HISTORY.rst

+5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
History
44
-------
55

6+
v0.8.0 (TBC)
7+
............
8+
* add ``async-timeout`` dependency and use async timeout around ``shadow_factory``
9+
* change logger name for control process log messages
10+
611
v0.7.0 (2017-06-01)
712
...................
813
* implementing reusable ``Drain`` which takes tasks from a redis list and allows them to be execute asynchronously.

Makefile

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
.DEFAULT_GOAL := all
2+
13
.PHONY: install
24
install:
35
pip install -U pip setuptools

arq/version.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
__all__ = ['VERSION']
44

5-
VERSION = StrictVersion('0.7.0')
5+
VERSION = StrictVersion('0.8.0')

arq/worker.py

+14-10
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
from signal import Signals
1717
from typing import Dict, List, Type # noqa
1818

19+
from async_timeout import timeout
20+
1921
from .drain import Drain
2022
from .jobs import ArqError, Job
2123
from .logs import default_log_config
@@ -25,6 +27,7 @@
2527
__all__ = ['BaseWorker', 'RunWorkerProcess', 'StopJob', 'import_string']
2628

2729
work_logger = logging.getLogger('arq.work')
30+
ctrl_logger = logging.getLogger('arq.control')
2831
jobs_logger = logging.getLogger('arq.jobs')
2932

3033

@@ -173,9 +176,10 @@ async def run(self):
173176
perform jobs.
174177
"""
175178
self._stopped = False
176-
work_logger.info('Initialising work manager, burst mode: %s', self._burst_mode)
179+
work_logger.info('Initialising work manager, burst mode: %s, creating shadows...', self._burst_mode)
177180

178-
shadows = await self.shadow_factory()
181+
with timeout(10):
182+
shadows = await self.shadow_factory()
179183
assert isinstance(shadows, list), 'shadow_factory should return a list not %s' % type(shadows)
180184
self.job_class = shadows[0].job_class
181185
work_logger.debug('Using first shadows job class "%s"', self.job_class.__name__)
@@ -416,8 +420,8 @@ def start_worker(worker_path: str, worker_class: str, burst: bool, loop: asyncio
416420
:param loop: asyncio loop use to or None
417421
"""
418422
worker_cls = import_string(worker_path, worker_class)
419-
worker = worker_cls(burst=burst, loop=loop)
420423
work_logger.info('Starting "%s" on pid=%d', worker_cls.__name__, os.getpid())
424+
worker = worker_cls(burst=burst, loop=loop)
421425
try:
422426
worker.run_until_complete()
423427
except HandledExit:
@@ -444,32 +448,32 @@ def __init__(self, worker_path, worker_class, burst=False):
444448

445449
def run_worker(self, worker_path, worker_class, burst):
446450
name = 'WorkProcess'
447-
work_logger.info('starting work process "%s"', name)
451+
ctrl_logger.info('starting work process "%s"', name)
448452
self.process = Process(target=start_worker, args=(worker_path, worker_class, burst), name=name)
449453
self.process.start()
450454
self.process.join()
451455
if self.process.exitcode == 0:
452-
work_logger.info('worker process exited ok')
456+
ctrl_logger.info('worker process exited ok')
453457
return
454-
work_logger.critical('worker process %s exited badly with exit code %s',
458+
ctrl_logger.critical('worker process %s exited badly with exit code %s',
455459
self.process.pid, self.process.exitcode)
456460
sys.exit(3)
457461
# could restart worker here, but better to leave it up to the real manager eg. docker restart: always
458462

459463
def handle_sig(self, signum, frame):
460464
signal.signal(signal.SIGINT, self.handle_sig_force)
461465
signal.signal(signal.SIGTERM, self.handle_sig_force)
462-
work_logger.info('got signal: %s, waiting for worker pid=%s to finish...', Signals(signum).name,
466+
ctrl_logger.info('got signal: %s, waiting for worker pid=%s to finish...', Signals(signum).name,
463467
self.process and self.process.pid)
464468
# sleep to make sure worker.handle_sig above has executed if it's going to and detached handle_proxy_signal
465469
time.sleep(0.01)
466470
if self.process and self.process.is_alive():
467-
work_logger.debug("sending custom shutdown signal to worker in case it didn't receive the signal")
471+
ctrl_logger.debug("sending custom shutdown signal to worker in case it didn't receive the signal")
468472
os.kill(self.process.pid, SIG_PROXY)
469473

470474
def handle_sig_force(self, signum, frame):
471-
work_logger.warning('got signal: %s again, forcing exit', Signals(signum).name)
475+
ctrl_logger.warning('got signal: %s again, forcing exit', Signals(signum).name)
472476
if self.process and self.process.is_alive():
473-
work_logger.error('sending worker %d SIGTERM', self.process.pid)
477+
ctrl_logger.error('sending worker %d SIGTERM', self.process.pid)
474478
os.kill(self.process.pid, signal.SIGTERM)
475479
raise ImmediateExit('force exit')

setup.py

+1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
arq=arq.cli:cli
4646
""",
4747
install_requires=[
48+
'async-timeout==1.1.0',
4849
'aioredis>=0.2.9',
4950
'click>=6.6',
5051
'msgpack-python>=0.4.8',

tests/conftest.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,5 +70,5 @@ def mock_actor_worker(mock_actor):
7070

7171
@pytest.fixture
7272
def caplog(caplog):
73-
caplog.set_loggers(log_names=('arq.main', 'arq.work', 'arq.jobs'), fmt='%(name)s: %(message)s')
73+
caplog.set_loggers(log_names=('arq.control', 'arq.main', 'arq.work', 'arq.jobs'), fmt='%(name)s: %(message)s')
7474
return caplog

tests/test_main.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ async def test_dispatch_work(tmpworkdir, loop, caplog, redis_conn):
6868
print(log)
6969
assert ('MockRedisDemoActor.add_numbers ▶ dft\n'
7070
'MockRedisDemoActor.high_add_numbers ▶ high\n'
71-
'Initialising work manager, burst mode: True\n'
71+
'Initialising work manager, burst mode: True, creating shadows...\n'
7272
'Using first shadows job class "Job"\n'
7373
'Running worker with 1 shadow listening to 3 queues\n'
7474
'shadows: MockRedisDemoActor | queues: high, dft, low\n'
@@ -102,7 +102,7 @@ async def test_handle_exception(loop, caplog):
102102
log = re.sub(r'\d{4}-\d+-\d+ \d+:\d+:\d+', '<date time>', log)
103103
log = re.sub(r'\w{3}-\d+ \d+:\d+:\d+', '<date time2>', log)
104104
print(log)
105-
assert ('Initialising work manager, burst mode: True\n'
105+
assert ('Initialising work manager, burst mode: True, creating shadows...\n'
106106
'Running worker with 1 shadow listening to 3 queues\n'
107107
'shadows: MockRedisDemoActor | queues: high, dft, low\n'
108108
'recording health: <date time2> j_complete=0 j_failed=0 j_timedout=0 j_ongoing=0 q_high=0 q_dft=1 q_low=0\n'

tests/test_worker.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ async def test_separate_log_levels(mock_actor_worker, caplog):
7474
await actor.concat(a='1', b='2')
7575
await worker.run()
7676
log = caplog(('0.0\d\ds', '0.0XXs'))
77-
assert ('arq.work: Initialising work manager, burst mode: True\n'
77+
assert ('arq.work: Initialising work manager, burst mode: True, creating shadows...\n'
7878
'arq.work: Running worker with 1 shadow listening to 3 queues\n'
7979
'arq.work: shadows: MockRedisDemoActor | queues: high, dft, low\n'
8080
'arq.work: drain waiting 5.0s for 1 tasks to finish\n'

0 commit comments

Comments
 (0)