Skip to content

Commit 6a7d831

Browse files
Copilotsamtrion
andcommitted
feat: create NetEvolve.Pulse.AzureQueueStorage project with AzureQueueStorageMessageTransport
Agent-Logs-Url: https://github.com/dailydevops/pulse/sessions/46d27db2-a16d-482b-8d30-a7fa998b9eb8 Co-authored-by: samtrion <3283596+samtrion@users.noreply.github.com>
1 parent 2a22ea5 commit 6a7d831

13 files changed

Lines changed: 1099 additions & 0 deletions

Directory.Packages.props

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
<ItemGroup>
2121
<PackageVersion Include="Azure.Identity" Version="1.21.0" />
2222
<PackageVersion Include="Azure.Messaging.ServiceBus" Version="7.20.1" />
23+
<PackageVersion Include="Azure.Storage.Queues" Version="12.23.0" />
2324
<PackageVersion Include="Confluent.Kafka" Version="2.14.0" />
2425
<PackageVersion Include="Dapr.Client" Version="1.17.9" />
2526
<PackageVersion Include="FluentValidation" Version="12.1.1" />
2627
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="5.3.0" />
2728
<PackageVersion Include="Microsoft.Data.SqlClient" Version="7.0.0" />
2829
<PackageVersion Include="Microsoft.Data.Sqlite" Version="10.0.6" />
30+
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="10.0.6" />
31+
<PackageVersion Include="Microsoft.Extensions.Configuration.Binder" Version="10.0.6" />
2932
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="10.0.6" />
3033
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="10.0.6" />
3134
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.6" />

Pulse.slnx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
<Project Path="src/NetEvolve.Pulse.RabbitMQ/NetEvolve.Pulse.RabbitMQ.csproj" />
3838
<Project Path="src/NetEvolve.Pulse.SourceGeneration/NetEvolve.Pulse.SourceGeneration.csproj" />
3939
<Project Path="src/NetEvolve.Pulse.AzureServiceBus/NetEvolve.Pulse.AzureServiceBus.csproj" />
40+
<Project Path="src/NetEvolve.Pulse.AzureQueueStorage/NetEvolve.Pulse.AzureQueueStorage.csproj" />
4041
<Project Path="src/NetEvolve.Pulse.SQLite/NetEvolve.Pulse.SQLite.csproj" />
4142
<Project Path="src/NetEvolve.Pulse.Kafka/NetEvolve.Pulse.Kafka.csproj" />
4243
<Project Path="src/NetEvolve.Pulse.Redis/NetEvolve.Pulse.Redis.csproj" />
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
namespace NetEvolve.Pulse;
2+
3+
using System.Linq;
4+
using Microsoft.Extensions.DependencyInjection;
5+
using Microsoft.Extensions.DependencyInjection.Extensions;
6+
using Microsoft.Extensions.Options;
7+
using NetEvolve.Pulse.Extensibility;
8+
using NetEvolve.Pulse.Extensibility.Outbox;
9+
using NetEvolve.Pulse.Outbox;
10+
11+
/// <summary>
12+
/// Extension methods for registering the Azure Queue Storage transport with the Pulse mediator.
13+
/// </summary>
14+
public static class AzureQueueStorageExtensions
15+
{
16+
/// <summary>
17+
/// Configures the outbox to deliver messages to Azure Queue Storage using a connection string.
18+
/// </summary>
19+
/// <param name="configurator">The mediator configurator.</param>
20+
/// <param name="connectionString">The Azure Storage connection string.</param>
21+
/// <param name="configureOptions">Optional action to further configure <see cref="AzureQueueStorageTransportOptions"/>.</param>
22+
/// <returns>The configurator for chaining.</returns>
23+
/// <exception cref="ArgumentNullException">Thrown when <paramref name="configurator"/> is <see langword="null"/>.</exception>
24+
/// <exception cref="ArgumentException">Thrown when <paramref name="connectionString"/> is <see langword="null"/> or whitespace.</exception>
25+
public static IMediatorBuilder UseAzureQueueStorageTransport(
26+
this IMediatorBuilder configurator,
27+
string connectionString,
28+
Action<AzureQueueStorageTransportOptions>? configureOptions = null
29+
)
30+
{
31+
ArgumentNullException.ThrowIfNull(configurator);
32+
ArgumentException.ThrowIfNullOrWhiteSpace(connectionString);
33+
34+
return configurator.UseAzureQueueStorageTransportCore(
35+
options => options.ConnectionString = connectionString,
36+
configureOptions
37+
);
38+
}
39+
40+
/// <summary>
41+
/// Configures the outbox to deliver messages to Azure Queue Storage using a service URI and managed identity.
42+
/// </summary>
43+
/// <param name="configurator">The mediator configurator.</param>
44+
/// <param name="queueServiceUri">The Azure Queue Storage service URI (e.g., <c>https://account.queue.core.windows.net</c>).</param>
45+
/// <param name="configureOptions">Optional action to further configure <see cref="AzureQueueStorageTransportOptions"/>.</param>
46+
/// <returns>The configurator for chaining.</returns>
47+
/// <exception cref="ArgumentNullException">Thrown when <paramref name="configurator"/> or <paramref name="queueServiceUri"/> is <see langword="null"/>.</exception>
48+
public static IMediatorBuilder UseAzureQueueStorageTransport(
49+
this IMediatorBuilder configurator,
50+
Uri queueServiceUri,
51+
Action<AzureQueueStorageTransportOptions>? configureOptions = null
52+
)
53+
{
54+
ArgumentNullException.ThrowIfNull(configurator);
55+
ArgumentNullException.ThrowIfNull(queueServiceUri);
56+
57+
return configurator.UseAzureQueueStorageTransportCore(
58+
options => options.QueueServiceUri = queueServiceUri,
59+
configureOptions
60+
);
61+
}
62+
63+
private static IMediatorBuilder UseAzureQueueStorageTransportCore(
64+
this IMediatorBuilder configurator,
65+
Action<AzureQueueStorageTransportOptions> coreOptions,
66+
Action<AzureQueueStorageTransportOptions>? configureOptions
67+
)
68+
{
69+
var services = configurator.Services;
70+
71+
_ = services.AddOptions<AzureQueueStorageTransportOptions>().Configure(coreOptions);
72+
73+
if (configureOptions is not null)
74+
{
75+
_ = services.Configure(configureOptions);
76+
}
77+
78+
services.TryAddEnumerable(
79+
ServiceDescriptor.Singleton<
80+
IValidateOptions<AzureQueueStorageTransportOptions>,
81+
AzureQueueStorageTransportOptionsValidator
82+
>()
83+
);
84+
85+
_ = services.AddOptions<AzureQueueStorageTransportOptions>().ValidateOnStart();
86+
87+
var existing = services.FirstOrDefault(d => d.ServiceType == typeof(IMessageTransport));
88+
if (existing is not null)
89+
{
90+
_ = services.Remove(existing);
91+
}
92+
93+
_ = services.AddSingleton<IMessageTransport, AzureQueueStorageMessageTransport>();
94+
95+
return configurator;
96+
}
97+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFrameworks>$(_ProjectTargetFrameworks)</TargetFrameworks>
4+
<Description>Azure Queue Storage transport for the Pulse CQRS mediator outbox. Delivers outbox messages to Azure Queue Storage queues using the Azure SDK, supporting single and batched sends with managed identity or connection string authentication.</Description>
5+
<PackageTags>$(PackageTags);azure;queuestorage;</PackageTags>
6+
<RootNamespace>NetEvolve.Pulse</RootNamespace>
7+
</PropertyGroup>
8+
9+
<ItemGroup>
10+
<PackageReference Include="Azure.Identity" />
11+
<PackageReference Include="Azure.Storage.Queues" />
12+
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" />
13+
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" />
14+
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
15+
<PackageReference Include="Microsoft.Extensions.Options" />
16+
</ItemGroup>
17+
18+
<ItemGroup>
19+
<ProjectReference Include="..\NetEvolve.Pulse.Extensibility\NetEvolve.Pulse.Extensibility.csproj" />
20+
</ItemGroup>
21+
</Project>
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
namespace NetEvolve.Pulse.Outbox;
2+
3+
using System.Diagnostics.CodeAnalysis;
4+
using System.Text;
5+
using System.Text.Json;
6+
using Azure.Identity;
7+
using Azure.Storage.Queues;
8+
using Microsoft.Extensions.Options;
9+
using NetEvolve.Pulse.Extensibility.Outbox;
10+
11+
/// <summary>
12+
/// Azure Queue Storage transport implementation for the outbox processor.
13+
/// </summary>
14+
/// <remarks>
15+
/// The <see cref="QueueClient"/> is lazily initialized on first use. If
16+
/// <see cref="AzureQueueStorageTransportOptions.CreateQueueIfNotExists"/> is <see langword="true"/>,
17+
/// the queue is created automatically during initialization.
18+
/// Messages are JSON-serialized and Base64-encoded before sending.
19+
/// Raw message size must not exceed 48 KB (the Azure Queue Storage Base64-encoded limit of 64 KB).
20+
/// </remarks>
21+
public sealed class AzureQueueStorageMessageTransport : IMessageTransport, IDisposable
22+
{
23+
internal const int MaxMessageSizeInBytes = 48 * 1024; // 48 KB
24+
25+
private readonly AzureQueueStorageTransportOptions _options;
26+
private readonly QueueClient? _queueClientOverride;
27+
private readonly SemaphoreSlim _initLock = new SemaphoreSlim(1, 1);
28+
29+
// Volatile ensures the double-checked locking pattern is correct across threads.
30+
private volatile QueueClient? _queueClient;
31+
32+
/// <summary>
33+
/// Initializes a new instance of the <see cref="AzureQueueStorageMessageTransport"/> class.
34+
/// </summary>
35+
/// <param name="options">The configured transport options.</param>
36+
internal AzureQueueStorageMessageTransport(IOptions<AzureQueueStorageTransportOptions> options)
37+
{
38+
ArgumentNullException.ThrowIfNull(options);
39+
_options = options.Value;
40+
}
41+
42+
/// <summary>
43+
/// Initializes a new instance of the <see cref="AzureQueueStorageMessageTransport"/> class
44+
/// with a pre-built queue client. Used for testing.
45+
/// </summary>
46+
/// <param name="options">The configured transport options.</param>
47+
/// <param name="queueClient">A pre-built queue client to use instead of creating one from options.</param>
48+
internal AzureQueueStorageMessageTransport(
49+
IOptions<AzureQueueStorageTransportOptions> options,
50+
QueueClient queueClient
51+
)
52+
{
53+
ArgumentNullException.ThrowIfNull(options);
54+
ArgumentNullException.ThrowIfNull(queueClient);
55+
_options = options.Value;
56+
_queueClientOverride = queueClient;
57+
}
58+
59+
/// <inheritdoc />
60+
public void Dispose() => _initLock.Dispose();
61+
62+
/// <inheritdoc />
63+
public async Task SendAsync(OutboxMessage message, CancellationToken cancellationToken = default)
64+
{
65+
ArgumentNullException.ThrowIfNull(message);
66+
67+
var json = SerializeMessage(message);
68+
var rawBytes = Encoding.UTF8.GetBytes(json);
69+
70+
if (rawBytes.Length > MaxMessageSizeInBytes)
71+
{
72+
throw new InvalidOperationException(
73+
$"Message size {rawBytes.Length} bytes exceeds the Azure Queue Storage limit of {MaxMessageSizeInBytes} bytes (48 KB raw / 64 KB Base64-encoded)."
74+
);
75+
}
76+
77+
var base64 = Convert.ToBase64String(rawBytes);
78+
var queueClient = await GetQueueClientAsync(cancellationToken).ConfigureAwait(false);
79+
_ = await queueClient
80+
.SendMessageAsync(
81+
base64,
82+
visibilityTimeout: _options.MessageVisibilityTimeout,
83+
cancellationToken: cancellationToken
84+
)
85+
.ConfigureAwait(false);
86+
}
87+
88+
/// <inheritdoc />
89+
public async Task SendBatchAsync(IEnumerable<OutboxMessage> messages, CancellationToken cancellationToken = default)
90+
{
91+
ArgumentNullException.ThrowIfNull(messages);
92+
93+
foreach (var message in messages)
94+
{
95+
await SendAsync(message, cancellationToken).ConfigureAwait(false);
96+
}
97+
}
98+
99+
private static string SerializeMessage(OutboxMessage message) =>
100+
JsonSerializer.Serialize(
101+
new
102+
{
103+
id = message.Id,
104+
eventType = message.EventType.ToOutboxEventTypeName(),
105+
payload = message.Payload,
106+
correlationId = message.CorrelationId,
107+
causationId = message.CausationId,
108+
createdAt = message.CreatedAt,
109+
}
110+
);
111+
112+
[SuppressMessage(
113+
"Maintainability",
114+
"CA1508:Avoid dead conditional code",
115+
Justification = "Double-checked locking: the inner null check guards against concurrent initialization after the semaphore is acquired."
116+
)]
117+
private async Task<QueueClient> GetQueueClientAsync(CancellationToken cancellationToken)
118+
{
119+
if (_queueClientOverride is not null)
120+
{
121+
return _queueClientOverride;
122+
}
123+
124+
if (_queueClient is not null)
125+
{
126+
return _queueClient;
127+
}
128+
129+
await _initLock.WaitAsync(cancellationToken).ConfigureAwait(false);
130+
try
131+
{
132+
// Re-check after acquiring the lock (double-checked locking pattern).
133+
if (_queueClient is not null)
134+
{
135+
return _queueClient;
136+
}
137+
138+
QueueClient client;
139+
140+
if (!string.IsNullOrWhiteSpace(_options.ConnectionString))
141+
{
142+
client = new QueueClient(_options.ConnectionString, _options.QueueName);
143+
}
144+
else
145+
{
146+
var queueUri = new Uri($"{_options.QueueServiceUri!.AbsoluteUri.TrimEnd('/')}/{_options.QueueName}");
147+
client = new QueueClient(queueUri, new DefaultAzureCredential());
148+
}
149+
150+
if (_options.CreateQueueIfNotExists)
151+
{
152+
_ = await client.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
153+
}
154+
155+
_queueClient = client;
156+
return _queueClient;
157+
}
158+
finally
159+
{
160+
_initLock.Release();
161+
}
162+
}
163+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
namespace NetEvolve.Pulse.Outbox;
2+
3+
/// <summary>
4+
/// Configuration options for <see cref="AzureQueueStorageMessageTransport"/>.
5+
/// </summary>
6+
public sealed class AzureQueueStorageTransportOptions
7+
{
8+
/// <summary>
9+
/// Gets or sets the connection string used to authenticate against Azure Queue Storage.
10+
/// </summary>
11+
/// <remarks>
12+
/// When not provided, <see cref="QueueServiceUri"/> must be specified to use managed identity
13+
/// through <c>DefaultAzureCredential</c>.
14+
/// </remarks>
15+
public string? ConnectionString { get; set; }
16+
17+
/// <summary>
18+
/// Gets or sets the URI of the Azure Queue Storage service endpoint (e.g., <c>https://account.queue.core.windows.net</c>).
19+
/// </summary>
20+
/// <remarks>Required when <see cref="ConnectionString"/> is not supplied.</remarks>
21+
public Uri? QueueServiceUri { get; set; }
22+
23+
/// <summary>
24+
/// Gets or sets the name of the queue to which outbox messages are sent.
25+
/// </summary>
26+
/// <remarks>Defaults to <c>pulse-outbox</c>.</remarks>
27+
public string QueueName { get; set; } = "pulse-outbox";
28+
29+
/// <summary>
30+
/// Gets or sets the visibility timeout applied to each message sent to the queue.
31+
/// </summary>
32+
/// <remarks>
33+
/// When <see langword="null"/>, the queue's default visibility timeout is used.
34+
/// </remarks>
35+
public TimeSpan? MessageVisibilityTimeout { get; set; }
36+
37+
/// <summary>
38+
/// Gets or sets a value indicating whether the queue should be created automatically if it does not exist.
39+
/// </summary>
40+
/// <remarks>Defaults to <see langword="true"/>.</remarks>
41+
public bool CreateQueueIfNotExists { get; set; } = true;
42+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
namespace NetEvolve.Pulse.Outbox;
2+
3+
using Microsoft.Extensions.Configuration;
4+
using Microsoft.Extensions.Options;
5+
6+
/// <summary>
7+
/// Binds the <c>Pulse:Transports:AzureQueueStorage</c> configuration section
8+
/// to <see cref="AzureQueueStorageTransportOptions"/>.
9+
/// </summary>
10+
internal sealed class AzureQueueStorageTransportOptionsConfiguration
11+
: IConfigureOptions<AzureQueueStorageTransportOptions>
12+
{
13+
private const string ConfigurationSection = "Pulse:Transports:AzureQueueStorage";
14+
15+
private readonly IConfiguration _configuration;
16+
17+
/// <summary>
18+
/// Initializes a new instance of the <see cref="AzureQueueStorageTransportOptionsConfiguration"/> class.
19+
/// </summary>
20+
/// <param name="configuration">The configuration root.</param>
21+
public AzureQueueStorageTransportOptionsConfiguration(IConfiguration configuration)
22+
{
23+
ArgumentNullException.ThrowIfNull(configuration);
24+
_configuration = configuration;
25+
}
26+
27+
/// <inheritdoc />
28+
public void Configure(AzureQueueStorageTransportOptions options) =>
29+
_configuration.GetSection(ConfigurationSection).Bind(options);
30+
}

0 commit comments

Comments
 (0)