Skip to content

Commit

Permalink
Update queue mode
Browse files Browse the repository at this point in the history
  • Loading branch information
pomianowski committed Feb 25, 2025
1 parent 7612d2c commit 820a855
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 61 deletions.
3 changes: 2 additions & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
</PropertyGroup>

<PropertyGroup>
<Version>4.0.0</Version>
<Version>4.1.0</Version>
<AssemblyVersion>4.0.0</AssemblyVersion>
</PropertyGroup>

Expand All @@ -29,6 +29,7 @@
<NuGetAuditLevel>moderate</NuGetAuditLevel>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
<CentralPackageTransitivePinningEnabled>false</CentralPackageTransitivePinningEnabled>
<_SilenceIsAotCompatibleUnsupportedWarning>true</_SilenceIsAotCompatibleUnsupportedWarning>
</PropertyGroup>

<PropertyGroup>
Expand Down
39 changes: 29 additions & 10 deletions Directory.Build.targets
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
<Project>

<PropertyGroup>
<!-- TODO: Dynamically generate Title if one wasn't set -->
<Title Condition="'$(Title)' == ''">$(Product) Asset</Title>
<PropertyGroup Condition="'$(GITHUB_ACTIONS)' == 'true'">
<ContinuousIntegrationBuild>true</ContinuousIntegrationBuild>
</PropertyGroup>

<PropertyGroup>
Expand All @@ -11,7 +10,17 @@
<PackageTags Condition="'$(PackageTags)' == ''">$(CommonTags)</PackageTags>
</PropertyGroup>

<ItemGroup Condition="$(IsPackable)">
<PropertyGroup Condition="'$(GeneratePackageOnBuild)' == 'true'">
<GenerateLibraryLayout>true</GenerateLibraryLayout>
<PackageReadmeFile>README.md</PackageReadmeFile>
<DeterministicSourcePaths Condition="'$(SourceLinkEnabled)' == 'true'">true</DeterministicSourcePaths>
<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
</PropertyGroup>

<ItemGroup Condition="'$(GeneratePackageOnBuild)' == 'true'">
<None Include="$(BuildToolsDirectory)nuget.png" Pack="true" PackagePath="\Icon.png" Visible="False" />
<None Include="$(RepositoryDirectory)ThirdPartyNotices.txt" Pack="true" PackagePath="\" Visible="False" />
<None Include="$(RepositoryDirectory)LICENSE.md" Pack="true" PackagePath="\LICENSE.md" Visible="False" />
Expand All @@ -27,13 +36,8 @@
</ItemGroup>
</Target>

<!-- Define NETSTANDARD2_1_OR_GREATER for .NET Standard 2.1 targets and above -->
<PropertyGroup Condition="'$(TargetFramework)' == 'net6.0' OR '$(TargetFramework)' == 'net6.0' OR '$(TargetFramework)' == 'net7.0'">
<DefineConstants>NETSTANDARD2_1_OR_GREATER</DefineConstants>
</PropertyGroup>

<!-- Configure trimming for projects on .NET 6 and above -->
<PropertyGroup Condition="'$(TargetFramework)' == 'net6.0' OR '$(TargetFramework)' == 'net8.0'">
<PropertyGroup Condition="'$(TargetFramework)' == 'net6.0' OR '$(TargetFramework)' == 'net8.0' OR '$(TargetFramework)' == 'net9.0'">
<IsTrimmable>true</IsTrimmable>
<EnableTrimAnalyzer>true</EnableTrimAnalyzer>
<EnableAotAnalyzer>true</EnableAotAnalyzer>
Expand All @@ -44,4 +48,19 @@
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>$(RepositoryDirectory)\src\lepo.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>

<Target Name="WpfSourceLinkWorkaround" BeforeTargets="InitializeSourceRootMappedPaths" Condition="'$(UseWPF)' == 'true'">
<!-- WPF causes an error with SourceLink because its build targets create a temporary project without a PackageReference to SourceLink, see https://github.com/dotnet/sourcelink/issues/91,
causing the @SourceRoot property to be unexpectedly empty for the MapSourceRoot task
For context, see https://github.com/dotnet/roslyn/blob/main/src/Compilers/Core/MSBuildTask/Microsoft.Managed.Core.targets
and https://github.com/dotnet/roslyn/blob/main/src/Compilers/Core/MSBuildTask/MapSourceRoots.cs
This workaround sets the SourceRoot manually to some deterministic value to keep the promise given by having DeterministicSourcePaths set to true -->
<Message Text="using deterministic source path workaround for WPF project instead of SourceLink" />
<ItemGroup>
<!-- There needs to be at least one SourceRoot defined, its value does not seem to matter as long as it ends with a directory separator -->
<SourceRoot Include="\" />
</ItemGroup>
</Target>
</Project>
6 changes: 5 additions & 1 deletion docs/codesnippet/Rtf/Hyperlink/RtfDocumentProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ from attr in doc.Descendants().Attributes()
#endregion

#region FixLink
private static void FixLink(XAttribute link, RelativePath filePath, HashSet<string> linkToFiles)
private static void FixLink(
XAttribute link,
RelativePath filePath,
HashSet<string> linkToFiles
)
{
string linkFile;
string anchor = null;
Expand Down
4 changes: 3 additions & 1 deletion docs/codesnippet/Rtf/RtfBuildStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ public class RtfBuildStep : IDocumentBuildStep
public void Build(FileModel model, IHostService host)
{
string content = (string)((Dictionary<string, object>)model.Content)["conceptual"];
content = _taskFactory.StartNew(() => RtfToHtmlConverter.ConvertRtfToHtml(content)).Result;
content = _taskFactory
.StartNew(() => RtfToHtmlConverter.ConvertRtfToHtml(content))
.Result;
((Dictionary<string, object>)model.Content)["conceptual"] = content;
}
#endregion
Expand Down
6 changes: 5 additions & 1 deletion src/ReflectionEventing.Demo.Wpf/App.xaml.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using ReflectionEventing.Demo.Wpf.Services;
using ReflectionEventing.Demo.Wpf.ViewModels;
using ReflectionEventing.DependencyInjection;
using ReflectionEventing.DependencyInjection.Services;

namespace ReflectionEventing.Demo.Wpf;

Expand Down Expand Up @@ -33,8 +34,11 @@ public partial class App : Application
_ = services.AddEventBus(e =>
{
e.Options.UseEventPolymorphism = true;
e.Options.UseEventsQueue = true;
e.Options.QueueMode = ProcessingMode.Parallel;

_ = e.AddAllConsumers(Assembly.GetExecutingAssembly());
e.UseBackgroundService<DependencyInjectionQueueProcessor>();
e.AddAllConsumers(Assembly.GetExecutingAssembly());
});
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// Copyright (C) Leszek Pomianowski and ReflectionEventing Contributors.
// All Rights Reserved.

using ReflectionEventing.DependencyInjection.Services;

namespace ReflectionEventing.DependencyInjection;

/// <summary>
Expand All @@ -11,6 +13,8 @@ namespace ReflectionEventing.DependencyInjection;
// ReSharper disable once ClassWithVirtualMembersNeverInherited.Global
public class DependencyInjectionEventBusBuilder(IServiceCollection services) : EventBusBuilder
{
internal Type QueueBackgroundService { get; set; } = typeof(DependencyInjectionQueueProcessor);

/// <summary>
/// Adds a consumer to the event bus and <see cref="IServiceCollection"/> with a specified service lifetime.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,28 @@ namespace ReflectionEventing.DependencyInjection;

public static class EventBusBuilderExtensions
{
/// <summary>
/// Configures the event bus to use a custom background service for processing events.
/// </summary>
/// <typeparam name="TQueueBackgroundService">The type of the background service to use. This type must implement <see cref="IHostedService"/>.</typeparam>
/// <returns>The current instance of <see cref="EventBusBuilder"/>.</returns>
public static EventBusBuilder UseBackgroundService<TQueueBackgroundService>(
this EventBusBuilder builder
)
where TQueueBackgroundService : class, IHostedService
{
if (builder is not DependencyInjectionEventBusBuilder dependencyInjectionEventBusBuilder)
{
throw new InvalidOperationException(
$"The event bus builder must be of type {nameof(DependencyInjectionEventBusBuilder)} to define background service."
);
}

dependencyInjectionEventBusBuilder.QueueBackgroundService = typeof(TQueueBackgroundService);

return dependencyInjectionEventBusBuilder;
}

/// <summary>
/// Adds a consumer to the event bus builder and <see cref="IServiceCollection"/>.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
// All Rights Reserved.

using ReflectionEventing.DependencyInjection.Configuration;
using ReflectionEventing.DependencyInjection.Services;
using ReflectionEventing.Queues;

namespace ReflectionEventing.DependencyInjection;
Expand Down Expand Up @@ -47,7 +46,7 @@ public static IServiceCollection AddEventBus(

if (builder.Options.UseEventsQueue)
{
_ = services.AddHostedService<DependencyInjectionQueueProcessor>();
_ = services.AddSingleton(typeof(IHostedService), builder.QueueBackgroundService);
}

return services;
Expand Down Expand Up @@ -82,7 +81,7 @@ Action<EventBusBuilder> configure

if (builder.Options.UseEventsQueue)
{
_ = services.AddHostedService<DependencyInjectionQueueProcessor>();
_ = services.AddSingleton(typeof(IHostedService), builder.QueueBackgroundService);
}

return services;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ ILogger<DependencyInjectionQueueProcessor> logger

private readonly TimeSpan errorTickRate = options.Value.ErrorTickRate;

private readonly SemaphoreSlim semaphore = new(options.Value.ConcurrentTaskLimit);

protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
await BackgroundProcessing(cancellationToken);
Expand Down Expand Up @@ -114,60 +116,118 @@ CancellationToken cancellationToken
return;
}

MethodInfo? consumeMethod = consumerType.GetMethod(
"ConsumeAsync",
[@event.GetType(), typeof(CancellationToken)]
);

if (consumeMethod != null)
if (options.Value.QueueMode == ProcessingMode.Sequential)
{
try
{
await (Task)consumeMethod.Invoke(consumer, [@event, cancellationToken])!;
}
catch (Exception e)
{
//activity?.AddException(e);
activity?.SetStatus(ActivityStatusCode.Error);

logger.LogError(
new EventId(75001, "ReflectionEventingQueueProcessingFailed"),
e,
"Error processing event of type {EventName}",
@event.GetType().Name
);

if (options.Value.UseErrorQueue)
{
queue.EnqueueError(
new FailedEvent
{
Data = @event,
Exception = e,
Timestamp = DateTimeOffset.UtcNow,
FailedConsumer = consumerType,
}
);
}
await ExecuteConsumerAsync(
@event,
consumerType,
eventType,
consumer,
activity,
cancellationToken
);
}
else if (options.Value.QueueMode == ProcessingMode.Parallel)
{
await semaphore.WaitAsync(cancellationToken);

EventsFailed.Add(
1,
new KeyValuePair<string, object?>("message_type", eventType.Name)
);
}
_ = Task.Run(
async () =>
{
try
{
await ExecuteConsumerAsync(
@event,
consumerType,
eventType,
consumer,
activity,
cancellationToken
);
}
catch (Exception e)
{
logger.LogError(e, "Error occurred during consumer execution");
}
finally
{
semaphore.Release();
}
},
cancellationToken
);
}
else
{
logger.LogError(
new EventId(75002, "ReflectionEventingConsumerMissing"),
"ConsumeAsync method not found on consumer {ConsumerType} for event type {EventName}",
consumerType.Name,
@event.GetType().Name
throw new InvalidOperationException(
"Invalid queue processing mode. Must be either Sequential or Parallel."
);
}
}
}

EventsProcessed.Add(1, new KeyValuePair<string, object?>("message_type", eventType.Name));
}

private async Task ExecuteConsumerAsync(
object @event,
Type consumerType,
Type eventType,
object consumer,
Activity? activity,
CancellationToken cancellationToken
)
{
MethodInfo? consumeMethod = consumerType.GetMethod(
"ConsumeAsync",
[@event.GetType(), typeof(CancellationToken)]
);

if (consumeMethod != null)
{
try
{
await (Task)consumeMethod.Invoke(consumer, [@event, cancellationToken])!;
}
catch (Exception e)
{
//activity?.AddException(e);
activity?.SetStatus(ActivityStatusCode.Error);

logger.LogError(
new EventId(75001, "ReflectionEventingQueueProcessingFailed"),
e,
"Error processing event of type {EventName}",
@event.GetType().Name
);

if (options.Value.UseErrorQueue)
{
queue.EnqueueError(
new FailedEvent
{
Data = @event,
Exception = e,
Timestamp = DateTimeOffset.UtcNow,
FailedConsumer = consumerType,
}
);
}

EventsFailed.Add(
1,
new KeyValuePair<string, object?>("message_type", eventType.Name)
);
}
}
else
{
logger.LogError(
new EventId(75002, "ReflectionEventingConsumerMissing"),
"ConsumeAsync method not found on consumer {ConsumerType} for event type {EventName}",
consumerType.Name,
@event.GetType().Name
);
}
}
}
15 changes: 15 additions & 0 deletions src/ReflectionEventing/EventBusBuilderOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ public class EventBusBuilderOptions
/// </summary>
public bool UseErrorQueue { get; set; } = false;

/// <summary>
/// Gets or sets the mode in which events in the queue are processed.
/// If set to <see cref="ProcessingMode.Sequential"/>, events are processed one at a time in the order they are received.
/// If set to <see cref="ProcessingMode.Parallel"/>, events are processed concurrently, allowing multiple events to be handled at the same time.
/// The default value is <see cref="ProcessingMode.Sequential"/>.
/// </summary>
public ProcessingMode QueueMode { get; set; } = ProcessingMode.Sequential;

/// <summary>
/// Gets or sets the maximum number of concurrent tasks that can be processed.
/// This value is used to limit the number of tasks running in parallel when <see cref="QueueMode"/> is set to <see cref="ProcessingMode.Parallel"/>.
/// The default value is 100.
/// </summary>
public int ConcurrentTaskLimit { get; set; } = 100;

/// <summary>
/// Gets or sets the rate at which the event queue is processed.
/// The default value is 20ms.
Expand Down
Loading

0 comments on commit 820a855

Please sign in to comment.