Skip to content

Commit

Permalink
Merge pull request #36 from iheartradio/routing-key-support
Browse files Browse the repository at this point in the history
Add support for specifying routing key when sending
  • Loading branch information
jonafato committed Apr 27, 2016
2 parents 7a31fe8 + c9a357e commit 40b4fe0
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Version 0.4.0

Release TBD

- Allow callers of ``Producer.send`` to specify a routing key


Version 0.3.0
=============
Expand Down
7 changes: 4 additions & 3 deletions docs/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ Producer Settings
| ``AMQP_OUTBOUND_EXCHANGE_TYPE`` | The type of the outbound exchange. |
| | Defaults to ``'direct'``. |
+------------------------------------+------------------------------------------------+
| ``AMQP_OUTBOUND_ROUTING_KEY`` | The routing key used when sending |
| | messages to the outbound exchange. |
| | Defaults to ``''``. |
| ``AMQP_OUTBOUND_ROUTING_KEY`` | The default routing key used when |
| | sending messages to the outbound |
| | exchange if the ``routing_key`` argument |
| | is not provided. Defaults to ``''``. |
+------------------------------------+------------------------------------------------+
| ``AMQP_PREFETCH_LIMIT`` | The maximum number of messages to keep |
| | in the internal queue waiting to be |
Expand Down
11 changes: 9 additions & 2 deletions henson_amqp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,17 +246,24 @@ def _teardown(self, app):
self._transport.close()

@asyncio.coroutine
def send(self, message):
def send(self, message, *, routing_key=None):
"""Send a message to the configured AMQP broker and exchange.
Args:
message (str): The body of the message to send.
routing_key (str): The routing key that should be used to
send the message. If set to ``None``, the
``AMQP_OUTBOUND_ROUTING_KEY`` application setting will
be used. Defaults to ``None``.
"""
properties = {
'delivery_mode': self.app.settings['AMQP_DELIVERY_MODE'].value,
}
if not self._channel:
yield from self._connect()
if routing_key is None:
routing_key = self.app.settings['AMQP_OUTBOUND_ROUTING_KEY']

yield from self._channel.exchange_declare(
durable=self.app.settings['AMQP_OUTBOUND_EXCHANGE_DURABLE'],
exchange_name=self.app.settings['AMQP_OUTBOUND_EXCHANGE'],
Expand All @@ -265,7 +272,7 @@ def send(self, message):
yield from self._channel.publish(
payload=message,
exchange_name=self.app.settings['AMQP_OUTBOUND_EXCHANGE'],
routing_key=self.app.settings['AMQP_OUTBOUND_ROUTING_KEY'],
routing_key=routing_key,
properties=properties,
)

Expand Down
31 changes: 31 additions & 0 deletions tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,37 @@ def test_retry(test_consumer):
)


@pytest.mark.asyncio
def test_produce_routing_key(test_producer):
"""Test that providing a routing key when sending works."""
test_producer._channel = mock.MagicMock()
message = 'spam and eggs'
routing_key = 'parrot'

yield from test_producer.send(message, routing_key=routing_key)
test_producer._channel.publish.assert_called_with(
payload=message,
routing_key=routing_key,
exchange_name=mock.ANY,
properties=mock.ANY,
)


@pytest.mark.asyncio
def test_produce_no_routing_key(test_producer):
"""Test that a default routing key is used when none is provided."""
test_producer._channel = mock.MagicMock()
message = 'spam and eggs'
yield from test_producer.send(message)

test_producer._channel.publish.assert_called_with(
payload=message,
routing_key=test_producer.app.settings['AMQP_OUTBOUND_ROUTING_KEY'],
exchange_name=mock.ANY,
properties=mock.ANY,
)


def test_producer_factory(test_amqp):
"""Test that ``AMQP.producer`` caches its result."""
producer1 = test_amqp.producer()
Expand Down

0 comments on commit 40b4fe0

Please sign in to comment.