Skip to content

Commit a267025

Browse files
authored
Merge pull request #1819 from rabbitmq/rabbitmq-dotnet-client-1818
Investigate & fix `SemaphoreFullException`
2 parents 35317bb + 6098e44 commit a267025

File tree

5 files changed

+123
-80
lines changed

5 files changed

+123
-80
lines changed

projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33-
using System.Diagnostics;
3433
using System.Runtime.CompilerServices;
3534
using System.Threading;
3635
using System.Threading.Tasks;
@@ -280,10 +279,20 @@ protected override async Task DoHandleCommandAsync(IncomingCommand cmd)
280279
{
281280
if (cmd.CommandId == ProtocolCommandId.BasicCancelOk)
282281
{
283-
Debug.Assert(_consumerTag == new BasicCancelOk(cmd.MethodSpan)._consumerTag);
284-
await _consumerDispatcher.HandleBasicCancelOkAsync(_consumerTag, CancellationToken)
285-
.ConfigureAwait(false);
286-
_tcs.SetResult(true);
282+
var result = new BasicCancelOk(cmd.MethodSpan);
283+
if (_consumerTag == result._consumerTag)
284+
{
285+
await _consumerDispatcher.HandleBasicCancelOkAsync(_consumerTag, CancellationToken)
286+
.ConfigureAwait(false);
287+
_tcs.SetResult(true);
288+
}
289+
else
290+
{
291+
string msg = string.Format("Consumer tag '{0}' does not match expected consumer tag for basic.cancel operation {1}",
292+
result._consumerTag, _consumerTag);
293+
var ex = new InvalidOperationException(msg);
294+
_tcs.SetException(ex);
295+
}
287296
}
288297
else
289298
{

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public async ValueTask BasicPublishAsync<TProperties>(string exchange, string ro
5252
try
5353
{
5454
publisherConfirmationInfo =
55-
await MaybeStartPublisherConfirmationTracking(cancellationToken)
55+
await MaybeStartPublisherConfirmationTrackingAsync(cancellationToken)
5656
.ConfigureAwait(false);
5757

5858
await MaybeEnforceFlowControlAsync(cancellationToken)
@@ -93,7 +93,7 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken)
9393
}
9494
finally
9595
{
96-
await MaybeEndPublisherConfirmationTracking(publisherConfirmationInfo, cancellationToken)
96+
await MaybeEndPublisherConfirmationTrackingAsync(publisherConfirmationInfo, cancellationToken)
9797
.ConfigureAwait(false);
9898
}
9999
}
@@ -107,7 +107,7 @@ public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, Cac
107107
try
108108
{
109109
publisherConfirmationInfo =
110-
await MaybeStartPublisherConfirmationTracking(cancellationToken)
110+
await MaybeStartPublisherConfirmationTrackingAsync(cancellationToken)
111111
.ConfigureAwait(false);
112112

113113
await MaybeEnforceFlowControlAsync(cancellationToken)
@@ -148,7 +148,7 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken)
148148
}
149149
finally
150150
{
151-
await MaybeEndPublisherConfirmationTracking(publisherConfirmationInfo, cancellationToken)
151+
await MaybeEndPublisherConfirmationTrackingAsync(publisherConfirmationInfo, cancellationToken)
152152
.ConfigureAwait(false);
153153
}
154154
}

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
using System;
3333
using System.Collections.Concurrent;
3434
using System.Collections.Generic;
35-
using System.Diagnostics;
3635
using System.Runtime.CompilerServices;
3736
using System.Text;
3837
using System.Threading;
@@ -126,7 +125,7 @@ private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled,
126125
_outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
127126
}
128127

129-
private async Task MaybeConfirmSelect(CancellationToken cancellationToken)
128+
private async Task MaybeConfirmSelectAsync(CancellationToken cancellationToken)
130129
{
131130
if (_publisherConfirmationsEnabled)
132131
{
@@ -148,13 +147,14 @@ private async Task MaybeConfirmSelect(CancellationToken cancellationToken)
148147
enqueued = Enqueue(k);
149148

150149
var method = new ConfirmSelect(false);
150+
151151
await ModelSendAsync(in method, k.CancellationToken)
152152
.ConfigureAwait(false);
153153

154-
bool result = await k;
155-
Debug.Assert(result);
156-
157-
return;
154+
if (false == await k)
155+
{
156+
throw new InvalidOperationException(InternalConstants.BugFound);
157+
}
158158
}
159159
finally
160160
{
@@ -180,12 +180,14 @@ private void HandleAck(ulong deliveryTag, bool multiple)
180180
{
181181
if (multiple)
182182
{
183-
foreach (KeyValuePair<ulong, TaskCompletionSource<bool>> pair in _confirmsTaskCompletionSources)
183+
foreach (KeyValuePair<ulong, TaskCompletionSource<bool>> pair in _confirmsTaskCompletionSources.ToArray())
184184
{
185185
if (pair.Key <= deliveryTag)
186186
{
187-
pair.Value.SetResult(true);
188-
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
187+
if (_confirmsTaskCompletionSources.TryRemove(pair.Key, out TaskCompletionSource<bool>? tcs))
188+
{
189+
tcs.SetResult(true);
190+
}
189191
}
190192
}
191193
}
@@ -208,20 +210,22 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn,
208210
{
209211
if (multiple)
210212
{
211-
foreach (KeyValuePair<ulong, TaskCompletionSource<bool>> pair in _confirmsTaskCompletionSources)
213+
foreach (KeyValuePair<ulong, TaskCompletionSource<bool>> pair in _confirmsTaskCompletionSources.ToArray())
212214
{
213215
if (pair.Key <= deliveryTag)
214216
{
215-
PublishException ex = PublishExceptionFactory.Create(isReturn, pair.Key,
216-
exchange, routingKey, replyCode, replyText);
217-
pair.Value.SetException(ex);
218-
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
217+
if (_confirmsTaskCompletionSources.TryRemove(pair.Key, out TaskCompletionSource<bool>? tcs))
218+
{
219+
PublishException ex = PublishExceptionFactory.Create(isReturn, pair.Key,
220+
exchange, routingKey, replyCode, replyText);
221+
tcs.SetException(ex);
222+
}
219223
}
220224
}
221225
}
222226
else
223227
{
224-
if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource<bool>? tcs))
228+
if (_confirmsTaskCompletionSources.TryRemove(deliveryTag, out TaskCompletionSource<bool>? tcs))
225229
{
226230
PublishException ex = PublishExceptionFactory.Create(isReturn, deliveryTag,
227231
exchange, routingKey, replyCode, replyText);
@@ -289,7 +293,7 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
289293
}
290294

291295
[MethodImpl(MethodImplOptions.AggressiveInlining)]
292-
private async Task<PublisherConfirmationInfo?> MaybeStartPublisherConfirmationTracking(CancellationToken cancellationToken)
296+
private async Task<PublisherConfirmationInfo?> MaybeStartPublisherConfirmationTrackingAsync(CancellationToken cancellationToken)
293297
{
294298
if (_publisherConfirmationsEnabled)
295299
{
@@ -357,7 +361,7 @@ private bool MaybeHandleExceptionWithEnabledPublisherConfirmations(PublisherConf
357361
}
358362

359363
[MethodImpl(MethodImplOptions.AggressiveInlining)]
360-
private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationInfo? publisherConfirmationInfo,
364+
private async Task MaybeEndPublisherConfirmationTrackingAsync(PublisherConfirmationInfo? publisherConfirmationInfo,
361365
CancellationToken cancellationToken)
362366
{
363367
if (_publisherConfirmationsEnabled)
@@ -388,6 +392,11 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn
388392
await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
389393
.ConfigureAwait(false);
390394
}
395+
catch (OperationCanceledException)
396+
{
397+
_confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _);
398+
throw;
399+
}
391400
finally
392401
{
393402
publisherConfirmationInfo.Dispose();

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 23 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,7 @@ await ModelSendAsync(in method, k.CancellationToken)
234234
.ConfigureAwait(false);
235235
}
236236

237-
bool result = await k;
238-
Debug.Assert(result);
237+
AssertResultIsTrue(await k);
239238

240239
await ConsumerDispatcher.WaitForShutdownAsync()
241240
.ConfigureAwait(false);
@@ -387,10 +386,9 @@ await ModelSendAsync(in method, k.CancellationToken)
387386

388387
try
389388
{
390-
bool result = await k;
391-
Debug.Assert(result);
389+
AssertResultIsTrue(await k);
392390

393-
await MaybeConfirmSelect(cancellationToken)
391+
await MaybeConfirmSelectAsync(cancellationToken)
394392
.ConfigureAwait(false);
395393
}
396394
catch (OperationCanceledException)
@@ -465,6 +463,14 @@ await c.HandleCommandAsync(cmd)
465463
}
466464
}
467465

466+
private static void AssertResultIsTrue(bool result)
467+
{
468+
if (false == result)
469+
{
470+
throw new InvalidOperationException(InternalConstants.BugFound);
471+
}
472+
}
473+
468474
[MethodImpl(MethodImplOptions.AggressiveInlining)]
469475
protected ValueTask ModelSendAsync<T>(in T method, CancellationToken cancellationToken) where T : struct, IOutgoingAmqpMethod
470476
{
@@ -978,8 +984,7 @@ await ModelSendAsync(in method, k.CancellationToken)
978984
await ModelSendAsync(in method, k.CancellationToken)
979985
.ConfigureAwait(false);
980986

981-
bool result = await k;
982-
Debug.Assert(result);
987+
AssertResultIsTrue(await k);
983988
}
984989
catch
985990
{
@@ -1108,9 +1113,7 @@ await ModelSendAsync(in method, k.CancellationToken)
11081113

11091114
try
11101115
{
1111-
bool result = await k;
1112-
Debug.Assert(result);
1113-
return;
1116+
AssertResultIsTrue(await k);
11141117
}
11151118
catch (OperationCanceledException)
11161119
{
@@ -1143,9 +1146,7 @@ await ModelSendAsync(in method, k.CancellationToken)
11431146

11441147
try
11451148
{
1146-
bool result = await k;
1147-
Debug.Assert(result);
1148-
return;
1149+
AssertResultIsTrue(await k);
11491150
}
11501151
catch (OperationCanceledException)
11511152
{
@@ -1187,9 +1188,7 @@ await ModelSendAsync(in method, k.CancellationToken)
11871188

11881189
try
11891190
{
1190-
bool result = await k;
1191-
Debug.Assert(result);
1192-
return;
1191+
AssertResultIsTrue(await k);
11931192
}
11941193
catch (OperationCanceledException)
11951194
{
@@ -1240,9 +1239,7 @@ await ModelSendAsync(in method, k.CancellationToken)
12401239

12411240
try
12421241
{
1243-
bool result = await k;
1244-
Debug.Assert(result);
1245-
return;
1242+
AssertResultIsTrue(await k);
12461243
}
12471244
catch (OperationCanceledException)
12481245
{
@@ -1286,8 +1283,7 @@ await ModelSendAsync(in method, k.CancellationToken)
12861283

12871284
try
12881285
{
1289-
bool result = await k;
1290-
Debug.Assert(result);
1286+
AssertResultIsTrue(await k);
12911287
}
12921288
catch (OperationCanceledException)
12931289
{
@@ -1332,8 +1328,7 @@ await ModelSendAsync(in method, k.CancellationToken)
13321328

13331329
try
13341330
{
1335-
bool result = await k;
1336-
Debug.Assert(result);
1331+
AssertResultIsTrue(await k);
13371332
}
13381333
catch (OperationCanceledException)
13391334
{
@@ -1456,8 +1451,7 @@ await ModelSendAsync(in method, k.CancellationToken)
14561451

14571452
try
14581453
{
1459-
bool result = await k;
1460-
Debug.Assert(result);
1454+
AssertResultIsTrue(await k);
14611455
}
14621456
catch (OperationCanceledException)
14631457
{
@@ -1587,9 +1581,7 @@ await ModelSendAsync(in method, k.CancellationToken)
15871581

15881582
try
15891583
{
1590-
bool result = await k;
1591-
Debug.Assert(result);
1592-
return;
1584+
AssertResultIsTrue(await k);
15931585
}
15941586
catch (OperationCanceledException)
15951587
{
@@ -1621,9 +1613,7 @@ await ModelSendAsync(in method, k.CancellationToken)
16211613

16221614
try
16231615
{
1624-
bool result = await k;
1625-
Debug.Assert(result);
1626-
return;
1616+
AssertResultIsTrue(await k);
16271617
}
16281618
catch (OperationCanceledException)
16291619
{
@@ -1655,9 +1645,7 @@ await ModelSendAsync(in method, k.CancellationToken)
16551645

16561646
try
16571647
{
1658-
bool result = await k;
1659-
Debug.Assert(result);
1660-
return;
1648+
AssertResultIsTrue(await k);
16611649
}
16621650
catch (OperationCanceledException)
16631651
{
@@ -1689,9 +1677,7 @@ await ModelSendAsync(in method, k.CancellationToken)
16891677

16901678
try
16911679
{
1692-
bool result = await k;
1693-
Debug.Assert(result);
1694-
return;
1680+
AssertResultIsTrue(await k);
16951681
}
16961682
catch (OperationCanceledException)
16971683
{

0 commit comments

Comments
 (0)