Skip to content

Commit e032dae

Browse files
Copilotromanettmarcschier
authored
Fix thread pool exhaustion with many MQTT writer groups (#3395)
* Initial plan * Convert PubSub PublishNetworkMessage to async to prevent thread pool exhaustion Co-authored-by: romanett <[email protected]> * Fix test files to properly use async/await patterns Co-authored-by: romanett <[email protected]> * Add Async suffix to PublishNetworkMessage method Co-authored-by: marcschier <[email protected]> --------- Co-authored-by: copilot-swe-agent[bot] <[email protected]> Co-authored-by: romanett <[email protected]> Co-authored-by: marcschier <[email protected]>
1 parent 965c5ec commit e032dae

File tree

10 files changed

+97
-90
lines changed

10 files changed

+97
-90
lines changed

Libraries/Opc.Ua.PubSub/IUaPubSubConnection.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
using System;
3131
using System.Collections.Generic;
32+
using System.Threading.Tasks;
3233

3334
namespace Opc.Ua.PubSub
3435
{
@@ -82,7 +83,7 @@ IList<UaNetworkMessage> CreateNetworkMessages(
8283
/// <summary>
8384
/// Publish the network message
8485
/// </summary>
85-
bool PublishNetworkMessage(UaNetworkMessage networkMessage);
86+
Task<bool> PublishNetworkMessageAsync(UaNetworkMessage networkMessage);
8687

8788
/// <summary>
8889
/// Get flag that indicates if all the network clients are connected

Libraries/Opc.Ua.PubSub/IntervalRunner.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,14 @@ public IntervalRunner(
5858
object id,
5959
double interval,
6060
Func<bool> canExecuteFunc,
61-
Action intervalAction,
61+
Func<Task> intervalActionAsync,
6262
ITelemetryContext telemetry)
6363
{
6464
m_logger = telemetry.CreateLogger<IntervalRunner>();
6565
Id = id;
6666
Interval = interval;
6767
CanExecuteFunc = canExecuteFunc;
68-
IntervalAction = intervalAction;
68+
IntervalActionAsync = intervalActionAsync;
6969
}
7070

7171
/// <summary>
@@ -99,9 +99,9 @@ public double Interval
9999
public Func<bool> CanExecuteFunc { get; }
100100

101101
/// <summary>
102-
/// Get the action that will be executed at each interval
102+
/// Get the async action that will be executed at each interval
103103
/// </summary>
104-
public Action IntervalAction { get; }
104+
public Func<Task> IntervalActionAsync { get; }
105105

106106
/// <summary>
107107
/// Starts the IntervalRunner and makes it ready to execute the code.
@@ -206,10 +206,10 @@ await Task.Delay(
206206
double nextCycle = (long)m_interval * HiResClock.TicksPerMillisecond;
207207
m_nextPublishTick += nextCycle;
208208

209-
if (IntervalAction != null && CanExecuteFunc != null && CanExecuteFunc())
209+
if (IntervalActionAsync != null && CanExecuteFunc != null && CanExecuteFunc())
210210
{
211-
// call on a new thread
212-
Task.Run(() => IntervalAction());
211+
// call on a new task without blocking the thread pool
212+
_ = Task.Run(IntervalActionAsync);
213213
}
214214
}
215215
}

Libraries/Opc.Ua.PubSub/Transport/MqttMetadataPublisher.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
* ======================================================================*/
2929

3030
using System;
31+
using System.Threading.Tasks;
3132
using Microsoft.Extensions.Logging;
3233

3334
namespace Opc.Ua.PubSub.Transport
@@ -65,7 +66,7 @@ internal MqttMetadataPublisher(
6566
dataSetWriter.DataSetWriterId,
6667
metaDataUpdateTime,
6768
CanPublish,
68-
PublishMessage,
69+
PublishMessageAsync,
6970
telemetry);
7071
}
7172

@@ -103,7 +104,7 @@ private bool CanPublish()
103104
/// <summary>
104105
/// Generate and publish the dataset MetaData message
105106
/// </summary>
106-
private void PublishMessage()
107+
private async Task PublishMessageAsync()
107108
{
108109
try
109110
{
@@ -113,7 +114,7 @@ private void PublishMessage()
113114
m_dataSetWriter);
114115
if (metaDataNetworkMessage != null)
115116
{
116-
bool success = m_parentConnection.PublishNetworkMessage(metaDataNetworkMessage);
117+
bool success = await m_parentConnection.PublishNetworkMessageAsync(metaDataNetworkMessage).ConfigureAwait(false);
117118
m_logger.LogInformation(
118119
"MqttMetadataPublisher Publish DataSetMetaData, DataSetWriterId:{DataSetWriterId}; success = {Success}",
119120
m_dataSetWriter.DataSetWriterId,

Libraries/Opc.Ua.PubSub/Transport/MqttPubSubConnection.cs

Lines changed: 54 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ public UaNetworkMessage CreateDataSetMetaDataNetworkMessage(
248248
/// <summary>
249249
/// Publish the network message
250250
/// </summary>
251-
public override bool PublishNetworkMessage(UaNetworkMessage networkMessage)
251+
public override async Task<bool> PublishNetworkMessageAsync(UaNetworkMessage networkMessage)
252252
{
253253
if (networkMessage == null || m_publisherMqttClient == null)
254254
{
@@ -257,76 +257,78 @@ public override bool PublishNetworkMessage(UaNetworkMessage networkMessage)
257257

258258
try
259259
{
260+
IMqttClient publisherClient;
260261
lock (Lock)
261262
{
262-
if (m_publisherMqttClient.IsConnected)
263+
publisherClient = m_publisherMqttClient;
264+
}
265+
266+
if (publisherClient != null && publisherClient.IsConnected)
267+
{
268+
// get the encoded bytes
269+
byte[] bytes = networkMessage.Encode(MessageContext);
270+
271+
try
263272
{
264-
// get the encoded bytes
265-
byte[] bytes = networkMessage.Encode(MessageContext);
273+
string queueName = null;
274+
BrokerTransportQualityOfService qos
275+
= BrokerTransportQualityOfService.AtLeastOnce;
266276

267-
try
277+
// the network messages that have DataSetWriterId are either metaData messages or SingleDataSet messages and
278+
if (networkMessage.DataSetWriterId != null)
268279
{
269-
string queueName = null;
270-
BrokerTransportQualityOfService qos
271-
= BrokerTransportQualityOfService.AtLeastOnce;
280+
DataSetWriterDataType dataSetWriter =
281+
networkMessage.WriterGroupConfiguration.DataSetWriters.Find(x =>
282+
x.DataSetWriterId == networkMessage.DataSetWriterId);
272283

273-
// the network messages that have DataSetWriterId are either metaData messages or SingleDataSet messages and
274-
if (networkMessage.DataSetWriterId != null)
284+
if (dataSetWriter != null &&
285+
ExtensionObject.ToEncodeable(dataSetWriter.TransportSettings)
286+
is BrokerDataSetWriterTransportDataType transportSettings)
275287
{
276-
DataSetWriterDataType dataSetWriter =
277-
networkMessage.WriterGroupConfiguration.DataSetWriters.Find(x =>
278-
x.DataSetWriterId == networkMessage.DataSetWriterId);
279-
280-
if (dataSetWriter != null &&
281-
ExtensionObject.ToEncodeable(dataSetWriter.TransportSettings)
282-
is BrokerDataSetWriterTransportDataType transportSettings)
283-
{
284-
qos = transportSettings.RequestedDeliveryGuarantee;
288+
qos = transportSettings.RequestedDeliveryGuarantee;
285289

286-
queueName = networkMessage.IsMetaDataMessage
287-
? transportSettings.MetaDataQueueName
288-
: transportSettings.QueueName;
289-
}
290+
queueName = networkMessage.IsMetaDataMessage
291+
? transportSettings.MetaDataQueueName
292+
: transportSettings.QueueName;
290293
}
294+
}
291295

292-
if (queueName == null ||
293-
qos == BrokerTransportQualityOfService.NotSpecified)
296+
if (queueName == null ||
297+
qos == BrokerTransportQualityOfService.NotSpecified)
298+
{
299+
if (ExtensionObject.ToEncodeable(
300+
networkMessage.WriterGroupConfiguration.TransportSettings)
301+
is BrokerWriterGroupTransportDataType transportSettings)
294302
{
295-
if (ExtensionObject.ToEncodeable(
296-
networkMessage.WriterGroupConfiguration.TransportSettings)
297-
is BrokerWriterGroupTransportDataType transportSettings)
303+
queueName ??= transportSettings.QueueName;
304+
// if the value is not specified and the value of the parent object shall be used
305+
if (qos == BrokerTransportQualityOfService.NotSpecified)
298306
{
299-
queueName ??= transportSettings.QueueName;
300-
// if the value is not specified and the value of the parent object shall be used
301-
if (qos == BrokerTransportQualityOfService.NotSpecified)
302-
{
303-
qos = transportSettings.RequestedDeliveryGuarantee;
304-
}
307+
qos = transportSettings.RequestedDeliveryGuarantee;
305308
}
306309
}
310+
}
307311

308-
if (!string.IsNullOrEmpty(queueName))
312+
if (!string.IsNullOrEmpty(queueName))
313+
{
314+
var message = new MqttApplicationMessage
309315
{
310-
var message = new MqttApplicationMessage
311-
{
312-
Topic = queueName,
313-
PayloadSegment = new ArraySegment<byte>(bytes),
314-
QualityOfServiceLevel = GetMqttQualityOfServiceLevel(qos),
315-
Retain = networkMessage.IsMetaDataMessage
316-
};
316+
Topic = queueName,
317+
PayloadSegment = new ArraySegment<byte>(bytes),
318+
QualityOfServiceLevel = GetMqttQualityOfServiceLevel(qos),
319+
Retain = networkMessage.IsMetaDataMessage
320+
};
317321

318-
m_publisherMqttClient.PublishAsync(message).GetAwaiter()
319-
.GetResult();
320-
}
321-
}
322-
catch (Exception ex)
323-
{
324-
m_logger.LogError(ex, "MqttPubSubConnection.PublishNetworkMessage");
325-
return false;
322+
await publisherClient.PublishAsync(message).ConfigureAwait(false);
326323
}
327-
328-
return true;
329324
}
325+
catch (Exception ex)
326+
{
327+
m_logger.LogError(ex, "MqttPubSubConnection.PublishNetworkMessage");
328+
return false;
329+
}
330+
331+
return true;
330332
}
331333
}
332334
catch (Exception ex)

Libraries/Opc.Ua.PubSub/Transport/UdpDiscoveryPublisher.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ UaNetworkMessage message in m_udpConnection
217217
"UdpDiscoveryPublisher.SendResponseDataSetMetaData before sending message for DataSetWriterId:{DataSetWriterId}",
218218
message.DataSetWriterId);
219219

220-
m_udpConnection.PublishNetworkMessage(message);
220+
m_udpConnection.PublishNetworkMessageAsync(message);
221221
}
222222
m_metadataWriterIdsToSend.Clear();
223223
}
@@ -251,7 +251,7 @@ private async Task SendResponseDataSetWriterConfigurationAsync()
251251
"UdpDiscoveryPublisher.SendResponseDataSetWriterConfiguration Before sending message for DataSetWriterId:{DataSetWriterId}",
252252
responsesMessage.DataSetWriterId);
253253

254-
m_udpConnection.PublishNetworkMessage(responsesMessage);
254+
m_udpConnection.PublishNetworkMessageAsync(responsesMessage);
255255
}
256256
}
257257
}
@@ -280,7 +280,7 @@ private async Task SendResponsePublisherEndpointsAsync()
280280
m_logger.LogInformation(
281281
"UdpDiscoveryPublisher.SendResponsePublisherEndpoints before sending message for PublisherEndpoints.");
282282

283-
m_udpConnection.PublishNetworkMessage(message);
283+
m_udpConnection.PublishNetworkMessageAsync(message);
284284
}
285285
}
286286

Libraries/Opc.Ua.PubSub/Transport/UdpDiscoverySubscriber.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public UdpDiscoverySubscriber(UdpPubSubConnection udpConnection, ITelemetryConte
6565
udpConnection.PubSubConnectionConfiguration.Name,
6666
kInitialRequestInterval,
6767
CanPublish,
68-
RequestDiscoveryMessages,
68+
RequestDiscoveryMessagesAsync,
6969
telemetry);
7070
}
7171

@@ -280,10 +280,11 @@ private bool CanPublish()
280280
/// <summary>
281281
/// Joint task to request discovery messages
282282
/// </summary>
283-
private void RequestDiscoveryMessages()
283+
private Task RequestDiscoveryMessagesAsync()
284284
{
285285
SendDiscoveryRequestDataSetMetaData();
286286
SendDiscoveryRequestDataSetWriterConfiguration();
287+
return Task.CompletedTask;
287288
}
288289
}
289290
}

Libraries/Opc.Ua.PubSub/Transport/UdpPubSubConnection.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -416,13 +416,13 @@ DataSetWriterConfigurationResponse response in GetDataSetWriterDiscoveryResponse
416416
/// <summary>
417417
/// Publish the network message
418418
/// </summary>
419-
public override bool PublishNetworkMessage(UaNetworkMessage networkMessage)
419+
public override Task<bool> PublishNetworkMessageAsync(UaNetworkMessage networkMessage)
420420
{
421421
if (networkMessage == null ||
422422
m_publisherUdpClients == null ||
423423
m_publisherUdpClients.Count == 0)
424424
{
425-
return false;
425+
return Task.FromResult(false);
426426
}
427427

428428
try
@@ -448,20 +448,20 @@ public override bool PublishNetworkMessage(UaNetworkMessage networkMessage)
448448
catch (Exception ex)
449449
{
450450
m_logger.LogError(ex, "UdpPubSubConnection.PublishNetworkMessage");
451-
return false;
451+
return Task.FromResult(false);
452452
}
453453
}
454-
return true;
454+
return Task.FromResult(true);
455455
}
456456
}
457457
}
458458
catch (Exception ex)
459459
{
460460
m_logger.LogError(ex, "UdpPubSubConnection.PublishNetworkMessage");
461-
return false;
461+
return Task.FromResult(false);
462462
}
463463

464-
return false;
464+
return Task.FromResult(false);
465465
}
466466

467467
/// <summary>

Libraries/Opc.Ua.PubSub/UaPubSubConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ public abstract IList<UaNetworkMessage> CreateNetworkMessages(
226226
/// </summary>
227227
/// <param name="networkMessage">The network message that needs to be published.</param>
228228
/// <returns>True if send was successful.</returns>
229-
public abstract bool PublishNetworkMessage(UaNetworkMessage networkMessage);
229+
public abstract Task<bool> PublishNetworkMessageAsync(UaNetworkMessage networkMessage);
230230

231231
/// <summary>
232232
/// Get flag that indicates if all the network clients are connected

Libraries/Opc.Ua.PubSub/UaPublisher.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
using System;
3131
using System.Collections.Generic;
3232
using System.Threading;
33+
using System.Threading.Tasks;
3334
using Microsoft.Extensions.Logging;
3435

3536
namespace Opc.Ua.PubSub
@@ -68,7 +69,7 @@ internal UaPublisher(
6869
WriterGroupConfiguration.Name,
6970
WriterGroupConfiguration.PublishingInterval,
7071
CanPublish,
71-
PublishMessages,
72+
PublishMessagesAsync,
7273
telemetry);
7374
}
7475

@@ -143,7 +144,7 @@ private bool CanPublish()
143144
/// <summary>
144145
/// Generate and publish the messages
145146
/// </summary>
146-
private void PublishMessages()
147+
private async Task PublishMessagesAsync()
147148
{
148149
try
149150
{
@@ -156,7 +157,7 @@ private void PublishMessages()
156157
{
157158
if (uaNetworkMessage != null)
158159
{
159-
bool success = PubSubConnection.PublishNetworkMessage(uaNetworkMessage);
160+
bool success = await PubSubConnection.PublishNetworkMessageAsync(uaNetworkMessage).ConfigureAwait(false);
160161
m_logger.LogDebug(
161162
"UaPublisher - PublishNetworkMessage, WriterGroupId:{WriterGroupId}; success = {Success}",
162163
WriterGroupConfiguration.WriterGroupId,

0 commit comments

Comments
 (0)