diff --git a/.gitignore b/.gitignore index 9558622..2bdfeb5 100644 --- a/.gitignore +++ b/.gitignore @@ -246,3 +246,6 @@ ModelManifest.xml /ServiceBusPerfSample/Properties/PublishProfiles/cvtest6 - Web Deploy.pubxml /ServiceBusPerfSample/Properties/Settings.settings /ServiceBusPerfSample/Properties/webjob-publish-settings.json + +# Rider IDE +.idea/ \ No newline at end of file diff --git a/ThroughputTest/PerformanceTask.cs b/ThroughputTest/PerformanceTask.cs index e888dc3..2db3915 100644 --- a/ThroughputTest/PerformanceTask.cs +++ b/ThroughputTest/PerformanceTask.cs @@ -51,7 +51,5 @@ public Task CloseAsync() protected abstract Task OnOpenAsync(); protected abstract Task OnStartAsync(); - - } } diff --git a/ThroughputTest/Program.cs b/ThroughputTest/Program.cs index 78cfadf..20f8cb8 100644 --- a/ThroughputTest/Program.cs +++ b/ThroughputTest/Program.cs @@ -9,7 +9,6 @@ namespace ThroughputTest { using CommandLine; - using Microsoft.Azure.ServiceBus; using System; using System.Linq; @@ -25,28 +24,16 @@ static int Main(params string[] args) static void RunOptionsAndReturnExitCode(Settings settings) { - ServiceBusConnectionStringBuilder cb = new ServiceBusConnectionStringBuilder(settings.ConnectionString); - if (string.IsNullOrWhiteSpace(cb.EntityPath)) + if (string.IsNullOrWhiteSpace(settings.ConnectionString) || string.IsNullOrWhiteSpace(settings.SendPath)) { - if (string.IsNullOrWhiteSpace(settings.SendPath)) - { - Console.WriteLine("--send-path option must be specified if there's no EntityPath in the connection string."); - result = 1; - return; - } + Console.WriteLine("--send-path option must be specified if there's no EntityPath in the connection string."); + result = 1; + return; } - else - { - if (string.IsNullOrWhiteSpace(settings.SendPath)) - { - settings.SendPath = cb.EntityPath; - } - cb.EntityPath = String.Empty; - settings.ConnectionString = cb.ToString(); - } - if (settings.ReceivePaths == null || settings.ReceivePaths.Count() == 0) + + if (settings.ReceivePaths == null || !settings.ReceivePaths.Any()) { - settings.ReceivePaths = new string[] { settings.SendPath }; + settings.ReceivePaths = new[] { settings.SendPath }; } Console.WriteLine("\n\nPress to STOP at anytime\n"); Metrics metrics = new Metrics(settings); @@ -58,7 +45,6 @@ static void RunOptionsAndReturnExitCode(Settings settings) }; app.Run(experiments).Wait(); Console.WriteLine("Complete"); - } } } diff --git a/ThroughputTest/ReceiverTask.cs b/ThroughputTest/ReceiverTask.cs index 92723bd..81e9299 100644 --- a/ThroughputTest/ReceiverTask.cs +++ b/ThroughputTest/ReceiverTask.cs @@ -6,10 +6,10 @@ // OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE. //--------------------------------------------------------------------------------- +using Azure.Messaging.ServiceBus; + namespace ThroughputTest { - using Microsoft.Azure.ServiceBus; - using Microsoft.Azure.ServiceBus.Core; using System; using System.Collections.Generic; using System.Diagnostics; @@ -47,10 +47,11 @@ protected override Task OnStartAsync() async Task ReceiveTask(string path) { - var receiver = new MessageReceiver(this.Settings.ConnectionString, path, this.Settings.ReceiveMode); - receiver.PrefetchCount = Settings.PrefetchCount; + var client = new ServiceBusClient(Settings.ConnectionString); + var receiver = client.CreateReceiver(path, new ServiceBusReceiverOptions { ReceiveMode = Settings.ReceiveMode, PrefetchCount = Settings.PrefetchCount }); var semaphore = new DynamicSemaphoreSlim(this.Settings.MaxInflightReceives.Value + 1); - var done = new SemaphoreSlim(1); done.Wait(); + var done = new SemaphoreSlim(1); + done.Wait(); var sw = Stopwatch.StartNew(); long totalReceives = 0; await Task.Delay(TimeSpan.FromMilliseconds(Settings.WorkDuration)); @@ -68,12 +69,12 @@ async Task ReceiveTask(string path) { nsec = sw.ElapsedTicks; // we're going to unblock the receives after 10 seconds if there's no pending message - receiver.ReceiveAsync(TimeSpan.FromSeconds(10)).ContinueWith(async (t) => + receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(10)).ContinueWith(async t => { receiveMetrics.ReceiveDuration100ns = sw.ElapsedTicks - nsec; if (t.IsFaulted || t.IsCanceled || t.Result == null) { - if (t.Exception?.GetType() == typeof(ServerBusyException)) + if(t.Exception?.InnerException is ServiceBusException exception && exception.Reason == ServiceBusException.FailureReason.ServiceBusy) { receiveMetrics.BusyErrors = 1; if (!this.CancellationToken.IsCancellationRequested) @@ -102,12 +103,12 @@ async Task ReceiveTask(string path) { await Task.Delay(TimeSpan.FromMilliseconds(Settings.WorkDuration)); } - receiver.CompleteAsync(t.Result.SystemProperties.LockToken).ContinueWith(async (t1) => + receiver.CompleteMessageAsync(t.Result.LockToken).ContinueWith(async t1 => { receiveMetrics.CompleteDuration100ns = sw.ElapsedTicks - nsec; if (t1.IsFaulted) { - if (t1.Exception?.GetType() == typeof(ServerBusyException)) + if(t1.Exception?.InnerException is ServiceBusException exception && exception.Reason == ServiceBusException.FailureReason.ServiceBusy) { receiveMetrics.BusyErrors = 1; if (!this.CancellationToken.IsCancellationRequested) @@ -139,12 +140,12 @@ async Task ReceiveTask(string path) { nsec = sw.ElapsedTicks; // we're going to unblock the receives after 10 seconds if there's no pending message - receiver.ReceiveAsync(Settings.ReceiveBatchCount, TimeSpan.FromSeconds(10)).ContinueWith(async (t) => + receiver.ReceiveMessagesAsync(Settings.ReceiveBatchCount, TimeSpan.FromSeconds(10)).ContinueWith(async t => { receiveMetrics.ReceiveDuration100ns = sw.ElapsedTicks - nsec; if (t.IsFaulted || t.IsCanceled || t.Result == null) { - if (t.Exception?.GetType() == typeof(ServerBusyException)) + if(t.Exception?.InnerException is ServiceBusException exception && exception.Reason == ServiceBusException.FailureReason.ServiceBusy) { receiveMetrics.BusyErrors = 1; if (!this.CancellationToken.IsCancellationRequested) @@ -176,12 +177,12 @@ async Task ReceiveTask(string path) for (int i = 0; i < t.Result.Count; i++) { await Task.Delay(TimeSpan.FromMilliseconds(Settings.WorkDuration)); - await receiver.CompleteAsync(t.Result[i].SystemProperties.LockToken).ContinueWith(async (t1) => + await receiver.CompleteMessageAsync(t.Result[i].LockToken).ContinueWith(async t1 => { receiveMetrics.CompleteDuration100ns = sw.ElapsedTicks - nsec; if (t1.IsFaulted) { - if (t1.Exception?.GetType() == typeof(ServerBusyException)) + if(t1.Exception?.InnerException is ServiceBusException exception && exception.Reason == ServiceBusException.FailureReason.ServiceBusy) { receiveMetrics.BusyErrors = 1; if (!this.CancellationToken.IsCancellationRequested) @@ -210,13 +211,12 @@ await receiver.CompleteAsync(t.Result[i].SystemProperties.LockToken).ContinueWit } else { - // batch complete - await receiver.CompleteAsync(t.Result.Select((m) => { return m.SystemProperties.LockToken; })).ContinueWith(async (t1) => + Func batchContinueTask = async t1 => { receiveMetrics.CompleteDuration100ns = sw.ElapsedTicks - nsec; if (t1.IsFaulted) { - if (t1.Exception?.GetType() == typeof(ServerBusyException)) + if(t1.Exception?.InnerException is ServiceBusException exception && exception.Reason == ServiceBusException.FailureReason.ServiceBusy) { receiveMetrics.BusyErrors = 1; if (!this.CancellationToken.IsCancellationRequested) @@ -245,7 +245,12 @@ await receiver.CompleteAsync(t.Result[i].SystemProperties.LockToken).ContinueWit done.Release(); } } - }); + }; + // batch complete + foreach (var receivedMessage in t.Result) + { + await receiver.CompleteMessageAsync(receivedMessage).ContinueWith(batchContinueTask); + } } } else @@ -266,6 +271,7 @@ await receiver.CompleteAsync(t.Result[i].SystemProperties.LockToken).ContinueWit } } await done.WaitAsync(); + await client.DisposeAsync(); } static void AdjustSemaphore(Observable.ChangingEventArgs e, DynamicSemaphoreSlim semaphore) diff --git a/ThroughputTest/SenderTask.cs b/ThroughputTest/SenderTask.cs index 4a928f1..cf43dc0 100644 --- a/ThroughputTest/SenderTask.cs +++ b/ThroughputTest/SenderTask.cs @@ -6,6 +6,8 @@ // OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE. //--------------------------------------------------------------------------------- +using Azure.Messaging.ServiceBus; + namespace ThroughputTest { using System; @@ -13,8 +15,6 @@ namespace ThroughputTest using System.Diagnostics; using System.Threading; using System.Threading.Tasks; - using Microsoft.Azure.ServiceBus.Core; - using Microsoft.Azure.ServiceBus; using System.Net.Sockets; sealed class SenderTask : PerformanceTask @@ -43,7 +43,8 @@ protected override Task OnStartAsync() async Task SendTask() { - var sender = new MessageSender(this.Settings.ConnectionString, this.Settings.SendPath, NoRetry.Default); + var client = new ServiceBusClient(Settings.ConnectionString, new ServiceBusClientOptions { RetryOptions = new ServiceBusRetryOptions { MaxRetries = 0 } }); + var sender = client.CreateSender(Settings.SendPath); var payload = new byte[this.Settings.MessageSizeInBytes]; var semaphore = new DynamicSemaphoreSlim(this.Settings.MaxInflightSends.Value); var done = new SemaphoreSlim(1); @@ -54,7 +55,7 @@ async Task SendTask() var sw = Stopwatch.StartNew(); // first send will fail out if the cxn string is bad - await sender.SendAsync(new Message(payload) { TimeToLive = TimeSpan.FromMinutes(5) }); + await sender.SendMessageAsync(new ServiceBusMessage(payload) { TimeToLive = TimeSpan.FromMinutes(5) }); for (int j = 0; j < Settings.MessageCount && !this.CancellationToken.IsCancellationRequested; j++) { @@ -72,7 +73,7 @@ async Task SendTask() } if (Settings.SendBatchCount <= 1) { - sender.SendAsync(new Message(payload) { TimeToLive = TimeSpan.FromMinutes(5) }) + sender.SendMessageAsync(new ServiceBusMessage(payload) { TimeToLive = TimeSpan.FromMinutes(5) }) .ContinueWith(async (t) => { if (t.IsFaulted || t.IsCanceled) @@ -95,12 +96,12 @@ async Task SendTask() } else { - List batch = new List(); + List batch = new List(); for (int i = 0; i < Settings.SendBatchCount && j < Settings.MessageCount && !this.CancellationToken.IsCancellationRequested; i++, j++) { - batch.Add(new Message(payload) { TimeToLive = TimeSpan.FromMinutes(5) }); + batch.Add(new ServiceBusMessage(payload) { TimeToLive = TimeSpan.FromMinutes(5) }); } - sender.SendAsync(batch) + sender.SendMessagesAsync(batch) .ContinueWith(async (t) => { if (t.IsFaulted || t.IsCanceled) @@ -123,6 +124,7 @@ async Task SendTask() } } await done.WaitAsync(); + await client.DisposeAsync(); } static void AdjustSemaphore(Observable.ChangingEventArgs e, DynamicSemaphoreSlim semaphore) @@ -145,24 +147,25 @@ static void AdjustSemaphore(Observable.ChangingEventArgs e, DynamicSemaphor private async Task HandleExceptions(DynamicSemaphoreSlim semaphore, SendMetrics sendMetrics, AggregateException ex) { - bool wait = false; + var wait = false; ex.Handle((x) => { - if (x is ServiceBusCommunicationException) + if (x is ServiceBusException exception) { - if (((ServiceBusCommunicationException)x).InnerException is SocketException && - ((SocketException)((ServiceBusCommunicationException)x).InnerException).SocketErrorCode == SocketError.HostNotFound) + if (exception.Reason == ServiceBusException.FailureReason.ServiceCommunicationProblem + && exception.InnerException is SocketException socketException + && socketException.SocketErrorCode == SocketError.HostNotFound) { return false; } - } - - if (x is ServerBusyException) - { - sendMetrics.BusyErrors = 1; - if (!this.CancellationToken.IsCancellationRequested) + + if (exception.Reason == ServiceBusException.FailureReason.ServiceBusy) { - wait = true; + sendMetrics.BusyErrors = 1; + if (!this.CancellationToken.IsCancellationRequested) + { + wait = true; + } } } else @@ -178,7 +181,6 @@ private async Task HandleExceptions(DynamicSemaphoreSlim semaphore, SendMetrics } semaphore.Release(); Metrics.PushSendMetrics(sendMetrics); - } } } diff --git a/ThroughputTest/Settings.cs b/ThroughputTest/Settings.cs index 21d9717..12f6a30 100644 --- a/ThroughputTest/Settings.cs +++ b/ThroughputTest/Settings.cs @@ -6,13 +6,14 @@ // OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE. //--------------------------------------------------------------------------------- +using Azure.Messaging.ServiceBus; + namespace ThroughputTest { using System; using System.Collections.Generic; using CommandLine; using CommandLine.Text; - using Microsoft.Azure.ServiceBus; class Settings { diff --git a/ThroughputTest/ThroughputTest.csproj b/ThroughputTest/ThroughputTest.csproj index 6100f99..64cfea1 100644 --- a/ThroughputTest/ThroughputTest.csproj +++ b/ThroughputTest/ThroughputTest.csproj @@ -7,9 +7,9 @@ + -