Skip to content

Commit 5a88a28

Browse files
author
Omer Katz
authored
Add native delayed delivery API to kombu (#2128)
* Adjust flake8 configuration to match Celery's configuration. * Add native delayed delivery API. * Add documentation. * Fix linter errors. * Fix pydocstyle errors. * Fix apicheck linter. * Address code review.
1 parent 96a497e commit 5a88a28

14 files changed

+301
-13
lines changed

docs/reference/index.rst

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ Kombu Core
2828
kombu.abstract
2929
kombu.resource
3030
kombu.serialization
31+
kombu.native_delayed_delivery
3132

3233
Kombu Transports
3334
================
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
==========================================================
2+
Native Delayed Delivery - ``native_delayed_delivery``
3+
==========================================================
4+
5+
.. versionadded:: 5.5
6+
7+
.. contents::
8+
:local:
9+
.. currentmodule:: kombu.transport.native_delayed_delivery
10+
11+
.. automodule:: kombu.transport.native_delayed_delivery
12+
:members:
13+
:undoc-members:

docs/userguide/examples.rst

+13
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,16 @@ for priorities using different queues.
5353
:file:`client.py`:
5454

5555
.. literalinclude:: ../../examples/simple_task_queue/client.py
56+
57+
.. _native-delayed-delivery-example:
58+
59+
Native Delayed Delivery
60+
=======================
61+
62+
This example demonstrates how to declare native delayed delivery queues and exchanges and publish a message using
63+
the native delayed delivery mechanism.
64+
65+
:file:`delayed_infra.py`:
66+
67+
.. literalinclude:: ../../examples/delayed_infra.py
68+
:language: python

examples/delayed_infra.py

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from __future__ import annotations
2+
3+
from examples.experimental.async_consume import queue
4+
from kombu import Connection, Exchange, Queue
5+
from kombu.transport.native_delayed_delivery import (
6+
bind_queue_to_native_delayed_delivery_exchange, calculate_routing_key,
7+
declare_native_delayed_delivery_exchanges_and_queues, level_name)
8+
9+
with Connection('amqp://guest:guest@localhost:5672//') as connection:
10+
declare_native_delayed_delivery_exchanges_and_queues(connection, 'quorum')
11+
12+
destination_exchange = Exchange(
13+
'destination', type='topic')
14+
destination_queue = Queue("destination", exchange=destination_exchange)
15+
bind_queue_to_native_delayed_delivery_exchange(connection, queue)
16+
17+
channel = connection.channel()
18+
with connection.Producer(channel=channel) as producer:
19+
routing_key = calculate_routing_key(30, 'destination')
20+
producer.publish(
21+
"delayed msg",
22+
routing_key=routing_key,
23+
exchange=level_name(27)
24+
)

kombu/asynchronous/aws/connection.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ def __init__(self, sqs_connection, http_client=None,
182182
super().__init__(sqs_connection, http_client,
183183
**http_client_params)
184184

185-
def make_request(self, operation, params_, path, verb, callback=None): # noqa
185+
def make_request(self, operation, params_, path, verb, callback=None):
186186
params = params_.copy()
187187
if operation:
188188
params['Action'] = operation
@@ -202,7 +202,7 @@ def make_request(self, operation, params_, path, verb, callback=None): # noqa
202202

203203
return self._mexe(prepared_request, callback=callback)
204204

205-
def get_list(self, operation, params, markers, path='/', parent=None, verb='POST', callback=None): # noqa
205+
def get_list(self, operation, params, markers, path='/', parent=None, verb='POST', callback=None):
206206
return self.make_request(
207207
operation, params, path, verb,
208208
callback=transform(
@@ -211,15 +211,15 @@ def get_list(self, operation, params, markers, path='/', parent=None, verb='POST
211211
),
212212
)
213213

214-
def get_object(self, operation, params, path='/', parent=None, verb='GET', callback=None): # noqa
214+
def get_object(self, operation, params, path='/', parent=None, verb='GET', callback=None):
215215
return self.make_request(
216216
operation, params, path, verb,
217217
callback=transform(
218218
self._on_obj_ready, callback, parent or self, operation
219219
),
220220
)
221221

222-
def get_status(self, operation, params, path='/', parent=None, verb='GET', callback=None): # noqa
222+
def get_status(self, operation, params, path='/', parent=None, verb='GET', callback=None):
223223
return self.make_request(
224224
operation, params, path, verb,
225225
callback=transform(

kombu/asynchronous/aws/sqs/connection.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,8 @@ def _create_json_request(self, operation, params, queue_url):
7676
**param_payload
7777
)
7878

79-
def make_request(self, operation_name, params, queue_url, verb, callback=None): # noqa
80-
"""
81-
Override make_request to support different protocols.
79+
def make_request(self, operation_name, params, queue_url, verb, callback=None):
80+
"""Override make_request to support different protocols.
8281
8382
botocore is soon going to change the default protocol of communicating
8483
with SQS backend from 'query' to 'json', so we need a special

kombu/connection.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ def switch(self, conn_str):
249249
self.declared_entities.clear()
250250
self._closed = False
251251
conn_params = (
252-
parse_url(conn_str) if "://" in conn_str else {"hostname": conn_str} # noqa
252+
parse_url(conn_str) if "://" in conn_str else {"hostname": conn_str}
253253
)
254254
self._init_params(**dict(self._initial_params, **conn_params))
255255

kombu/transport/SQS.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@
127127
* Supports Fanout: Yes
128128
* Supports Priority: No
129129
* Supports TTL: No
130-
""" # noqa: E501
130+
"""
131131

132132

133133
from __future__ import annotations
+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
"""Native Delayed Delivery API.
2+
3+
Only relevant for RabbitMQ.
4+
"""
5+
from __future__ import annotations
6+
7+
from kombu import Connection, Exchange, Queue
8+
from kombu.log import get_logger
9+
10+
logger = get_logger(__name__)
11+
12+
MAX_NUMBER_OF_BITS_TO_USE = 28
13+
MAX_LEVEL = MAX_NUMBER_OF_BITS_TO_USE - 1
14+
CELERY_DELAYED_DELIVERY_EXCHANGE = "celery_delayed_delivery"
15+
16+
17+
def level_name(level: int) -> str:
18+
"""Generates the delayed queue/exchange name based on the level."""
19+
if level < 0:
20+
raise ValueError("level must be a non-negative number")
21+
22+
return f"celery_delayed_{level}"
23+
24+
25+
def declare_native_delayed_delivery_exchanges_and_queues(connection: Connection, queue_type: str) -> None:
26+
"""Declares all native delayed delivery exchanges and queues."""
27+
if queue_type != "classic" and queue_type != "quorum":
28+
raise ValueError("queue_type must be either classic or quorum")
29+
30+
channel = connection.channel()
31+
32+
routing_key: str = "1.#"
33+
34+
for level in range(27, -1, - 1):
35+
current_level = level_name(level)
36+
next_level = level_name(level - 1) if level > 0 else None
37+
38+
delayed_exchange: Exchange = Exchange(
39+
current_level, type="topic").bind(channel)
40+
delayed_exchange.declare()
41+
42+
queue_arguments = {
43+
"x-queue-type": queue_type,
44+
"x-overflow": "reject-publish",
45+
"x-message-ttl": pow(2, level) * 1000,
46+
"x-dead-letter-exchange": next_level if level > 0 else CELERY_DELAYED_DELIVERY_EXCHANGE,
47+
}
48+
49+
if queue_type == 'quorum':
50+
queue_arguments["x-dead-letter-strategy"] = "at-least-once"
51+
52+
delayed_queue: Queue = Queue(
53+
current_level,
54+
queue_arguments=queue_arguments
55+
).bind(channel)
56+
delayed_queue.declare()
57+
delayed_queue.bind_to(current_level, routing_key)
58+
59+
routing_key = "*." + routing_key
60+
61+
routing_key = "0.#"
62+
for level in range(27, 0, - 1):
63+
current_level = level_name(level)
64+
next_level = level_name(level - 1) if level > 0 else None
65+
66+
next_level_exchange: Exchange = Exchange(
67+
next_level, type="topic").bind(channel)
68+
69+
next_level_exchange.bind_to(current_level, routing_key)
70+
71+
routing_key = "*." + routing_key
72+
73+
delivery_exchange: Exchange = Exchange(
74+
CELERY_DELAYED_DELIVERY_EXCHANGE, type="topic").bind(channel)
75+
delivery_exchange.declare()
76+
delivery_exchange.bind_to(level_name(0), routing_key)
77+
78+
79+
def bind_queue_to_native_delayed_delivery_exchange(connection: Connection, queue: Queue) -> None:
80+
"""Binds a queue to the native delayed delivery exchange."""
81+
channel = connection.channel()
82+
queue = queue.bind(channel)
83+
exchange: Exchange = queue.exchange.bind(channel)
84+
85+
if exchange.type == 'direct':
86+
logger.warn(f"Exchange {exchange.name} is a direct exchange "
87+
f"and native delayed delivery do not support direct exchanges.\n"
88+
f"ETA tasks published to this exchange will block the worker until the ETA arrives.")
89+
return
90+
91+
routing_key = queue.routing_key if queue.routing_key.startswith(
92+
'#') else f"#.{queue.routing_key}"
93+
exchange.bind_to(CELERY_DELAYED_DELIVERY_EXCHANGE, routing_key=routing_key)
94+
queue.bind_to(exchange.name, routing_key=routing_key)
95+
96+
97+
def calculate_routing_key(countdown: int, routing_key: str) -> str:
98+
"""Calculate the routing key for publishing a delayed message based on the countdown."""
99+
if countdown < 1:
100+
raise ValueError("countdown must be a positive number")
101+
102+
if not routing_key:
103+
raise ValueError("routing_key must be non-empty")
104+
105+
return '.'.join(list(f'{countdown:028b}')) + f'.{routing_key}'

setup.cfg

+27-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,33 @@ all_files = 1
1010
[flake8]
1111
# classes can be lowercase, arguments and variables can be uppercase
1212
# whenever it makes the code more readable.
13-
extend-ignore = W504, N806, N802, N801, N803
13+
max-line-length = 117
14+
extend-ignore =
15+
# classes can be lowercase, arguments and variables can be uppercase
16+
# whenever it makes the code more readable.
17+
W504, N806, N802, N801, N803
18+
# incompatible with black https://github.com/psf/black/issues/315#issuecomment-395457972
19+
E203,
20+
# Missing docstring in public method
21+
D102,
22+
# Missing docstring in public package
23+
D104,
24+
# Missing docstring in magic method
25+
D105,
26+
# Missing docstring in __init__
27+
D107,
28+
# First line should be in imperative mood; try rephrasing
29+
D401,
30+
# No blank lines allowed between a section header and its content
31+
D412,
32+
# ambiguous variable name '...'
33+
E741,
34+
# ambiguous class definition '...'
35+
E742,
36+
per-file-ignores =
37+
t/*,setup.py,examples/*,docs/*,extra/*:
38+
# docstrings
39+
D,
1440

1541
[isort]
1642
add_imports =

t/unit/transport/test_SQS.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ def test_get_bulk_raises_empty(self):
358358

359359
def test_optional_b64_decode(self):
360360
raw = b'{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77","task": "celery.task.PingTask",' \
361-
b'"args": [],"kwargs": {},"retries": 0,"eta": "2009-11-17T12:30:56.527191"}' # noqa
361+
b'"args": [],"kwargs": {},"retries": 0,"eta": "2009-11-17T12:30:56.527191"}'
362362
b64_enc = base64.b64encode(raw)
363363
assert self.channel._optional_b64_decode(b64_enc) == raw
364364
assert self.channel._optional_b64_decode(raw) == raw

t/unit/transport/test_azureservicebus.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def get_queue_runtime_properties(self, queue_name):
103103

104104
URL_NOCREDS = 'azureservicebus://'
105105
URL_CREDS_SAS = 'azureservicebus://policyname:ke/y@hostname'
106-
URL_CREDS_SAS_FQ = 'azureservicebus://policyname:ke/[email protected]' # noqa
106+
URL_CREDS_SAS_FQ = 'azureservicebus://policyname:ke/[email protected]'
107107
URL_CREDS_DA = 'azureservicebus://DefaultAzureCredential@hostname'
108108
URL_CREDS_DA_FQ = 'azureservicebus://[email protected]' # noqa
109109
URL_CREDS_MI = 'azureservicebus://ManagedIdentityCredential@hostname'

t/unit/transport/test_mongodb.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def test_custom_port(self):
8787
assert hostname == 'mongodb://localhost:27018'
8888

8989
def test_replicaset_hosts(self):
90-
url = 'mongodb://mongodb1.example.com:27317,mongodb2.example.com:27017/?replicaSet=test_rs' # noqa
90+
url = 'mongodb://mongodb1.example.com:27317,mongodb2.example.com:27017/?replicaSet=test_rs'
9191
channel = _create_mock_connection(url).default_channel
9292
hostname, dbname, options = channel._parse_uri()
9393

0 commit comments

Comments
 (0)