Skip to content

Commit 8ef934c

Browse files
Copilotsamtrion
andauthored
feat: Azure Cosmos DB outbox repository (NetEvolve.Pulse.CosmosDb) (#422)
* feat: add Azure Cosmos DB outbox repository (NetEvolve.Pulse.CosmosDb) Agent-Logs-Url: https://github.com/dailydevops/pulse/sessions/8cc0eb88-c126-47a6-9ff3-96f416746c0b Co-authored-by: samtrion <3283596+samtrion@users.noreply.github.com> * feat: Azure Cosmos DB outbox repository (NetEvolve.Pulse.CosmosDb) Agent-Logs-Url: https://github.com/dailydevops/pulse/sessions/8cc0eb88-c126-47a6-9ff3-96f416746c0b Co-authored-by: samtrion <3283596+samtrion@users.noreply.github.com> * fix: Suppress Newtonsoft.Json check in test projects Add <AzureCosmosDisableNewtonsoftJsonCheck>true</AzureCosmosDisableNewtonsoftJsonCheck> to both integration and unit test .csproj files. This disables the explicit-reference check for Newtonsoft.Json in Azure Cosmos DB SDK, as these projects use System.Text.Json via CosmosSystemTextJsonSerializer. * refactor: CosmosDb outbox namespaces and cleanup usings Consolidate CosmosDb outbox classes under NetEvolve.Pulse.Outbox namespace. Update XML doc references to match new namespace. Refactor CosmosDbOutboxRepository to iterate over item IDs for deletion. Remove unused using directives for cleaner code. * test: add unit and integration tests for NetEvolve.Pulse.CosmosDb Agent-Logs-Url: https://github.com/dailydevops/pulse/sessions/d31a8c5a-325d-4a30-be5b-45c7b12889dc Co-authored-by: samtrion <3283596+samtrion@users.noreply.github.com> * fix: mark CosmosDb emulator docker image with /*dockerimage*/ comment Agent-Logs-Url: https://github.com/dailydevops/pulse/sessions/6d32b9f0-7ff3-4f18-934d-989dd7532723 Co-authored-by: samtrion <3283596+samtrion@users.noreply.github.com> * fix: We don't need `Newtonsoft.Json` * fix: Adjusted namespace * test: Recompiled verified files with new namespace * fix: Gnarf, we need `Newtonsoft.Json` * fix: Added missing Annotations * style: Reformatted solution --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: samtrion <3283596+samtrion@users.noreply.github.com> Co-authored-by: Martin Stühmer <me@samtrion.net>
1 parent 6fc0bd0 commit 8ef934c

28 files changed

Lines changed: 1672 additions & 2 deletions

File tree

Directory.Packages.props

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
</ItemGroup>
2020
<ItemGroup>
2121
<PackageVersion Include="Azure.Identity" Version="1.21.0" />
22+
<PackageVersion Include="Microsoft.Azure.Cosmos" Version="3.58.0" />
2223
<PackageVersion Include="Azure.Messaging.ServiceBus" Version="7.20.1" />
2324
<PackageVersion Include="Confluent.Kafka" Version="2.14.0" />
2425
<PackageVersion Include="Dapr.Client" Version="1.17.9" />
@@ -39,10 +40,12 @@
3940
<PackageVersion Include="NetEvolve.Http.Correlation.Abstractions" Version="3.1.64" />
4041
<PackageVersion Include="NetEvolve.Http.Correlation.AspNetCore" Version="3.1.64" />
4142
<PackageVersion Include="NetEvolve.Http.Correlation.TestGenerator" Version="3.1.64" />
43+
<PackageVersion Include="Newtonsoft.Json" Version="13.0.4" />
4244
<PackageVersion Include="Polly.Core" Version="8.6.6" />
4345
<PackageVersion Include="RabbitMQ.Client" Version="7.2.1" />
4446
<PackageVersion Include="StackExchange.Redis" Version="2.12.14" />
4547
<PackageVersion Include="Testcontainers" Version="4.11.0" />
48+
<PackageVersion Include="Testcontainers.CosmosDb" Version="4.11.0" />
4649
<PackageVersion Include="Testcontainers.Kafka" Version="4.11.0" />
4750
<PackageVersion Include="Testcontainers.MongoDb" Version="4.11.0" />
4851
<PackageVersion Include="Testcontainers.MsSql" Version="4.11.0" />

Pulse.slnx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
<Project Path="src/NetEvolve.Pulse.SourceGeneration/NetEvolve.Pulse.SourceGeneration.csproj" />
3939
<Project Path="src/NetEvolve.Pulse.AzureServiceBus/NetEvolve.Pulse.AzureServiceBus.csproj" />
4040
<Project Path="src/NetEvolve.Pulse.SQLite/NetEvolve.Pulse.SQLite.csproj" />
41+
<Project Path="src/NetEvolve.Pulse.CosmosDb/NetEvolve.Pulse.CosmosDb.csproj" />
4142
<Project Path="src/NetEvolve.Pulse.MongoDB/NetEvolve.Pulse.MongoDB.csproj" />
4243
<Project Path="src/NetEvolve.Pulse.Kafka/NetEvolve.Pulse.Kafka.csproj" />
4344
<Project Path="src/NetEvolve.Pulse.Redis/NetEvolve.Pulse.Redis.csproj" />
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
namespace NetEvolve.Pulse;
2+
3+
using Microsoft.Azure.Cosmos;
4+
using Microsoft.Extensions.DependencyInjection;
5+
using Microsoft.Extensions.DependencyInjection.Extensions;
6+
using NetEvolve.Pulse.Extensibility;
7+
using NetEvolve.Pulse.Extensibility.Outbox;
8+
using NetEvolve.Pulse.Outbox;
9+
10+
/// <summary>
11+
/// Extension methods for configuring Azure Cosmos DB outbox services on <see cref="IMediatorBuilder"/>.
12+
/// </summary>
13+
public static class CosmosDbExtensions
14+
{
15+
/// <summary>
16+
/// Adds Azure Cosmos DB outbox persistence and registers all core outbox services.
17+
/// </summary>
18+
/// <param name="configurator">The mediator configurator.</param>
19+
/// <param name="configureOptions">Action to configure <see cref="CosmosDbOutboxOptions"/>.</param>
20+
/// <returns>The configurator for chaining.</returns>
21+
/// <remarks>
22+
/// <para><strong>Prerequisites:</strong></para>
23+
/// <list type="number">
24+
/// <item><description>Register a <see cref="CosmosClient"/> in the DI container before calling this method.</description></item>
25+
/// <item><description>Create the Cosmos DB database and container before using this provider — automatic schema creation is out of scope.</description></item>
26+
/// </list>
27+
/// <para><strong>Registered Services:</strong></para>
28+
/// <list type="bullet">
29+
/// <item><description><see cref="IEventOutbox"/> as <see cref="OutboxEventStore"/> (Scoped)</description></item>
30+
/// <item><description><see cref="IOutboxRepository"/> as <see cref="CosmosDbOutboxRepository"/> (Scoped)</description></item>
31+
/// <item><description><see cref="IOutboxManagement"/> as <see cref="CosmosDbOutboxManagement"/> (Scoped)</description></item>
32+
/// <item><description><see cref="TimeProvider"/> (Singleton, if not already registered)</description></item>
33+
/// </list>
34+
/// <para><strong>Note:</strong></para>
35+
/// Core outbox services are registered automatically; calling
36+
/// <see cref="OutboxExtensions.AddOutbox"/> before this method is optional but harmless.
37+
/// </remarks>
38+
/// <example>
39+
/// <code>
40+
/// services.AddSingleton(new CosmosClient(connectionString));
41+
/// services.AddPulse(config => config
42+
/// .AddCosmosDbOutbox(opts =>
43+
/// {
44+
/// opts.DatabaseName = "MyDatabase";
45+
/// opts.ContainerName = "outbox_messages";
46+
/// })
47+
/// );
48+
/// </code>
49+
/// </example>
50+
public static IMediatorBuilder AddCosmosDbOutbox(
51+
this IMediatorBuilder configurator,
52+
Action<CosmosDbOutboxOptions> configureOptions
53+
)
54+
{
55+
ArgumentNullException.ThrowIfNull(configurator);
56+
ArgumentNullException.ThrowIfNull(configureOptions);
57+
58+
_ = configurator.Services.Configure(configureOptions);
59+
60+
return configurator.RegisterCosmosDbOutboxServices();
61+
}
62+
63+
/// <summary>
64+
/// Registers a Cosmos DB-backed <see cref="IOutboxRepository"/> and <see cref="IOutboxManagement"/>
65+
/// without registering core outbox services.
66+
/// </summary>
67+
/// <param name="configurator">The mediator configurator.</param>
68+
/// <param name="configureOptions">Action to configure <see cref="CosmosDbOutboxOptions"/>.</param>
69+
/// <returns>The configurator for chaining.</returns>
70+
/// <remarks>
71+
/// <para><strong>Prerequisites:</strong></para>
72+
/// <list type="number">
73+
/// <item><description>Call <see cref="OutboxExtensions.AddOutbox"/> first to register core outbox services.</description></item>
74+
/// <item><description>Register a <see cref="CosmosClient"/> in the DI container before calling this method.</description></item>
75+
/// <item><description>Create the Cosmos DB database and container before using this provider.</description></item>
76+
/// </list>
77+
/// <para><strong>Registered Services:</strong></para>
78+
/// <list type="bullet">
79+
/// <item><description><see cref="IOutboxRepository"/> as <see cref="CosmosDbOutboxRepository"/> (Scoped)</description></item>
80+
/// <item><description><see cref="IOutboxManagement"/> as <see cref="CosmosDbOutboxManagement"/> (Scoped)</description></item>
81+
/// <item><description><see cref="TimeProvider"/> (Singleton, if not already registered)</description></item>
82+
/// </list>
83+
/// </remarks>
84+
/// <example>
85+
/// <code>
86+
/// services.AddSingleton(new CosmosClient(connectionString));
87+
/// services.AddPulse(config => config
88+
/// .AddOutbox()
89+
/// .UseCosmosDbOutbox(opts =>
90+
/// {
91+
/// opts.DatabaseName = "MyDatabase";
92+
/// })
93+
/// );
94+
/// </code>
95+
/// </example>
96+
public static IMediatorBuilder UseCosmosDbOutbox(
97+
this IMediatorBuilder configurator,
98+
Action<CosmosDbOutboxOptions> configureOptions
99+
)
100+
{
101+
ArgumentNullException.ThrowIfNull(configurator);
102+
ArgumentNullException.ThrowIfNull(configureOptions);
103+
104+
var services = configurator.Services;
105+
106+
_ = services.Configure(configureOptions);
107+
108+
// Ensure TimeProvider is registered.
109+
services.TryAddSingleton(TimeProvider.System);
110+
111+
// Register the repository.
112+
services.TryAddScoped<IOutboxRepository, CosmosDbOutboxRepository>();
113+
114+
// Register the management API.
115+
services.TryAddScoped<IOutboxManagement, CosmosDbOutboxManagement>();
116+
117+
return configurator;
118+
}
119+
120+
private static IMediatorBuilder RegisterCosmosDbOutboxServices(this IMediatorBuilder configurator)
121+
{
122+
// AddOutbox() uses TryAdd* internally, so this call is safe even when AddOutbox() was already invoked.
123+
_ = configurator
124+
.AddOutbox()
125+
.Services.RemoveAll<IOutboxRepository>()
126+
.AddScoped<IOutboxRepository, CosmosDbOutboxRepository>()
127+
.RemoveAll<IOutboxManagement>()
128+
.AddScoped<IOutboxManagement, CosmosDbOutboxManagement>();
129+
130+
return configurator;
131+
}
132+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFrameworks>$(_ProjectTargetFrameworks)</TargetFrameworks>
4+
<Description>Azure Cosmos DB persistence provider for the Pulse outbox pattern using the official Microsoft.Azure.Cosmos SDK. Provides CosmosDbOutboxRepository implementing IOutboxRepository with optimistic concurrency via ETag-based conditional patch operations. Supports configurable database name, container name, and TTL-based automatic cleanup for completed and dead-letter documents. Designed for document-oriented architectures on Azure Cosmos DB that require the outbox pattern without introducing a relational database dependency.</Description>
5+
<PackageTags>$(PackageTags);outbox;cosmosdb;azure;cosmos;</PackageTags>
6+
<RootNamespace>NetEvolve.Pulse</RootNamespace>
7+
<!-- Suppress the Newtonsoft.Json explicit-reference check; this package uses System.Text.Json via CosmosSystemTextJsonSerializer. -->
8+
<AzureCosmosDisableNewtonsoftJsonCheck>true</AzureCosmosDisableNewtonsoftJsonCheck>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<PackageReference Include="Microsoft.Azure.Cosmos" />
13+
<PackageReference Include="Newtonsoft.Json" />
14+
</ItemGroup>
15+
16+
<ItemGroup>
17+
<ProjectReference Include="..\NetEvolve.Pulse\NetEvolve.Pulse.csproj" />
18+
</ItemGroup>
19+
</Project>
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
namespace NetEvolve.Pulse.Outbox;
2+
3+
using System.Text.Json.Serialization;
4+
using NetEvolve.Pulse.Extensibility.Outbox;
5+
using Newtonsoft.Json;
6+
7+
/// <summary>
8+
/// Represents an <see cref="OutboxMessage"/> persisted as a Cosmos DB document.
9+
/// </summary>
10+
/// <remarks>
11+
/// Maps all <see cref="OutboxMessage"/> fields to their Cosmos DB document equivalents.
12+
/// The Cosmos DB document <c>id</c> property is mapped from <see cref="OutboxMessage.Id"/>.
13+
/// The optional <c>ttl</c> property enables automatic cleanup via the Cosmos DB TTL engine
14+
/// when <see cref="CosmosDbOutboxOptions.EnableTimeToLive"/> is <see langword="true"/>.
15+
/// </remarks>
16+
internal sealed class CosmosDbOutboxDocument
17+
{
18+
/// <summary>
19+
/// Gets or sets the Cosmos DB document identifier, mapped from <see cref="OutboxMessage.Id"/>.
20+
/// </summary>
21+
[JsonPropertyName("id")]
22+
[JsonProperty("id")]
23+
public string Id { get; set; } = string.Empty;
24+
25+
/// <summary>
26+
/// Gets or sets the assembly-qualified event type name.
27+
/// </summary>
28+
[JsonPropertyName("eventType")]
29+
[JsonProperty("eventType")]
30+
public string EventType { get; set; } = string.Empty;
31+
32+
/// <summary>
33+
/// Gets or sets the JSON serialized event payload.
34+
/// </summary>
35+
[JsonPropertyName("payload")]
36+
[JsonProperty("payload")]
37+
public string Payload { get; set; } = string.Empty;
38+
39+
/// <summary>
40+
/// Gets or sets the correlation identifier for distributed tracing.
41+
/// </summary>
42+
[JsonPropertyName("correlationId")]
43+
[JsonProperty("correlationId")]
44+
public string? CorrelationId { get; set; }
45+
46+
/// <summary>
47+
/// Gets or sets the causation identifier.
48+
/// </summary>
49+
[JsonPropertyName("causationId")]
50+
[JsonProperty("causationId")]
51+
public string? CausationId { get; set; }
52+
53+
/// <summary>
54+
/// Gets or sets the message creation timestamp in ISO 8601 format.
55+
/// </summary>
56+
[JsonPropertyName("createdAt")]
57+
[JsonProperty("createdAt")]
58+
public DateTimeOffset CreatedAt { get; set; }
59+
60+
/// <summary>
61+
/// Gets or sets the last update timestamp in ISO 8601 format.
62+
/// </summary>
63+
[JsonPropertyName("updatedAt")]
64+
[JsonProperty("updatedAt")]
65+
public DateTimeOffset UpdatedAt { get; set; }
66+
67+
/// <summary>
68+
/// Gets or sets the timestamp when this message was successfully processed.
69+
/// </summary>
70+
[JsonPropertyName("processedAt")]
71+
[JsonProperty("processedAt")]
72+
public DateTimeOffset? ProcessedAt { get; set; }
73+
74+
/// <summary>
75+
/// Gets or sets the scheduled timestamp for the next retry attempt.
76+
/// </summary>
77+
[JsonPropertyName("nextRetryAt")]
78+
[JsonProperty("nextRetryAt")]
79+
public DateTimeOffset? NextRetryAt { get; set; }
80+
81+
/// <summary>
82+
/// Gets or sets the number of processing attempts.
83+
/// </summary>
84+
[JsonPropertyName("retryCount")]
85+
[JsonProperty("retryCount")]
86+
public int RetryCount { get; set; }
87+
88+
/// <summary>
89+
/// Gets or sets the last error message from a failed processing attempt.
90+
/// </summary>
91+
[JsonPropertyName("error")]
92+
[JsonProperty("error")]
93+
public string? Error { get; set; }
94+
95+
/// <summary>
96+
/// Gets or sets the current processing status as an integer.
97+
/// </summary>
98+
[JsonPropertyName("status")]
99+
[JsonProperty("status")]
100+
public int Status { get; set; }
101+
102+
/// <summary>
103+
/// Gets or sets the TTL override in seconds for this document.
104+
/// A value of <c>-1</c> disables TTL; <see langword="null"/> inherits the container default.
105+
/// </summary>
106+
[JsonPropertyName("ttl")]
107+
[System.Text.Json.Serialization.JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
108+
[JsonProperty("ttl", NullValueHandling = NullValueHandling.Ignore)]
109+
public int? Ttl { get; set; }
110+
111+
/// <summary>
112+
/// Converts this Cosmos DB document to an <see cref="OutboxMessage"/>.
113+
/// </summary>
114+
/// <returns>The corresponding <see cref="OutboxMessage"/>.</returns>
115+
public OutboxMessage ToOutboxMessage() =>
116+
new OutboxMessage
117+
{
118+
Id = Guid.Parse(Id),
119+
EventType = Type.GetType(EventType, throwOnError: false) ?? typeof(object),
120+
Payload = Payload,
121+
CorrelationId = CorrelationId,
122+
CausationId = CausationId,
123+
CreatedAt = CreatedAt,
124+
UpdatedAt = UpdatedAt,
125+
ProcessedAt = ProcessedAt,
126+
NextRetryAt = NextRetryAt,
127+
RetryCount = RetryCount,
128+
Error = Error,
129+
Status = (OutboxMessageStatus)Status,
130+
};
131+
132+
/// <summary>
133+
/// Creates a <see cref="CosmosDbOutboxDocument"/> from an <see cref="OutboxMessage"/>.
134+
/// </summary>
135+
/// <param name="message">The source outbox message.</param>
136+
/// <returns>A new <see cref="CosmosDbOutboxDocument"/> populated from the message.</returns>
137+
public static CosmosDbOutboxDocument FromOutboxMessage(OutboxMessage message)
138+
{
139+
ArgumentNullException.ThrowIfNull(message);
140+
141+
return new CosmosDbOutboxDocument
142+
{
143+
Id = message.Id.ToString(),
144+
EventType = message.EventType.ToOutboxEventTypeName(),
145+
Payload = message.Payload,
146+
CorrelationId = message.CorrelationId,
147+
CausationId = message.CausationId,
148+
CreatedAt = message.CreatedAt,
149+
UpdatedAt = message.UpdatedAt,
150+
ProcessedAt = message.ProcessedAt,
151+
NextRetryAt = message.NextRetryAt,
152+
RetryCount = message.RetryCount,
153+
Error = message.Error,
154+
Status = (int)message.Status,
155+
};
156+
}
157+
}

0 commit comments

Comments
 (0)