From 8d3f1e79609c9b1d146fd49b6805099da63701fd Mon Sep 17 00:00:00 2001
From: pomian <13592821+pomianowski@users.noreply.github.com>
Date: Tue, 25 Feb 2025 16:57:54 +0100
Subject: [PATCH] Update queue mode (#3)
* Update queue mode
* Fix validator
---
.../reflection-events-pr-validator.yaml | 4 +-
Directory.Build.props | 3 +-
Directory.Build.targets | 39 +++--
.../Rtf/Hyperlink/RtfDocumentProcessor.cs | 6 +-
docs/codesnippet/Rtf/RtfBuildStep.cs | 4 +-
src/ReflectionEventing.Demo.Wpf/App.xaml.cs | 6 +-
.../DependencyInjectionEventBusBuilder.cs | 4 +
.../EventBusBuilderExtensions.cs | 22 +++
.../ServiceCollectionExtensions.cs | 5 +-
.../DependencyInjectionQueueProcessor.cs | 148 ++++++++++++------
.../EventBusBuilderOptions.cs | 15 ++
src/ReflectionEventing/ProcessingMode.cs | 22 +++
12 files changed, 214 insertions(+), 64 deletions(-)
create mode 100644 src/ReflectionEventing/ProcessingMode.cs
diff --git a/.github/workflows/reflection-events-pr-validator.yaml b/.github/workflows/reflection-events-pr-validator.yaml
index 18a0a00..ea4a87b 100644
--- a/.github/workflows/reflection-events-pr-validator.yaml
+++ b/.github/workflows/reflection-events-pr-validator.yaml
@@ -2,9 +2,7 @@ name: reflection-events-pr-validator
on:
pull_request:
- branches: [development]
- push:
- branches: [development]
+ branches: [main]
workflow_dispatch:
diff --git a/Directory.Build.props b/Directory.Build.props
index fc880f6..7af207f 100644
--- a/Directory.Build.props
+++ b/Directory.Build.props
@@ -6,7 +6,7 @@
- 4.0.0
+ 4.1.0
4.0.0
@@ -29,6 +29,7 @@
moderate
true
false
+ <_SilenceIsAotCompatibleUnsupportedWarning>true
diff --git a/Directory.Build.targets b/Directory.Build.targets
index 67657ac..4876660 100644
--- a/Directory.Build.targets
+++ b/Directory.Build.targets
@@ -1,8 +1,7 @@
-
-
- $(Product) Asset
+
+ true
@@ -11,7 +10,17 @@
$(CommonTags)
-
+
+ true
+ README.md
+ true
+ true
+ snupkg
+ true
+ true
+
+
+
@@ -27,13 +36,8 @@
-
-
- NETSTANDARD2_1_OR_GREATER
-
-
-
+
true
true
true
@@ -44,4 +48,19 @@
true
$(RepositoryDirectory)\src\lepo.snk
+
+
+
+
+
+
+
+
+
diff --git a/docs/codesnippet/Rtf/Hyperlink/RtfDocumentProcessor.cs b/docs/codesnippet/Rtf/Hyperlink/RtfDocumentProcessor.cs
index 3f24253..9f54611 100644
--- a/docs/codesnippet/Rtf/Hyperlink/RtfDocumentProcessor.cs
+++ b/docs/codesnippet/Rtf/Hyperlink/RtfDocumentProcessor.cs
@@ -86,7 +86,11 @@ from attr in doc.Descendants().Attributes()
#endregion
#region FixLink
- private static void FixLink(XAttribute link, RelativePath filePath, HashSet linkToFiles)
+ private static void FixLink(
+ XAttribute link,
+ RelativePath filePath,
+ HashSet linkToFiles
+ )
{
string linkFile;
string anchor = null;
diff --git a/docs/codesnippet/Rtf/RtfBuildStep.cs b/docs/codesnippet/Rtf/RtfBuildStep.cs
index 28b21eb..781fa7d 100644
--- a/docs/codesnippet/Rtf/RtfBuildStep.cs
+++ b/docs/codesnippet/Rtf/RtfBuildStep.cs
@@ -21,7 +21,9 @@ public class RtfBuildStep : IDocumentBuildStep
public void Build(FileModel model, IHostService host)
{
string content = (string)((Dictionary)model.Content)["conceptual"];
- content = _taskFactory.StartNew(() => RtfToHtmlConverter.ConvertRtfToHtml(content)).Result;
+ content = _taskFactory
+ .StartNew(() => RtfToHtmlConverter.ConvertRtfToHtml(content))
+ .Result;
((Dictionary)model.Content)["conceptual"] = content;
}
#endregion
diff --git a/src/ReflectionEventing.Demo.Wpf/App.xaml.cs b/src/ReflectionEventing.Demo.Wpf/App.xaml.cs
index d097e22..bab575d 100644
--- a/src/ReflectionEventing.Demo.Wpf/App.xaml.cs
+++ b/src/ReflectionEventing.Demo.Wpf/App.xaml.cs
@@ -6,6 +6,7 @@
using ReflectionEventing.Demo.Wpf.Services;
using ReflectionEventing.Demo.Wpf.ViewModels;
using ReflectionEventing.DependencyInjection;
+using ReflectionEventing.DependencyInjection.Services;
namespace ReflectionEventing.Demo.Wpf;
@@ -33,8 +34,11 @@ public partial class App : Application
_ = services.AddEventBus(e =>
{
e.Options.UseEventPolymorphism = true;
+ e.Options.UseEventsQueue = true;
+ e.Options.QueueMode = ProcessingMode.Parallel;
- _ = e.AddAllConsumers(Assembly.GetExecutingAssembly());
+ e.UseBackgroundService();
+ e.AddAllConsumers(Assembly.GetExecutingAssembly());
});
}
)
diff --git a/src/ReflectionEventing.DependencyInjection/DependencyInjectionEventBusBuilder.cs b/src/ReflectionEventing.DependencyInjection/DependencyInjectionEventBusBuilder.cs
index a1bfc63..f2c9164 100644
--- a/src/ReflectionEventing.DependencyInjection/DependencyInjectionEventBusBuilder.cs
+++ b/src/ReflectionEventing.DependencyInjection/DependencyInjectionEventBusBuilder.cs
@@ -3,6 +3,8 @@
// Copyright (C) Leszek Pomianowski and ReflectionEventing Contributors.
// All Rights Reserved.
+using ReflectionEventing.DependencyInjection.Services;
+
namespace ReflectionEventing.DependencyInjection;
///
@@ -11,6 +13,8 @@ namespace ReflectionEventing.DependencyInjection;
// ReSharper disable once ClassWithVirtualMembersNeverInherited.Global
public class DependencyInjectionEventBusBuilder(IServiceCollection services) : EventBusBuilder
{
+ internal Type QueueBackgroundService { get; set; } = typeof(DependencyInjectionQueueProcessor);
+
///
/// Adds a consumer to the event bus and with a specified service lifetime.
///
diff --git a/src/ReflectionEventing.DependencyInjection/EventBusBuilderExtensions.cs b/src/ReflectionEventing.DependencyInjection/EventBusBuilderExtensions.cs
index 384ba0e..64398ed 100644
--- a/src/ReflectionEventing.DependencyInjection/EventBusBuilderExtensions.cs
+++ b/src/ReflectionEventing.DependencyInjection/EventBusBuilderExtensions.cs
@@ -7,6 +7,28 @@ namespace ReflectionEventing.DependencyInjection;
public static class EventBusBuilderExtensions
{
+ ///
+ /// Configures the event bus to use a custom background service for processing events.
+ ///
+ /// The type of the background service to use. This type must implement .
+ /// The current instance of .
+ public static EventBusBuilder UseBackgroundService(
+ this EventBusBuilder builder
+ )
+ where TQueueBackgroundService : class, IHostedService
+ {
+ if (builder is not DependencyInjectionEventBusBuilder dependencyInjectionEventBusBuilder)
+ {
+ throw new InvalidOperationException(
+ $"The event bus builder must be of type {nameof(DependencyInjectionEventBusBuilder)} to define background service."
+ );
+ }
+
+ dependencyInjectionEventBusBuilder.QueueBackgroundService = typeof(TQueueBackgroundService);
+
+ return dependencyInjectionEventBusBuilder;
+ }
+
///
/// Adds a consumer to the event bus builder and .
///
diff --git a/src/ReflectionEventing.DependencyInjection/ServiceCollectionExtensions.cs b/src/ReflectionEventing.DependencyInjection/ServiceCollectionExtensions.cs
index 0176623..9bb8859 100644
--- a/src/ReflectionEventing.DependencyInjection/ServiceCollectionExtensions.cs
+++ b/src/ReflectionEventing.DependencyInjection/ServiceCollectionExtensions.cs
@@ -4,7 +4,6 @@
// All Rights Reserved.
using ReflectionEventing.DependencyInjection.Configuration;
-using ReflectionEventing.DependencyInjection.Services;
using ReflectionEventing.Queues;
namespace ReflectionEventing.DependencyInjection;
@@ -47,7 +46,7 @@ public static IServiceCollection AddEventBus(
if (builder.Options.UseEventsQueue)
{
- _ = services.AddHostedService();
+ _ = services.AddSingleton(typeof(IHostedService), builder.QueueBackgroundService);
}
return services;
@@ -82,7 +81,7 @@ Action configure
if (builder.Options.UseEventsQueue)
{
- _ = services.AddHostedService();
+ _ = services.AddSingleton(typeof(IHostedService), builder.QueueBackgroundService);
}
return services;
diff --git a/src/ReflectionEventing.DependencyInjection/Services/DependencyInjectionQueueProcessor.cs b/src/ReflectionEventing.DependencyInjection/Services/DependencyInjectionQueueProcessor.cs
index 390ac4b..8b8ef3b 100644
--- a/src/ReflectionEventing.DependencyInjection/Services/DependencyInjectionQueueProcessor.cs
+++ b/src/ReflectionEventing.DependencyInjection/Services/DependencyInjectionQueueProcessor.cs
@@ -31,6 +31,8 @@ ILogger logger
private readonly TimeSpan errorTickRate = options.Value.ErrorTickRate;
+ private readonly SemaphoreSlim semaphore = new(options.Value.ConcurrentTaskLimit);
+
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
await BackgroundProcessing(cancellationToken);
@@ -114,55 +116,51 @@ CancellationToken cancellationToken
return;
}
- MethodInfo? consumeMethod = consumerType.GetMethod(
- "ConsumeAsync",
- [@event.GetType(), typeof(CancellationToken)]
- );
-
- if (consumeMethod != null)
+ if (options.Value.QueueMode == ProcessingMode.Sequential)
{
- try
- {
- await (Task)consumeMethod.Invoke(consumer, [@event, cancellationToken])!;
- }
- catch (Exception e)
- {
- //activity?.AddException(e);
- activity?.SetStatus(ActivityStatusCode.Error);
-
- logger.LogError(
- new EventId(75001, "ReflectionEventingQueueProcessingFailed"),
- e,
- "Error processing event of type {EventName}",
- @event.GetType().Name
- );
-
- if (options.Value.UseErrorQueue)
- {
- queue.EnqueueError(
- new FailedEvent
- {
- Data = @event,
- Exception = e,
- Timestamp = DateTimeOffset.UtcNow,
- FailedConsumer = consumerType,
- }
- );
- }
+ await ExecuteConsumerAsync(
+ @event,
+ consumerType,
+ eventType,
+ consumer,
+ activity,
+ cancellationToken
+ );
+ }
+ else if (options.Value.QueueMode == ProcessingMode.Parallel)
+ {
+ await semaphore.WaitAsync(cancellationToken);
- EventsFailed.Add(
- 1,
- new KeyValuePair("message_type", eventType.Name)
- );
- }
+ _ = Task.Run(
+ async () =>
+ {
+ try
+ {
+ await ExecuteConsumerAsync(
+ @event,
+ consumerType,
+ eventType,
+ consumer,
+ activity,
+ cancellationToken
+ );
+ }
+ catch (Exception e)
+ {
+ logger.LogError(e, "Error occurred during consumer execution");
+ }
+ finally
+ {
+ semaphore.Release();
+ }
+ },
+ cancellationToken
+ );
}
else
{
- logger.LogError(
- new EventId(75002, "ReflectionEventingConsumerMissing"),
- "ConsumeAsync method not found on consumer {ConsumerType} for event type {EventName}",
- consumerType.Name,
- @event.GetType().Name
+ throw new InvalidOperationException(
+ "Invalid queue processing mode. Must be either Sequential or Parallel."
);
}
}
@@ -170,4 +168,66 @@ CancellationToken cancellationToken
EventsProcessed.Add(1, new KeyValuePair("message_type", eventType.Name));
}
+
+ private async Task ExecuteConsumerAsync(
+ object @event,
+ Type consumerType,
+ Type eventType,
+ object consumer,
+ Activity? activity,
+ CancellationToken cancellationToken
+ )
+ {
+ MethodInfo? consumeMethod = consumerType.GetMethod(
+ "ConsumeAsync",
+ [@event.GetType(), typeof(CancellationToken)]
+ );
+
+ if (consumeMethod != null)
+ {
+ try
+ {
+ await (Task)consumeMethod.Invoke(consumer, [@event, cancellationToken])!;
+ }
+ catch (Exception e)
+ {
+ //activity?.AddException(e);
+ activity?.SetStatus(ActivityStatusCode.Error);
+
+ logger.LogError(
+ new EventId(75001, "ReflectionEventingQueueProcessingFailed"),
+ e,
+ "Error processing event of type {EventName}",
+ @event.GetType().Name
+ );
+
+ if (options.Value.UseErrorQueue)
+ {
+ queue.EnqueueError(
+ new FailedEvent
+ {
+ Data = @event,
+ Exception = e,
+ Timestamp = DateTimeOffset.UtcNow,
+ FailedConsumer = consumerType,
+ }
+ );
+ }
+
+ EventsFailed.Add(
+ 1,
+ new KeyValuePair("message_type", eventType.Name)
+ );
+ }
+ }
+ else
+ {
+ logger.LogError(
+ new EventId(75002, "ReflectionEventingConsumerMissing"),
+ "ConsumeAsync method not found on consumer {ConsumerType} for event type {EventName}",
+ consumerType.Name,
+ @event.GetType().Name
+ );
+ }
+ }
}
diff --git a/src/ReflectionEventing/EventBusBuilderOptions.cs b/src/ReflectionEventing/EventBusBuilderOptions.cs
index 584e77a..48ac876 100644
--- a/src/ReflectionEventing/EventBusBuilderOptions.cs
+++ b/src/ReflectionEventing/EventBusBuilderOptions.cs
@@ -35,6 +35,21 @@ public class EventBusBuilderOptions
///
public bool UseErrorQueue { get; set; } = false;
+ ///
+ /// Gets or sets the mode in which events in the queue are processed.
+ /// If set to , events are processed one at a time in the order they are received.
+ /// If set to , events are processed concurrently, allowing multiple events to be handled at the same time.
+ /// The default value is .
+ ///
+ public ProcessingMode QueueMode { get; set; } = ProcessingMode.Sequential;
+
+ ///
+ /// Gets or sets the maximum number of concurrent tasks that can be processed.
+ /// This value is used to limit the number of tasks running in parallel when is set to .
+ /// The default value is 100.
+ ///
+ public int ConcurrentTaskLimit { get; set; } = 100;
+
///
/// Gets or sets the rate at which the event queue is processed.
/// The default value is 20ms.
diff --git a/src/ReflectionEventing/ProcessingMode.cs b/src/ReflectionEventing/ProcessingMode.cs
new file mode 100644
index 0000000..5bbfd92
--- /dev/null
+++ b/src/ReflectionEventing/ProcessingMode.cs
@@ -0,0 +1,22 @@
+// This Source Code Form is subject to the terms of the MIT License.
+// If a copy of the MIT was not distributed with this file, You can obtain one at https://opensource.org/licenses/MIT.
+// Copyright (C) Leszek Pomianowski and ReflectionEventing Contributors.
+// All Rights Reserved.
+
+namespace ReflectionEventing;
+
+///
+/// Specifies the mode in which events in the queue are processed.
+///
+public enum ProcessingMode
+{
+ ///
+ /// Events are processed one at a time in the order they are received.
+ ///
+ Sequential = 0,
+
+ ///
+ /// Events are processed concurrently, allowing multiple events to be handled at the same time.
+ ///
+ Parallel = 1,
+}