Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update queue mode #3

Merged
merged 2 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .github/workflows/reflection-events-pr-validator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ name: reflection-events-pr-validator

on:
pull_request:
branches: [development]
push:
branches: [development]
branches: [main]

workflow_dispatch:

Expand Down
3 changes: 2 additions & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
</PropertyGroup>

<PropertyGroup>
<Version>4.0.0</Version>
<Version>4.1.0</Version>
<AssemblyVersion>4.0.0</AssemblyVersion>
</PropertyGroup>

Expand All @@ -29,6 +29,7 @@
<NuGetAuditLevel>moderate</NuGetAuditLevel>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
<CentralPackageTransitivePinningEnabled>false</CentralPackageTransitivePinningEnabled>
<_SilenceIsAotCompatibleUnsupportedWarning>true</_SilenceIsAotCompatibleUnsupportedWarning>
</PropertyGroup>

<PropertyGroup>
Expand Down
39 changes: 29 additions & 10 deletions Directory.Build.targets
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
<Project>

<PropertyGroup>
<!-- TODO: Dynamically generate Title if one wasn't set -->
<Title Condition="'$(Title)' == ''">$(Product) Asset</Title>
<PropertyGroup Condition="'$(GITHUB_ACTIONS)' == 'true'">
<ContinuousIntegrationBuild>true</ContinuousIntegrationBuild>
</PropertyGroup>

<PropertyGroup>
Expand All @@ -11,7 +10,17 @@
<PackageTags Condition="'$(PackageTags)' == ''">$(CommonTags)</PackageTags>
</PropertyGroup>

<ItemGroup Condition="$(IsPackable)">
<PropertyGroup Condition="'$(GeneratePackageOnBuild)' == 'true'">
<GenerateLibraryLayout>true</GenerateLibraryLayout>
<PackageReadmeFile>README.md</PackageReadmeFile>
<DeterministicSourcePaths Condition="'$(SourceLinkEnabled)' == 'true'">true</DeterministicSourcePaths>
<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
</PropertyGroup>

<ItemGroup Condition="'$(GeneratePackageOnBuild)' == 'true'">
<None Include="$(BuildToolsDirectory)nuget.png" Pack="true" PackagePath="\Icon.png" Visible="False" />
<None Include="$(RepositoryDirectory)ThirdPartyNotices.txt" Pack="true" PackagePath="\" Visible="False" />
<None Include="$(RepositoryDirectory)LICENSE.md" Pack="true" PackagePath="\LICENSE.md" Visible="False" />
Expand All @@ -27,13 +36,8 @@
</ItemGroup>
</Target>

<!-- Define NETSTANDARD2_1_OR_GREATER for .NET Standard 2.1 targets and above -->
<PropertyGroup Condition="'$(TargetFramework)' == 'net6.0' OR '$(TargetFramework)' == 'net6.0' OR '$(TargetFramework)' == 'net7.0'">
<DefineConstants>NETSTANDARD2_1_OR_GREATER</DefineConstants>
</PropertyGroup>

<!-- Configure trimming for projects on .NET 6 and above -->
<PropertyGroup Condition="'$(TargetFramework)' == 'net6.0' OR '$(TargetFramework)' == 'net8.0'">
<PropertyGroup Condition="'$(TargetFramework)' == 'net6.0' OR '$(TargetFramework)' == 'net8.0' OR '$(TargetFramework)' == 'net9.0'">
<IsTrimmable>true</IsTrimmable>
<EnableTrimAnalyzer>true</EnableTrimAnalyzer>
<EnableAotAnalyzer>true</EnableAotAnalyzer>
Expand All @@ -44,4 +48,19 @@
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>$(RepositoryDirectory)\src\lepo.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>

<Target Name="WpfSourceLinkWorkaround" BeforeTargets="InitializeSourceRootMappedPaths" Condition="'$(UseWPF)' == 'true'">
<!-- WPF causes an error with SourceLink because its build targets create a temporary project without a PackageReference to SourceLink, see https://github.com/dotnet/sourcelink/issues/91,
causing the @SourceRoot property to be unexpectedly empty for the MapSourceRoot task

For context, see https://github.com/dotnet/roslyn/blob/main/src/Compilers/Core/MSBuildTask/Microsoft.Managed.Core.targets
and https://github.com/dotnet/roslyn/blob/main/src/Compilers/Core/MSBuildTask/MapSourceRoots.cs

This workaround sets the SourceRoot manually to some deterministic value to keep the promise given by having DeterministicSourcePaths set to true -->
<Message Text="using deterministic source path workaround for WPF project instead of SourceLink" />
<ItemGroup>
<!-- There needs to be at least one SourceRoot defined, its value does not seem to matter as long as it ends with a directory separator -->
<SourceRoot Include="\" />
</ItemGroup>
</Target>
</Project>
6 changes: 5 additions & 1 deletion docs/codesnippet/Rtf/Hyperlink/RtfDocumentProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ from attr in doc.Descendants().Attributes()
#endregion

#region FixLink
private static void FixLink(XAttribute link, RelativePath filePath, HashSet<string> linkToFiles)
private static void FixLink(
XAttribute link,
RelativePath filePath,
HashSet<string> linkToFiles
)
{
string linkFile;
string anchor = null;
Expand Down
4 changes: 3 additions & 1 deletion docs/codesnippet/Rtf/RtfBuildStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ public class RtfBuildStep : IDocumentBuildStep
public void Build(FileModel model, IHostService host)
{
string content = (string)((Dictionary<string, object>)model.Content)["conceptual"];
content = _taskFactory.StartNew(() => RtfToHtmlConverter.ConvertRtfToHtml(content)).Result;
content = _taskFactory
.StartNew(() => RtfToHtmlConverter.ConvertRtfToHtml(content))
.Result;
((Dictionary<string, object>)model.Content)["conceptual"] = content;
}
#endregion
Expand Down
6 changes: 5 additions & 1 deletion src/ReflectionEventing.Demo.Wpf/App.xaml.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using ReflectionEventing.Demo.Wpf.Services;
using ReflectionEventing.Demo.Wpf.ViewModels;
using ReflectionEventing.DependencyInjection;
using ReflectionEventing.DependencyInjection.Services;

namespace ReflectionEventing.Demo.Wpf;

Expand Down Expand Up @@ -33,8 +34,11 @@ public partial class App : Application
_ = services.AddEventBus(e =>
{
e.Options.UseEventPolymorphism = true;
e.Options.UseEventsQueue = true;
e.Options.QueueMode = ProcessingMode.Parallel;

_ = e.AddAllConsumers(Assembly.GetExecutingAssembly());
e.UseBackgroundService<DependencyInjectionQueueProcessor>();
e.AddAllConsumers(Assembly.GetExecutingAssembly());
});
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// Copyright (C) Leszek Pomianowski and ReflectionEventing Contributors.
// All Rights Reserved.

using ReflectionEventing.DependencyInjection.Services;

namespace ReflectionEventing.DependencyInjection;

/// <summary>
Expand All @@ -11,6 +13,8 @@ namespace ReflectionEventing.DependencyInjection;
// ReSharper disable once ClassWithVirtualMembersNeverInherited.Global
public class DependencyInjectionEventBusBuilder(IServiceCollection services) : EventBusBuilder
{
internal Type QueueBackgroundService { get; set; } = typeof(DependencyInjectionQueueProcessor);

/// <summary>
/// Adds a consumer to the event bus and <see cref="IServiceCollection"/> with a specified service lifetime.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,28 @@ namespace ReflectionEventing.DependencyInjection;

public static class EventBusBuilderExtensions
{
/// <summary>
/// Configures the event bus to use a custom background service for processing events.
/// </summary>
/// <typeparam name="TQueueBackgroundService">The type of the background service to use. This type must implement <see cref="IHostedService"/>.</typeparam>
/// <returns>The current instance of <see cref="EventBusBuilder"/>.</returns>
public static EventBusBuilder UseBackgroundService<TQueueBackgroundService>(
this EventBusBuilder builder
)
where TQueueBackgroundService : class, IHostedService
{
if (builder is not DependencyInjectionEventBusBuilder dependencyInjectionEventBusBuilder)
{
throw new InvalidOperationException(
$"The event bus builder must be of type {nameof(DependencyInjectionEventBusBuilder)} to define background service."
);
}

dependencyInjectionEventBusBuilder.QueueBackgroundService = typeof(TQueueBackgroundService);

return dependencyInjectionEventBusBuilder;
}

/// <summary>
/// Adds a consumer to the event bus builder and <see cref="IServiceCollection"/>.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
// All Rights Reserved.

using ReflectionEventing.DependencyInjection.Configuration;
using ReflectionEventing.DependencyInjection.Services;
using ReflectionEventing.Queues;

namespace ReflectionEventing.DependencyInjection;
Expand Down Expand Up @@ -47,7 +46,7 @@ public static IServiceCollection AddEventBus(

if (builder.Options.UseEventsQueue)
{
_ = services.AddHostedService<DependencyInjectionQueueProcessor>();
_ = services.AddSingleton(typeof(IHostedService), builder.QueueBackgroundService);
}

return services;
Expand Down Expand Up @@ -82,7 +81,7 @@ Action<EventBusBuilder> configure

if (builder.Options.UseEventsQueue)
{
_ = services.AddHostedService<DependencyInjectionQueueProcessor>();
_ = services.AddSingleton(typeof(IHostedService), builder.QueueBackgroundService);
}

return services;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ ILogger<DependencyInjectionQueueProcessor> logger

private readonly TimeSpan errorTickRate = options.Value.ErrorTickRate;

private readonly SemaphoreSlim semaphore = new(options.Value.ConcurrentTaskLimit);

protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
await BackgroundProcessing(cancellationToken);
Expand Down Expand Up @@ -114,60 +116,118 @@ CancellationToken cancellationToken
return;
}

MethodInfo? consumeMethod = consumerType.GetMethod(
"ConsumeAsync",
[@event.GetType(), typeof(CancellationToken)]
);

if (consumeMethod != null)
if (options.Value.QueueMode == ProcessingMode.Sequential)
{
try
{
await (Task)consumeMethod.Invoke(consumer, [@event, cancellationToken])!;
}
catch (Exception e)
{
//activity?.AddException(e);
activity?.SetStatus(ActivityStatusCode.Error);

logger.LogError(
new EventId(75001, "ReflectionEventingQueueProcessingFailed"),
e,
"Error processing event of type {EventName}",
@event.GetType().Name
);

if (options.Value.UseErrorQueue)
{
queue.EnqueueError(
new FailedEvent
{
Data = @event,
Exception = e,
Timestamp = DateTimeOffset.UtcNow,
FailedConsumer = consumerType,
}
);
}
await ExecuteConsumerAsync(
@event,
consumerType,
eventType,
consumer,
activity,
cancellationToken
);
}
else if (options.Value.QueueMode == ProcessingMode.Parallel)
{
await semaphore.WaitAsync(cancellationToken);

EventsFailed.Add(
1,
new KeyValuePair<string, object?>("message_type", eventType.Name)
);
}
_ = Task.Run(
async () =>
{
try
{
await ExecuteConsumerAsync(
@event,
consumerType,
eventType,
consumer,
activity,
cancellationToken
);
}
catch (Exception e)
{
logger.LogError(e, "Error occurred during consumer execution");
}
finally
{
semaphore.Release();
}
},
cancellationToken
);
}
else
{
logger.LogError(
new EventId(75002, "ReflectionEventingConsumerMissing"),
"ConsumeAsync method not found on consumer {ConsumerType} for event type {EventName}",
consumerType.Name,
@event.GetType().Name
throw new InvalidOperationException(
"Invalid queue processing mode. Must be either Sequential or Parallel."
);
}
}
}

EventsProcessed.Add(1, new KeyValuePair<string, object?>("message_type", eventType.Name));
}

private async Task ExecuteConsumerAsync(
object @event,
Type consumerType,
Type eventType,
object consumer,
Activity? activity,
CancellationToken cancellationToken
)
{
MethodInfo? consumeMethod = consumerType.GetMethod(
"ConsumeAsync",
[@event.GetType(), typeof(CancellationToken)]
);

if (consumeMethod != null)
{
try
{
await (Task)consumeMethod.Invoke(consumer, [@event, cancellationToken])!;
}
catch (Exception e)
{
//activity?.AddException(e);
activity?.SetStatus(ActivityStatusCode.Error);

logger.LogError(
new EventId(75001, "ReflectionEventingQueueProcessingFailed"),
e,
"Error processing event of type {EventName}",
@event.GetType().Name
);

if (options.Value.UseErrorQueue)
{
queue.EnqueueError(
new FailedEvent
{
Data = @event,
Exception = e,
Timestamp = DateTimeOffset.UtcNow,
FailedConsumer = consumerType,
}
);
}

EventsFailed.Add(
1,
new KeyValuePair<string, object?>("message_type", eventType.Name)
);
}
}
else
{
logger.LogError(
new EventId(75002, "ReflectionEventingConsumerMissing"),
"ConsumeAsync method not found on consumer {ConsumerType} for event type {EventName}",
consumerType.Name,
@event.GetType().Name
);
}
}
}
Loading
Loading