Skip to content

Commit 89c4a15

Browse files
Support Azure Service Bus v5 and topic per event topology (#640)
* Upgrade base transport version * First tweaks to acceptance tests * Support metadata * More metadata mapping * Fix one of the publisher tests * Fix remaining acceptance tests * Fix analyzer tests * Small cleanup * Json test * Better options support implementation * Remove startup validation for now --------- Co-authored-by: Daniel Marbach <[email protected]>
1 parent 0237449 commit 89c4a15

18 files changed

+260
-87
lines changed
Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
11
using NServiceBus;
22

3-
public class SomeEvent : IEvent
4-
{
5-
}
3+
public class SomeEvent : IEvent;
Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
11
using NServiceBus;
22

3-
public class SomeOtherMessage : ICommand
4-
{
5-
}
3+
public class SomeOtherMessage : ICommand;
Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
11
using NServiceBus;
22

3-
public class TriggerMessage : IMessage
4-
{
5-
}
3+
public class TriggerMessage : IMessage;

src/NServiceBus.AzureFunctions.Worker.Analyzer.Tests/AnalyzerTestFixture.cs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using System.Text.RegularExpressions;
1010
using System.Threading;
1111
using System.Threading.Tasks;
12+
using Azure.Core;
1213
using Microsoft.CodeAnalysis;
1314
using Microsoft.CodeAnalysis.CSharp;
1415
using Microsoft.CodeAnalysis.Diagnostics;
@@ -20,10 +21,10 @@
2021
protected virtual LanguageVersion AnalyzerLanguageVersion => LanguageVersion.CSharp7;
2122

2223
protected Task Assert(string markupCode, CancellationToken cancellationToken = default) =>
23-
Assert(Array.Empty<string>(), markupCode, Array.Empty<string>(), cancellationToken);
24+
Assert([], markupCode, [], cancellationToken);
2425

2526
protected Task Assert(string expectedDiagnosticId, string markupCode, CancellationToken cancellationToken = default) =>
26-
Assert(new[] { expectedDiagnosticId }, markupCode, Array.Empty<string>(), cancellationToken);
27+
Assert([expectedDiagnosticId], markupCode, [], cancellationToken);
2728

2829
protected async Task Assert(string[] expectedDiagnosticIds, string markupCode, string[] ignoreDiagnosticIds, CancellationToken cancellationToken = default)
2930
{
@@ -102,18 +103,17 @@ protected Project CreateProject(string[] code)
102103
return project;
103104
}
104105

105-
static AnalyzerTestFixture()
106-
{
106+
static AnalyzerTestFixture() =>
107107
ProjectReferences = ImmutableList.Create(
108108
MetadataReference.CreateFromFile(typeof(object).GetTypeInfo().Assembly.Location),
109109
MetadataReference.CreateFromFile(typeof(Enumerable).GetTypeInfo().Assembly.Location),
110110
MetadataReference.CreateFromFile(typeof(System.Linq.Expressions.Expression).GetTypeInfo().Assembly
111111
.Location),
112112
MetadataReference.CreateFromFile(Assembly.Load("System.Runtime").Location),
113113
MetadataReference.CreateFromFile(typeof(IFunctionEndpoint).GetTypeInfo().Assembly.Location),
114+
MetadataReference.CreateFromFile(typeof(TokenCredential).GetTypeInfo().Assembly.Location),
114115
MetadataReference.CreateFromFile(typeof(EndpointConfiguration).GetTypeInfo().Assembly.Location),
115116
MetadataReference.CreateFromFile(typeof(AzureServiceBusTransport).GetTypeInfo().Assembly.Location));
116-
}
117117

118118
static readonly ImmutableList<PortableExecutableReference> ProjectReferences;
119119

@@ -149,18 +149,16 @@ protected static void WriteAnalyzerDiagnostics(IEnumerable<Diagnostic> diagnosti
149149
}
150150
}
151151

152-
protected static string[] SplitMarkupCodeIntoFiles(string markupCode)
153-
{
154-
return DocumentSplittingRegex.Split(markupCode)
152+
protected static string[] SplitMarkupCodeIntoFiles(string markupCode) =>
153+
DocumentSplittingRegex.Split(markupCode)
155154
.Where(docCode => !string.IsNullOrWhiteSpace(docCode))
156155
.ToArray();
157-
}
158156

159157
static (string[] code, List<(string file, TextSpan span)>) Parse(string markupCode)
160158
{
161159
if (markupCode == null)
162160
{
163-
return (Array.Empty<string>(), new List<(string, TextSpan)>());
161+
return ([], []);
164162
}
165163

166164
var documents = SplitMarkupCodeIntoFiles(markupCode);
@@ -177,15 +175,15 @@ protected static string[] SplitMarkupCodeIntoFiles(string markupCode)
177175

178176
while (remainingCode.Length > 0)
179177
{
180-
var beforeAndAfterOpening = remainingCode.Split(new[] { "[|" }, 2, StringSplitOptions.None);
178+
var beforeAndAfterOpening = remainingCode.Split(["[|"], 2, StringSplitOptions.None);
181179

182180
if (beforeAndAfterOpening.Length == 1)
183181
{
184182
_ = code.Append(beforeAndAfterOpening[0]);
185183
break;
186184
}
187185

188-
var midAndAfterClosing = beforeAndAfterOpening[1].Split(new[] { "|]" }, 2, StringSplitOptions.None);
186+
var midAndAfterClosing = beforeAndAfterOpening[1].Split(["|]"], 2, StringSplitOptions.None);
189187

190188
if (midAndAfterClosing.Length == 1)
191189
{

src/NServiceBus.AzureFunctions.Worker.Analyzer.Tests/ConfigurationAnalyzerTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public class ConfigurationAnalyzerTests : AnalyzerTestFixture<ConfigurationAnaly
1313
[TestCase("OverrideLocalAddress(null)", OverrideLocalAddressNotAllowedId)]
1414
[TestCase("PurgeOnStartup(true)", PurgeOnStartupNotAllowedId)]
1515
[TestCase("SetDiagnosticsPath(null)", SetDiagnosticsPathNotAllowedId)]
16-
[TestCase("UseTransport(new AzureServiceBusTransport(null))", UseTransportNotAllowedId)]
16+
[TestCase("UseTransport(new AzureServiceBusTransport(null, default(TopicTopology)))", UseTransportNotAllowedId)]
1717
public Task DiagnosticIsReportedForEndpointConfiguration(string configuration, string diagnosticId)
1818
{
1919
var source =
@@ -40,7 +40,7 @@ void Bar(ServiceBusTriggeredEndpointConfiguration endpointConfig)
4040
[TestCase("OverrideLocalAddress(null)", OverrideLocalAddressNotAllowedId)]
4141
[TestCase("PurgeOnStartup(true)", PurgeOnStartupNotAllowedId)]
4242
[TestCase("SetDiagnosticsPath(null)", SetDiagnosticsPathNotAllowedId)]
43-
[TestCase("UseTransport(new AzureServiceBusTransport(null))", UseTransportNotAllowedId)]
43+
[TestCase("UseTransport(new AzureServiceBusTransport(null, default(TopicTopology)))", UseTransportNotAllowedId)]
4444
public Task DiagnosticIsNotReportedForOtherEndpointConfiguration(string configuration, string diagnosticId)
4545
{
4646
var source =

src/NServiceBus.AzureFunctions.Worker.Analyzer.Tests/ConfigurationAnalyzerTestsCSharp8.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
public class ConfigurationAnalyzerTestsCSharp8 : AnalyzerTestFixture<ConfigurationAnalyzer>
1010
{
1111
// HINT: In C# 7 this call is ambiguous with the LearningTransport version as the compiler cannot differentiate method calls via generic type constraints
12-
[TestCase("UseTransport<AzureServiceBusTransport>()", UseTransportNotAllowedId)]
12+
[TestCase("UseTransport<AzureServiceBusTransport>(null)", UseTransportNotAllowedId)]
1313
public Task DiagnosticIsReportedForEndpointConfiguration(string configuration, string diagnosticId)
1414
{
1515
var source =
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
namespace NServiceBus.AzureFunctions.Worker.ServiceBus.Tests;
2+
3+
using System;
4+
using System.IO.Hashing;
5+
using System.Text;
6+
7+
public static class AcceptanceTestExtensions
8+
{
9+
public static string ToTopicName(this Type eventType) =>
10+
eventType.FullName.Replace("+", ".").Shorten(maxLength: 260);
11+
12+
// The idea here is to preserve part of the text and append a non-cryptographic hash to it.
13+
// This way, we can have a deterministic and unique names without harming much the readability.
14+
// The chance of collisions should be very low but definitely not zero. We can always switch to
15+
// using more bits in the hash or even back to a cryptographic hash if needed.
16+
public static string Shorten(this string name, int maxLength = 50)
17+
{
18+
if (name.Length <= maxLength)
19+
{
20+
return name;
21+
}
22+
23+
var nameBytes = Encoding.UTF8.GetBytes(name);
24+
var hashValue = XxHash32.Hash(nameBytes);
25+
string hashHex = Convert.ToHexString(hashValue);
26+
27+
int prefixLength = maxLength - hashHex.Length;
28+
29+
if (prefixLength < 0)
30+
{
31+
return hashHex.Length > maxLength
32+
? hashHex[..maxLength] // in case even the hash is too long
33+
: hashHex;
34+
}
35+
36+
string prefix = name[..Math.Min(prefixLength, name.Length)];
37+
return $"{prefix}{hashHex}";
38+
}
39+
}

src/NServiceBus.AzureFunctions.Worker.ServiceBus.Tests/DefaultEndpoint.cs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace NServiceBus.AzureFunctions.Worker.ServiceBus.Tests
22
{
33
using System;
4+
using System.Linq;
45
using System.Threading.Tasks;
56
using Microsoft.Extensions.DependencyInjection;
67
using NServiceBus;
@@ -28,23 +29,21 @@ public async Task<EndpointConfiguration> GetConfiguration(
2829
recoverability.Immediate(immediate => immediate.NumberOfRetries(0));
2930
configuration.SendFailedMessagesTo("error");
3031

32+
configuration.EnforcePublisherMetadataRegistration(endpointConfiguration.EndpointName, endpointConfiguration.PublisherMetadata);
33+
3134
var connectionString = Environment.GetEnvironmentVariable(ServerlessTransport.DefaultServiceBusConnectionName);
32-
var azureServiceBusTransport = new AzureServiceBusTransport(connectionString)
35+
var topology = TopicTopology.Default;
36+
topology.OverrideSubscriptionNameFor(endpointConfiguration.EndpointName, endpointConfiguration.EndpointName.Shorten());
37+
foreach (var eventType in endpointConfiguration.PublisherMetadata.Publishers.SelectMany(p => p.Events))
3338
{
34-
SubscriptionRuleNamingConvention = type =>
35-
{
36-
if (type.FullName.Length <= 50)
37-
{
38-
return type.FullName;
39-
}
40-
41-
return type.Name;
42-
}
43-
};
39+
topology.PublishTo(eventType, eventType.ToTopicName());
40+
topology.SubscribeTo(eventType, eventType.ToTopicName());
41+
}
42+
var azureServiceBusTransport = new AzureServiceBusTransport(connectionString, topology);
4443

45-
var transport = configuration.UseTransport(azureServiceBusTransport);
44+
_ = configuration.UseTransport(azureServiceBusTransport);
4645

47-
configuration.UseSerialization<SystemJsonSerializer>();
46+
_ = configuration.UseSerialization<SystemJsonSerializer>();
4847

4948
await configurationBuilderCustomization(configuration);
5049

src/NServiceBus.AzureFunctions.Worker.ServiceBus.Tests/FunctionEndpointComponent.cs

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
using NServiceBus.AcceptanceTesting;
1515
using NServiceBus.AcceptanceTesting.Customization;
1616
using NServiceBus.AcceptanceTesting.Support;
17+
using Transport.AzureServiceBus;
1718
using Conventions = AcceptanceTesting.Customization.Conventions;
1819

1920
abstract class FunctionEndpointComponent : IComponentBehavior
@@ -23,11 +24,16 @@ public Task<ComponentRunner> CreateRunner(RunDescriptor runDescriptor) =>
2324
new FunctionRunner(
2425
testMessages,
2526
CustomizeConfiguration,
27+
CustomizeHostBuilder,
2628
OnStartCore,
2729
runDescriptor.ScenarioContext,
30+
PublisherMetadata,
2831
GetType()));
2932

30-
public Action<ServiceBusTriggeredEndpointConfiguration> CustomizeConfiguration { private get; set; } = _ => { };
33+
public Action<ServiceBusTriggeredEndpointConfiguration> CustomizeConfiguration { private get; init; } = _ => { };
34+
public Action<IHostBuilder> CustomizeHostBuilder { private get; init; } = _ => { };
35+
36+
public PublisherMetadata PublisherMetadata { get; } = new PublisherMetadata();
3137

3238
public void AddTestMessage(object body, IDictionary<string, object> userProperties = null) =>
3339
testMessages.Add(new TestMessage
@@ -40,13 +46,15 @@ public void AddTestMessage(object body, IDictionary<string, object> userProperti
4046

4147
Task OnStartCore(IFunctionEndpoint functionEndpoint, FunctionContext functionContext) => OnStart(functionEndpoint, functionContext);
4248

43-
IList<TestMessage> testMessages = [];
49+
readonly IList<TestMessage> testMessages = [];
4450

4551
class FunctionRunner(
4652
IList<TestMessage> messages,
4753
Action<ServiceBusTriggeredEndpointConfiguration> configurationCustomization,
54+
Action<IHostBuilder> hostBuilderCustomization,
4855
Func<IFunctionEndpoint, FunctionContext, Task> onStart,
4956
ScenarioContext scenarioContext,
57+
PublisherMetadata publisherMetadata,
5058
Type functionComponentType)
5159
: ComponentRunner
5260
{
@@ -55,32 +63,48 @@ class FunctionRunner(
5563
public override async Task Start(CancellationToken cancellationToken = default)
5664
{
5765
var hostBuilder = Host.CreateDefaultBuilder();
58-
hostBuilder.ConfigureServices(services =>
66+
_ = hostBuilder.ConfigureServices(services =>
5967
{
6068
// TODO Think about using a real logger or the NServiceBus.Testing logging infrastructure?
6169
services.AddSingleton<ILoggerFactory>(new TestLoggingFactory());
6270
});
71+
72+
hostBuilderCustomization(hostBuilder);
73+
6374
hostBuilder.UseNServiceBus(Name, (configuration, triggerConfiguration) =>
6475
{
6576
var endpointConfiguration = triggerConfiguration.AdvancedConfiguration;
6677

6778
endpointConfiguration.TypesToIncludeInScan(functionComponentType.GetTypesScopedByTestClass());
6879

80+
if (triggerConfiguration.Transport.Topology is TopicPerEventTopology topology)
81+
{
82+
topology.OverrideSubscriptionNameFor(Name, Name.Shorten());
83+
84+
foreach (var eventType in publisherMetadata.Publishers.SelectMany(p => p.Events))
85+
{
86+
topology.PublishTo(eventType, eventType.ToTopicName());
87+
topology.SubscribeTo(eventType, eventType.ToTopicName());
88+
}
89+
}
90+
91+
endpointConfiguration.EnforcePublisherMetadataRegistration(Name, publisherMetadata);
92+
6993
endpointConfiguration.Recoverability()
7094
.Immediate(i => i.NumberOfRetries(0))
7195
.Delayed(d => d.NumberOfRetries(0))
7296
.Failed(c => c
7397
// track messages sent to the error queue to fail the test
7498
.OnMessageSentToErrorQueue((failedMessage, ct) =>
7599
{
76-
scenarioContext.FailedMessages.AddOrUpdate(
100+
_ = scenarioContext.FailedMessages.AddOrUpdate(
77101
Name,
78-
new[] { failedMessage },
102+
[failedMessage],
79103
(_, fm) =>
80104
{
81-
var messages = fm.ToList();
82-
messages.Add(failedMessage);
83-
return messages;
105+
var failedMessages = fm.ToList();
106+
failedMessages.Add(failedMessage);
107+
return failedMessages;
84108
});
85109
return Task.CompletedTask;
86110
}));

src/NServiceBus.AzureFunctions.Worker.ServiceBus.Tests/NServiceBus.AzureFunctions.Worker.ServiceBus.Tests.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919
</ItemGroup>
2020

2121
<ItemGroup>
22-
<PackageReference Include="NServiceBus.AcceptanceTesting" Version="9.2.3" />
22+
<PackageReference Include="NServiceBus.AcceptanceTesting" Version="9.2.6" />
2323
<PackageReference Include="Particular.Approvals" Version="2.0.1" />
2424
<PackageReference Include="PublicApiGenerator" Version="11.4.5" />
25+
<PackageReference Include="System.IO.Hashing" Version="9.0.2" />
2526
</ItemGroup>
2627

2728
</Project>

0 commit comments

Comments
 (0)