Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions docs/provider_azure_servicebus.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ Please read the [Introduction](intro.md) before reading this provider documentat

- [Configuration](#configuration)
- [Producing Messages](#producing-messages)
- [Publishing Messages](#publishing-messages)
- [Using Default Destinations](#using-default-destinations)
- [Notes](#notes)
- [Message modifier](#message-modifier)
- [Global message modifier](#global-message-modifier)
- [Consuming Messages](#consuming-messages)
Expand Down Expand Up @@ -46,54 +49,64 @@ This determination is set as part of the bus builder configuration.

## Producing Messages

To produce a given `TMessage` to a Azure Service Bus queue (or topic) use:
To send a message of type `TMessage` to an Azure Service Bus **queue** or **topic**, configure the producer as follows:

```cs
// send TMessage to Azure SB queues
```csharp
// Send TMessage to an Azure Service Bus queue
mbb.Produce<TMessage>(x => x.UseQueue());

// OR

// send TMessage to Azure SB topics
// Send TMessage to an Azure Service Bus topic
mbb.Produce<TMessage>(x => x.UseTopic());
```

The above example configures the runtime to deliver all message types of `TMessage` to an Azure Service Bus queue (or topic) respectively.
This tells the runtime where messages of type `TMessage` should be delivered - either to a queue or a topic.

Then anytime you produce a message like this:
### Publishing Messages

```cs
Once configured, you can publish a message with:

```csharp
TMessage msg;

// msg will go to the "some-queue" queue
// Send msg to the "some-queue" queue
bus.Publish(msg, "some-queue");

// OR

// msg will go to the "some-topic" topic
// Send msg to the "some-topic" topic
bus.Publish(msg, "some-topic");
```

The second (`path`) parameter indicates a queue or topic name - depending on the bus configuration.
The second parameter (`path`) specifies the queue or topic name, depending on the producer configuration.

When the default queue (or topic) path is configured for a message type:
#### Using Default Destinations

```cs
You can also configure a **default queue or topic** for a message type:

```csharp
// Set a default queue for TMessage
mbb.Produce<TMessage>(x => x.DefaultQueue("some-queue"));

// OR

// Set a default topic for TMessage
mbb.Produce<TMessage>(x => x.DefaultTopic("some-topic"));
```

and the second (`path`) parameter is omitted in `bus.Publish()`, then that default queue (or topic) name is going to be used:
When a default is set, you can omit the `path` parameter:

```cs
// msg will go to the "some-queue" queue (or "some-topic")
```csharp
// msg will be sent to "some-queue" (or "some-topic")
bus.Publish(msg);
```

Setting the default queue name `DefaultQueue()` for a message type will implicitly configure `UseQueue()` for that message type. By default, if configuration is not provided then runtime will assume a message needs to be sent on a topic (and works as if `UseTopic()` was configured).
#### Notes

- Calling `.DefaultQueue("...")` automatically applies `.UseQueue()` for that message type, and respectively for `.DefaultTopic()` implies `.UseTopic()`.
- When none of the `.UseQueue()`, `.UseTopic()`, `.DefaultQueue()`, nor `.DefaultTopic()` is configured, then runtime implies the producer path represents a topic entity.
- If no `.DefaultQueue()` or `.DefaultTopic()` is set, [topology provisioning](#topology-provisioning) will **not** create the corresponding Azure Service Bus entity.

### Message modifier

Expand Down
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<Version>3.3.1-rc100</Version>
<Version>3.3.1-rc103</Version>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public static TConsumerBuilder CreateQueueOptions<TConsumerBuilder>(this TConsum
if (builder is null) throw new ArgumentNullException(nameof(builder));
if (action is null) throw new ArgumentNullException(nameof(action));

builder.ConsumerSettings.SetQueueOptions(action);
AsbProperties.CreateQueueOptions.Set(builder.ConsumerSettings, action);
return builder;
}

Expand All @@ -253,7 +253,7 @@ public static TConsumerBuilder CreateTopicOptions<TConsumerBuilder>(this TConsum
if (builder is null) throw new ArgumentNullException(nameof(builder));
if (action is null) throw new ArgumentNullException(nameof(action));

builder.ConsumerSettings.SetTopicOptions(action);
AsbProperties.CreateTopicOptions.Set(builder.ConsumerSettings, action);
return builder;
}

Expand All @@ -270,7 +270,7 @@ public static TConsumerBuilder CreateSubscriptionOptions<TConsumerBuilder>(this
if (builder is null) throw new ArgumentNullException(nameof(builder));
if (action is null) throw new ArgumentNullException(nameof(action));

builder.ConsumerSettings.SetSubscriptionOptions(action);
AsbProperties.CreateSubscriptionOptions.Set(builder.ConsumerSettings, action);
return builder;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static ProducerBuilder<T> WithModifier<T>(this ProducerBuilder<T> produce
if (producerBuilder is null) throw new ArgumentNullException(nameof(producerBuilder));
if (modifier is null) throw new ArgumentNullException(nameof(modifier));

producerBuilder.Settings.SetMessageModifier((e, m) => modifier((T)e, m));
AsbProperties.MessageModifier.Set(producerBuilder.Settings, (e, m) => modifier((T)e, m));
return producerBuilder;
}

Expand All @@ -69,7 +69,7 @@ public static ProducerBuilder<T> CreateQueueOptions<T>(this ProducerBuilder<T> p
if (producerBuilder is null) throw new ArgumentNullException(nameof(producerBuilder));
if (action is null) throw new ArgumentNullException(nameof(action));

producerBuilder.Settings.SetQueueOptions(action);
AsbProperties.CreateQueueOptions.Set(producerBuilder.Settings, action);
return producerBuilder;
}

Expand All @@ -84,7 +84,7 @@ public static ProducerBuilder<T> CreateTopicOptions<T>(this ProducerBuilder<T> p
if (producerBuilder is null) throw new ArgumentNullException(nameof(producerBuilder));
if (action is null) throw new ArgumentNullException(nameof(action));

producerBuilder.Settings.SetTopicOptions(action);
AsbProperties.CreateTopicOptions.Set(producerBuilder.Settings, action);
return producerBuilder;
}
}
10 changes: 10 additions & 0 deletions src/SlimMessageBus.Host.AzureServiceBus/Config/AsbProperties.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,14 @@ public static class AsbProperties
public static readonly string SessionIdleTimeoutKey = "Asb_SessionIdleTimeout";
public static readonly string MaxConcurrentSessionsKey = "Asb_MaxConcurrentSessions";
public static readonly string RulesKey = "Asb_Rules";

// Producer
static readonly internal ProviderExtensionProperty<AsbMessageModifier<object>> MessageModifier = new("Asb_MessageModifier");

// Producer and Consumer
static readonly internal ProviderExtensionProperty<Action<CreateQueueOptions>> CreateQueueOptions = new("Asb_CreateQueueOptions");
static readonly internal ProviderExtensionProperty<Action<CreateTopicOptions>> CreateTopicOptions = new("Asb_CreateTopicOptions");

// Consumer
static readonly internal ProviderExtensionProperty<Action<CreateSubscriptionOptions>> CreateSubscriptionOptions = new("Asb_CreateSubscriptionOptions");
}
107 changes: 81 additions & 26 deletions src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ public class ServiceBusMessageBus : MessageBusBase<ServiceBusMessageBusSettings>
private readonly ILogger _logger;
private ServiceBusClient _client;
private SafeDictionaryWrapper<string, ServiceBusSender> _producerByPath;
private ServiceBusTopologyService _topologyService;

public ServiceBusMessageBus(MessageBusSettings settings, ServiceBusMessageBusSettings providerSettings)
: base(settings, providerSettings)
{
_logger = LoggerFactory.CreateLogger<ServiceBusMessageBus>();

OnBuildProvider();
}

Expand Down Expand Up @@ -46,9 +46,10 @@ protected override async ValueTask DisposeAsyncCore()
public override async Task ProvisionTopology()
{
await base.ProvisionTopology();

var provisioningService = new ServiceBusTopologyService(LoggerFactory.CreateLogger<ServiceBusTopologyService>(), Settings, ProviderSettings);
await provisioningService.ProvisionTopology(); // provisioning happens asynchronously
if (_topologyService != null)
{
await _topologyService.ProvisionTopology(); // provisioning happens asynchronously
}
}

#region Overrides of MessageBusBase
Expand All @@ -59,6 +60,7 @@ protected override void Build()

if (ProviderSettings.TopologyProvisioning?.Enabled ?? false)
{
_topologyService = new ServiceBusTopologyService(LoggerFactory.CreateLogger<ServiceBusTopologyService>(), Settings, ProviderSettings);
InitTaskList.Add(ProvisionTopology, CancellationToken);
}

Expand Down Expand Up @@ -133,40 +135,93 @@ public override async Task ProduceToTransport(object message, Type messageType,
{
var transportMessage = GetTransportMessage(message, messageType, messageHeaders, path);
var senderClient = _producerByPath.GetOrAdd(path);
await senderClient.SendMessageAsync(transportMessage, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Delivered item {Message} of type {MessageType} to {Path}", message, messageType?.Name, path);

try
{
await senderClient.SendMessageAsync(transportMessage, cancellationToken).ConfigureAwait(false);
}
catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessagingEntityNotFound && _topologyService != null)
{
await EnsurePathExists(messageType, path);
// Resend messages after the path has been created
await senderClient.SendMessageAsync(transportMessage, cancellationToken).ConfigureAwait(false);
}

_logger.LogDebug("Delivered message {Message} of type {MessageType} to {Path}", message, messageType?.Name, path);
}
catch (Exception ex) when (ex is not ProducerMessageBusException && ex is not TaskCanceledException)
{
throw new ProducerMessageBusException(GetProducerErrorMessage(path, message, messageType, ex), ex);
}
}

public override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
/// <summary>
/// When the topic or queue does not exist, we can try to create it.
/// This happens in cases where the path is dynamically set upon publish/send (different from default path)
/// </summary>
/// <param name="messageType"></param>
/// <param name="path">topic or queue</param>
/// <returns></returns>
private async Task EnsurePathExists(Type messageType, string path)
{
AssertActive();
var producerSettings = GetProducerSettings(messageType);
if (producerSettings.PathKind == PathKind.Topic)
{
_logger.LogInformation("Topic {Path} does not exist, trying to create it", path);
await _topologyService.TryCreateTopic(path, ProviderSettings.TopologyProvisioning.CanProducerCreateTopic);
}
else
{
_logger.LogInformation("Queue {Path} does not exist, trying to create it", path);
await _topologyService.TryCreateQueue(path, ProviderSettings.TopologyProvisioning.CanProducerCreateQueue);
}
}

Task SendBatchAsync(ServiceBusSender senderClient, ServiceBusMessageBatch batch, CancellationToken cancellationToken) =>
Retry.WithDelay(
operation: async cancellationToken =>
internal Task SendBatchAsync<T>(string path,
ServiceBusSender senderClient,
IReadOnlyCollection<T> envelopes,
ServiceBusMessageBatch batch,
CancellationToken cancellationToken)
where T : BulkMessageEnvelope
=> Retry.WithDelay(
operation: async cancellationToken =>
{
try
{
await senderClient.SendMessagesAsync(batch, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Batch of {BatchSize} message(s) dispatched to {Path} ({SizeInBytes} bytes)", batch.Count, path, batch.SizeInBytes);
},
shouldRetry: (exception, attempt) =>
}
catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessagingEntityNotFound && _topologyService != null)
{
if (attempt < 3
&& exception is ServiceBusException ex
&& ex.Reason == ServiceBusFailureReason.ServiceBusy)
var messageType = envelopes.FirstOrDefault()?.MessageType;
if (messageType != null)
{
_logger.LogWarning("Service bus throttled. Backing off (Attempt: {Attempt}).", attempt);
return true;
await EnsurePathExists(messageType, path);
// Resend messages after the path has been created
await senderClient.SendMessagesAsync(batch, cancellationToken).ConfigureAwait(false);
}
return false;
},
delay: TimeSpan.FromSeconds(2),
jitter: TimeSpan.FromSeconds(1),
cancellationToken);
}

_logger.LogDebug("Batch of {BatchSize} message(s) dispatched to {Path} ({SizeInBytes} bytes)", batch.Count, path, batch.SizeInBytes);
},
shouldRetry: (exception, attempt) =>
{
if (attempt < 3
&& exception is ServiceBusException ex
&& ex.Reason == ServiceBusFailureReason.ServiceBusy)
{
_logger.LogWarning("Service bus throttled. Backing off (Attempt: {Attempt}).", attempt);
return true;
}
return false;
},
delay: TimeSpan.FromSeconds(2),
jitter: TimeSpan.FromSeconds(1),
cancellationToken);


public override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
{
AssertActive();

var messages = envelopes
.Select(envelope =>
Expand Down Expand Up @@ -208,7 +263,7 @@ Task SendBatchAsync(ServiceBusSender senderClient, ServiceBusMessageBatch batch,
}

advance = false;
await SendBatchAsync(senderClient, batch, cancellationToken).ConfigureAwait(false);
await SendBatchAsync(path, senderClient, envelopes, batch, cancellationToken).ConfigureAwait(false);
dispatched.AddRange(inBatch);
inBatch.Clear();

Expand Down Expand Up @@ -265,7 +320,7 @@ private void InvokeMessageModifier(object message, Type messageType, ServiceBusM
{
try
{
var messageModifier = settings.GetMessageModifier();
var messageModifier = settings.GetOrDefault(AsbProperties.MessageModifier);
messageModifier?.Invoke(message, m);
}
catch (Exception e)
Expand Down
Loading
Loading