diff --git a/.gitignore b/.gitignore index e25422f..009742c 100644 --- a/.gitignore +++ b/.gitignore @@ -58,5 +58,12 @@ _ReSharper.*/ *.rej scratch.txt +# Local scratch notes +/scratch.md + +# FileScrivener toy outputs — keep scope tight +# Only ignore the toy's artifacts; do not blanket-ignore any /data folders +/src/toys/Coven.Toys.FileScrivenerConsole/data/ + # Codex CLI generated state **/.codex/ diff --git a/.vscode/launch.json b/.vscode/launch.json index efefc94..c9baf90 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -67,6 +67,17 @@ "justMyCode": true, "preLaunchTask": "build" }, + { + "name": "Launch: Coven.Toys.FileScrivenerConsole", + "type": "coreclr", + "request": "launch", + "program": "${workspaceFolder}/src/toys/Coven.Toys.FileScrivenerConsole/bin/Debug/net10.0/Coven.Toys.FileScrivenerConsole.dll", + "cwd": "${workspaceFolder}/src/toys/Coven.Toys.FileScrivenerConsole", + "console": "integratedTerminal", + "stopAtEntry": false, + "justMyCode": true, + "preLaunchTask": "build" + }, { "name": "Attach: .NET Core", "type": "coreclr", diff --git a/INDEX.md b/INDEX.md index a373e53..4f2f7f3 100644 --- a/INDEX.md +++ b/INDEX.md @@ -25,6 +25,8 @@ Project overview: see [README](/README.md). - Agents (branch): [/src/Coven.Agents](/src/Coven.Agents/) ([README](/src/Coven.Agents/README.md)) - Agents OpenAI (leaf): [/src/Coven.Agents.OpenAI](/src/Coven.Agents.OpenAI/) ([README](/src/Coven.Agents.OpenAI/README.md)) +- Scriveners: [/src/Coven.Scriveners.FileScrivener](/src/Coven.Scriveners.FileScrivener/) ([README](/src/Coven.Scriveners.FileScrivener/README.md)) + - Tests: [/src/Coven.Core.Tests](/src/Coven.Core.Tests/), [/src/Coven.Daemonology.Tests](/src/Coven.Daemonology.Tests/) ## Samples @@ -36,3 +38,4 @@ Project overview: see [README](/README.md). - [/src/toys/Coven.Toys.ConsoleOpenAIStreaming](/src/toys/Coven.Toys.ConsoleOpenAIStreaming/) - [/src/toys/Coven.Toys.DiscordChat](/src/toys/Coven.Toys.DiscordChat/) - [/src/toys/Coven.Toys.DiscordStreaming](/src/toys/Coven.Toys.DiscordStreaming/) +- [/src/toys/Coven.Toys.FileScrivenerConsole](/src/toys/Coven.Toys.FileScrivenerConsole/) diff --git a/architecture/Journaling-and-Scriveners.md b/architecture/Journaling-and-Scriveners.md index 83b9019..ec123ac 100644 --- a/architecture/Journaling-and-Scriveners.md +++ b/architecture/Journaling-and-Scriveners.md @@ -35,6 +35,12 @@ Scriveners are append‑only journals that record typed entries. They decouple p - Avoid long‑running synchronous work in consumers; prefer daemons that tail asynchronously. - Treat journals as the source of truth for cross‑component communication. +## Persistence +- In‑memory by default: `InMemoryScrivener` (fast, replayable within process). +- File‑backed option: `Coven.Scriveners.FileScrivener` provides a background flusher that appends NDJSON snapshots to disk while your app continues to use the in‑memory journal. + - See package README: `/src/Coven.Scriveners.FileScrivener/README.md`. + - Try the toy: `/src/toys/Coven.Toys.FileScrivenerConsole/`. + ## Related - Windowing/Shattering: see “Windowing and Shattering”. - Daemon lifecycle: see `src/Coven.Daemonology/README.md`. diff --git a/architecture/README.md b/architecture/README.md index 700aab5..6972bbc 100644 --- a/architecture/README.md +++ b/architecture/README.md @@ -43,6 +43,7 @@ Concrete examples help ground the vocabulary above. The Sample 01 app wires Disc - Journals (Scriveners): append‑only logs connecting router↔branches↔leaves. - Chat journal: `IScrivener` (Discord adapter implements its own scrivener; router reads/writes entries). - Agent journal: `IScrivener` (OpenAI adapter scrivener; router reads/writes entries). + - Persistence: use `Coven.Scriveners.FileScrivener` to append NDJSON snapshots to disk while keeping in‑process journals in memory. - Directionality in the router logic: - Afferent (inbound): Discord adapter writes `ChatAfferent` when a user posts in the channel; router reads and forwards as an `AgentPrompt`. diff --git a/src/Coven.Agents.OpenAI/OpenAIAgentSession.cs b/src/Coven.Agents.OpenAI/OpenAIAgentSession.cs index 17ad4c6..e920caa 100644 --- a/src/Coven.Agents.OpenAI/OpenAIAgentSession.cs +++ b/src/Coven.Agents.OpenAI/OpenAIAgentSession.cs @@ -7,12 +7,17 @@ namespace Coven.Agents.OpenAI; +/// +/// Coordinates an OpenAI agent session bridging the OpenAI and Agent journals. +/// Uses imbuing transmuters to carry the source journal position as a reagent for position-based ACKs. +/// internal sealed class OpenAIAgentSession( IOpenAIGatewayConnection gateway, IScrivener openAIJournal, IScrivener agentJournal, IShatterPolicy shatterPolicy, - IBiDirectionalTransmuter transmuter, + IImbuingTransmuter afferentTransmuter, + IImbuingTransmuter efferentTransmuter, ILogger logger, CancellationToken sessionToken) : IAsyncDisposable { @@ -20,7 +25,8 @@ internal sealed class OpenAIAgentSession( private readonly IScrivener _openAIJournal = openAIJournal ?? throw new ArgumentNullException(nameof(openAIJournal)); private readonly IScrivener _agentJournal = agentJournal ?? throw new ArgumentNullException(nameof(agentJournal)); private readonly IShatterPolicy _shatterPolicy = shatterPolicy ?? throw new ArgumentNullException(nameof(shatterPolicy)); - private readonly IBiDirectionalTransmuter _transmuter = transmuter ?? throw new ArgumentNullException(nameof(transmuter)); + private readonly IImbuingTransmuter _afferentTransmuter = afferentTransmuter ?? throw new ArgumentNullException(nameof(afferentTransmuter)); + private readonly IImbuingTransmuter _efferentTransmuter = efferentTransmuter ?? throw new ArgumentNullException(nameof(efferentTransmuter)); private readonly ILogger _logger = logger ?? throw new ArgumentNullException(nameof(logger)); private readonly CancellationToken _sessionToken = sessionToken; @@ -55,7 +61,7 @@ public async Task StartAsync() if (openAIEntry is OpenAIAfferentThoughtChunk) { produced = true; - AgentEntry agentChunk = await _transmuter.TransmuteAfferent(openAIEntry, ct).ConfigureAwait(false); + AgentEntry agentChunk = await _afferentTransmuter.Transmute(openAIEntry, position, ct).ConfigureAwait(false); long pos = await _agentJournal.WriteAsync(agentChunk, ct).ConfigureAwait(false); OpenAILog.OpenAIToAgentsAppended(_logger, agentChunk.GetType().Name, pos); } @@ -67,7 +73,7 @@ public async Task StartAsync() } } - AgentEntry agent = await _transmuter.TransmuteAfferent(entry, ct).ConfigureAwait(false); + AgentEntry agent = await _afferentTransmuter.Transmute(entry, position, ct).ConfigureAwait(false); OpenAILog.OpenAIToAgentsTransmuted(_logger, entry.GetType().Name, agent.GetType().Name); long agentPos = await _agentJournal.WriteAsync(agent, ct).ConfigureAwait(false); OpenAILog.OpenAIToAgentsAppended(_logger, agent.GetType().Name, agentPos); @@ -98,7 +104,7 @@ public async Task StartAsync() } OpenAILog.AgentsToOpenAIObserved(_logger, entry.GetType().Name, position); - OpenAIEntry openAI = await _transmuter.TransmuteEfferent(entry, ct).ConfigureAwait(false); + OpenAIEntry openAI = await _efferentTransmuter.Transmute(entry, position, ct).ConfigureAwait(false); OpenAILog.AgentsToOpenAITransmuted(_logger, entry.GetType().Name, openAI.GetType().Name); long aiPos = await _openAIJournal.WriteAsync(openAI, ct).ConfigureAwait(false); OpenAILog.AgentsToOpenAIAppended(_logger, openAI.GetType().Name, aiPos); diff --git a/src/Coven.Agents.OpenAI/OpenAIAgentSessionFactory.cs b/src/Coven.Agents.OpenAI/OpenAIAgentSessionFactory.cs index a387524..d70a9bd 100644 --- a/src/Coven.Agents.OpenAI/OpenAIAgentSessionFactory.cs +++ b/src/Coven.Agents.OpenAI/OpenAIAgentSessionFactory.cs @@ -7,21 +7,26 @@ namespace Coven.Agents.OpenAI; +/// +/// Constructs instances with imbuing transmuters for position-based acknowledgements. +/// internal sealed class OpenAIAgentSessionFactory( IOpenAIGatewayConnection gatewayConnection, IScrivener openAIJournal, IScrivener agentJournal, - IBiDirectionalTransmuter transmuter, + IImbuingTransmuter afferentTransmuter, + IImbuingTransmuter efferentTransmuter, IShatterPolicy shatterPolicy, ILogger logger) { private readonly IOpenAIGatewayConnection _gatewayConnection = gatewayConnection ?? throw new ArgumentNullException(nameof(gatewayConnection)); private readonly IScrivener _openAIJournal = openAIJournal ?? throw new ArgumentNullException(nameof(openAIJournal)); private readonly IScrivener _agentJournal = agentJournal ?? throw new ArgumentNullException(nameof(agentJournal)); - private readonly IBiDirectionalTransmuter _transmuter = transmuter ?? throw new ArgumentNullException(nameof(transmuter)); + private readonly IImbuingTransmuter _afferentTransmuter = afferentTransmuter ?? throw new ArgumentNullException(nameof(afferentTransmuter)); + private readonly IImbuingTransmuter _efferentTransmuter = efferentTransmuter ?? throw new ArgumentNullException(nameof(efferentTransmuter)); private readonly IShatterPolicy _shatterPolicy = shatterPolicy ?? throw new ArgumentNullException(nameof(shatterPolicy)); private readonly ILogger _logger = logger ?? throw new ArgumentNullException(nameof(logger)); public OpenAIAgentSession Create(CancellationToken sessionToken) - => new(_gatewayConnection, _openAIJournal, _agentJournal, _shatterPolicy, _transmuter, _logger, sessionToken); + => new(_gatewayConnection, _openAIJournal, _agentJournal, _shatterPolicy, _afferentTransmuter, _efferentTransmuter, _logger, sessionToken); } diff --git a/src/Coven.Agents.OpenAI/OpenAIAgentsServiceCollectionExtensions.cs b/src/Coven.Agents.OpenAI/OpenAIAgentsServiceCollectionExtensions.cs index cb42167..cd04f64 100644 --- a/src/Coven.Agents.OpenAI/OpenAIAgentsServiceCollectionExtensions.cs +++ b/src/Coven.Agents.OpenAI/OpenAIAgentsServiceCollectionExtensions.cs @@ -14,7 +14,7 @@ namespace Coven.Agents.OpenAI; /// /// Dependency Injection helpers for wiring the OpenAI agent integration. -/// Registers journals, gateway connection, transmuters, windowing daemons, and the official OpenAI client. +/// Registers journals, gateway connection, imbuing transmuters (position-based ACKs), windowing daemons, and the official OpenAI client. /// public static class OpenAIAgentsServiceCollectionExtensions { @@ -87,8 +87,9 @@ public static IServiceCollection AddOpenAIAgents(this IServiceCollection service services.TryAddScoped(); } - // Transmuter and daemon - services.AddScoped, OpenAITransmuter>(); + // Transmuters and daemon + services.AddScoped, OpenAITransmuter>(); + services.AddScoped, OpenAITransmuter>(); services.TryAddScoped, OpenAIEntryToResponseItemTransmuter>(); services.TryAddScoped(); // Session-local shattering for OpenAI chunks: split on paragraph boundary diff --git a/src/Coven.Agents.OpenAI/OpenAIEntry.cs b/src/Coven.Agents.OpenAI/OpenAIEntry.cs index d03526e..502651f 100644 --- a/src/Coven.Agents.OpenAI/OpenAIEntry.cs +++ b/src/Coven.Agents.OpenAI/OpenAIEntry.cs @@ -1,5 +1,6 @@ // SPDX-License-Identifier: BUSL-1.1 +using System.Text.Json.Serialization; using Coven.Core; namespace Coven.Agents.OpenAI; @@ -7,6 +8,15 @@ namespace Coven.Agents.OpenAI; /// /// Base entry type for OpenAI agent journals (requests, responses, thoughts, chunks, acknowledgements). /// +[JsonPolymorphic(TypeDiscriminatorPropertyName = "$type")] +[JsonDerivedType(typeof(OpenAIEfferent), nameof(OpenAIEfferent))] +[JsonDerivedType(typeof(OpenAIAfferent), nameof(OpenAIAfferent))] +[JsonDerivedType(typeof(OpenAIAfferentChunk), nameof(OpenAIAfferentChunk))] +[JsonDerivedType(typeof(OpenAIAfferentThoughtChunk), nameof(OpenAIAfferentThoughtChunk))] +[JsonDerivedType(typeof(OpenAIThought), nameof(OpenAIThought))] +[JsonDerivedType(typeof(OpenAIAck), nameof(OpenAIAck))] +[JsonDerivedType(typeof(OpenAIEfferentThoughtChunk), nameof(OpenAIEfferentThoughtChunk))] +[JsonDerivedType(typeof(OpenAIStreamCompleted), nameof(OpenAIStreamCompleted))] public abstract record OpenAIEntry( string Sender ) : Entry; @@ -57,7 +67,7 @@ string Model /// OpenAI acknowledgement used for synchronization. public sealed record OpenAIAck( string Sender, - string Text + long Position ) : OpenAIEntry(Sender); // Streaming thought chunks (efferent): agent streams thoughts out diff --git a/src/Coven.Agents.OpenAI/OpenAITransmuter.cs b/src/Coven.Agents.OpenAI/OpenAITransmuter.cs index f9c2210..edf5597 100644 --- a/src/Coven.Agents.OpenAI/OpenAITransmuter.cs +++ b/src/Coven.Agents.OpenAI/OpenAITransmuter.cs @@ -5,47 +5,63 @@ namespace Coven.Agents.OpenAI; /// -/// Maps between OpenAI-specific entries and generic Agent entries. +/// Maps between OpenAI-specific entries and generic Agent entries using position-imbued ACKs. /// Afferent: OpenAI → Agent; Efferent: Agent → OpenAI. /// -internal sealed class OpenAITransmuter : IBiDirectionalTransmuter +internal sealed class OpenAITransmuter + : IImbuingTransmuter, + IImbuingTransmuter { - public Task TransmuteAfferent(OpenAIEntry Input, CancellationToken cancellationToken) + /// + /// Transmutes OpenAI-afferent entries to Agent entries. + /// + /// The source OpenAI entry. + /// The source journal position used for position-based acknowledgements. + /// Cancellation token. + /// The mapped Agent entry. + public Task Transmute(OpenAIEntry Input, long Reagent, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); return Input switch { OpenAIAfferent incoming => Task.FromResult(new AgentResponse(incoming.Sender, incoming.Text)), - // Today, Afferent chunks include thoughts, tomorrow who knows. OpenAIAfferentChunk chunk => Task.FromResult(new AgentAfferentChunk(chunk.Sender, chunk.Text)), - // Streaming thought chunks from OpenAI surface as afferent thought drafts OpenAIAfferentThoughtChunk tChunk => Task.FromResult(new AgentAfferentThoughtChunk(tChunk.Sender, tChunk.Text)), - // A full OpenAIThought should surface as a fixed AgentThought OpenAIThought thought => Task.FromResult(new AgentThought(thought.Sender, thought.Text)), OpenAIStreamCompleted done => Task.FromResult(new AgentStreamCompleted(done.Sender)), - OpenAIEfferent outgoing => Task.FromResult(new AgentAck(outgoing.Sender)), + // For efferent records observed on the OpenAI journal or explicit OpenAI acks, emit an AgentAck with the source position + OpenAIEfferent outgoing => Task.FromResult(new AgentAck(outgoing.Sender, Reagent)), + OpenAIAck => Task.FromResult(new AgentAck(Input.Sender, Reagent)), _ => throw new ArgumentOutOfRangeException(nameof(Input)) }; } - public Task TransmuteEfferent(AgentEntry Output, CancellationToken cancellationToken) + /// + /// Transmutes Agent-efferent entries to OpenAI entries. + /// + /// The source Agent entry. + /// The source journal position used for position-based acknowledgements. + /// Cancellation token. + /// The mapped OpenAI entry. + public Task Transmute(AgentEntry Input, long Reagent, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); - return Output switch + return Input switch { AgentPrompt prompt => Task.FromResult(new OpenAIEfferent(prompt.Sender, prompt.Text)), - AgentResponse response => Task.FromResult(new OpenAIAck(response.Sender, response.Text)), - AgentThought thought => Task.FromResult(new OpenAIAck(thought.Sender, thought.Text)), - AgentEfferentChunk efferentChunk => Task.FromResult(new OpenAIAck(efferentChunk.Sender, efferentChunk.Text)), - AgentAfferentChunk afferentChunk => Task.FromResult(new OpenAIAck(afferentChunk.Sender, afferentChunk.Text)), - // Streaming efferent thought drafts map to OpenAI efferent thought chunk (not forwarded by gateway today) + + // All other Agent entries (including drafts and fixed) acknowledge with the position being processed + AgentResponse response => Task.FromResult(new OpenAIAck(response.Sender, Reagent)), + AgentThought thought => Task.FromResult(new OpenAIAck(thought.Sender, Reagent)), + AgentEfferentChunk efferentChunk => Task.FromResult(new OpenAIAck(efferentChunk.Sender, Reagent)), + AgentAfferentChunk afferentChunk => Task.FromResult(new OpenAIAck(afferentChunk.Sender, Reagent)), AgentEfferentThoughtChunk etChunk => Task.FromResult(new OpenAIEfferentThoughtChunk(etChunk.Sender, etChunk.Text)), - // Afferent thought drafts are not sent outward; ack for completeness - AgentAfferentThoughtChunk atChunk => Task.FromResult(new OpenAIAck(atChunk.Sender, atChunk.Text)), - AgentStreamCompleted done => Task.FromResult(new OpenAIAck(done.Sender, string.Empty)), - _ => throw new ArgumentOutOfRangeException(nameof(Output)) + AgentAfferentThoughtChunk atChunk => Task.FromResult(new OpenAIAck(atChunk.Sender, Reagent)), + AgentStreamCompleted done => Task.FromResult(new OpenAIAck(done.Sender, Reagent)), + AgentAck ack => Task.FromResult(new OpenAIAck(ack.Sender, Reagent)), + _ => throw new ArgumentOutOfRangeException(nameof(Input)) }; } } diff --git a/src/Coven.Agents/AgentEntry.cs b/src/Coven.Agents/AgentEntry.cs index dbe368c..4076346 100644 --- a/src/Coven.Agents/AgentEntry.cs +++ b/src/Coven.Agents/AgentEntry.cs @@ -1,5 +1,6 @@ // SPDX-License-Identifier: BUSL-1.1 +using System.Text.Json.Serialization; using Coven.Core; namespace Coven.Agents; @@ -7,6 +8,16 @@ namespace Coven.Agents; /// /// Base entry type for agent journals (prompts, responses, thoughts, acks, and streaming chunks). /// +[JsonPolymorphic(TypeDiscriminatorPropertyName = "$type")] +[JsonDerivedType(typeof(AgentPrompt), nameof(AgentPrompt))] +[JsonDerivedType(typeof(AgentResponse), nameof(AgentResponse))] +[JsonDerivedType(typeof(AgentThought), nameof(AgentThought))] +[JsonDerivedType(typeof(AgentAck), nameof(AgentAck))] +[JsonDerivedType(typeof(AgentEfferentChunk), nameof(AgentEfferentChunk))] +[JsonDerivedType(typeof(AgentAfferentChunk), nameof(AgentAfferentChunk))] +[JsonDerivedType(typeof(AgentEfferentThoughtChunk), nameof(AgentEfferentThoughtChunk))] +[JsonDerivedType(typeof(AgentAfferentThoughtChunk), nameof(AgentAfferentThoughtChunk))] +[JsonDerivedType(typeof(AgentStreamCompleted), nameof(AgentStreamCompleted))] public abstract record AgentEntry(string Sender) : Entry; /// @@ -23,8 +34,8 @@ public sealed record AgentResponse(string Sender, string Text) : AgentEntry(Send /// Represents an agent's introspective thought (not typically user-visible). public sealed record AgentThought(string Sender, string Text) : AgentEntry(Sender); -/// Represents an acknowledgement for internal synchronization. -public sealed record AgentAck(string Sender) : AgentEntry(Sender); +/// Represents an acknowledgement for internal synchronization. Carries the position being acknowledged. +public sealed record AgentAck(string Sender, long Position) : AgentEntry(Sender); // Streaming additions /// Outgoing (efferent) response chunk prior to finalization. diff --git a/src/Coven.Chat.Console/ConsoleChatServiceCollectionExtensions.cs b/src/Coven.Chat.Console/ConsoleChatServiceCollectionExtensions.cs index d231fc5..2ec6540 100644 --- a/src/Coven.Chat.Console/ConsoleChatServiceCollectionExtensions.cs +++ b/src/Coven.Chat.Console/ConsoleChatServiceCollectionExtensions.cs @@ -34,7 +34,9 @@ public static IServiceCollection AddConsoleChat(this IServiceCollection services services.AddScoped, ConsoleScrivener>(); services.AddKeyedScoped, InMemoryScrivener>("Coven.InternalConsoleScrivener"); - services.AddScoped, ConsoleTransmuter>(); + // Imbuing transmuters (position-aware) for ack correctness + services.AddScoped, ConsoleTransmuter>(); + services.AddScoped, ConsoleTransmuter>(); services.AddScoped, InMemoryScrivener>(); services.AddScoped(); return services; diff --git a/src/Coven.Chat.Console/ConsoleChatSession.cs b/src/Coven.Chat.Console/ConsoleChatSession.cs index 4143950..a98b1b6 100644 --- a/src/Coven.Chat.Console/ConsoleChatSession.cs +++ b/src/Coven.Chat.Console/ConsoleChatSession.cs @@ -10,14 +10,16 @@ internal sealed class ConsoleChatSession( ConsoleGatewayConnection gateway, IScrivener consoleJournal, IScrivener chatJournal, - IBiDirectionalTransmuter transmuter, + IImbuingTransmuter afferentTransmuter, + IImbuingTransmuter efferentTransmuter, ILogger logger, CancellationToken sessionToken) : IAsyncDisposable { private readonly ConsoleGatewayConnection _gateway = gateway ?? throw new ArgumentNullException(nameof(gateway)); private readonly IScrivener _consoleJournal = consoleJournal ?? throw new ArgumentNullException(nameof(consoleJournal)); private readonly IScrivener _chatJournal = chatJournal ?? throw new ArgumentNullException(nameof(chatJournal)); - private readonly IBiDirectionalTransmuter _transmuter = transmuter ?? throw new ArgumentNullException(nameof(transmuter)); + private readonly IImbuingTransmuter _afferentTransmuter = afferentTransmuter ?? throw new ArgumentNullException(nameof(afferentTransmuter)); + private readonly IImbuingTransmuter _efferentTransmuter = efferentTransmuter ?? throw new ArgumentNullException(nameof(efferentTransmuter)); private readonly ILogger _logger = logger ?? throw new ArgumentNullException(nameof(logger)); private readonly CancellationToken _sessionToken = sessionToken; @@ -41,7 +43,7 @@ public async Task StartAsync() } ConsoleLog.ConsoleToChatObserved(_logger, entry.GetType().Name, position); - ChatEntry chat = await _transmuter.TransmuteAfferent(entry, ct).ConfigureAwait(false); + ChatEntry chat = await _afferentTransmuter.Transmute(entry, position, ct).ConfigureAwait(false); ConsoleLog.ConsoleToChatTransmuted(_logger, entry.GetType().Name, chat.GetType().Name); long chatPos = await _chatJournal.WriteAsync(chat, ct).ConfigureAwait(false); ConsoleLog.ConsoleToChatAppended(_logger, chat.GetType().Name, chatPos); @@ -72,7 +74,7 @@ public async Task StartAsync() } ConsoleLog.ChatToConsoleObserved(_logger, entry.GetType().Name, position); - ConsoleEntry console = await _transmuter.TransmuteEfferent(entry, ct).ConfigureAwait(false); + ConsoleEntry console = await _efferentTransmuter.Transmute(entry, position, ct).ConfigureAwait(false); ConsoleLog.ChatToConsoleTransmuted(_logger, entry.GetType().Name, console.GetType().Name); long consolePos = await _consoleJournal.WriteAsync(console, ct).ConfigureAwait(false); ConsoleLog.ChatToConsoleAppended(_logger, console.GetType().Name, consolePos); diff --git a/src/Coven.Chat.Console/ConsoleChatSessionFactory.cs b/src/Coven.Chat.Console/ConsoleChatSessionFactory.cs index e2297f3..8fe947d 100644 --- a/src/Coven.Chat.Console/ConsoleChatSessionFactory.cs +++ b/src/Coven.Chat.Console/ConsoleChatSessionFactory.cs @@ -10,15 +10,17 @@ internal sealed class ConsoleChatSessionFactory( ConsoleGatewayConnection gatewayConnection, IScrivener consoleJournal, IScrivener chatJournal, - IBiDirectionalTransmuter transmuter, + IImbuingTransmuter afferentTransmuter, + IImbuingTransmuter efferentTransmuter, ILogger logger) { private readonly ConsoleGatewayConnection _gatewayConnection = gatewayConnection ?? throw new ArgumentNullException(nameof(gatewayConnection)); private readonly IScrivener _consoleJournal = consoleJournal ?? throw new ArgumentNullException(nameof(consoleJournal)); private readonly IScrivener _chatJournal = chatJournal ?? throw new ArgumentNullException(nameof(chatJournal)); - private readonly IBiDirectionalTransmuter _transmuter = transmuter ?? throw new ArgumentNullException(nameof(transmuter)); + private readonly IImbuingTransmuter _afferentTransmuter = afferentTransmuter ?? throw new ArgumentNullException(nameof(afferentTransmuter)); + private readonly IImbuingTransmuter _efferentTransmuter = efferentTransmuter ?? throw new ArgumentNullException(nameof(efferentTransmuter)); private readonly ILogger _logger = logger ?? throw new ArgumentNullException(nameof(logger)); public ConsoleChatSession Create(CancellationToken sessionToken) - => new(_gatewayConnection, _consoleJournal, _chatJournal, _transmuter, _logger, sessionToken); + => new(_gatewayConnection, _consoleJournal, _chatJournal, _afferentTransmuter, _efferentTransmuter, _logger, sessionToken); } diff --git a/src/Coven.Chat.Console/ConsoleEntry.cs b/src/Coven.Chat.Console/ConsoleEntry.cs index 2aa6012..8723358 100644 --- a/src/Coven.Chat.Console/ConsoleEntry.cs +++ b/src/Coven.Chat.Console/ConsoleEntry.cs @@ -1,5 +1,6 @@ // SPDX-License-Identifier: BUSL-1.1 +using System.Text.Json.Serialization; using Coven.Core; namespace Coven.Chat.Console; @@ -7,25 +8,28 @@ namespace Coven.Chat.Console; /// /// Base entry type for the Console chat journal. /// +[JsonPolymorphic(TypeDiscriminatorPropertyName = "$type")] +[JsonDerivedType(typeof(ConsoleAck), nameof(ConsoleAck))] +[JsonDerivedType(typeof(ConsoleAfferent), nameof(ConsoleAfferent))] +[JsonDerivedType(typeof(ConsoleEfferent), nameof(ConsoleEfferent))] public abstract record ConsoleEntry( - string Sender, - string Text + string Sender ) : Entry; /// Acknowledgement entry for internal synchronization. public sealed record ConsoleAck( string Sender, - string Text -) : ConsoleEntry(Sender, Text); + long Position +) : ConsoleEntry(Sender); /// Incoming line read from stdin. public sealed record ConsoleAfferent( string Sender, string Text -) : ConsoleEntry(Sender, Text); +) : ConsoleEntry(Sender); /// Outgoing line written to stdout. public sealed record ConsoleEfferent( string Sender, string Text -) : ConsoleEntry(Sender, Text); +) : ConsoleEntry(Sender); diff --git a/src/Coven.Chat.Console/ConsoleScrivener.cs b/src/Coven.Chat.Console/ConsoleScrivener.cs index 6ed0d98..64973aa 100644 --- a/src/Coven.Chat.Console/ConsoleScrivener.cs +++ b/src/Coven.Chat.Console/ConsoleScrivener.cs @@ -33,9 +33,9 @@ public ConsoleScrivener( /// public override async Task WriteAsync(ConsoleEntry entry, CancellationToken cancellationToken = default) { - if (entry is ConsoleEfferent) + if (entry is ConsoleEfferent efferent) { - await _gateway.SendAsync(entry.Text, cancellationToken).ConfigureAwait(false); + await _gateway.SendAsync(efferent.Text, cancellationToken).ConfigureAwait(false); } long pos = await WriteInnerAsync(entry, cancellationToken).ConfigureAwait(false); diff --git a/src/Coven.Chat.Console/ConsoleTransmuter.cs b/src/Coven.Chat.Console/ConsoleTransmuter.cs index 4244518..0649add 100644 --- a/src/Coven.Chat.Console/ConsoleTransmuter.cs +++ b/src/Coven.Chat.Console/ConsoleTransmuter.cs @@ -6,39 +6,47 @@ namespace Coven.Chat.Console; /// /// Maps between Console-specific entries and generic Chat entries. -/// Afferent: Console → Chat; Efferent: Chat → Console. +/// Supports imbuing with the source record position for position-based ACKs. /// -internal sealed class ConsoleTransmuter(ConsoleClientConfig config) : IBiDirectionalTransmuter +internal sealed class ConsoleTransmuter(ConsoleClientConfig config) + : IImbuingTransmuter, + IImbuingTransmuter { private readonly ConsoleClientConfig _config = config ?? throw new ArgumentNullException(nameof(config)); - public Task TransmuteAfferent(ConsoleEntry Input, CancellationToken cancellationToken) + // Console → Chat (afferent): + // - ConsoleAfferent -> ChatAfferent (position ignored) + // - ConsoleEfferent -> ChatAck(position) + public Task Transmute(ConsoleEntry Input, long Reagent, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); return Input switch { ConsoleAfferent incoming => Task.FromResult(new ChatAfferent(incoming.Sender, incoming.Text)), - ConsoleEfferent outgoing => Task.FromResult(new ChatAck(outgoing.Sender, outgoing.Text)), + ConsoleEfferent outgoing => Task.FromResult(new ChatAck(outgoing.Sender, Reagent)), + ConsoleAck => Task.FromResult(new ChatAck(Input.Sender, Reagent)), _ => throw new ArgumentOutOfRangeException(nameof(Input)) }; } - public Task TransmuteEfferent(ChatEntry Output, CancellationToken cancellationToken) + // Chat → Console (efferent): + // - ChatEfferent -> ConsoleEfferent + // - All others -> ConsoleAck(position) + public Task Transmute(ChatEntry Input, long Reagent, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); - return Output switch + return Input switch { ChatEfferent outgoing => Task.FromResult(new ConsoleEfferent(_config.OutputSender, outgoing.Text)), - // Internal/unfixed artifacts or inbound: acknowledge only - ChatEfferentDraft draft => Task.FromResult(new ConsoleAck(draft.Sender, draft.Text)), - ChatChunk chunk => Task.FromResult(new ConsoleAck(chunk.Sender, chunk.Text)), - ChatStreamCompleted done => Task.FromResult(new ConsoleAck(done.Sender, string.Empty)), - ChatAfferent incoming => Task.FromResult(new ConsoleAck(incoming.Sender, incoming.Text)), - ChatAfferentDraft incomingDraft => Task.FromResult(new ConsoleAck(incomingDraft.Sender, incomingDraft.Text)), - _ => throw new ArgumentOutOfRangeException(nameof(Output)) + ChatEfferentDraft draft => Task.FromResult(new ConsoleAck(draft.Sender, Reagent)), + ChatChunk chunk => Task.FromResult(new ConsoleAck(chunk.Sender, Reagent)), + ChatStreamCompleted done => Task.FromResult(new ConsoleAck(done.Sender, Reagent)), + ChatAfferent incoming => Task.FromResult(new ConsoleAck(incoming.Sender, Reagent)), + ChatAfferentDraft incomingDraft => Task.FromResult(new ConsoleAck(incomingDraft.Sender, Reagent)), + _ => throw new ArgumentOutOfRangeException(nameof(Input)) }; } } diff --git a/src/Coven.Chat.Discord/DiscordChatServiceCollectionExtensions.cs b/src/Coven.Chat.Discord/DiscordChatServiceCollectionExtensions.cs index a705208..7f245f4 100644 --- a/src/Coven.Chat.Discord/DiscordChatServiceCollectionExtensions.cs +++ b/src/Coven.Chat.Discord/DiscordChatServiceCollectionExtensions.cs @@ -50,7 +50,8 @@ public static IServiceCollection AddDiscordChat(this IServiceCollection services services.AddScoped, DiscordScrivener>(); services.AddKeyedScoped, InMemoryScrivener>("Coven.InternalDiscordScrivener"); - services.AddScoped, DiscordTransmuter>(); + services.AddScoped, DiscordTransmuter>(); + services.AddScoped, DiscordTransmuter>(); services.AddScoped, InMemoryScrivener>(); services.AddScoped(); diff --git a/src/Coven.Chat.Discord/DiscordChatSession.cs b/src/Coven.Chat.Discord/DiscordChatSession.cs index 018060c..d56c9c0 100644 --- a/src/Coven.Chat.Discord/DiscordChatSession.cs +++ b/src/Coven.Chat.Discord/DiscordChatSession.cs @@ -11,7 +11,8 @@ internal sealed class DiscordChatSession( DiscordGatewayConnection gateway, IScrivener discordJournal, IScrivener chatJournal, - IBiDirectionalTransmuter transmuter, + IImbuingTransmuter afferentTransmuter, + IImbuingTransmuter efferentTransmuter, IShatterPolicy shatterPolicy, ILogger logger, CancellationToken sessionToken) : IAsyncDisposable @@ -19,7 +20,8 @@ internal sealed class DiscordChatSession( private readonly DiscordGatewayConnection _gateway = gateway ?? throw new ArgumentNullException(nameof(gateway)); private readonly IScrivener _discordJournal = discordJournal ?? throw new ArgumentNullException(nameof(discordJournal)); private readonly IScrivener _chatJournal = chatJournal ?? throw new ArgumentNullException(nameof(chatJournal)); - private readonly IBiDirectionalTransmuter _transmuter = transmuter ?? throw new ArgumentNullException(nameof(transmuter)); + private readonly IImbuingTransmuter _afferentTransmuter = afferentTransmuter ?? throw new ArgumentNullException(nameof(afferentTransmuter)); + private readonly IImbuingTransmuter _efferentTransmuter = efferentTransmuter ?? throw new ArgumentNullException(nameof(efferentTransmuter)); private readonly IShatterPolicy _shatterPolicy = shatterPolicy ?? throw new ArgumentNullException(nameof(shatterPolicy)); private readonly ILogger _logger = logger ?? throw new ArgumentNullException(nameof(logger)); private readonly CancellationToken _sessionToken = sessionToken; @@ -43,7 +45,7 @@ public async Task StartAsync() } DiscordLog.DiscordToChatObserved(_logger, entry.GetType().Name, position); - ChatEntry chat = await _transmuter.TransmuteAfferent(entry, ct).ConfigureAwait(false); + ChatEntry chat = await _afferentTransmuter.Transmute(entry, position, ct).ConfigureAwait(false); DiscordLog.DiscordToChatTransmuted(_logger, entry.GetType().Name, chat.GetType().Name); long chatPos = await _chatJournal.WriteAsync(chat, ct).ConfigureAwait(false); DiscordLog.DiscordToChatAppended(_logger, chat.GetType().Name, chatPos); @@ -103,7 +105,7 @@ public async Task StartAsync() continue; } - DiscordEntry discord = await _transmuter.TransmuteEfferent(entry, ct).ConfigureAwait(false); + DiscordEntry discord = await _efferentTransmuter.Transmute(entry, position, ct).ConfigureAwait(false); DiscordLog.ChatToDiscordTransmuted(_logger, entry.GetType().Name, discord.GetType().Name); long discPos = await _discordJournal.WriteAsync(discord, ct).ConfigureAwait(false); DiscordLog.ChatToDiscordAppended(_logger, discord.GetType().Name, discPos); diff --git a/src/Coven.Chat.Discord/DiscordChatSessionFactory.cs b/src/Coven.Chat.Discord/DiscordChatSessionFactory.cs index 877a9a1..491636b 100644 --- a/src/Coven.Chat.Discord/DiscordChatSessionFactory.cs +++ b/src/Coven.Chat.Discord/DiscordChatSessionFactory.cs @@ -11,19 +11,21 @@ internal sealed class DiscordChatSessionFactory( DiscordGatewayConnection discordGatewayConnection, IScrivener discordJournal, IScrivener chatJournal, - IBiDirectionalTransmuter transmuter, + IImbuingTransmuter afferentTransmuter, + IImbuingTransmuter efferentTransmuter, IShatterPolicy shatterPolicy, ILogger logger) { private readonly DiscordGatewayConnection _discordGatewayConnection = discordGatewayConnection ?? throw new ArgumentNullException(nameof(discordGatewayConnection)); private readonly IScrivener _discordJournal = discordJournal ?? throw new ArgumentNullException(nameof(discordJournal)); private readonly IScrivener _chatJournal = chatJournal ?? throw new ArgumentNullException(nameof(chatJournal)); - private readonly IBiDirectionalTransmuter _transmuter = transmuter ?? throw new ArgumentNullException(nameof(transmuter)); + private readonly IImbuingTransmuter _afferentTransmuter = afferentTransmuter ?? throw new ArgumentNullException(nameof(afferentTransmuter)); + private readonly IImbuingTransmuter _efferentTransmuter = efferentTransmuter ?? throw new ArgumentNullException(nameof(efferentTransmuter)); private readonly IShatterPolicy _shatterPolicy = shatterPolicy ?? throw new ArgumentNullException(nameof(shatterPolicy)); private readonly ILogger _logger = logger ?? throw new ArgumentNullException(nameof(logger)); public DiscordChatSession Create(CancellationToken sessionToken) { - return new DiscordChatSession(_discordGatewayConnection, _discordJournal, _chatJournal, _transmuter, _shatterPolicy, _logger, sessionToken); + return new DiscordChatSession(_discordGatewayConnection, _discordJournal, _chatJournal, _afferentTransmuter, _efferentTransmuter, _shatterPolicy, _logger, sessionToken); } } diff --git a/src/Coven.Chat.Discord/DiscordEntry.cs b/src/Coven.Chat.Discord/DiscordEntry.cs index bcd3541..a562891 100644 --- a/src/Coven.Chat.Discord/DiscordEntry.cs +++ b/src/Coven.Chat.Discord/DiscordEntry.cs @@ -1,5 +1,6 @@ // SPDX-License-Identifier: BUSL-1.1 +using System.Text.Json.Serialization; using Coven.Core; namespace Coven.Chat.Discord; @@ -7,25 +8,28 @@ namespace Coven.Chat.Discord; /// /// Base entry type for the Discord chat journal. /// +[JsonPolymorphic(TypeDiscriminatorPropertyName = "$type")] +[JsonDerivedType(typeof(DiscordAck), nameof(DiscordAck))] +[JsonDerivedType(typeof(DiscordAfferent), nameof(DiscordAfferent))] +[JsonDerivedType(typeof(DiscordEfferent), nameof(DiscordEfferent))] public abstract record DiscordEntry( - string Sender, - string Text + string Sender ) : Entry; /// Acknowledgement entry for internal synchronization. public sealed record DiscordAck( string Sender, - string Text -) : DiscordEntry(Sender, Text); + long Position +) : DiscordEntry(Sender); /// Incoming Discord message received from a channel or DM. public sealed record DiscordAfferent( string Sender, string Text, string MessageId, - DateTimeOffset Timestamp) : DiscordEntry(Sender, Text); + DateTimeOffset Timestamp) : DiscordEntry(Sender); /// Outgoing Discord message to be sent. public sealed record DiscordEfferent( string Sender, - string Text) : DiscordEntry(Sender, Text); + string Text) : DiscordEntry(Sender); diff --git a/src/Coven.Chat.Discord/DiscordGatewayConnection.cs b/src/Coven.Chat.Discord/DiscordGatewayConnection.cs index e2fd73f..697d731 100644 --- a/src/Coven.Chat.Discord/DiscordGatewayConnection.cs +++ b/src/Coven.Chat.Discord/DiscordGatewayConnection.cs @@ -9,6 +9,10 @@ namespace Coven.Chat.Discord; +/// +/// Manages Discord connectivity and bridges inbound/outbound messages to a Discord journal. +/// Bot-authored messages are ignored (no ACK is written here); position-based ACKs are emitted later by the session/transmuter pipeline. +/// internal sealed class DiscordGatewayConnection( DiscordClientConfig configuration, DiscordSocketClient socketClient, @@ -121,14 +125,12 @@ private async Task OnMessageReceivedAsync(SocketMessage message) // Determine the sender identity. For Discord.Net, Author should be present on normal messages; string sender = message.Author.Username; - // Bots don't get incoming, they only get ACK + // Ignore bot-authored messages to prevent loops; session will generate position-based ACKs + // when it observes our own DiscordEfferent entries via the journal. if (message.Author.IsBot) { string text = message.Content ?? string.Empty; DiscordLog.InboundBotMessageObserved(_logger, sender, text.Length); - DiscordAck ack = new(sender, text); - long pos = await _scrivener.WriteAsync(ack).ConfigureAwait(false); - DiscordLog.InboundAppendedToJournal(_logger, nameof(DiscordAck), pos); return; } diff --git a/src/Coven.Chat.Discord/DiscordScrivener.cs b/src/Coven.Chat.Discord/DiscordScrivener.cs index ab8aec3..1af42e6 100644 --- a/src/Coven.Chat.Discord/DiscordScrivener.cs +++ b/src/Coven.Chat.Discord/DiscordScrivener.cs @@ -38,9 +38,9 @@ public DiscordScrivener([FromKeyedServices("Coven.InternalDiscordScrivener")] IS public override async Task WriteAsync(DiscordEntry entry, CancellationToken cancellationToken = default) { // Only send actual outbound messages to Discord. ACKs and inbound entries must not be sent. - if (entry is DiscordEfferent) + if (entry is DiscordEfferent efferent) { - await _discordClient.SendAsync(entry.Text, cancellationToken).ConfigureAwait(false); + await _discordClient.SendAsync(efferent.Text, cancellationToken).ConfigureAwait(false); } // Always persist to the underlying scrivener so pumps and tests can observe ordering. diff --git a/src/Coven.Chat.Discord/DiscordTransmuter.cs b/src/Coven.Chat.Discord/DiscordTransmuter.cs index 4ec33d0..45e3b36 100644 --- a/src/Coven.Chat.Discord/DiscordTransmuter.cs +++ b/src/Coven.Chat.Discord/DiscordTransmuter.cs @@ -5,25 +5,28 @@ namespace Coven.Chat.Discord; /// -/// Maps between Discord-specific entries and generic Chat entries. +/// Maps between Discord-specific entries and generic Chat entries using position-imbued ACKs. /// Afferent: Discord → Chat; Efferent: Chat → Discord. /// -public class DiscordTransmuter : IBiDirectionalTransmuter +public class DiscordTransmuter : IImbuingTransmuter, IImbuingTransmuter { /// /// Transmutes Discord-afferent entries into chat entries. /// /// The source Discord entry. + /// The source journal position used for position-based acknowledgements. /// A cancellation token. /// The mapped . - public Task TransmuteAfferent(DiscordEntry Input, CancellationToken cancellationToken) + // Discord → Chat (afferent) + public Task Transmute(DiscordEntry Input, long Reagent, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); return Input switch { DiscordAfferent incoming => Task.FromResult(new ChatAfferent(incoming.Sender, incoming.Text)), - DiscordEfferent outgoing => Task.FromResult(new ChatAck(outgoing.Sender, outgoing.Text)), + DiscordEfferent outgoing => Task.FromResult(new ChatAck(outgoing.Sender, Reagent)), + DiscordAck => Task.FromResult(new ChatAck(Input.Sender, Reagent)), _ => throw new ArgumentOutOfRangeException(nameof(Input)) }; } @@ -31,24 +34,26 @@ public Task TransmuteAfferent(DiscordEntry Input, CancellationToken c /// /// Transmutes chat-efferent entries into Discord entries. /// - /// The source chat entry. + /// The source chat entry. + /// The source journal position used for position-based acknowledgements. /// A cancellation token. /// The mapped . - public Task TransmuteEfferent(ChatEntry Output, CancellationToken cancellationToken) + // Chat → Discord (efferent) + public Task Transmute(ChatEntry Input, long Reagent, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); - return Output switch + return Input switch { ChatEfferent outgoing => Task.FromResult(new DiscordEfferent(outgoing.Sender, outgoing.Text)), // Internal/unfixed artifacts: acknowledge only to prevent loops - ChatEfferentDraft draft => Task.FromResult(new DiscordAck(draft.Sender, draft.Text)), - ChatChunk chunk => Task.FromResult(new DiscordAck(chunk.Sender, chunk.Text)), - ChatStreamCompleted done => Task.FromResult(new DiscordAck(done.Sender, string.Empty)), - ChatAfferent incoming => Task.FromResult(new DiscordAck(incoming.Sender, incoming.Text)), - ChatAfferentDraft incomingDraft => Task.FromResult(new DiscordAck(incomingDraft.Sender, incomingDraft.Text)), - _ => throw new ArgumentOutOfRangeException(nameof(Output)) + ChatEfferentDraft draft => Task.FromResult(new DiscordAck(draft.Sender, Reagent)), + ChatChunk chunk => Task.FromResult(new DiscordAck(chunk.Sender, Reagent)), + ChatStreamCompleted done => Task.FromResult(new DiscordAck(done.Sender, Reagent)), + ChatAfferent incoming => Task.FromResult(new DiscordAck(incoming.Sender, Reagent)), + ChatAfferentDraft incomingDraft => Task.FromResult(new DiscordAck(incomingDraft.Sender, Reagent)), + _ => throw new ArgumentOutOfRangeException(nameof(Input)) }; } } diff --git a/src/Coven.Chat/ChatEntry.cs b/src/Coven.Chat/ChatEntry.cs index d0efd29..86fd285 100644 --- a/src/Coven.Chat/ChatEntry.cs +++ b/src/Coven.Chat/ChatEntry.cs @@ -1,5 +1,6 @@ // SPDX-License-Identifier: BUSL-1.1 +using System.Text.Json.Serialization; using Coven.Core; namespace Coven.Chat; @@ -7,6 +8,14 @@ namespace Coven.Chat; /// /// Base entry type for chat journals (incoming/outgoing messages, acks, and streaming drafts/chunks). /// +[JsonPolymorphic(TypeDiscriminatorPropertyName = "$type")] +[JsonDerivedType(typeof(ChatEfferent), nameof(ChatEfferent))] +[JsonDerivedType(typeof(ChatAfferent), nameof(ChatAfferent))] +[JsonDerivedType(typeof(ChatAck), nameof(ChatAck))] +[JsonDerivedType(typeof(ChatEfferentDraft), nameof(ChatEfferentDraft))] +[JsonDerivedType(typeof(ChatAfferentDraft), nameof(ChatAfferentDraft))] +[JsonDerivedType(typeof(ChatChunk), nameof(ChatChunk))] +[JsonDerivedType(typeof(ChatStreamCompleted), nameof(ChatStreamCompleted))] public abstract record ChatEntry(string Sender) : Entry; /// @@ -20,8 +29,8 @@ public sealed record ChatEfferent(string Sender, string Text) : ChatEntry(Sender /// Incoming chat message from users or external sources. public sealed record ChatAfferent(string Sender, string Text) : ChatEntry(Sender); -/// Local acknowledgement to avoid feedback loops between journals. -public sealed record ChatAck(string Sender, string Text) : ChatEntry(Sender); +/// Local acknowledgement to avoid feedback loops between journals. Carries the position being acknowledged. +public sealed record ChatAck(string Sender, long Position) : ChatEntry(Sender); // Streaming additions /// Outgoing draft message prior to finalization. diff --git a/src/Coven.Core.Debug/README.md b/src/Coven.Core.Debug/README.md deleted file mode 100644 index 5f923dc..0000000 --- a/src/Coven.Core.Debug/README.md +++ /dev/null @@ -1,25 +0,0 @@ -# Coven.Core.Debug — Scrivener Taps - -Minimal, zero‑ceremony way to observe journal writes without changing any branch/leaf code: wrap an existing `IScrivener` with a tiny decorator that calls an observer delegate after each write. - -## Goal -- Add an observer delegate to an existing scrivener. -- Do not change `IScrivener`. -- Do not write back to the observed journal. -- Preserve normal tail/read behavior and positions. - -## Concept -- `TappedScrivener` has an inner `IScrivener`. -- On `WriteAsync(entry)`, it awaits the inner write to get the assigned `position`, then invokes an observer delegate: `Action`. -- All read APIs (`TailAsync`, `ReadBackwardAsync`, `WaitForAsync`) simply delegate to the inner scrivener. - -## Behavior -- Observation happens after the inner write completes, using the actual assigned position. -- Observer exceptions are ignored (best‑effort, non‑interfering). -- No writes back to the observed journal; tap is read‑only aside from the delegated call. -- Overhead is a single delegate invocation per write. - -## Notes -- Keep observers cheap; they run inline after writes. If you need heavy work, queue it yourself inside the delegate. -- Because it wraps a specific IScrivener instance, it only observes entries written through that instance. -- Because we need to register the TappedScrivener as the final implementation for IScrivener we will need some way to disambiguate them. diff --git a/src/Coven.Core/README.md b/src/Coven.Core/README.md index af28c26..a0c6197 100644 --- a/src/Coven.Core/README.md +++ b/src/Coven.Core/README.md @@ -54,4 +54,5 @@ await coven.Ritual(new Empty()); ## See Also - Architecture: Journaling and Scriveners; Windowing and Shattering. +- Persistence: `/src/Coven.Scriveners.FileScrivener/` (file‑backed snapshots for journals). - Samples: `src/samples/01.DiscordAgent` for end‑to‑end orchestration. diff --git a/src/Coven.Daemonology/DaemonEvent.cs b/src/Coven.Daemonology/DaemonEvent.cs index eb86f96..72dd5f2 100644 --- a/src/Coven.Daemonology/DaemonEvent.cs +++ b/src/Coven.Daemonology/DaemonEvent.cs @@ -1,10 +1,17 @@ // SPDX-License-Identifier: BUSL-1.1 +using System.Text.Json.Serialization; + namespace Coven.Daemonology; /// /// Base record for daemon lifecycle events emitted to journals. /// +[ + JsonPolymorphic(TypeDiscriminatorPropertyName = "$type"), + JsonDerivedType(typeof(StatusChanged), nameof(StatusChanged)), + JsonDerivedType(typeof(FailureOccurred), nameof(FailureOccurred)) +] public abstract record DaemonEvent; internal sealed record StatusChanged(Status NewStatus) : DaemonEvent; diff --git a/src/Coven.Scriveners.FileScrivener/CountThresholdFlushPredicate.cs b/src/Coven.Scriveners.FileScrivener/CountThresholdFlushPredicate.cs new file mode 100644 index 0000000..64fba45 --- /dev/null +++ b/src/Coven.Scriveners.FileScrivener/CountThresholdFlushPredicate.cs @@ -0,0 +1,30 @@ +// SPDX-License-Identifier: BUSL-1.1 + +namespace Coven.Scriveners.FileScrivener; + +/// +/// Flush predicate that triggers when the in-memory snapshot reaches a minimum entry count. +/// Useful as a simple batching policy for throughput. +/// +/// The journal entry type. +public sealed class CountThresholdFlushPredicate : IFlushPredicate +{ + private readonly int _threshold; + + /// + /// Creates a predicate with the specified minimum snapshot size. + /// + /// Minimum number of entries required to flush. Must be >= 1. + public CountThresholdFlushPredicate(int threshold) + { + ArgumentOutOfRangeException.ThrowIfLessThan(threshold, 1); + _threshold = threshold; + } + + /// + /// Returns true when meets or exceeds the configured threshold. + /// + /// The current in-memory snapshot buffer. + public bool ShouldFlush(IReadOnlyList<(long position, TEntry entry)> snapshot) + => snapshot.Count >= _threshold; +} diff --git a/src/Coven.Core.Debug/Coven.Core.Debug.csproj b/src/Coven.Scriveners.FileScrivener/Coven.Scriveners.FileScrivener.csproj similarity index 52% rename from src/Coven.Core.Debug/Coven.Core.Debug.csproj rename to src/Coven.Scriveners.FileScrivener/Coven.Scriveners.FileScrivener.csproj index 2fcd78b..8295a96 100644 --- a/src/Coven.Core.Debug/Coven.Core.Debug.csproj +++ b/src/Coven.Scriveners.FileScrivener/Coven.Scriveners.FileScrivener.csproj @@ -4,16 +4,13 @@ true true $(MSBuildProjectName) - Diagnostics helpers for Coven.Core (e.g., scrivener dump finalizer). + File-backed scrivener with snapshot flushing daemon. Coven - coven;debug;diagnostics;journal;dump + coven;journal;scrivener;file;persistence;daemon - - - - + diff --git a/src/Coven.Scriveners.FileScrivener/FileAppendFlushSink.cs b/src/Coven.Scriveners.FileScrivener/FileAppendFlushSink.cs new file mode 100644 index 0000000..f8bfe67 --- /dev/null +++ b/src/Coven.Scriveners.FileScrivener/FileAppendFlushSink.cs @@ -0,0 +1,45 @@ +// SPDX-License-Identifier: BUSL-1.1 + +using System.Text; + +namespace Coven.Scriveners.FileScrivener; + +/// +/// Append-only file sink for flushed snapshots. +/// Each record is serialized as a single JSON line using the configured serializer. +/// Thread-safe for a single writer; designed to be used by the flusher consumer loop. +/// +public sealed class FileAppendFlushSink(string path, IEntrySerializer serializer) : IFlushSink +{ + private readonly string _path = path ?? throw new ArgumentNullException(nameof(path)); + private readonly IEntrySerializer _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); + + /// + public async Task AppendSnapshotAsync(IReadOnlyList<(long position, TEntry entry)> snapshot, CancellationToken cancellationToken = default) + { + if (snapshot.Count == 0) + { + return; + } + + string? dir = Path.GetDirectoryName(_path); + if (!string.IsNullOrEmpty(dir) && !Directory.Exists(dir)) + { + Directory.CreateDirectory(dir); + } + + StringBuilder sb = new(); + for (int i = 0; i < snapshot.Count; i++) + { + (long pos, TEntry entry) = snapshot[i]; + string line = _serializer.Serialize(pos, entry); + sb.Append(line).Append('\n'); + } + + byte[] bytes = Encoding.UTF8.GetBytes(sb.ToString()); + + using FileStream fs = new(_path, FileMode.Append, FileAccess.Write, FileShare.Read, 4096, useAsync: true); + await fs.WriteAsync(bytes.AsMemory(), cancellationToken).ConfigureAwait(false); + await fs.FlushAsync(cancellationToken).ConfigureAwait(false); + } +} diff --git a/src/Coven.Scriveners.FileScrivener/FileScrivener.cs b/src/Coven.Scriveners.FileScrivener/FileScrivener.cs new file mode 100644 index 0000000..e4807ea --- /dev/null +++ b/src/Coven.Scriveners.FileScrivener/FileScrivener.cs @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: BUSL-1.1 + +using Coven.Core; + +namespace Coven.Scriveners.FileScrivener; + +/// +/// File-backed scrivener that delegates in-memory semantics to an internal , +/// enabling snapshot-based flushing by a companion daemon. This type itself does not perform I/O. +/// +/// The journal entry type. +public sealed class FileScrivener(IScrivener inner) : IScrivener where TEntry : notnull +{ + private readonly IScrivener _inner = inner ?? throw new ArgumentNullException(nameof(inner)); + + /// + public Task WriteAsync(TEntry entry, CancellationToken cancellationToken = default) + => _inner.WriteAsync(entry, cancellationToken); + + /// + public IAsyncEnumerable<(long journalPosition, TEntry entry)> TailAsync(long afterPosition = 0, CancellationToken cancellationToken = default) + => _inner.TailAsync(afterPosition, cancellationToken); + + /// + public IAsyncEnumerable<(long journalPosition, TEntry entry)> ReadBackwardAsync(long beforePosition = long.MaxValue, CancellationToken cancellationToken = default) + => _inner.ReadBackwardAsync(beforePosition, cancellationToken); + + /// + public Task<(long journalPosition, TEntry entry)> WaitForAsync(long afterPosition, Func match, CancellationToken cancellationToken = default) + => _inner.WaitForAsync(afterPosition, match, cancellationToken); + + /// + public Task<(long journalPosition, TDerived entry)> WaitForAsync(long afterPosition, Func match, CancellationToken cancellationToken = default) where TDerived : TEntry + => _inner.WaitForAsync(afterPosition, match, cancellationToken); +} diff --git a/src/Coven.Scriveners.FileScrivener/FileScrivenerSchema.cs b/src/Coven.Scriveners.FileScrivener/FileScrivenerSchema.cs new file mode 100644 index 0000000..aafc9c0 --- /dev/null +++ b/src/Coven.Scriveners.FileScrivener/FileScrivenerSchema.cs @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: BUSL-1.1 + +namespace Coven.Scriveners.FileScrivener; + +/// +/// Schema/version constants for FileScrivener on-disk format. +/// +internal static class FileScrivenerSchema +{ + /// + /// A free-form schema version string written to each line. + /// Change this when making incompatible on-disk format changes. + /// + public const string CurrentVersion = "1"; +} diff --git a/src/Coven.Scriveners.FileScrivener/FlusherDaemon.cs b/src/Coven.Scriveners.FileScrivener/FlusherDaemon.cs new file mode 100644 index 0000000..7e30ea0 --- /dev/null +++ b/src/Coven.Scriveners.FileScrivener/FlusherDaemon.cs @@ -0,0 +1,219 @@ +// SPDX-License-Identifier: BUSL-1.1 + +using System.Threading.Channels; +using Coven.Core; +using Coven.Daemonology; +using Microsoft.Extensions.Logging; + +namespace Coven.Scriveners.FileScrivener; + +/// +/// Tails a FileScrivener and flushes snapshots to a sink when a predicate is met. +/// Implements a producer (tail→snapshot→enqueue) and consumer (dequeue→append) pair with buffer reuse (swap mechanics). +/// +internal sealed class FlusherDaemon( + IScrivener scrivener, + IFlushSink sink, + IFlushPredicate predicate, + FileScrivenerConfig config, + ILogger> logger, + IScrivener daemonJournal) + : ContractDaemon(daemonJournal), IAsyncDisposable where TEntry : notnull +{ + private readonly IScrivener _scrivener = scrivener ?? throw new ArgumentNullException(nameof(scrivener)); + private readonly IFlushSink _sink = sink ?? throw new ArgumentNullException(nameof(sink)); + private readonly IFlushPredicate _predicate = predicate ?? throw new ArgumentNullException(nameof(predicate)); + private readonly FileScrivenerConfig _config = config ?? throw new ArgumentNullException(nameof(config)); + private readonly ILogger> _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + // Producer-owned active buffer. Only the producer task appends to the list. + // Reference swaps are done atomically to avoid duplication during shutdown races. + private List<(long position, TEntry entry)> _activeSnapshot = []; + + private Channel> _flushQueue = Channel.CreateUnbounded>(); + private Channel> _pool = Channel.CreateUnbounded>(); + private CancellationTokenSource? _cts; + private Task? _producer; + private Task? _consumer; + + public override async Task Start(CancellationToken cancellationToken) + { + _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + CancellationToken ct = _cts.Token; + + // Flush queue (bounded) + _flushQueue = Channel.CreateBounded>(new BoundedChannelOptions(_config.FlushQueueCapacity) + { + SingleReader = true, + SingleWriter = true, + FullMode = BoundedChannelFullMode.Wait + }); + + // Buffer pool (bounded). Slightly larger than flush queue to always have an active buffer. + int poolCapacity = _config.FlushQueueCapacity + 2; + _pool = Channel.CreateBounded>(new BoundedChannelOptions(poolCapacity) + { + SingleReader = false, + SingleWriter = false, + FullMode = BoundedChannelFullMode.Wait + }); + // Pre-fill the pool to avoid allocations in steady state. Capacity is tuned for amortized growth. + for (int i = 0; i < poolCapacity; i++) + { + _pool.Writer.TryWrite(new List<(long position, TEntry entry)>(capacity: 128)); + } + + // Rent initial active buffer + if (!_pool.Reader.TryRead(out _activeSnapshot!)) + { + _activeSnapshot = new List<(long position, TEntry entry)>(capacity: 128); + } + + _producer = Task.Run(() => ProducerAsync(ct), ct); + _consumer = Task.Run(() => ConsumerAsync(ct), ct); + + await Transition(Status.Running, cancellationToken).ConfigureAwait(false); + } + + public override async Task Shutdown(CancellationToken cancellationToken) + { + _cts?.Cancel(); + try + { + if (_producer is not null && _consumer is not null) + { + try + { + await Task.WhenAll(_producer, _consumer).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // cooperative shutdown + } + } + } + finally + { + await Transition(Status.Completed, cancellationToken).ConfigureAwait(false); + } + } + + private async Task> RentBufferAsync(CancellationToken ct) + { + return _pool.Reader.TryRead(out List<(long position, TEntry entry)>? buf) + ? buf + : await _pool.Reader.ReadAsync(ct).ConfigureAwait(false); + } + + private async Task ProducerAsync(CancellationToken ct) + { + try + { + await foreach ((long pos, TEntry entry) in _scrivener.TailAsync(0, ct)) + { + // Append to current snapshot. + // Expected state: _activeSnapshot is exclusively owned by producer, safe to append without locking. + _activeSnapshot.Add((pos, entry)); + + if (_predicate.ShouldFlush(_activeSnapshot)) + { + // Rent a fresh buffer, then atomically swap references to avoid any window + // where shutdown could observe the pre-swap active list. + List<(long position, TEntry entry)> fresh = await RentBufferAsync(ct).ConfigureAwait(false); + fresh.Clear(); + + // Re-check after allocation to avoid swapping on stale predicate truth. + if (_predicate.ShouldFlush(_activeSnapshot)) + { + List<(long position, TEntry entry)> toFlush = Interlocked.Exchange(ref _activeSnapshot, fresh); + FlusherLog.SnapshotFlushTriggered(_logger, toFlush.Count); + + await _flushQueue.Writer.WriteAsync(toFlush, ct).ConfigureAwait(false); + FlusherLog.SnapshotEnqueued(_logger, toFlush.Count); + } + else + { + // Predicate flipped; return the unused fresh buffer to the pool. + await _pool.Writer.WriteAsync(fresh, ct).ConfigureAwait(false); + } + } + } + } + catch (OperationCanceledException) + { + FlusherLog.ProducerCanceled(_logger); + } + catch (Exception ex) + { + FlusherLog.ProducerFailed(_logger, ex); + await Fail(ex, ct).ConfigureAwait(false); + } + finally + { + // On completion/cancel, push any remaining snapshot. Atomically detach the active buffer + // to avoid any chance of double-enqueue with an in-flight swap. + List<(long position, TEntry entry)> remainder = Interlocked.Exchange(ref _activeSnapshot, new List<(long position, TEntry entry)>(capacity: 128)); + if (remainder.Count > 0) + { + try + { + _flushQueue.Writer.TryWrite(remainder); + FlusherLog.ShutdownRemainderEnqueued(_logger, remainder.Count); + } + catch + { + // ignore on shutdown + } + } + _flushQueue.Writer.TryComplete(); + } + } + + private async Task ConsumerAsync(CancellationToken ct) + { + try + { + ChannelReader> reader = _flushQueue.Reader; + while (await reader.WaitToReadAsync(ct).ConfigureAwait(false)) + { + while (reader.TryRead(out List<(long position, TEntry entry)>? batch)) + { + // Persist ordered batch to sink. + await _sink.AppendSnapshotAsync(batch, ct).ConfigureAwait(false); + FlusherLog.SnapshotAppended(_logger, batch.Count); + // Return buffer to pool for reuse by producer. + batch.Clear(); + await _pool.Writer.WriteAsync(batch, ct).ConfigureAwait(false); + } + } + } + catch (OperationCanceledException) + { + FlusherLog.ConsumerCanceled(_logger); + } + catch (Exception ex) + { + FlusherLog.ConsumerFailed(_logger, ex); + await Fail(ex, ct).ConfigureAwait(false); + } + } + + public async ValueTask DisposeAsync() + { + try + { + if (Status != Status.Completed) + { + await Shutdown(CancellationToken.None).ConfigureAwait(false); + } + } + finally + { + _cts?.Dispose(); + _producer = null; + _consumer = null; + GC.SuppressFinalize(this); + } + return; + } +} diff --git a/src/Coven.Scriveners.FileScrivener/FlusherLog.cs b/src/Coven.Scriveners.FileScrivener/FlusherLog.cs new file mode 100644 index 0000000..0dbf8d0 --- /dev/null +++ b/src/Coven.Scriveners.FileScrivener/FlusherLog.cs @@ -0,0 +1,67 @@ +// SPDX-License-Identifier: BUSL-1.1 + +using Microsoft.Extensions.Logging; + +namespace Coven.Scriveners.FileScrivener; + +internal static class FlusherLog +{ + private static readonly Action _producerCanceled = + LoggerMessage.Define( + LogLevel.Information, + new EventId(4000, nameof(ProducerCanceled)), + "Flusher producer canceled."); + + private static readonly Action _producerFailed = + LoggerMessage.Define( + LogLevel.Error, + new EventId(4001, nameof(ProducerFailed)), + "Flusher producer failed."); + + private static readonly Action _consumerCanceled = + LoggerMessage.Define( + LogLevel.Information, + new EventId(4002, nameof(ConsumerCanceled)), + "Flusher consumer canceled."); + + private static readonly Action _consumerFailed = + LoggerMessage.Define( + LogLevel.Error, + new EventId(4003, nameof(ConsumerFailed)), + "Flusher consumer failed."); + + internal static void ProducerCanceled(ILogger logger) => _producerCanceled(logger, null); + internal static void ProducerFailed(ILogger logger, Exception ex) => _producerFailed(logger, ex); + internal static void ConsumerCanceled(ILogger logger) => _consumerCanceled(logger, null); + internal static void ConsumerFailed(ILogger logger, Exception ex) => _consumerFailed(logger, ex); + + // Info breadcrumbs for normal operation + private static readonly Action _snapshotFlushTriggered = + LoggerMessage.Define( + LogLevel.Information, + new EventId(4010, nameof(SnapshotFlushTriggered)), + "Flusher predicate met; capturing snapshot of {Count} entries."); + + private static readonly Action _snapshotEnqueued = + LoggerMessage.Define( + LogLevel.Debug, + new EventId(4011, nameof(SnapshotEnqueued)), + "Flusher enqueued snapshot of {Count} entries."); + + private static readonly Action _snapshotAppended = + LoggerMessage.Define( + LogLevel.Information, + new EventId(4012, nameof(SnapshotAppended)), + "Flusher appended snapshot of {Count} entries."); + + private static readonly Action _shutdownRemainderEnqueued = + LoggerMessage.Define( + LogLevel.Information, + new EventId(4013, nameof(ShutdownRemainderEnqueued)), + "Flusher shutdown: enqueued remainder of {Count} entries."); + + internal static void SnapshotFlushTriggered(ILogger logger, int count) => _snapshotFlushTriggered(logger, count, null); + internal static void SnapshotEnqueued(ILogger logger, int count) => _snapshotEnqueued(logger, count, null); + internal static void SnapshotAppended(ILogger logger, int count) => _snapshotAppended(logger, count, null); + internal static void ShutdownRemainderEnqueued(ILogger logger, int count) => _shutdownRemainderEnqueued(logger, count, null); +} diff --git a/src/Coven.Scriveners.FileScrivener/FlusherOptions.cs b/src/Coven.Scriveners.FileScrivener/FlusherOptions.cs new file mode 100644 index 0000000..996ab89 --- /dev/null +++ b/src/Coven.Scriveners.FileScrivener/FlusherOptions.cs @@ -0,0 +1,24 @@ +// SPDX-License-Identifier: BUSL-1.1 + +namespace Coven.Scriveners.FileScrivener; + +/// +/// Configuration for the file-backed scrivener and flusher daemon. +/// +public sealed class FileScrivenerConfig +{ + /// + /// Destination file path for appended snapshots. Directories are created if necessary. + /// + public required string FilePath { get; init; } + + /// + /// Default threshold for snapshot size when no predicate is supplied. + /// + public int FlushThreshold { get; init; } = 100; + + /// + /// Capacity of the ring buffer (bounded channel) for flushing snapshots. + /// + public int FlushQueueCapacity { get; init; } = 8; +} diff --git a/src/Coven.Scriveners.FileScrivener/IEntrySerializer.cs b/src/Coven.Scriveners.FileScrivener/IEntrySerializer.cs new file mode 100644 index 0000000..586176e --- /dev/null +++ b/src/Coven.Scriveners.FileScrivener/IEntrySerializer.cs @@ -0,0 +1,19 @@ +// SPDX-License-Identifier: BUSL-1.1 + +namespace Coven.Scriveners.FileScrivener; + +/// +/// Serializes a journal entry (with its assigned position) into a persistable string representation. +/// Implementations should produce a single-line format when used with newline-delimited sinks. +/// +/// The journal entry type. +public interface IEntrySerializer +{ + /// + /// Serialize the provided entry and position into a string suitable for persistence. + /// + /// The assigned journal position for the entry. + /// The entry payload. + /// Serialized string representation. + string Serialize(long position, TEntry entry); +} diff --git a/src/Coven.Scriveners.FileScrivener/IFlushPredicate.cs b/src/Coven.Scriveners.FileScrivener/IFlushPredicate.cs new file mode 100644 index 0000000..3047489 --- /dev/null +++ b/src/Coven.Scriveners.FileScrivener/IFlushPredicate.cs @@ -0,0 +1,16 @@ +// SPDX-License-Identifier: BUSL-1.1 + +namespace Coven.Scriveners.FileScrivener; + +/// +/// Determines when the in-memory snapshot should be flushed to persistent storage. +/// +/// The journal entry type. +public interface IFlushPredicate +{ + /// + /// Returns true when the provided snapshot meets the flush criteria. + /// + /// Current in-memory snapshot buffer. + bool ShouldFlush(IReadOnlyList<(long position, TEntry entry)> snapshot); +} diff --git a/src/Coven.Scriveners.FileScrivener/IFlushSink.cs b/src/Coven.Scriveners.FileScrivener/IFlushSink.cs new file mode 100644 index 0000000..8cd96c1 --- /dev/null +++ b/src/Coven.Scriveners.FileScrivener/IFlushSink.cs @@ -0,0 +1,18 @@ +// SPDX-License-Identifier: BUSL-1.1 + +namespace Coven.Scriveners.FileScrivener; + +/// +/// Destination for flushed snapshots emitted by the flusher consumer. +/// Implementations should persist entries in order and atomically per snapshot. +/// +/// The journal entry type. +public interface IFlushSink +{ + /// + /// Persist the ordered snapshot entries. + /// + /// Ordered list of (position, entry) pairs. + /// Cooperative cancellation token. + Task AppendSnapshotAsync(IReadOnlyList<(long position, TEntry entry)> snapshot, CancellationToken cancellationToken = default); +} diff --git a/src/Coven.Scriveners.FileScrivener/JsonEntrySerializer.cs b/src/Coven.Scriveners.FileScrivener/JsonEntrySerializer.cs new file mode 100644 index 0000000..63e7fc3 --- /dev/null +++ b/src/Coven.Scriveners.FileScrivener/JsonEntrySerializer.cs @@ -0,0 +1,21 @@ +// SPDX-License-Identifier: BUSL-1.1 + +using System.Text.Json; + +namespace Coven.Scriveners.FileScrivener; + +/// +/// JSON serializer producing an envelope of { Position, Entry } using System.Text.Json (web defaults). +/// The Entry is serialized polymorphically using its runtime type so derived properties are preserved. +/// Intended for newline-delimited output when paired with a file sink. +/// +public sealed class JsonEntrySerializer : IEntrySerializer +{ + private static readonly JsonSerializerOptions _options = new(JsonSerializerDefaults.Web); + + private sealed record Envelope(string SchemaVersion, long Position, TEntry Entry); + + /// + public string Serialize(long position, TEntry entry) + => JsonSerializer.Serialize(new Envelope(FileScrivenerSchema.CurrentVersion, position, entry), _options); +} diff --git a/src/Coven.Scriveners.FileScrivener/README.md b/src/Coven.Scriveners.FileScrivener/README.md new file mode 100644 index 0000000..622175d --- /dev/null +++ b/src/Coven.Scriveners.FileScrivener/README.md @@ -0,0 +1,155 @@ +# Coven.Scriveners.FileScrivener + +File-backed `IScrivener` with a snapshot flusher daemon. In‑process journaling remains fully in‑memory and replayable; a background daemon tails the journal and appends newline‑delimited snapshots to disk. + +## What It Provides + +- `FileScrivener`: a wrapper over `InMemoryScrivener` that performs no I/O and exposes the standard `IScrivener` API. +- `FlusherDaemon`: tails the journal, accumulates an ordered snapshot of `(position, entry)` pairs, and flushes to a sink when a predicate is met; flushes any remainder on shutdown. +- Defaults: JSON serializer (`{ schemaVersion: string, position, entry }` per line), append‑only file sink, and a count‑based flush predicate. + +## Install / DI + +```csharp +using Coven.Core; +using Coven.Daemonology; +using Coven.Scriveners.FileScrivener; +using Microsoft.Extensions.DependencyInjection; + +services.AddFileScrivener(new FileScrivenerConfig +{ + FilePath = "./journal.ndjson", // destination file (directories auto-created) + FlushThreshold = 100, // default predicate: flush every 100 entries + FlushQueueCapacity = 8 // bounded ring buffer for snapshots +}); + +// FlusherDaemon is registered as a ContractDaemon — start it in your block. +``` + +What `AddFileScrivener` registers: +- `IScrivener` → `FileScrivener` backed by a keyed `InMemoryScrivener`. +- `IEntrySerializer` → `JsonEntrySerializer` (TryAdd; overrideable). +- `IFlushSink` → `FileAppendFlushSink` (TryAdd; overrideable). +- `IFlushPredicate` → `CountThresholdFlushPredicate` using `FlushThreshold` (TryAdd; overrideable). +- `IScrivener` → `InMemoryScrivener` for daemon status events. +- `ContractDaemon` → `FlusherDaemon`. + +## How It Works + +Producer (tail): +- Tails `_scrivener.TailAsync(0, ct)` and appends `(position, entry)` pairs to a producer‑owned `_activeSnapshot` list. +- When `IFlushPredicate.ShouldFlush(snapshot)` returns `true`, it swaps the active buffer with a fresh one (rented from a bounded pool) and enqueues the full snapshot to a bounded flush queue. +- On cancellation/completion, enqueues any remaining snapshot and completes the queue. + +Consumer (flush): +- Single reader of the bounded flush queue; for each snapshot batch: + - Serializes each entry line via `IEntrySerializer` and appends to file (`FileAppendFlushSink` uses UTF‑8 NDJSON, `FileShare.Read`). + - Clears the list and returns it to the pool for reuse. + +Threading & safety: +- Single producer/consumer tasks; snapshots are persisted atomically per batch in arrival order. +- Directories are created if needed. File writes are append‑only and flushed. +- Remaining in‑memory data is flushed on shutdown. Failures are surfaced via the daemon’s status journal. + +Important: File persistence is append‑only; reading/recovery from disk is not implemented — replay comes from the in‑memory scrivener for the current process. If you need recovery, implement a startup loader that hydrates the inner scrivener from the file. + +Compatibility: The on‑disk format intentionally does not commit to backward/forward compatibility. The file scrivener makes no promises about reading any version other than the one it wrote. Each line includes a `schemaVersion` field to enable readers to detect and reject incompatible data. + +## Start the Daemon + +Start `ContractDaemon`s from your block (pattern used across the repo): + +```csharp +using Coven.Core; +using Coven.Daemonology; + +internal sealed class StartDaemonsBlock(IEnumerable daemons) : IMagikBlock +{ + private readonly IEnumerable _daemons = daemons ?? throw new ArgumentNullException(nameof(daemons)); + + public async Task DoMagik(Empty input, CancellationToken cancellationToken = default) + { + foreach (ContractDaemon d in _daemons) + { + await d.Start(cancellationToken); + // Optionally: await d.WaitFor(Status.Running, cancellationToken); + } + return input; + } +} +``` + +## Customization + +- Serializer (line format): + - Default is `JsonEntrySerializer` producing `{ schemaVersion: string, position, entry }` per line. + - The `entry` object is serialized polymorphically with System.Text.Json using type discriminators (annotated on entry base types). + - Replace by registering your own: + ```csharp + services.AddScoped, MyCustomSerializer>(); + ``` + +- Flush predicate (when to persist): + - Default: `CountThresholdFlushPredicate` using `FlushThreshold`. + - Provide time‑based or size‑based logic: + ```csharp + public sealed class TimeOrCountPredicate : IFlushPredicate + { + private readonly int _count; + private readonly TimeProvider _clock; + private DateTimeOffset _last = DateTimeOffset.UtcNow; + public TimeOrCountPredicate(int count, TimeProvider? clock = null) + { _count = count; _clock = clock ?? TimeProvider.System; } + public bool ShouldFlush(IReadOnlyList<(long position, T entry)> snapshot) + { + if (snapshot.Count >= _count) { _last = _clock.GetUtcNow(); return true; } + if (_clock.GetUtcNow() - _last >= TimeSpan.FromSeconds(2)) { _last = _clock.GetUtcNow(); return snapshot.Count > 0; } + return false; + } + } + services.AddScoped>(_ => new TimeOrCountPredicate(50)); + ``` + +- Sink (where to persist): + - Default: `FileAppendFlushSink` with async append and `FileShare.Read`. + - Swap for rotation/cloud/etc.: + ```csharp + services.AddScoped, RotatingFileSink>(); + ``` + +Note: Defaults are registered with `TryAdd`. You can register your own serializer/predicate/sink before or after `AddFileScrivener`; the last registration wins for single service resolution. + +## Configuration + +`FileScrivenerConfig`: +- `FilePath`: destination file path (directories auto‑created). +- `FlushThreshold`: default count predicate threshold (>= 1, default 100). +- `FlushQueueCapacity`: bounded snapshot queue capacity (default 8). Pool size is `FlushQueueCapacity + 2`. + +## Example End‑to‑End + +```csharp +// 1) Register journaling for a specific entry type +services.AddFileScrivener(new FileScrivenerConfig +{ + FilePath = "./data/my-entry.ndjson", + FlushThreshold = 200, + FlushQueueCapacity = 16 +}); + +// 2) Start daemons from your ritual’s first block (see above) + +// 3) Use IScrivener as usual +IScrivener journal = provider.GetRequiredService>(); +await journal.WriteAsync(new MyEntry(/* ... */)); +await foreach ((long pos, MyEntry e) in journal.TailAsync()) +{ + // application logic ... +} +``` + +## Notes & Limitations + +- Append‑only persistence; no compaction, rotation, or on‑startup replay. +- Single consumer performs file writes; designed for one process to own the file. +- Shutdown flushes any buffered entries. Unhandled exceptions in producer/consumer surface via daemon failure. diff --git a/src/Coven.Scriveners.FileScrivener/ServiceCollectionExtensions.cs b/src/Coven.Scriveners.FileScrivener/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..90e4b47 --- /dev/null +++ b/src/Coven.Scriveners.FileScrivener/ServiceCollectionExtensions.cs @@ -0,0 +1,56 @@ +// SPDX-License-Identifier: BUSL-1.1 + +using Coven.Core; +using Coven.Daemonology; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace Coven.Scriveners.FileScrivener; + +/// +/// Dependency Injection helpers to register a file-backed scrivener, defaults, and the flusher daemon for a given entry type. +/// +public static class ServiceCollectionExtensions +{ + /// + /// Registers a file-backed scrivener for TEntry and its flushing daemon. + /// Adds defaults for serializer, sink, and flush predicate (count threshold). + /// Registers a `ContractDaemon` that performs tail→snapshot→flush behavior. + /// + /// The journal entry type. + /// Service collection. + /// File scrivener configuration (file path, thresholds). + /// The service collection for chaining. + public static IServiceCollection AddFileScrivener(this IServiceCollection services, FileScrivenerConfig config) + where TEntry : notnull + { + ArgumentNullException.ThrowIfNull(services); + ArgumentNullException.ThrowIfNull(config); + + // Register the config so other classes can find it. + services.AddScoped(sp => config); + + // Inner in-memory journal (scoped) used by the file scrivener + services.AddKeyedScoped, InMemoryScrivener>("Coven.InternalFileScrivener"); + services.AddScoped>(sp => + { + IScrivener inner = sp.GetRequiredKeyedService>("Coven.InternalFileScrivener"); + return new FileScrivener(inner); + }); + + // Serializer + sink defaults (TryAdd so hosts can override) + services.TryAddScoped, JsonEntrySerializer>(); + services.TryAddScoped>(sp => new FileAppendFlushSink(config.FilePath, sp.GetRequiredService>())); + + // Predicate default based on threshold + services.TryAddScoped>(_ => new CountThresholdFlushPredicate(config.FlushThreshold)); + + // Daemon journal for status contracts + services.TryAddScoped, InMemoryScrivener>(); + + // Flusher daemon + services.AddScoped>(); + + return services; + } +} diff --git a/src/Coven.Transmutation/IImbuingTransmuter.cs b/src/Coven.Transmutation/IImbuingTransmuter.cs new file mode 100644 index 0000000..ecdbf62 --- /dev/null +++ b/src/Coven.Transmutation/IImbuingTransmuter.cs @@ -0,0 +1,33 @@ +// SPDX-License-Identifier: BUSL-1.1 + +namespace Coven.Transmutation; + +/// +/// Defines a two-parameter transmutation where an input is "imbued" with a secondary +/// reagent to produce an output. Implementations should be pure and cancel-aware. +/// +/// Primary input type. +/// Secondary input (reagent) type. +/// Output type. +public interface IImbuingTransmuter : ITransmuter<(TIn Input, TReagent Reagent), TOut> +{ + /// + /// Transmutes the given using the provided , + /// producing an instance of . + /// + /// Primary input value. + /// Secondary input value that modifies the transmutation. + /// Token to observe for cooperative cancellation. + /// A task that completes with the transmuted output. + Task Transmute(TIn Input, TReagent Reagent, CancellationToken cancellationToken = default); + + /// + /// Tuple-based transmutation to align with ; forwards to the + /// two-parameter overload for convenience and consistency. + /// + /// A tuple containing the primary input and reagent. + /// Token to observe for cooperative cancellation. + /// A task that completes with the transmuted output. + Task ITransmuter<(TIn Input, TReagent Reagent), TOut>.Transmute((TIn Input, TReagent Reagent) Input, CancellationToken cancellationToken) + => Transmute(Input.Input, Input.Reagent, cancellationToken); +} diff --git a/src/Coven.Transmutation/README.md b/src/Coven.Transmutation/README.md index fe111a1..bb91b5e 100644 --- a/src/Coven.Transmutation/README.md +++ b/src/Coven.Transmutation/README.md @@ -9,6 +9,7 @@ Pure transformations between types used across Coven. Transmuters describe how o - IBatchTransmuter: many‑to‑one transformation over a window of chunks. - BatchTransmuteResult: output plus optional remainder chunk. - LambdaTransmuter: adapter to build a transmuter from a delegate. +- IImbuingTransmuter: two‑parameter transmutation using an input plus a reagent. Also implements `ITransmuter<(TIn Input, TReagent Reagent), TOut>` for tuple‑based composition. ## Principles @@ -73,6 +74,27 @@ Remainders are useful when only part of the last chunk is consumed; the unused t var t = new LambdaTransmuter((i, ct) => Task.FromResult(i.ToString())); ``` +## Imbuing Transmutation (Input + Reagent) + +```csharp +using Coven.Transmutation; + +// Implement the two-parameter interface directly +public sealed class SumWithBiasImbuing : IImbuingTransmuter +{ + public Task Transmute(int Input, int Reagent, CancellationToken ct = default) + => Task.FromResult(Input + Reagent); +} + +// Use it directly with separate parameters +int result1 = await new SumWithBiasImbuing().Transmute(10, 5); // 15 + +// Because IImbuingTransmuter also implements ITransmuter<(int,int),int>, +// it can be used in tuple-based composition without extra adapters +ITransmuter<(int Input, int Bias), int> tupleTransmuter = new SumWithBiasImbuing(); +int result2 = await tupleTransmuter.Transmute((10, 5)); // 15 +``` + ## Tips - Keep transmuters small and testable; stitch them together via DI. diff --git a/src/Coven.sln b/src/Coven.sln index 1f0e511..9967d1d 100644 --- a/src/Coven.sln +++ b/src/Coven.sln @@ -43,7 +43,9 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Coven.Core.Streaming", "Cov EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Coven.Toys.DiscordStreaming", "toys\Coven.Toys.DiscordStreaming\Coven.Toys.DiscordStreaming.csproj", "{08F6A76C-CF62-4113-8740-CF5EF8115F99}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Coven.Core.Debug", "Coven.Core.Debug\Coven.Core.Debug.csproj", "{1264DB86-E00B-4222-B865-CF15BD752704}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Coven.Scriveners.FileScrivener", "Coven.Scriveners.FileScrivener\Coven.Scriveners.FileScrivener.csproj", "{31A02263-5D48-4EEC-B5ED-28876DEC2CB5}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Coven.Toys.FileScrivenerConsole", "toys\Coven.Toys.FileScrivenerConsole\Coven.Toys.FileScrivenerConsole.csproj", "{243BAC56-0FB0-43BC-91DB-DC341F0D64FA}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -271,18 +273,30 @@ Global {08F6A76C-CF62-4113-8740-CF5EF8115F99}.Release|x64.Build.0 = Release|Any CPU {08F6A76C-CF62-4113-8740-CF5EF8115F99}.Release|x86.ActiveCfg = Release|Any CPU {08F6A76C-CF62-4113-8740-CF5EF8115F99}.Release|x86.Build.0 = Release|Any CPU - {1264DB86-E00B-4222-B865-CF15BD752704}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {1264DB86-E00B-4222-B865-CF15BD752704}.Debug|Any CPU.Build.0 = Debug|Any CPU - {1264DB86-E00B-4222-B865-CF15BD752704}.Debug|x64.ActiveCfg = Debug|Any CPU - {1264DB86-E00B-4222-B865-CF15BD752704}.Debug|x64.Build.0 = Debug|Any CPU - {1264DB86-E00B-4222-B865-CF15BD752704}.Debug|x86.ActiveCfg = Debug|Any CPU - {1264DB86-E00B-4222-B865-CF15BD752704}.Debug|x86.Build.0 = Debug|Any CPU - {1264DB86-E00B-4222-B865-CF15BD752704}.Release|Any CPU.ActiveCfg = Release|Any CPU - {1264DB86-E00B-4222-B865-CF15BD752704}.Release|Any CPU.Build.0 = Release|Any CPU - {1264DB86-E00B-4222-B865-CF15BD752704}.Release|x64.ActiveCfg = Release|Any CPU - {1264DB86-E00B-4222-B865-CF15BD752704}.Release|x64.Build.0 = Release|Any CPU - {1264DB86-E00B-4222-B865-CF15BD752704}.Release|x86.ActiveCfg = Release|Any CPU - {1264DB86-E00B-4222-B865-CF15BD752704}.Release|x86.Build.0 = Release|Any CPU + {31A02263-5D48-4EEC-B5ED-28876DEC2CB5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {31A02263-5D48-4EEC-B5ED-28876DEC2CB5}.Debug|Any CPU.Build.0 = Debug|Any CPU + {31A02263-5D48-4EEC-B5ED-28876DEC2CB5}.Debug|x64.ActiveCfg = Debug|Any CPU + {31A02263-5D48-4EEC-B5ED-28876DEC2CB5}.Debug|x64.Build.0 = Debug|Any CPU + {31A02263-5D48-4EEC-B5ED-28876DEC2CB5}.Debug|x86.ActiveCfg = Debug|Any CPU + {31A02263-5D48-4EEC-B5ED-28876DEC2CB5}.Debug|x86.Build.0 = Debug|Any CPU + {31A02263-5D48-4EEC-B5ED-28876DEC2CB5}.Release|Any CPU.ActiveCfg = Release|Any CPU + {31A02263-5D48-4EEC-B5ED-28876DEC2CB5}.Release|Any CPU.Build.0 = Release|Any CPU + {31A02263-5D48-4EEC-B5ED-28876DEC2CB5}.Release|x64.ActiveCfg = Release|Any CPU + {31A02263-5D48-4EEC-B5ED-28876DEC2CB5}.Release|x64.Build.0 = Release|Any CPU + {31A02263-5D48-4EEC-B5ED-28876DEC2CB5}.Release|x86.ActiveCfg = Release|Any CPU + {31A02263-5D48-4EEC-B5ED-28876DEC2CB5}.Release|x86.Build.0 = Release|Any CPU + {243BAC56-0FB0-43BC-91DB-DC341F0D64FA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {243BAC56-0FB0-43BC-91DB-DC341F0D64FA}.Debug|Any CPU.Build.0 = Debug|Any CPU + {243BAC56-0FB0-43BC-91DB-DC341F0D64FA}.Debug|x64.ActiveCfg = Debug|Any CPU + {243BAC56-0FB0-43BC-91DB-DC341F0D64FA}.Debug|x64.Build.0 = Debug|Any CPU + {243BAC56-0FB0-43BC-91DB-DC341F0D64FA}.Debug|x86.ActiveCfg = Debug|Any CPU + {243BAC56-0FB0-43BC-91DB-DC341F0D64FA}.Debug|x86.Build.0 = Debug|Any CPU + {243BAC56-0FB0-43BC-91DB-DC341F0D64FA}.Release|Any CPU.ActiveCfg = Release|Any CPU + {243BAC56-0FB0-43BC-91DB-DC341F0D64FA}.Release|Any CPU.Build.0 = Release|Any CPU + {243BAC56-0FB0-43BC-91DB-DC341F0D64FA}.Release|x64.ActiveCfg = Release|Any CPU + {243BAC56-0FB0-43BC-91DB-DC341F0D64FA}.Release|x64.Build.0 = Release|Any CPU + {243BAC56-0FB0-43BC-91DB-DC341F0D64FA}.Release|x86.ActiveCfg = Release|Any CPU + {243BAC56-0FB0-43BC-91DB-DC341F0D64FA}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -294,5 +308,6 @@ Global {7021B9A6-B5F7-4CE8-AA8E-A9D0E02F90AF} = {5D20AA90-6969-D8BD-9DCD-8634F4692FDA} {1D6A8099-642A-4DAF-9103-A8324AE31930} = {28A4673F-9138-6225-6D15-8B1CCBEDA19D} {08F6A76C-CF62-4113-8740-CF5EF8115F99} = {28A4673F-9138-6225-6D15-8B1CCBEDA19D} + {243BAC56-0FB0-43BC-91DB-DC341F0D64FA} = {28A4673F-9138-6225-6D15-8B1CCBEDA19D} EndGlobalSection EndGlobal diff --git a/src/samples/01.DiscordAgent/DiscordAgent.csproj b/src/samples/01.DiscordAgent/DiscordAgent.csproj index 5f2e204..304d2f2 100644 --- a/src/samples/01.DiscordAgent/DiscordAgent.csproj +++ b/src/samples/01.DiscordAgent/DiscordAgent.csproj @@ -17,13 +17,17 @@ - + - + + + + + diff --git a/src/samples/01.DiscordAgent/Program.cs b/src/samples/01.DiscordAgent/Program.cs index aa69ce7..1937321 100644 --- a/src/samples/01.DiscordAgent/Program.cs +++ b/src/samples/01.DiscordAgent/Program.cs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: BUSL-1.1 using Coven.Agents.OpenAI; +using Coven.Chat; using Coven.Chat.Discord; using Coven.Core; using Coven.Core.Builder; @@ -12,6 +13,7 @@ using OpenAI.Responses; using Coven.Core.Streaming; using Coven.Agents; +using Coven.Scriveners.FileScrivener; // Configuration (env-first with fallback to defaults below) // Defaults: edit these to hardcode values when env vars are not present @@ -47,6 +49,16 @@ // Register DI HostApplicationBuilder builder = Host.CreateApplicationBuilder(args); builder.Services.AddLogging(b => b.AddConsole()); +// Persist journals to disk using FileScrivener (registered before branches to avoid TryAdd overrides) +builder.Services.AddFileScrivener(new FileScrivenerConfig +{ + FilePath = "./data/discord-chat.ndjson", + FlushThreshold = 1 +}); +builder.Services.AddFileScrivener(new FileScrivenerConfig +{ + FilePath = "./data/openai-agent.ndjson" +}); builder.Services.AddDiscordChat(discordConfig); builder.Services.AddOpenAIAgents(openAiConfig, registration => { diff --git a/src/samples/01.DiscordAgent/README.md b/src/samples/01.DiscordAgent/README.md index 04cef99..a8f3be9 100644 --- a/src/samples/01.DiscordAgent/README.md +++ b/src/samples/01.DiscordAgent/README.md @@ -51,6 +51,12 @@ Troubleshooting: - OpenAI: Confirm API key and model are valid for your account. - Networking: Ensure outbound HTTPS to Discord and OpenAI is allowed. +### Persistence (FileScrivener) +- Journals persist to NDJSON files for replay/audit: + - Chat: `./data/discord-chat.ndjson` + - Agent: `./data/openai-agent.ndjson` +- Change paths by editing `AddFileScrivener(new FileScrivenerConfig { FilePath = ... })` in `Program.cs`. + ## Extend Swap Discord for Console chat (one-line change): diff --git a/src/toys/Coven.Toys.FileScrivenerConsole/Coven.Toys.FileScrivenerConsole.csproj b/src/toys/Coven.Toys.FileScrivenerConsole/Coven.Toys.FileScrivenerConsole.csproj new file mode 100644 index 0000000..f223ebd --- /dev/null +++ b/src/toys/Coven.Toys.FileScrivenerConsole/Coven.Toys.FileScrivenerConsole.csproj @@ -0,0 +1,28 @@ + + + + Exe + net10.0 + false + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/toys/Coven.Toys.FileScrivenerConsole/PersistingEchoBlock.cs b/src/toys/Coven.Toys.FileScrivenerConsole/PersistingEchoBlock.cs new file mode 100644 index 0000000..e559036 --- /dev/null +++ b/src/toys/Coven.Toys.FileScrivenerConsole/PersistingEchoBlock.cs @@ -0,0 +1,36 @@ +// SPDX-License-Identifier: BUSL-1.1 + +using Coven.Chat; +using Coven.Core; +using Coven.Daemonology; + +namespace Coven.Toys.FileScrivenerConsole; + +internal sealed class PersistingEchoBlock(IEnumerable daemons, IScrivener chat) + : IMagikBlock +{ + private readonly IEnumerable _daemons = daemons ?? throw new ArgumentNullException(nameof(daemons)); + private readonly IScrivener _chat = chat ?? throw new ArgumentNullException(nameof(chat)); + + public async Task DoMagik(Empty input, CancellationToken cancellationToken = default) + { + // Start all registered daemons: console gateway and file flusher + foreach (ContractDaemon d in _daemons) + { + await d.Start(cancellationToken).ConfigureAwait(false); + } + + // Echo inbound console messages to outbound chat; file flusher persists the journal + await foreach ((long _, ChatEntry entry) in _chat.TailAsync(0, cancellationToken)) + { + if (entry is ChatAfferent r) + { + await _chat.WriteAsync(new ChatEfferent("BOT", "Echo: " + r.Text), cancellationToken) + .ConfigureAwait(false); + } + } + + return input; + } +} + diff --git a/src/toys/Coven.Toys.FileScrivenerConsole/Program.cs b/src/toys/Coven.Toys.FileScrivenerConsole/Program.cs new file mode 100644 index 0000000..1e9fcd6 --- /dev/null +++ b/src/toys/Coven.Toys.FileScrivenerConsole/Program.cs @@ -0,0 +1,42 @@ +// SPDX-License-Identifier: BUSL-1.1 + +using Coven.Chat; +using Coven.Chat.Console; +using Coven.Core; +using Coven.Core.Builder; +using Coven.Scriveners.FileScrivener; +using Coven.Toys.FileScrivenerConsole; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +// Configuration (toy-friendly defaults) +ConsoleClientConfig console = new() +{ + InputSender = "console", + OutputSender = "BOT" +}; + +FileScrivenerConfig fileConfig = new() +{ + FilePath = "./data/console-chat.ndjson", + FlushThreshold = 1, + FlushQueueCapacity = 8 +}; + +// Register DI +HostApplicationBuilder builder = Host.CreateApplicationBuilder(args); +builder.Services.AddLogging(b => b.AddConsole()); + +// Register FileScrivener before AddConsoleChat so TryAdd doesn't override +builder.Services.AddFileScrivener(fileConfig); +builder.Services.AddConsoleChat(console); + +// Wire a simple echo block that starts daemons and echoes input +builder.Services.BuildCoven(c => c.MagikBlock().Done()); + +IHost host = builder.Build(); + +// Execute ritual +ICoven coven = host.Services.GetRequiredService(); +await coven.Ritual(new Empty());