From 053438ab21b48d2a1cb47cf6aba9d4e0304eb331 Mon Sep 17 00:00:00 2001
From: Leszek Pomianowski <13592821+pomianowski@users.noreply.github.com>
Date: Fri, 21 Feb 2025 17:05:14 +0100
Subject: [PATCH] Add new background queue
---
Directory.Build.props | 18 +-
Directory.Packages.props | 9 +-
LICENSE | 2 +-
LICENSE.md | 2 +-
.../ContainerBuilderExtensions.cs | 3 +
.../ReflectionEventing.Autofac.csproj | 4 +-
.../EventBusInstaller.cs | 2 +
.../ReflectionEventing.Castle.Windsor.csproj | 4 +-
.../Events/AsyncQueuedEvent.cs | 8 +
.../MainWindow.xaml | 34 +++-
.../ReflectionEventing.Demo.Wpf.csproj | 2 +-
.../Services/BackgroundTickService.cs | 4 +-
.../ViewModels/MainWindowViewModel.cs | 27 ++-
.../QueueProcessorOptionsProvider.cs | 28 +++
.../DependencyInjectionQueueProcessor.cs | 160 +++++++++++++++++
.../GlobalUsings.cs | 8 +
...lectionEventing.DependencyInjection.csproj | 20 ++-
.../ServiceCollectionExtensions.cs | 45 +++++
.../EventBusModule.cs | 3 +
.../ReflectionEventing.Ninject.csproj | 8 +-
.../ReflectionEventing.Unity.csproj | 4 +-
.../UnityContainerExtensions.cs | 5 +
src/ReflectionEventing/EventBus.cs | 45 ++++-
.../EventBusBuilderOptions.cs | 18 ++
src/ReflectionEventing/EventBusExtensions.cs | 30 +++-
src/ReflectionEventing/GlobalUsings.cs | 4 +
src/ReflectionEventing/IEventBus.cs | 21 ++-
src/ReflectionEventing/Queues/EventsQueue.cs | 41 +++++
src/ReflectionEventing/Queues/FailedEvent.cs | 13 ++
src/ReflectionEventing/Queues/IEventsQueue.cs | 40 +++++
.../ReflectionEventing.csproj | 14 +-
...eflectionEventing.Autofac.UnitTests.csproj | 4 +-
.../EventBusInstallerTests.cs | 9 +-
.../EventBusTests.cs | 13 +-
...onEventing.Castle.Windsor.UnitTests.csproj | 4 +-
.../DependencyInjectionQueueProcessorTests.cs | 163 ++++++++++++++++++
.../GlobalUsings.cs | 2 +
...nting.DependencyInjection.UnitTests.csproj | 7 +-
.../ServiceCollectionExtensionsTests.cs | 2 +-
...eflectionEventing.Ninject.UnitTests.csproj | 4 +-
.../EventBusTests.cs | 14 +-
.../HashedConsumerTypesProviderTests.cs | 17 +-
.../ReflectionEventing.UnitTests.csproj | 4 +-
.../ReflectionEventing.Unity.UnitTests.csproj | 6 +-
44 files changed, 794 insertions(+), 81 deletions(-)
create mode 100644 src/ReflectionEventing.Demo.Wpf/Events/AsyncQueuedEvent.cs
create mode 100644 src/ReflectionEventing.DependencyInjection/Configuration/QueueProcessorOptionsProvider.cs
create mode 100644 src/ReflectionEventing.DependencyInjection/DependencyInjectionQueueProcessor.cs
create mode 100644 src/ReflectionEventing/Queues/EventsQueue.cs
create mode 100644 src/ReflectionEventing/Queues/FailedEvent.cs
create mode 100644 src/ReflectionEventing/Queues/IEventsQueue.cs
create mode 100644 tests/ReflectionEventing.DependencyInjection.UnitTests/DependencyInjectionQueueProcessorTests.cs
diff --git a/Directory.Build.props b/Directory.Build.props
index a3b2cd8..fc880f6 100644
--- a/Directory.Build.props
+++ b/Directory.Build.props
@@ -6,8 +6,8 @@
- 3.1.0
- 3.0.0
+ 4.0.0
+ 4.0.0
@@ -37,7 +37,7 @@
true
- 12.0
+ 13.0enable
diff --git a/src/ReflectionEventing.Unity/ReflectionEventing.Unity.csproj b/src/ReflectionEventing.Unity/ReflectionEventing.Unity.csproj
index dbe03fc..7ff6966 100644
--- a/src/ReflectionEventing.Unity/ReflectionEventing.Unity.csproj
+++ b/src/ReflectionEventing.Unity/ReflectionEventing.Unity.csproj
@@ -2,7 +2,7 @@
ReflectionEventing.Unity
- netstandard2.0;netstandard2.1;net462;net6.0;net8.0
+ net9.0;net8.0;net6.0;netstandard2.0;net462;net472truetruetrue
@@ -10,7 +10,7 @@
Unity container extensions with ReflectionEventing, which promotes better Inversion of Control (IoC), reducing coupling and enhancing the modularity and flexibility of your applications.
-
+ truetrueSpeed
diff --git a/src/ReflectionEventing.Unity/UnityContainerExtensions.cs b/src/ReflectionEventing.Unity/UnityContainerExtensions.cs
index b205a3b..3ca58e0 100644
--- a/src/ReflectionEventing.Unity/UnityContainerExtensions.cs
+++ b/src/ReflectionEventing.Unity/UnityContainerExtensions.cs
@@ -3,6 +3,7 @@
// Copyright (C) Leszek Pomianowski and ReflectionEventing Contributors.
// All Rights Reserved.
+using ReflectionEventing.Queues;
using Unity;
using Unity.Lifetime;
@@ -37,6 +38,10 @@ Action configure
new ContainerControlledLifetimeManager()
);
+ _ = container.RegisterType(
+ new ContainerControlledLifetimeManager()
+ );
+
_ = container.RegisterType(
new HierarchicalLifetimeManager()
);
diff --git a/src/ReflectionEventing/EventBus.cs b/src/ReflectionEventing/EventBus.cs
index a12dc17..5c62f7d 100644
--- a/src/ReflectionEventing/EventBus.cs
+++ b/src/ReflectionEventing/EventBus.cs
@@ -3,6 +3,8 @@
// Copyright (C) Leszek Pomianowski and ReflectionEventing Contributors.
// All Rights Reserved.
+using ReflectionEventing.Queues;
+
namespace ReflectionEventing;
///
@@ -13,13 +15,31 @@ namespace ReflectionEventing;
///
public class EventBus(
IConsumerProvider consumerProviders,
- IConsumerTypesProvider consumerTypesProvider
+ IConsumerTypesProvider consumerTypesProvider,
+ IEventsQueue queue
) : IEventBus
{
+ private static readonly ActivitySource ActivitySource = new("ReflectionEventing.EventBus");
+
+ private static readonly Meter Meter = new("ReflectionEventing.EventBus");
+
+ private static readonly Counter SentCounter = Meter.CreateCounter("bus.sent");
+
+ private static readonly Counter PublishedCounter = Meter.CreateCounter(
+ "bus.published"
+ );
+
///
- public async Task PublishAsync(TEvent eventItem, CancellationToken cancellationToken)
+ public async Task SendAsync(
+ TEvent eventItem,
+ CancellationToken cancellationToken = default
+ )
where TEvent : class
{
+ using Activity? activity = ActivitySource.StartActivity(ActivityKind.Producer);
+
+ activity?.AddTag("co.lepo.reflection.eventing.message", typeof(TEvent).Name);
+
if (eventItem is null)
{
throw new ArgumentNullException(nameof(eventItem));
@@ -38,5 +58,26 @@ public async Task PublishAsync(TEvent eventItem, CancellationToken cance
}
await Task.WhenAll(tasks).ConfigureAwait(false);
+
+ SentCounter.Add(1, new KeyValuePair("message_type", eventType.Name));
+ }
+
+ ///
+ public async Task PublishAsync(
+ TEvent eventItem,
+ CancellationToken cancellationToken = default
+ )
+ where TEvent : class
+ {
+ using Activity? activity = ActivitySource.StartActivity(ActivityKind.Producer);
+
+ activity?.AddTag("co.lepo.reflection.eventing.message", typeof(TEvent).Name);
+
+ await queue.EnqueueAsync(eventItem, cancellationToken);
+
+ PublishedCounter.Add(
+ 1,
+ new KeyValuePair("message_type", typeof(TEvent).Name)
+ );
}
}
diff --git a/src/ReflectionEventing/EventBusBuilderOptions.cs b/src/ReflectionEventing/EventBusBuilderOptions.cs
index 58aa90f..65d47dd 100644
--- a/src/ReflectionEventing/EventBusBuilderOptions.cs
+++ b/src/ReflectionEventing/EventBusBuilderOptions.cs
@@ -18,4 +18,22 @@ public class EventBusBuilderOptions
/// The default value is false.
///
public bool UseEventPolymorphism { get; set; } = false;
+
+ ///
+ /// Gets or sets the rate at which the event queue is processed.
+ /// The default value is 20ms.
+ ///
+ ///
+ /// Adjust this value to control how frequently the event queue is processed.
+ ///
+ public TimeSpan QueueTickRate { get; set; } = TimeSpan.FromMilliseconds(20); // NOTE: There are 10,000 ticks in a millisecond.
+
+ ///
+ /// Gets or sets the rate at which the error queue is processed when default queue consumption fails.
+ /// The default value is 20ms.
+ ///
+ ///
+ /// Adjust this value to control how frequently the error queue is processed.
+ ///
+ public TimeSpan ErrorTickRate { get; set; } = TimeSpan.FromMilliseconds(20); // NOTE: There are 10,000 ticks in a millisecond.
}
diff --git a/src/ReflectionEventing/EventBusExtensions.cs b/src/ReflectionEventing/EventBusExtensions.cs
index a2678c4..0dedc07 100644
--- a/src/ReflectionEventing/EventBusExtensions.cs
+++ b/src/ReflectionEventing/EventBusExtensions.cs
@@ -10,13 +10,41 @@ namespace ReflectionEventing;
///
public static class EventBusExtensions
{
+ ///
+ /// Sends the specified event synchronously.
+ ///
+ /// The type of the event to publish.
+ /// The event bus to extend.
+ /// The event to publish.
+ [Obsolete($"May cause deadlock on UI threads, use {nameof(IEventBus.SendAsync)} instead.")]
+ public static void Send(this IEventBus eventBus, TEvent eventItem)
+ where TEvent : class
+ {
+ using CancellationTokenSource cancellationSource = new();
+
+ Task.Run(
+ () =>
+ {
+ // ReSharper disable once AccessToDisposedClosure
+ eventBus
+ .SendAsync(eventItem, cancellationSource.Token)
+ .ConfigureAwait(false)
+ .GetAwaiter()
+ .GetResult();
+ },
+ cancellationSource.Token
+ )
+ .GetAwaiter()
+ .GetResult();
+ }
+
///
/// Publishes the specified event synchronously.
///
/// The type of the event to publish.
/// The event bus to extend.
/// The event to publish.
- [Obsolete($"May cause deadlock on UI threads, use {nameof(IEventBus.PublishAsync)} instead.")]
+ [Obsolete($"May cause deadlock on UI threads, use {nameof(IEventBus.SendAsync)} instead.")]
public static void Publish(this IEventBus eventBus, TEvent eventItem)
where TEvent : class
{
diff --git a/src/ReflectionEventing/GlobalUsings.cs b/src/ReflectionEventing/GlobalUsings.cs
index b59a06c..f1a166a 100644
--- a/src/ReflectionEventing/GlobalUsings.cs
+++ b/src/ReflectionEventing/GlobalUsings.cs
@@ -4,9 +4,13 @@
// All Rights Reserved.
global using System;
+global using System.Collections.Concurrent;
global using System.Collections.Generic;
+global using System.Diagnostics;
global using System.Diagnostics.CodeAnalysis;
+global using System.Diagnostics.Metrics;
global using System.Linq;
global using System.Reflection;
global using System.Threading;
+global using System.Threading.Channels;
global using System.Threading.Tasks;
diff --git a/src/ReflectionEventing/IEventBus.cs b/src/ReflectionEventing/IEventBus.cs
index 2dea8e3..5b55841 100644
--- a/src/ReflectionEventing/IEventBus.cs
+++ b/src/ReflectionEventing/IEventBus.cs
@@ -11,16 +11,29 @@ namespace ReflectionEventing;
public interface IEventBus
{
///
- /// Publishes the specified event asynchronously.
+ /// Sends the specified event asynchronously within the current scope, waiting for its execution.
///
- /// The type of the event to publish.
- /// The event to publish.
+ /// The type of the event to send.
+ /// The event to send.
/// A cancellation token that can be used to cancel the operation.
/// A task that represents the asynchronous operation.
///
/// This method gets the consumers for the specified event type from the consumer provider and then uses the service provider to get the required service for each consumer.
/// Each consumer is then used to consume the event asynchronously.
///
- Task PublishAsync(TEvent eventItem, CancellationToken cancellationToken)
+ Task SendAsync(TEvent eventItem, CancellationToken cancellationToken = default)
+ where TEvent : class;
+
+ ///
+ /// Adds the specified event to the queue.Another scope will take over execution as configured.
+ ///
+ /// The type of the event to publish.
+ /// The event to publish.
+ /// A cancellation token that can be used to cancel the operation.
+ /// A task that represents the asynchronous operation.
+ ///
+ /// The method only adds the event to the execution queue, it does not wait for its successful execution.
+ ///
+ Task PublishAsync(TEvent eventItem, CancellationToken cancellationToken = default)
where TEvent : class;
}
diff --git a/src/ReflectionEventing/Queues/EventsQueue.cs b/src/ReflectionEventing/Queues/EventsQueue.cs
new file mode 100644
index 0000000..a511fb6
--- /dev/null
+++ b/src/ReflectionEventing/Queues/EventsQueue.cs
@@ -0,0 +1,41 @@
+// This Source Code Form is subject to the terms of the MIT License.
+// If a copy of the MIT was not distributed with this file, You can obtain one at https://opensource.org/licenses/MIT.
+// Copyright (C) Leszek Pomianowski and ReflectionEventing Contributors.
+// All Rights Reserved.
+
+namespace ReflectionEventing.Queues;
+
+public class EventsQueue : IEventsQueue
+{
+ private readonly Channel