Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,6 @@ ModelManifest.xml
/ServiceBusPerfSample/Properties/PublishProfiles/cvtest6 - Web Deploy.pubxml
/ServiceBusPerfSample/Properties/Settings.settings
/ServiceBusPerfSample/Properties/webjob-publish-settings.json

# Rider IDE
.idea/
2 changes: 0 additions & 2 deletions ThroughputTest/PerformanceTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,5 @@ public Task CloseAsync()
protected abstract Task OnOpenAsync();

protected abstract Task OnStartAsync();


}
}
28 changes: 7 additions & 21 deletions ThroughputTest/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
namespace ThroughputTest
{
using CommandLine;
using Microsoft.Azure.ServiceBus;
using System;
using System.Linq;

Expand All @@ -25,28 +24,16 @@ static int Main(params string[] args)

static void RunOptionsAndReturnExitCode(Settings settings)
{
ServiceBusConnectionStringBuilder cb = new ServiceBusConnectionStringBuilder(settings.ConnectionString);
if (string.IsNullOrWhiteSpace(cb.EntityPath))
if (string.IsNullOrWhiteSpace(settings.ConnectionString) || string.IsNullOrWhiteSpace(settings.SendPath))
{
if (string.IsNullOrWhiteSpace(settings.SendPath))
{
Console.WriteLine("--send-path option must be specified if there's no EntityPath in the connection string.");
result = 1;
return;
}
Console.WriteLine("--send-path option must be specified if there's no EntityPath in the connection string.");
result = 1;
return;
}
else
{
if (string.IsNullOrWhiteSpace(settings.SendPath))
{
settings.SendPath = cb.EntityPath;
}
cb.EntityPath = String.Empty;
settings.ConnectionString = cb.ToString();
}
if (settings.ReceivePaths == null || settings.ReceivePaths.Count() == 0)

if (settings.ReceivePaths == null || !settings.ReceivePaths.Any())
{
settings.ReceivePaths = new string[] { settings.SendPath };
settings.ReceivePaths = new[] { settings.SendPath };
}
Console.WriteLine("\n\nPress <ENTER> to STOP at anytime\n");
Metrics metrics = new Metrics(settings);
Expand All @@ -58,7 +45,6 @@ static void RunOptionsAndReturnExitCode(Settings settings)
};
app.Run(experiments).Wait();
Console.WriteLine("Complete");

}
}
}
40 changes: 23 additions & 17 deletions ThroughputTest/ReceiverTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
// OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.
//---------------------------------------------------------------------------------

using Azure.Messaging.ServiceBus;

namespace ThroughputTest
{
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using System;
using System.Collections.Generic;
using System.Diagnostics;
Expand Down Expand Up @@ -47,10 +47,11 @@ protected override Task OnStartAsync()

async Task ReceiveTask(string path)
{
var receiver = new MessageReceiver(this.Settings.ConnectionString, path, this.Settings.ReceiveMode);
receiver.PrefetchCount = Settings.PrefetchCount;
var client = new ServiceBusClient(Settings.ConnectionString);
var receiver = client.CreateReceiver(path, new ServiceBusReceiverOptions { ReceiveMode = Settings.ReceiveMode, PrefetchCount = Settings.PrefetchCount });
var semaphore = new DynamicSemaphoreSlim(this.Settings.MaxInflightReceives.Value + 1);
var done = new SemaphoreSlim(1); done.Wait();
var done = new SemaphoreSlim(1);
done.Wait();
var sw = Stopwatch.StartNew();
long totalReceives = 0;
await Task.Delay(TimeSpan.FromMilliseconds(Settings.WorkDuration));
Expand All @@ -68,12 +69,12 @@ async Task ReceiveTask(string path)
{
nsec = sw.ElapsedTicks;
// we're going to unblock the receives after 10 seconds if there's no pending message
receiver.ReceiveAsync(TimeSpan.FromSeconds(10)).ContinueWith(async (t) =>
receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(10)).ContinueWith(async t =>
{
receiveMetrics.ReceiveDuration100ns = sw.ElapsedTicks - nsec;
if (t.IsFaulted || t.IsCanceled || t.Result == null)
{
if (t.Exception?.GetType() == typeof(ServerBusyException))
if(t.Exception?.InnerException is ServiceBusException exception && exception.Reason == ServiceBusException.FailureReason.ServiceBusy)
{
receiveMetrics.BusyErrors = 1;
if (!this.CancellationToken.IsCancellationRequested)
Expand Down Expand Up @@ -102,12 +103,12 @@ async Task ReceiveTask(string path)
{
await Task.Delay(TimeSpan.FromMilliseconds(Settings.WorkDuration));
}
receiver.CompleteAsync(t.Result.SystemProperties.LockToken).ContinueWith(async (t1) =>
receiver.CompleteMessageAsync(t.Result.LockToken).ContinueWith(async t1 =>
{
receiveMetrics.CompleteDuration100ns = sw.ElapsedTicks - nsec;
if (t1.IsFaulted)
{
if (t1.Exception?.GetType() == typeof(ServerBusyException))
if(t1.Exception?.InnerException is ServiceBusException exception && exception.Reason == ServiceBusException.FailureReason.ServiceBusy)
{
receiveMetrics.BusyErrors = 1;
if (!this.CancellationToken.IsCancellationRequested)
Expand Down Expand Up @@ -139,12 +140,12 @@ async Task ReceiveTask(string path)
{
nsec = sw.ElapsedTicks;
// we're going to unblock the receives after 10 seconds if there's no pending message
receiver.ReceiveAsync(Settings.ReceiveBatchCount, TimeSpan.FromSeconds(10)).ContinueWith(async (t) =>
receiver.ReceiveMessagesAsync(Settings.ReceiveBatchCount, TimeSpan.FromSeconds(10)).ContinueWith(async t =>
{
receiveMetrics.ReceiveDuration100ns = sw.ElapsedTicks - nsec;
if (t.IsFaulted || t.IsCanceled || t.Result == null)
{
if (t.Exception?.GetType() == typeof(ServerBusyException))
if(t.Exception?.InnerException is ServiceBusException exception && exception.Reason == ServiceBusException.FailureReason.ServiceBusy)
{
receiveMetrics.BusyErrors = 1;
if (!this.CancellationToken.IsCancellationRequested)
Expand Down Expand Up @@ -176,12 +177,12 @@ async Task ReceiveTask(string path)
for (int i = 0; i < t.Result.Count; i++)
{
await Task.Delay(TimeSpan.FromMilliseconds(Settings.WorkDuration));
await receiver.CompleteAsync(t.Result[i].SystemProperties.LockToken).ContinueWith(async (t1) =>
await receiver.CompleteMessageAsync(t.Result[i].LockToken).ContinueWith(async t1 =>
{
receiveMetrics.CompleteDuration100ns = sw.ElapsedTicks - nsec;
if (t1.IsFaulted)
{
if (t1.Exception?.GetType() == typeof(ServerBusyException))
if(t1.Exception?.InnerException is ServiceBusException exception && exception.Reason == ServiceBusException.FailureReason.ServiceBusy)
{
receiveMetrics.BusyErrors = 1;
if (!this.CancellationToken.IsCancellationRequested)
Expand Down Expand Up @@ -210,13 +211,12 @@ await receiver.CompleteAsync(t.Result[i].SystemProperties.LockToken).ContinueWit
}
else
{
// batch complete
await receiver.CompleteAsync(t.Result.Select((m) => { return m.SystemProperties.LockToken; })).ContinueWith(async (t1) =>
Func<Task, Task> batchContinueTask = async t1 =>
{
receiveMetrics.CompleteDuration100ns = sw.ElapsedTicks - nsec;
if (t1.IsFaulted)
{
if (t1.Exception?.GetType() == typeof(ServerBusyException))
if(t1.Exception?.InnerException is ServiceBusException exception && exception.Reason == ServiceBusException.FailureReason.ServiceBusy)
{
receiveMetrics.BusyErrors = 1;
if (!this.CancellationToken.IsCancellationRequested)
Expand Down Expand Up @@ -245,7 +245,12 @@ await receiver.CompleteAsync(t.Result[i].SystemProperties.LockToken).ContinueWit
done.Release();
}
}
});
};
// batch complete
foreach (var receivedMessage in t.Result)
{
await receiver.CompleteMessageAsync(receivedMessage).ContinueWith(batchContinueTask);
}
}
}
else
Expand All @@ -266,6 +271,7 @@ await receiver.CompleteAsync(t.Result[i].SystemProperties.LockToken).ContinueWit
}
}
await done.WaitAsync();
await client.DisposeAsync();
}

static void AdjustSemaphore(Observable<int>.ChangingEventArgs e, DynamicSemaphoreSlim semaphore)
Expand Down
42 changes: 22 additions & 20 deletions ThroughputTest/SenderTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
// OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.
//---------------------------------------------------------------------------------

using Azure.Messaging.ServiceBus;

namespace ThroughputTest
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Azure.ServiceBus;
using System.Net.Sockets;

sealed class SenderTask : PerformanceTask
Expand Down Expand Up @@ -43,7 +43,8 @@ protected override Task OnStartAsync()

async Task SendTask()
{
var sender = new MessageSender(this.Settings.ConnectionString, this.Settings.SendPath, NoRetry.Default);
var client = new ServiceBusClient(Settings.ConnectionString, new ServiceBusClientOptions { RetryOptions = new ServiceBusRetryOptions { MaxRetries = 0 } });
var sender = client.CreateSender(Settings.SendPath);
var payload = new byte[this.Settings.MessageSizeInBytes];
var semaphore = new DynamicSemaphoreSlim(this.Settings.MaxInflightSends.Value);
var done = new SemaphoreSlim(1);
Expand All @@ -54,7 +55,7 @@ async Task SendTask()
var sw = Stopwatch.StartNew();

// first send will fail out if the cxn string is bad
await sender.SendAsync(new Message(payload) { TimeToLive = TimeSpan.FromMinutes(5) });
await sender.SendMessageAsync(new ServiceBusMessage(payload) { TimeToLive = TimeSpan.FromMinutes(5) });

for (int j = 0; j < Settings.MessageCount && !this.CancellationToken.IsCancellationRequested; j++)
{
Expand All @@ -72,7 +73,7 @@ async Task SendTask()
}
if (Settings.SendBatchCount <= 1)
{
sender.SendAsync(new Message(payload) { TimeToLive = TimeSpan.FromMinutes(5) })
sender.SendMessageAsync(new ServiceBusMessage(payload) { TimeToLive = TimeSpan.FromMinutes(5) })
.ContinueWith(async (t) =>
{
if (t.IsFaulted || t.IsCanceled)
Expand All @@ -95,12 +96,12 @@ async Task SendTask()
}
else
{
List<Message> batch = new List<Message>();
List<ServiceBusMessage> batch = new List<ServiceBusMessage>();
for (int i = 0; i < Settings.SendBatchCount && j < Settings.MessageCount && !this.CancellationToken.IsCancellationRequested; i++, j++)
{
batch.Add(new Message(payload) { TimeToLive = TimeSpan.FromMinutes(5) });
batch.Add(new ServiceBusMessage(payload) { TimeToLive = TimeSpan.FromMinutes(5) });
}
sender.SendAsync(batch)
sender.SendMessagesAsync(batch)
.ContinueWith(async (t) =>
{
if (t.IsFaulted || t.IsCanceled)
Expand All @@ -123,6 +124,7 @@ async Task SendTask()
}
}
await done.WaitAsync();
await client.DisposeAsync();
}

static void AdjustSemaphore(Observable<int>.ChangingEventArgs e, DynamicSemaphoreSlim semaphore)
Expand All @@ -145,24 +147,25 @@ static void AdjustSemaphore(Observable<int>.ChangingEventArgs e, DynamicSemaphor

private async Task HandleExceptions(DynamicSemaphoreSlim semaphore, SendMetrics sendMetrics, AggregateException ex)
{
bool wait = false;
var wait = false;
ex.Handle((x) =>
{
if (x is ServiceBusCommunicationException)
if (x is ServiceBusException exception)
{
if (((ServiceBusCommunicationException)x).InnerException is SocketException &&
((SocketException)((ServiceBusCommunicationException)x).InnerException).SocketErrorCode == SocketError.HostNotFound)
if (exception.Reason == ServiceBusException.FailureReason.ServiceCommunicationProblem
&& exception.InnerException is SocketException socketException
&& socketException.SocketErrorCode == SocketError.HostNotFound)
{
return false;
}
}

if (x is ServerBusyException)
{
sendMetrics.BusyErrors = 1;
if (!this.CancellationToken.IsCancellationRequested)

if (exception.Reason == ServiceBusException.FailureReason.ServiceBusy)
{
wait = true;
sendMetrics.BusyErrors = 1;
if (!this.CancellationToken.IsCancellationRequested)
{
wait = true;
}
}
}
else
Expand All @@ -178,7 +181,6 @@ private async Task HandleExceptions(DynamicSemaphoreSlim semaphore, SendMetrics
}
semaphore.Release();
Metrics.PushSendMetrics(sendMetrics);

}
}
}
3 changes: 2 additions & 1 deletion ThroughputTest/Settings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
// OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.
//---------------------------------------------------------------------------------

using Azure.Messaging.ServiceBus;

namespace ThroughputTest
{
using System;
using System.Collections.Generic;
using CommandLine;
using CommandLine.Text;
using Microsoft.Azure.ServiceBus;

class Settings
{
Expand Down
2 changes: 1 addition & 1 deletion ThroughputTest/ThroughputTest.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.0.0-preview.4" />
<PackageReference Include="CommandLineParser" Version="2.5.0" />
<PackageReference Include="LinqStatistics" Version="2.2.1" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="3.4.0" />
<PackageReference Include="Microsoft.Win32.Primitives" Version="4.3.0" />
<PackageReference Include="System.Reactive" Version="4.1.5" />
<PackageReference Include="System.Reactive.Interfaces" Version="4.1.5" />
Expand Down