diff --git a/src/Coven.Agents.OpenAI/AgentMaxLengthWindowPolicy.cs b/src/Coven.Agents.OpenAI/AgentMaxLengthWindowPolicy.cs new file mode 100644 index 0000000..3592d29 --- /dev/null +++ b/src/Coven.Agents.OpenAI/AgentMaxLengthWindowPolicy.cs @@ -0,0 +1,37 @@ +// SPDX-License-Identifier: BUSL-1.1 +using Coven.Core.Streaming; + +namespace Coven.Agents.OpenAI; + +/// +/// Emits when recent agent chunk length reaches a max. Minimal lookback of 1. +/// +public sealed class AgentMaxLengthWindowPolicy : IWindowPolicy +{ + private readonly int _max; + + public AgentMaxLengthWindowPolicy(int max) + { + ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(max, 0); + _max = max; + } + + public int MinChunkLookback => 1; + + public bool ShouldEmit(StreamWindow window) + { + int total = 0; + foreach (AgentAfferentChunk chunk in window.PendingChunks) + { + if (!string.IsNullOrEmpty(chunk.Text)) + { + total += chunk.Text.Length; + if (total >= _max) + { + return true; + } + } + } + return false; + } +} diff --git a/src/Coven.Agents.OpenAI/AgentParagraphWindowPolicy.cs b/src/Coven.Agents.OpenAI/AgentParagraphWindowPolicy.cs new file mode 100644 index 0000000..44baef1 --- /dev/null +++ b/src/Coven.Agents.OpenAI/AgentParagraphWindowPolicy.cs @@ -0,0 +1,34 @@ +// SPDX-License-Identifier: BUSL-1.1 +using System.Text; +using Coven.Core.Streaming; + +namespace Coven.Agents.OpenAI; + +/// +/// Emits agent output when the recent text (last 1–2 chunks) ends at a paragraph boundary. +/// Uses a minimal lookback of 2 to handle boundaries that straddle chunk edges. +/// +public sealed class AgentParagraphWindowPolicy : IWindowPolicy +{ + public int MinChunkLookback => 2; + + public bool ShouldEmit(StreamWindow window) + { + StringBuilder stringBuilder = new(); + foreach (AgentAfferentChunk chunk in window.PendingChunks) + { + if (!string.IsNullOrEmpty(chunk.Text)) + { + stringBuilder.Append(chunk.Text); + } + } + + if (stringBuilder.Length == 0) + { + return false; + } + + string concatenatedWindow = stringBuilder.ToString(); + return concatenatedWindow.EndsWith("\r\n\r\n", StringComparison.Ordinal) || concatenatedWindow.EndsWith("\n\n", StringComparison.Ordinal); + } +} diff --git a/src/Coven.Agents.OpenAI/AgentThoughtMaxLengthWindowPolicy.cs b/src/Coven.Agents.OpenAI/AgentThoughtMaxLengthWindowPolicy.cs new file mode 100644 index 0000000..0ec1aff --- /dev/null +++ b/src/Coven.Agents.OpenAI/AgentThoughtMaxLengthWindowPolicy.cs @@ -0,0 +1,38 @@ +// SPDX-License-Identifier: BUSL-1.1 +using Coven.Core.Streaming; + +namespace Coven.Agents.OpenAI; + +/// +/// Emits when recent agent thought chunk length reaches a max. Minimal lookback of 1. +/// +public sealed class AgentThoughtMaxLengthWindowPolicy : IWindowPolicy +{ + private readonly int _max; + + public AgentThoughtMaxLengthWindowPolicy(int max) + { + ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(max, 0); + _max = max; + } + + public int MinChunkLookback => 1; + + public bool ShouldEmit(StreamWindow window) + { + int total = 0; + foreach (AgentAfferentThoughtChunk chunk in window.PendingChunks) + { + if (!string.IsNullOrEmpty(chunk.Text)) + { + total += chunk.Text.Length; + if (total >= _max) + { + return true; + } + } + } + return false; + } +} + diff --git a/src/Coven.Agents.OpenAI/AgentThoughtParagraphWindowPolicy.cs b/src/Coven.Agents.OpenAI/AgentThoughtParagraphWindowPolicy.cs new file mode 100644 index 0000000..480dfd7 --- /dev/null +++ b/src/Coven.Agents.OpenAI/AgentThoughtParagraphWindowPolicy.cs @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: BUSL-1.1 +using System.Text; +using Coven.Core.Streaming; + +namespace Coven.Agents.OpenAI; + +/// +/// Emits agent thought output when the recent text (last 1–2 chunks) ends at a paragraph boundary. +/// Uses a minimal lookback of 2 to handle boundaries that straddle chunk edges. +/// +public sealed class AgentThoughtParagraphWindowPolicy : IWindowPolicy +{ + public int MinChunkLookback => 2; + + public bool ShouldEmit(StreamWindow window) + { + StringBuilder stringBuilder = new(); + foreach (AgentAfferentThoughtChunk chunk in window.PendingChunks) + { + if (!string.IsNullOrEmpty(chunk.Text)) + { + stringBuilder.Append(chunk.Text); + } + } + + if (stringBuilder.Length == 0) + { + return false; + } + + string concatenatedWindow = stringBuilder.ToString(); + return concatenatedWindow.EndsWith("\r\n\r\n", StringComparison.Ordinal) || concatenatedWindow.EndsWith("\n\n", StringComparison.Ordinal); + } +} + diff --git a/src/Coven.Agents.OpenAI/AgentThoughtSentenceWindowPolicy.cs b/src/Coven.Agents.OpenAI/AgentThoughtSentenceWindowPolicy.cs new file mode 100644 index 0000000..fe96807 --- /dev/null +++ b/src/Coven.Agents.OpenAI/AgentThoughtSentenceWindowPolicy.cs @@ -0,0 +1,47 @@ +// SPDX-License-Identifier: BUSL-1.1 +using System.Text; +using Coven.Core.Streaming; + +namespace Coven.Agents.OpenAI; + +/// +/// Emits when the concatenated thought text ends at a sentence boundary. +/// A sentence boundary is a trailing '.', '!' or '?' (ignoring trailing whitespace). +/// +public sealed class AgentThoughtSentenceWindowPolicy : IWindowPolicy +{ + // 4 chunks should be generous for windowing sentence termination + public int MinChunkLookback => 4; + + public bool ShouldEmit(StreamWindow window) + { + StringBuilder sb = new(); + foreach (AgentAfferentThoughtChunk chunk in window.PendingChunks) + { + if (!string.IsNullOrEmpty(chunk.Text)) + { + sb.Append(chunk.Text); + } + } + + return sb.Length != 0 && EndsWithSentenceBoundary(sb); + } + + private static bool EndsWithSentenceBoundary(StringBuilder sb) + { + int i = sb.Length - 1; + while (i >= 0 && char.IsWhiteSpace(sb[i])) + { + i--; + } + + if (i < 0) + { + return false; + } + + char c = sb[i]; + return c is '.' or '!' or '?'; + } +} + diff --git a/src/Coven.Agents.OpenAI/AgentThoughtSummaryMarkerWindowPolicy.cs b/src/Coven.Agents.OpenAI/AgentThoughtSummaryMarkerWindowPolicy.cs new file mode 100644 index 0000000..7db789a --- /dev/null +++ b/src/Coven.Agents.OpenAI/AgentThoughtSummaryMarkerWindowPolicy.cs @@ -0,0 +1,83 @@ +// SPDX-License-Identifier: BUSL-1.1 +using System.Text; +using Coven.Core.Streaming; + +namespace Coven.Agents.OpenAI; + +/// +/// Emits when a summary marker is observed in the thought stream. +/// The marker is any bold Markdown segment ("**...**") followed by a newline sequence. +/// Recognized sequences: "\n\n", "\r\n\r\n", or "\r\n". +/// +public sealed class AgentThoughtSummaryMarkerWindowPolicy : IWindowPolicy +{ + public int MinChunkLookback => 10; + + public bool ShouldEmit(StreamWindow window) + { + StringBuilder stringBuilder = new(); + foreach (AgentAfferentThoughtChunk chunk in window.PendingChunks) + { + if (!string.IsNullOrEmpty(chunk.Text)) + { + stringBuilder.Append(chunk.Text); + } + } + + if (stringBuilder.Length == 0) + { + return false; + } + + string text = stringBuilder.ToString(); + ReadOnlySpan span = text.AsSpan(); + return HasBoldFollowedByNewline(span); + } + + private static bool HasBoldFollowedByNewline(ReadOnlySpan span) + { + int position = 0; + while (position < span.Length) + { + int start = span[position..].IndexOf("**"); + if (start < 0) + { + return false; + } + start += position; + + int afterOpen = start + 2; + if (afterOpen >= span.Length) + { + return false; + } + + int end = span[afterOpen..].IndexOf("**"); + if (end < 0) + { + // Unmatched opener; advance past it and continue scanning later content + position = start + 2; + continue; + } + end += afterOpen; + + // Require non-empty content between markers + if (end > start + 2) + { + int after = end + 2; + ReadOnlySpan tail = after <= span.Length ? span[after..] : []; + + if (tail.StartsWith("\r\n\r\n", StringComparison.Ordinal) || + tail.StartsWith("\n\n", StringComparison.Ordinal) || + tail.StartsWith("\r\n", StringComparison.Ordinal)) + { + return true; + } + } + + position = end + 2; + } + + return false; + } +} diff --git a/src/Coven.Agents.OpenAI/AgentThoughtSummaryShatterPolicy.cs b/src/Coven.Agents.OpenAI/AgentThoughtSummaryShatterPolicy.cs new file mode 100644 index 0000000..70fe1f8 --- /dev/null +++ b/src/Coven.Agents.OpenAI/AgentThoughtSummaryShatterPolicy.cs @@ -0,0 +1,110 @@ +// SPDX-License-Identifier: BUSL-1.1 +using Coven.Core.Streaming; + +namespace Coven.Agents.OpenAI; + +/// +/// Shatters AgentThought outputs on the first matched "summary marker": +/// any bold Markdown segment ("**...**") immediately followed by a newline sequence +/// ("\n\n", "\r\n\r\n", or "\r\n"). +/// +/// When a boundary is found, emits two AgentThought entries: +/// - First: everything before the bold segment +/// - Second: the bold segment plus the newline sequence and any remaining text +/// +/// If no boundary exists, produces no outputs (forward unchanged). +/// +public sealed class AgentThoughtSummaryShatterPolicy : IShatterPolicy +{ + private static class Grammar + { + // Token for a Markdown bold delimiter + public const string Bold = "**"; + // Ordered newline sequences that define a "paragraph boundary" + public static readonly string[] _paragraphBoundaries = ["\r\n\r\n", "\n\n", "\r\n"]; + } + + public IEnumerable Shatter(AgentEntry entry) + { + if (entry is not AgentThought thought || string.IsNullOrEmpty(thought.Text)) + { + yield break; + } + + string text = thought.Text; + // Locate the first boundary where a bold segment is immediately followed by a newline sequence. + // The returned index is the position of the bold opener; we split BEFORE it. + int splitIndex = IndexOfSummaryBoundary(text); + if (splitIndex < 0) + { + yield break; + } + + // Split before the header: first = preface text; second = header + newline(s) + remainder. + string first = text[..splitIndex]; + string second = text[splitIndex..]; + + // Emit only non-empty chunks. + if (first.Length > 0) + { + yield return new AgentThought(thought.Sender, first); + } + if (second.Length > 0) + { + yield return new AgentThought(thought.Sender, second); + } + } + + private static int IndexOfSummaryBoundary(string s) + { + ReadOnlySpan span = s.AsSpan(); + int position = 0; + while (position < span.Length) + { + // Find the next bold opener starting at the current scan position. + int start = span[position..].IndexOf(Grammar.Bold); + if (start < 0) + { + return -1; + } + // Convert relative index to absolute index within the source span. + start += position; + + // Index immediately after the opening bold token. + int afterOpen = start + Grammar.Bold.Length; + if (afterOpen >= span.Length) { return -1; } + + // Search for the matching bold closer after the opener. + int end = span[afterOpen..].IndexOf(Grammar.Bold); + if (end < 0) + { + // No closer found; advance past the opener to allow subsequent matches later in the text. + position = start + Grammar.Bold.Length; + continue; + } + end += afterOpen; + + // Require non-empty content between the opener and closer (i.e., at least one character inside the bold segment). + if (end > start + Grammar.Bold.Length) + { + // Index immediately after the closing bold token. + int after = end + Grammar.Bold.Length; + // Slice of text following the bold segment; used to detect newline sequences. + ReadOnlySpan tail = after <= span.Length ? span[after..] : []; + // Bold header followed by newline(s) — split BEFORE the bold segment. + foreach (string nl in Grammar._paragraphBoundaries) + { + if (tail.StartsWith(nl, StringComparison.Ordinal)) + { + // Found a boundary: return the index of the bold opener so callers split BEFORE it. + return start; + } + } + } + + // Advance scan position to just after the bold closer and continue searching. + position = end + Grammar.Bold.Length; + } + return -1; + } +} diff --git a/src/Coven.Agents.OpenAI/DefaultOpenAITranscriptBuilder.cs b/src/Coven.Agents.OpenAI/DefaultOpenAITranscriptBuilder.cs index 98ca601..65d422a 100644 --- a/src/Coven.Agents.OpenAI/DefaultOpenAITranscriptBuilder.cs +++ b/src/Coven.Agents.OpenAI/DefaultOpenAITranscriptBuilder.cs @@ -14,7 +14,7 @@ internal sealed class DefaultOpenAITranscriptBuilder( private readonly IScrivener _journal = journal ?? throw new ArgumentNullException(nameof(journal)); private readonly ITransmuter _entryToItem = entryToItem ?? throw new ArgumentNullException(nameof(entryToItem)); - public async Task> BuildAsync(OpenAIOutgoing newest, int maxMessages, CancellationToken cancellationToken) + public async Task> BuildAsync(OpenAIEfferent newest, int maxMessages, CancellationToken cancellationToken) { List buffer = []; diff --git a/src/Coven.Agents.OpenAI/IOpenAIGatewayConnection.cs b/src/Coven.Agents.OpenAI/IOpenAIGatewayConnection.cs index 49c325a..d5f4a54 100644 --- a/src/Coven.Agents.OpenAI/IOpenAIGatewayConnection.cs +++ b/src/Coven.Agents.OpenAI/IOpenAIGatewayConnection.cs @@ -5,6 +5,6 @@ namespace Coven.Agents.OpenAI; internal interface IOpenAIGatewayConnection { Task ConnectAsync(); - Task SendAsync(OpenAIOutgoing outgoing, CancellationToken cancellationToken); + Task SendAsync(OpenAIEfferent outgoing, CancellationToken cancellationToken); } diff --git a/src/Coven.Agents.OpenAI/IOpenAITranscriptBuilder.cs b/src/Coven.Agents.OpenAI/IOpenAITranscriptBuilder.cs index 4f5b86c..03b7e09 100644 --- a/src/Coven.Agents.OpenAI/IOpenAITranscriptBuilder.cs +++ b/src/Coven.Agents.OpenAI/IOpenAITranscriptBuilder.cs @@ -6,6 +6,6 @@ namespace Coven.Agents.OpenAI; internal interface IOpenAITranscriptBuilder { - Task> BuildAsync(OpenAIOutgoing newest, int maxMessages, CancellationToken cancellationToken); + Task> BuildAsync(OpenAIEfferent newest, int maxMessages, CancellationToken cancellationToken); } diff --git a/src/Coven.Agents.OpenAI/OpenAIAgentSession.cs b/src/Coven.Agents.OpenAI/OpenAIAgentSession.cs index 4eb869d..702e9ee 100644 --- a/src/Coven.Agents.OpenAI/OpenAIAgentSession.cs +++ b/src/Coven.Agents.OpenAI/OpenAIAgentSession.cs @@ -2,6 +2,7 @@ using Coven.Core; using Coven.Transmutation; +using Coven.Core.Streaming; using Microsoft.Extensions.Logging; namespace Coven.Agents.OpenAI; @@ -10,6 +11,7 @@ internal sealed class OpenAIAgentSession( IOpenAIGatewayConnection gateway, IScrivener openAIJournal, IScrivener agentJournal, + IShatterPolicy shatterPolicy, IBiDirectionalTransmuter transmuter, ILogger logger, CancellationToken sessionToken) : IAsyncDisposable @@ -17,6 +19,7 @@ internal sealed class OpenAIAgentSession( private readonly IOpenAIGatewayConnection _gateway = gateway ?? throw new ArgumentNullException(nameof(gateway)); private readonly IScrivener _openAIJournal = openAIJournal ?? throw new ArgumentNullException(nameof(openAIJournal)); private readonly IScrivener _agentJournal = agentJournal ?? throw new ArgumentNullException(nameof(agentJournal)); + private readonly IShatterPolicy _shatterPolicy = shatterPolicy ?? throw new ArgumentNullException(nameof(shatterPolicy)); private readonly IBiDirectionalTransmuter _transmuter = transmuter ?? throw new ArgumentNullException(nameof(transmuter)); private readonly ILogger _logger = logger ?? throw new ArgumentNullException(nameof(logger)); private readonly CancellationToken _sessionToken = sessionToken; @@ -41,7 +44,30 @@ public async Task StartAsync() } OpenAILog.OpenAIToAgentsObserved(_logger, entry.GetType().Name, position); - AgentEntry agent = await _transmuter.TransmuteIn(entry, ct).ConfigureAwait(false); + + // Session-local shattering for OpenAI thought chunks on paragraph boundary + if (entry is OpenAIAfferentThoughtChunk) + { + bool produced = false; + IEnumerable outputs = _shatterPolicy.Shatter(entry) ?? []; + foreach (OpenAIEntry openAIEntry in outputs) + { + if (openAIEntry is OpenAIAfferentThoughtChunk) + { + produced = true; + AgentEntry agentChunk = await _transmuter.TransmuteAfferent(openAIEntry, ct).ConfigureAwait(false); + long pos = await _agentJournal.WriteAsync(agentChunk, ct).ConfigureAwait(false); + OpenAILog.OpenAIToAgentsAppended(_logger, agentChunk.GetType().Name, pos); + } + } + + if (produced) + { + continue; // do not forward the original chunk + } + } + + AgentEntry agent = await _transmuter.TransmuteAfferent(entry, ct).ConfigureAwait(false); OpenAILog.OpenAIToAgentsTransmuted(_logger, entry.GetType().Name, agent.GetType().Name); long agentPos = await _agentJournal.WriteAsync(agent, ct).ConfigureAwait(false); OpenAILog.OpenAIToAgentsAppended(_logger, agent.GetType().Name, agentPos); @@ -65,13 +91,14 @@ public async Task StartAsync() { await foreach ((long position, AgentEntry entry) in _agentJournal.TailAsync(0, ct)) { - if (entry is AgentAck) + // Early filtering: ignore drafts and acks to avoid loops/noise + if (entry is AgentEntryDraft or AgentAck) { continue; } OpenAILog.AgentsToOpenAIObserved(_logger, entry.GetType().Name, position); - OpenAIEntry openAI = await _transmuter.TransmuteOut(entry, ct).ConfigureAwait(false); + OpenAIEntry openAI = await _transmuter.TransmuteEfferent(entry, ct).ConfigureAwait(false); OpenAILog.AgentsToOpenAITransmuted(_logger, entry.GetType().Name, openAI.GetType().Name); long aiPos = await _openAIJournal.WriteAsync(openAI, ct).ConfigureAwait(false); OpenAILog.AgentsToOpenAIAppended(_logger, openAI.GetType().Name, aiPos); diff --git a/src/Coven.Agents.OpenAI/OpenAIAgentSessionFactory.cs b/src/Coven.Agents.OpenAI/OpenAIAgentSessionFactory.cs index adc8840..a387524 100644 --- a/src/Coven.Agents.OpenAI/OpenAIAgentSessionFactory.cs +++ b/src/Coven.Agents.OpenAI/OpenAIAgentSessionFactory.cs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: BUSL-1.1 using Coven.Core; +using Coven.Core.Streaming; using Coven.Transmutation; using Microsoft.Extensions.Logging; @@ -11,14 +12,16 @@ internal sealed class OpenAIAgentSessionFactory( IScrivener openAIJournal, IScrivener agentJournal, IBiDirectionalTransmuter transmuter, + IShatterPolicy shatterPolicy, ILogger logger) { private readonly IOpenAIGatewayConnection _gatewayConnection = gatewayConnection ?? throw new ArgumentNullException(nameof(gatewayConnection)); private readonly IScrivener _openAIJournal = openAIJournal ?? throw new ArgumentNullException(nameof(openAIJournal)); private readonly IScrivener _agentJournal = agentJournal ?? throw new ArgumentNullException(nameof(agentJournal)); private readonly IBiDirectionalTransmuter _transmuter = transmuter ?? throw new ArgumentNullException(nameof(transmuter)); + private readonly IShatterPolicy _shatterPolicy = shatterPolicy ?? throw new ArgumentNullException(nameof(shatterPolicy)); private readonly ILogger _logger = logger ?? throw new ArgumentNullException(nameof(logger)); public OpenAIAgentSession Create(CancellationToken sessionToken) - => new(_gatewayConnection, _openAIJournal, _agentJournal, _transmuter, _logger, sessionToken); + => new(_gatewayConnection, _openAIJournal, _agentJournal, _shatterPolicy, _transmuter, _logger, sessionToken); } diff --git a/src/Coven.Agents.OpenAI/OpenAIChunkBatchTransmuter.cs b/src/Coven.Agents.OpenAI/OpenAIChunkBatchTransmuter.cs new file mode 100644 index 0000000..144f1d0 --- /dev/null +++ b/src/Coven.Agents.OpenAI/OpenAIChunkBatchTransmuter.cs @@ -0,0 +1,47 @@ +// SPDX-License-Identifier: BUSL-1.1 +using System.Text; +using Coven.Transmutation; + +namespace Coven.Agents.OpenAI; + +public sealed class OpenAIChunkBatchTransmuter : IBatchTransmuter +{ + public Task> Transmute(IEnumerable Input, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(Input); + + string sender = string.Empty; + string responseId = string.Empty; + string model = string.Empty; + DateTimeOffset timestamp = DateTimeOffset.MinValue; + StringBuilder sb = new(); + + foreach (OpenAIAfferentChunk chunk in Input) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!string.IsNullOrEmpty(chunk.Sender)) + { + sender = chunk.Sender; + } + if (!string.IsNullOrEmpty(chunk.ResponseId)) + { + responseId = chunk.ResponseId; + } + if (!string.IsNullOrEmpty(chunk.Model)) + { + model = chunk.Model; + } + if (chunk.Timestamp != default) + { + timestamp = chunk.Timestamp; + } + if (!string.IsNullOrEmpty(chunk.Text)) + { + sb.Append(chunk.Text); + } + } + + OpenAIThought output = new(sender, sb.ToString(), responseId, timestamp, model); + return Task.FromResult(new BatchTransmuteResult(output, false, null)); + } +} diff --git a/src/Coven.Agents.OpenAI/OpenAIClientConfig.cs b/src/Coven.Agents.OpenAI/OpenAIClientConfig.cs index d7abdc0..5099dfe 100644 --- a/src/Coven.Agents.OpenAI/OpenAIClientConfig.cs +++ b/src/Coven.Agents.OpenAI/OpenAIClientConfig.cs @@ -5,7 +5,7 @@ namespace Coven.Agents.OpenAI; /// /// Minimal configuration for the OpenAI Responses integration. /// -public sealed class OpenAIClientConfig +public sealed record OpenAIClientConfig { public required string ApiKey { get; init; } public required string Model { get; init; } @@ -16,5 +16,9 @@ public sealed class OpenAIClientConfig public float? Temperature { get; init; } public float? TopP { get; init; } public int? MaxOutputTokens { get; init; } - public int? HistoryClip { get; init; } + // Max number of transcript items to include; default is unlimited + public int HistoryClip { get; init; } = int.MaxValue; + + // Configures reasoning options for models that support it. + public ReasoningConfig Reasoning { get; init; } = new ReasoningConfig(); } diff --git a/src/Coven.Agents.OpenAI/OpenAIEntry.cs b/src/Coven.Agents.OpenAI/OpenAIEntry.cs index 3e9fe90..2ee2b10 100644 --- a/src/Coven.Agents.OpenAI/OpenAIEntry.cs +++ b/src/Coven.Agents.OpenAI/OpenAIEntry.cs @@ -6,12 +6,12 @@ public abstract record OpenAIEntry( string Sender ); -public sealed record OpenAIOutgoing( +public sealed record OpenAIEfferent( string Sender, string Text ) : OpenAIEntry(Sender); -public sealed record OpenAIIncoming( +public sealed record OpenAIAfferent( string Sender, string Text, string ResponseId, @@ -19,7 +19,16 @@ public sealed record OpenAIIncoming( string Model ) : OpenAIEntry(Sender); -public sealed record OpenAIIncomingChunk( +public sealed record OpenAIAfferentChunk( + string Sender, + string Text, + string ResponseId, + DateTimeOffset Timestamp, + string Model +) : OpenAIEntry(Sender); + +// Streaming thought chunks (afferent): model streams thoughts back +public sealed record OpenAIAfferentThoughtChunk( string Sender, string Text, string ResponseId, @@ -40,6 +49,12 @@ public sealed record OpenAIAck( string Text ) : OpenAIEntry(Sender); +// Streaming thought chunks (efferent): agent streams thoughts out +public sealed record OpenAIEfferentThoughtChunk( + string Sender, + string Text +) : OpenAIEntry(Sender); + public sealed record OpenAIStreamCompleted( string Sender, string ResponseId, diff --git a/src/Coven.Agents.OpenAI/OpenAIEntryToResponseItemTransmuter.cs b/src/Coven.Agents.OpenAI/OpenAIEntryToResponseItemTransmuter.cs index 10102d0..1b5262f 100644 --- a/src/Coven.Agents.OpenAI/OpenAIEntryToResponseItemTransmuter.cs +++ b/src/Coven.Agents.OpenAI/OpenAIEntryToResponseItemTransmuter.cs @@ -17,8 +17,8 @@ public sealed class OpenAIEntryToResponseItemTransmuter : ITransmuter Task.FromResult(ResponseItem.CreateUserMessageItem(u.Text)), - OpenAIIncoming a => Task.FromResult(ResponseItem.CreateAssistantMessageItem(a.Text)), + OpenAIEfferent u => Task.FromResult(ResponseItem.CreateUserMessageItem(u.Text)), + OpenAIAfferent a => Task.FromResult(ResponseItem.CreateAssistantMessageItem(a.Text)), // Thoughts/acks do not participate in prompt construction. _ => Task.FromResult(null) }; diff --git a/src/Coven.Agents.OpenAI/OpenAILog.cs b/src/Coven.Agents.OpenAI/OpenAILog.cs index 480c733..ab59cdb 100644 --- a/src/Coven.Agents.OpenAI/OpenAILog.cs +++ b/src/Coven.Agents.OpenAI/OpenAILog.cs @@ -14,55 +14,55 @@ internal static class OpenAILog private static readonly Action _outboundSendStart = LoggerMessage.Define( - LogLevel.Debug, + LogLevel.Trace, new EventId(3006, nameof(OutboundSendStart)), "Sending request to OpenAI (length {Length})."); private static readonly Action _outboundSendSucceeded = LoggerMessage.Define( - LogLevel.Debug, + LogLevel.Trace, new EventId(3007, nameof(OutboundSendSucceeded)), "Sent request to OpenAI."); private static readonly Action _openAIScrivenerAppended = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(3050, nameof(OpenAIScrivenerAppended)), "OpenAIScrivener appended {EntryType} to internal journal at position {Position}."); private static readonly Action _openAIToAgentsObserved = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(3030, nameof(OpenAIToAgentsObserved)), "OpenAI→Agents observed {EntryType} at position {Position}."); private static readonly Action _openAIToAgentsTransmuted = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(3031, nameof(OpenAIToAgentsTransmuted)), "OpenAI→Agents transmuted {FromType} → {ToType}."); private static readonly Action _openAIToAgentsAppended = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(3032, nameof(OpenAIToAgentsAppended)), "OpenAI→Agents appended {EntryType} to Agents journal at position {Position}."); private static readonly Action _agentsToOpenAIObserved = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(3040, nameof(AgentsToOpenAIObserved)), "Agents→OpenAI observed {EntryType} at position {Position}."); private static readonly Action _agentsToOpenAITransmuted = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(3041, nameof(AgentsToOpenAITransmuted)), "Agents→OpenAI transmuted {FromType} → {ToType}."); private static readonly Action _agentsToOpenAIAppended = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(3042, nameof(AgentsToOpenAIAppended)), "Agents→OpenAI appended {EntryType} to OpenAI journal at position {Position}."); diff --git a/src/Coven.Agents.OpenAI/OpenAIMaxLengthWindowPolicy.cs b/src/Coven.Agents.OpenAI/OpenAIMaxLengthWindowPolicy.cs new file mode 100644 index 0000000..da05ea6 --- /dev/null +++ b/src/Coven.Agents.OpenAI/OpenAIMaxLengthWindowPolicy.cs @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: BUSL-1.1 +using Coven.Core.Streaming; + +namespace Coven.Agents.OpenAI; + +/// +/// Emits when the recent OpenAI chunk(s) length reaches a max. +/// Minimal lookback of 1; intended as a safety cap in combination with semantic policies. +/// +public sealed class OpenAIMaxLengthWindowPolicy : IWindowPolicy +{ + private readonly int _max; + + public OpenAIMaxLengthWindowPolicy(int max) + { + ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(max, 0); + _max = max; + } + + public int MinChunkLookback => 1; + + public bool ShouldEmit(StreamWindow window) + { + int total = 0; + foreach (OpenAIAfferentChunk chunk in window.PendingChunks) + { + if (!string.IsNullOrEmpty(chunk.Text)) + { + total += chunk.Text.Length; + if (total >= _max) + { + return true; + } + } + } + return false; + } +} + diff --git a/src/Coven.Agents.OpenAI/OpenAIParagraphShatterPolicy.cs b/src/Coven.Agents.OpenAI/OpenAIParagraphShatterPolicy.cs new file mode 100644 index 0000000..6c254f7 --- /dev/null +++ b/src/Coven.Agents.OpenAI/OpenAIParagraphShatterPolicy.cs @@ -0,0 +1,67 @@ +// SPDX-License-Identifier: BUSL-1.1 +using Coven.Core.Streaming; + +namespace Coven.Agents.OpenAI; + +/// +/// Shatters OpenAI afferent chunks at the first paragraph boundary. +/// A paragraph boundary is a double newline sequence ("\r\n\r\n" or "\n\n"). +/// When a boundary is found, emits exactly two chunks: +/// - First: original text up to and including the boundary +/// - Second: remainder of the original text +/// If no boundary exists, produces no outputs (forward unchanged). +/// +public sealed class OpenAIThoughtParagraphShatterPolicy : IShatterPolicy +{ + public IEnumerable Shatter(OpenAIEntry entry) + { + if (entry is not OpenAIAfferentThoughtChunk chunk || string.IsNullOrEmpty(chunk.Text)) + { + yield break; + } + + string text = chunk.Text; + + // Prefer CRLF CRLF over LF LF when both could match starting at same position + int boundaryIndex = IndexOfParagraphBoundary(text, out int boundaryLength); + if (boundaryIndex < 0) + { + yield break; // no change + } + + int splitAfter = boundaryIndex + boundaryLength; + string first = text[..splitAfter]; + string second = text[splitAfter..]; + + if (first.Length > 0) + { + yield return new OpenAIAfferentThoughtChunk(chunk.Sender, first, chunk.ResponseId, chunk.Timestamp, chunk.Model); + } + if (second.Length > 0) + { + yield return new OpenAIAfferentThoughtChunk(chunk.Sender, second, chunk.ResponseId, chunk.Timestamp, chunk.Model); + } + } + + private static int IndexOfParagraphBoundary(string s, out int boundaryLength) + { + boundaryLength = 0; + for (int i = 0; i < s.Length; i++) + { + char c = s[i]; + // CRLF CRLF + if (c == '\r' && i + 3 < s.Length && s[i + 1] == '\n' && s[i + 2] == '\r' && s[i + 3] == '\n') + { + boundaryLength = 4; + return i; + } + // LF LF + if (c == '\n' && i + 1 < s.Length && s[i + 1] == '\n') + { + boundaryLength = 2; + return i; + } + } + return -1; + } +} diff --git a/src/Coven.Agents.OpenAI/OpenAIParagraphWindowPolicy.cs b/src/Coven.Agents.OpenAI/OpenAIParagraphWindowPolicy.cs new file mode 100644 index 0000000..b9ad0cf --- /dev/null +++ b/src/Coven.Agents.OpenAI/OpenAIParagraphWindowPolicy.cs @@ -0,0 +1,54 @@ +// SPDX-License-Identifier: BUSL-1.1 +using System.Text; +using Coven.Core.Streaming; + +namespace Coven.Agents.OpenAI; + +/// +/// Emits when the concatenated OpenAI text (last 1–2 chunks) ends at a paragraph boundary. +/// A paragraph boundary is a double newline sequence ("\r\n\r\n" or "\n\n"). +/// Uses a minimal lookback of 2 to account for boundaries that straddle chunk edges. +/// +public sealed class OpenAIParagraphWindowPolicy : IWindowPolicy +{ + public int MinChunkLookback => 2; + + public bool ShouldEmit(StreamWindow window) + { + StringBuilder sb = new(); + foreach (OpenAIAfferentChunk chunk in window.PendingChunks) + { + if (!string.IsNullOrEmpty(chunk.Text)) + { + sb.Append(chunk.Text); + } + } + + if (sb.Length == 0) + { + return false; + } + + string s = TrimEndExceptNewlines(sb.ToString()); + return s.EndsWith("\r\n\r\n", StringComparison.Ordinal) || s.EndsWith("\n\n", StringComparison.Ordinal); + } + + private static string TrimEndExceptNewlines(string s) + { + int end = s.Length; + while (end > 0) + { + char c = s[end - 1]; + if (c is ' ' or '\t') + { + end--; + } + else + { + break; + } + } + return end == s.Length ? s : s[..end]; + } +} + diff --git a/src/Coven.Agents.OpenAI/OpenAIRequestGatewayConnection.cs b/src/Coven.Agents.OpenAI/OpenAIRequestGatewayConnection.cs index ba66255..9c9d65c 100644 --- a/src/Coven.Agents.OpenAI/OpenAIRequestGatewayConnection.cs +++ b/src/Coven.Agents.OpenAI/OpenAIRequestGatewayConnection.cs @@ -2,6 +2,7 @@ using System.ClientModel; using Coven.Core; +using Coven.Transmutation; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using OpenAI; @@ -14,13 +15,15 @@ internal sealed class OpenAIRequestGatewayConnection( [FromKeyedServices("Coven.InternalOpenAIScrivener")] IScrivener journal, ILogger logger, OpenAIClient openAIClient, - IOpenAITranscriptBuilder transcriptBuilder) : IOpenAIGatewayConnection + IOpenAITranscriptBuilder transcriptBuilder, + ITransmuter responseOptionsTransmuter) : IOpenAIGatewayConnection { private readonly OpenAIClientConfig _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration)); private readonly IScrivener _journal = journal ?? throw new ArgumentNullException(nameof(journal)); private readonly ILogger _logger = logger ?? throw new ArgumentNullException(nameof(logger)); private readonly OpenAIResponseClient _client = openAIClient.GetOpenAIResponseClient(configuration.Model) ?? throw new ArgumentNullException(nameof(openAIClient)); private readonly IOpenAITranscriptBuilder _transcriptBuilder = transcriptBuilder ?? throw new ArgumentNullException(nameof(transcriptBuilder)); + private readonly ITransmuter _responseOptionsTransmuter = responseOptionsTransmuter ?? throw new ArgumentNullException(nameof(responseOptionsTransmuter)); public Task ConnectAsync() { @@ -28,19 +31,14 @@ public Task ConnectAsync() return Task.CompletedTask; } - public async Task SendAsync(OpenAIOutgoing outgoing, CancellationToken cancellationToken) + public async Task SendAsync(OpenAIEfferent outgoing, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); - List input = await _transcriptBuilder.BuildAsync(outgoing, _configuration.HistoryClip ?? int.MaxValue, cancellationToken).ConfigureAwait(false); + List input = await _transcriptBuilder.BuildAsync(outgoing, _configuration.HistoryClip, cancellationToken).ConfigureAwait(false); OpenAILog.OutboundSendStart(_logger, input.Count); - ResponseCreationOptions options = new() - { - Temperature = _configuration.Temperature, - TopP = _configuration.TopP, - MaxOutputTokenCount = _configuration.MaxOutputTokens - }; + ResponseCreationOptions options = await _responseOptionsTransmuter.Transmute(_configuration, cancellationToken).ConfigureAwait(false); OpenAIResponse response; try @@ -58,15 +56,36 @@ public async Task SendAsync(OpenAIOutgoing outgoing, CancellationToken cancellat throw; } + // Surface any reasoning/thought summaries if present in the non-streaming response. + if (response.OutputItems is not null) + { + foreach (ResponseItem item in response.OutputItems) + { + if (item is ReasoningResponseItem reasoning) + { + string summary = reasoning.GetSummaryText(); + if (!string.IsNullOrEmpty(summary)) + { + OpenAIThought thought = new( + Sender: "openai", + Text: summary, + ResponseId: response.Id, + Timestamp: response.CreatedAt, + Model: response.Model); + await _journal.WriteAsync(thought, cancellationToken).ConfigureAwait(false); + } + } + } + } + string text = response.GetOutputText() ?? string.Empty; - OpenAIIncoming incoming = new( + OpenAIAfferent incoming = new( Sender: "openai", Text: text, ResponseId: response.Id, Timestamp: response.CreatedAt, Model: response.Model); - await _journal.WriteAsync(incoming, cancellationToken).ConfigureAwait(false); } } diff --git a/src/Coven.Agents.OpenAI/OpenAIResponseOptionsTransmuter.cs b/src/Coven.Agents.OpenAI/OpenAIResponseOptionsTransmuter.cs new file mode 100644 index 0000000..a295dff --- /dev/null +++ b/src/Coven.Agents.OpenAI/OpenAIResponseOptionsTransmuter.cs @@ -0,0 +1,45 @@ +// SPDX-License-Identifier: BUSL-1.1 + +using Coven.Transmutation; +using OpenAI.Responses; + +namespace Coven.Agents.OpenAI; + +internal sealed class OpenAIResponseOptionsTransmuter : ITransmuter +{ + public Task Transmute(OpenAIClientConfig Input, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(Input); + cancellationToken.ThrowIfCancellationRequested(); + + ResponseCreationOptions options = new() + { + Temperature = Input.Temperature, + TopP = Input.TopP, + MaxOutputTokenCount = Input.MaxOutputTokens + }; + + if (Input.Reasoning is not null) + { + options.ReasoningOptions = new ResponseReasoningOptions() + { + ReasoningEffortLevel = Input.Reasoning.Effort switch + { + ReasoningEffort.Low => ResponseReasoningEffortLevel.Low, + ReasoningEffort.Medium => ResponseReasoningEffortLevel.Medium, + ReasoningEffort.High => ResponseReasoningEffortLevel.High, + _ => null + }, + ReasoningSummaryVerbosity = Input.Reasoning.SummaryVerbosity switch + { + ReasoningSummaryVerbosity.Auto => ResponseReasoningSummaryVerbosity.Auto, + ReasoningSummaryVerbosity.Detailed => ResponseReasoningSummaryVerbosity.Detailed, + ReasoningSummaryVerbosity.Concise => ResponseReasoningSummaryVerbosity.Concise, + _ => null + } + }; + } + + return Task.FromResult(options); + } +} diff --git a/src/Coven.Agents.OpenAI/OpenAIScrivener.cs b/src/Coven.Agents.OpenAI/OpenAIScrivener.cs index b50538a..f05f025 100644 --- a/src/Coven.Agents.OpenAI/OpenAIScrivener.cs +++ b/src/Coven.Agents.OpenAI/OpenAIScrivener.cs @@ -27,7 +27,7 @@ public OpenAIScrivener( public async Task WriteAsync(OpenAIEntry entry, CancellationToken cancellationToken = default) { - if (entry is OpenAIOutgoing outgoing) + if (entry is OpenAIEfferent outgoing) { await _gateway.SendAsync(outgoing, cancellationToken).ConfigureAwait(false); } diff --git a/src/Coven.Agents.OpenAI/OpenAIStreamingGatewayConnection.cs b/src/Coven.Agents.OpenAI/OpenAIStreamingGatewayConnection.cs index 783cbd0..057fc7c 100644 --- a/src/Coven.Agents.OpenAI/OpenAIStreamingGatewayConnection.cs +++ b/src/Coven.Agents.OpenAI/OpenAIStreamingGatewayConnection.cs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: BUSL-1.1 +using System.Reflection; using Coven.Core; +using Coven.Transmutation; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using OpenAI; @@ -13,13 +15,15 @@ internal sealed class OpenAIStreamingGatewayConnection( [FromKeyedServices("Coven.InternalOpenAIScrivener")] IScrivener journal, ILogger logger, OpenAIClient openAIClient, - IOpenAITranscriptBuilder transcriptBuilder) : IOpenAIGatewayConnection + IOpenAITranscriptBuilder transcriptBuilder, + ITransmuter responseOptionsTransmuter) : IOpenAIGatewayConnection { private readonly OpenAIClientConfig _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration)); private readonly IScrivener _journal = journal ?? throw new ArgumentNullException(nameof(journal)); private readonly ILogger _logger = logger ?? throw new ArgumentNullException(nameof(logger)); private readonly OpenAIResponseClient _client = openAIClient.GetOpenAIResponseClient(configuration.Model) ?? throw new ArgumentNullException(nameof(openAIClient)); private readonly IOpenAITranscriptBuilder _transcriptBuilder = transcriptBuilder ?? throw new ArgumentNullException(nameof(transcriptBuilder)); + private readonly ITransmuter _responseOptionsTransmuter = responseOptionsTransmuter ?? throw new ArgumentNullException(nameof(responseOptionsTransmuter)); public Task ConnectAsync() { @@ -27,19 +31,14 @@ public Task ConnectAsync() return Task.CompletedTask; } - public async Task SendAsync(OpenAIOutgoing outgoing, CancellationToken cancellationToken) + public async Task SendAsync(OpenAIEfferent outgoing, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); - List input = await _transcriptBuilder.BuildAsync(outgoing, _configuration.HistoryClip ?? int.MaxValue, cancellationToken).ConfigureAwait(false); + List input = await _transcriptBuilder.BuildAsync(outgoing, _configuration.HistoryClip, cancellationToken).ConfigureAwait(false); OpenAILog.OutboundSendStart(_logger, input.Count); - ResponseCreationOptions options = new() - { - Temperature = _configuration.Temperature, - TopP = _configuration.TopP, - MaxOutputTokenCount = _configuration.MaxOutputTokens - }; + ResponseCreationOptions options = await _responseOptionsTransmuter.Transmute(_configuration, cancellationToken).ConfigureAwait(false); string model = _configuration.Model; string responseId = string.Empty; @@ -62,7 +61,7 @@ public async Task SendAsync(OpenAIOutgoing outgoing, CancellationToken cancellat case StreamingResponseOutputTextDeltaUpdate textDelta: if (!string.IsNullOrEmpty(textDelta.Delta)) { - OpenAIIncomingChunk chunk = new( + OpenAIAfferentChunk chunk = new( Sender: "openai", Text: textDelta.Delta, ResponseId: responseId, @@ -85,6 +84,18 @@ public async Task SendAsync(OpenAIOutgoing outgoing, CancellationToken cancellat case StreamingResponseFailedUpdate failed when failed.Response is not null: throw new InvalidOperationException($"OpenAI streaming failed: {failed.Response.Status}"); + case var rsDelta when TryGetReasoningSummaryTextDelta(update, out string? rDelta) && !string.IsNullOrEmpty(rDelta): + { + OpenAIAfferentThoughtChunk tChunk = new( + Sender: "openai", + Text: rDelta, + ResponseId: responseId, + Timestamp: createdAt, + Model: model); + await _journal.WriteAsync(tChunk, cancellationToken).ConfigureAwait(false); + break; + } + default: break; } @@ -106,4 +117,30 @@ public async Task SendAsync(OpenAIOutgoing outgoing, CancellationToken cancellat Model: model); await _journal.WriteAsync(done, cancellationToken).ConfigureAwait(false); } + + /// + /// Filthy shim. Do not copy this, this is bad code. We just have this here to handle people being too slow to update the official SDK. + /// + /// + /// + /// + private static bool TryGetReasoningSummaryTextDelta(object? update, out string? delta) + { + delta = null; + if (update is null) + { + return false; + } + + Type t = update.GetType(); + // Current SDK keeps these types internal; detect by simple name to avoid tight coupling. + if (t.Name is "StreamingResponseReasoningSummaryTextDeltaUpdate" or "InternalResponseReasoningSummaryTextDeltaEvent") + { + // internal/public both expose a string Delta; grab it reflexively + PropertyInfo? prop = t.GetProperty("Delta", BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic); + delta = prop?.GetValue(update)?.ToString(); + return !string.IsNullOrEmpty(delta); + } + return false; + } } diff --git a/src/Coven.Agents.OpenAI/OpenAITransmuter.cs b/src/Coven.Agents.OpenAI/OpenAITransmuter.cs index cf5babe..eddb714 100644 --- a/src/Coven.Agents.OpenAI/OpenAITransmuter.cs +++ b/src/Coven.Agents.OpenAI/OpenAITransmuter.cs @@ -6,31 +6,40 @@ namespace Coven.Agents.OpenAI; public sealed class OpenAITransmuter : IBiDirectionalTransmuter { - public Task TransmuteIn(OpenAIEntry Input, CancellationToken cancellationToken) + public Task TransmuteAfferent(OpenAIEntry Input, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); return Input switch { - OpenAIIncoming incoming => Task.FromResult(new AgentResponse(incoming.Sender, incoming.Text)), - OpenAIIncomingChunk chunk => Task.FromResult(new AgentChunk(chunk.Sender, chunk.Text)), + OpenAIAfferent incoming => Task.FromResult(new AgentResponse(incoming.Sender, incoming.Text)), + // Today, Afferent chunks include thoughts, tomorrow who knows. + OpenAIAfferentChunk chunk => Task.FromResult(new AgentAfferentChunk(chunk.Sender, chunk.Text)), + // Streaming thought chunks from OpenAI surface as afferent thought drafts + OpenAIAfferentThoughtChunk tChunk => Task.FromResult(new AgentAfferentThoughtChunk(tChunk.Sender, tChunk.Text)), + // A full OpenAIThought should surface as a fixed AgentThought OpenAIThought thought => Task.FromResult(new AgentThought(thought.Sender, thought.Text)), OpenAIStreamCompleted done => Task.FromResult(new AgentStreamCompleted(done.Sender)), - OpenAIOutgoing outgoing => Task.FromResult(new AgentAck(outgoing.Sender, outgoing.Text)), + OpenAIEfferent outgoing => Task.FromResult(new AgentAck(outgoing.Sender)), _ => throw new ArgumentOutOfRangeException(nameof(Input)) }; } - public Task TransmuteOut(AgentEntry Output, CancellationToken cancellationToken) + public Task TransmuteEfferent(AgentEntry Output, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); return Output switch { - AgentPrompt prompt => Task.FromResult(new OpenAIOutgoing(prompt.Sender, prompt.Text)), + AgentPrompt prompt => Task.FromResult(new OpenAIEfferent(prompt.Sender, prompt.Text)), AgentResponse response => Task.FromResult(new OpenAIAck(response.Sender, response.Text)), AgentThought thought => Task.FromResult(new OpenAIAck(thought.Sender, thought.Text)), - AgentChunk chunk => Task.FromResult(new OpenAIAck(chunk.Sender, chunk.Text)), + AgentEfferentChunk affChunk => Task.FromResult(new OpenAIAck(affChunk.Sender, affChunk.Text)), + AgentAfferentChunk effChunk => Task.FromResult(new OpenAIAck(effChunk.Sender, effChunk.Text)), + // Streaming efferent thought drafts map to OpenAI efferent thought chunk (not forwarded by gateway today) + AgentEfferentThoughtChunk etChunk => Task.FromResult(new OpenAIEfferentThoughtChunk(etChunk.Sender, etChunk.Text)), + // Afferent thought drafts are not sent outward; ack for completeness + AgentAfferentThoughtChunk atChunk => Task.FromResult(new OpenAIAck(atChunk.Sender, atChunk.Text)), AgentStreamCompleted done => Task.FromResult(new OpenAIAck(done.Sender, string.Empty)), _ => throw new ArgumentOutOfRangeException(nameof(Output)) }; diff --git a/src/Coven.Agents.OpenAI/ReasoningConfig.cs b/src/Coven.Agents.OpenAI/ReasoningConfig.cs new file mode 100644 index 0000000..0d3d196 --- /dev/null +++ b/src/Coven.Agents.OpenAI/ReasoningConfig.cs @@ -0,0 +1,10 @@ +// SPDX-License-Identifier: BUSL-1.1 + +namespace Coven.Agents.OpenAI; + +public sealed class ReasoningConfig +{ + // When reasoning is provided, assume enabled and default required settings. + public ReasoningEffort Effort { get; init; } = ReasoningEffort.Medium; + public ReasoningSummaryVerbosity SummaryVerbosity { get; init; } = ReasoningSummaryVerbosity.Auto; +} diff --git a/src/Coven.Agents.OpenAI/ReasoningEffort.cs b/src/Coven.Agents.OpenAI/ReasoningEffort.cs new file mode 100644 index 0000000..ccb4601 --- /dev/null +++ b/src/Coven.Agents.OpenAI/ReasoningEffort.cs @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: BUSL-1.1 + +namespace Coven.Agents.OpenAI; + +/// +/// Reasoning effort configuration for models that support reasoning. +/// This is mapped internally to the OpenAI SDK's effort levels. +/// +public enum ReasoningEffort +{ + Low, + Medium, + High +} + diff --git a/src/Coven.Agents.OpenAI/ReasoningSummaryVerbosity.cs b/src/Coven.Agents.OpenAI/ReasoningSummaryVerbosity.cs new file mode 100644 index 0000000..23631e8 --- /dev/null +++ b/src/Coven.Agents.OpenAI/ReasoningSummaryVerbosity.cs @@ -0,0 +1,11 @@ +// SPDX-License-Identifier: BUSL-1.1 + +namespace Coven.Agents.OpenAI; + +public enum ReasoningSummaryVerbosity +{ + Auto, + Concise, + Detailed +} + diff --git a/src/Coven.Agents.OpenAI/ServiceCollectionExtensions.cs b/src/Coven.Agents.OpenAI/ServiceCollectionExtensions.cs index 9821b95..c57c2c2 100644 --- a/src/Coven.Agents.OpenAI/ServiceCollectionExtensions.cs +++ b/src/Coven.Agents.OpenAI/ServiceCollectionExtensions.cs @@ -74,29 +74,62 @@ public static IServiceCollection AddOpenAIAgents(this IServiceCollection service services.AddScoped, OpenAITransmuter>(); services.TryAddScoped, OpenAIEntryToResponseItemTransmuter>(); services.TryAddScoped(); + // Session-local shattering for OpenAI chunks: split on paragraph boundary + services.TryAddScoped>(_ => new OpenAIThoughtParagraphShatterPolicy()); + // Windowed output shattering for Agent thoughts: split on summary marker + services.TryAddScoped>(_ => new AgentThoughtSummaryShatterPolicy()); services.AddScoped, InMemoryScrivener>(); services.AddScoped(); services.AddScoped(); - // When streaming is enabled, include generic windowing daemon bound to Agent types + // When streaming is enabled, include generic windowing daemons for OpenAI and Agent types if (registration.StreamingEnabled) { + // Provide default window policies that can be overridden by the host. + // Paragraph boundary first, then a safety cap to avoid unbounded buffers. + services.TryAddScoped>(_ => + new CompositeWindowPolicy( + new AgentParagraphWindowPolicy(), + new AgentMaxLengthWindowPolicy(4096) + )); + services.TryAddScoped>(_ => + new CompositeWindowPolicy( + new AgentThoughtSummaryMarkerWindowPolicy(), + new AgentThoughtMaxLengthWindowPolicy(4096) + )); + + services.AddScoped(sp => + { + IScrivener daemonEvents = sp.GetRequiredService>(); + IScrivener agentJournal = sp.GetRequiredService>(); + + // Allow DI to provide a custom window policy via registration + IWindowPolicy policy = sp.GetRequiredService>(); + IBatchTransmuter batchTransmuter = + sp.GetRequiredService>(); + IShatterPolicy? shatterPolicy = sp.GetService>(); + + return new StreamWindowingDaemon( + daemonEvents, agentJournal, policy, batchTransmuter, shatterPolicy); + }); + services.AddScoped(sp => { IScrivener daemonEvents = sp.GetRequiredService>(); IScrivener agentJournal = sp.GetRequiredService>(); - // Allow DI to provide a custom window policy; fall back to final-only - IWindowPolicy policy = - sp.GetService>() ?? new LambdaWindowPolicy(1, _ => false); - ITransmuter, BatchTransmuteResult> batchTransmuter = - sp.GetRequiredService, BatchTransmuteResult>>(); + IWindowPolicy policy = sp.GetRequiredService>(); + IBatchTransmuter batchTransmuter = + sp.GetRequiredService>(); + IShatterPolicy? shatterPolicy = sp.GetService>(); - return new StreamWindowingDaemon( - daemonEvents, agentJournal, policy, batchTransmuter); + return new StreamWindowingDaemon( + daemonEvents, agentJournal, policy, batchTransmuter, shatterPolicy); }); } - services.TryAddScoped, BatchTransmuteResult>, AgentChunkBatchTransmuter>(); + services.TryAddScoped, AgentAfferentBatchTransmuter>(); + services.TryAddScoped, AgentAfferentThoughtBatchTransmuter>(); + services.TryAddScoped, OpenAIResponseOptionsTransmuter>(); return services; } } diff --git a/src/Coven.Agents/AgentChunkBatchTransmuter.cs b/src/Coven.Agents/AgentAfferentBatchTransmuter.cs similarity index 57% rename from src/Coven.Agents/AgentChunkBatchTransmuter.cs rename to src/Coven.Agents/AgentAfferentBatchTransmuter.cs index b7e3f7e..7cb0c05 100644 --- a/src/Coven.Agents/AgentChunkBatchTransmuter.cs +++ b/src/Coven.Agents/AgentAfferentBatchTransmuter.cs @@ -1,19 +1,18 @@ // SPDX-License-Identifier: BUSL-1.1 using System.Text; -using Coven.Core.Streaming; using Coven.Transmutation; namespace Coven.Agents; -public sealed class AgentChunkBatchTransmuter : ITransmuter, BatchTransmuteResult> +public sealed class AgentAfferentBatchTransmuter : IBatchTransmuter { - public Task> Transmute(IEnumerable Input, CancellationToken cancellationToken = default) + public Task> Transmute(IEnumerable Input, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(Input); string sender = string.Empty; StringBuilder sb = new(); - foreach (AgentChunk chunk in Input) + foreach (AgentAfferentChunk chunk in Input) { cancellationToken.ThrowIfCancellationRequested(); if (!string.IsNullOrEmpty(chunk.Sender)) @@ -27,6 +26,6 @@ public Task> Transmute(IEnumerab } AgentResponse output = new(sender, sb.ToString()); - return Task.FromResult(new BatchTransmuteResult(output, false, null)); + return Task.FromResult(new BatchTransmuteResult(output, false, null)); } } diff --git a/src/Coven.Agents/AgentAfferentThoughtBatchTransmuter.cs b/src/Coven.Agents/AgentAfferentThoughtBatchTransmuter.cs new file mode 100644 index 0000000..5373314 --- /dev/null +++ b/src/Coven.Agents/AgentAfferentThoughtBatchTransmuter.cs @@ -0,0 +1,31 @@ +// SPDX-License-Identifier: BUSL-1.1 +using System.Text; +using Coven.Transmutation; + +namespace Coven.Agents; + +public sealed class AgentAfferentThoughtBatchTransmuter : IBatchTransmuter +{ + public Task> Transmute(IEnumerable Input, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(Input); + + string sender = string.Empty; + StringBuilder sb = new(); + foreach (AgentAfferentThoughtChunk chunk in Input) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!string.IsNullOrEmpty(chunk.Sender)) + { + sender = chunk.Sender; + } + if (!string.IsNullOrEmpty(chunk.Text)) + { + sb.Append(chunk.Text); + } + } + + AgentThought output = new(sender, sb.ToString()); + return Task.FromResult(new BatchTransmuteResult(output, false, null)); + } +} diff --git a/src/Coven.Agents/AgentEfferentBatchTransmuter.cs b/src/Coven.Agents/AgentEfferentBatchTransmuter.cs new file mode 100644 index 0000000..00cf9a0 --- /dev/null +++ b/src/Coven.Agents/AgentEfferentBatchTransmuter.cs @@ -0,0 +1,31 @@ +// SPDX-License-Identifier: BUSL-1.1 +using System.Text; +using Coven.Transmutation; + +namespace Coven.Agents; + +public sealed class AgentEfferentBatchTransmuter : IBatchTransmuter +{ + public Task> Transmute(IEnumerable Input, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(Input); + + string sender = string.Empty; + StringBuilder sb = new(); + foreach (AgentEfferentChunk chunk in Input) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!string.IsNullOrEmpty(chunk.Sender)) + { + sender = chunk.Sender; + } + if (!string.IsNullOrEmpty(chunk.Text)) + { + sb.Append(chunk.Text); + } + } + + AgentPrompt output = new(sender, sb.ToString()); + return Task.FromResult(new BatchTransmuteResult(output, false, null)); + } +} diff --git a/src/Coven.Agents/AgentEntry.cs b/src/Coven.Agents/AgentEntry.cs index c5042f5..1a9d984 100644 --- a/src/Coven.Agents/AgentEntry.cs +++ b/src/Coven.Agents/AgentEntry.cs @@ -3,17 +3,25 @@ namespace Coven.Agents; // Minimal agent entry union used with IScrivener -public abstract record AgentEntry(string Sender, string Text); +public abstract record AgentEntry(string Sender); -public sealed record AgentPrompt(string Sender, string Text) : AgentEntry(Sender, Text); +// Unfixed/draft entries that should never be forwarded by adapters directly +public abstract record AgentEntryDraft(string Sender) : AgentEntry(Sender); -public sealed record AgentResponse(string Sender, string Text) : AgentEntry(Sender, Text); +public sealed record AgentPrompt(string Sender, string Text) : AgentEntry(Sender); -public sealed record AgentThought(string Sender, string Text) : AgentEntry(Sender, Text); +public sealed record AgentResponse(string Sender, string Text) : AgentEntry(Sender); -public sealed record AgentAck(string Sender, string Text) : AgentEntry(Sender, Text); +public sealed record AgentThought(string Sender, string Text) : AgentEntry(Sender); + +public sealed record AgentAck(string Sender) : AgentEntry(Sender); // Streaming additions -public sealed record AgentChunk(string Sender, string Text) : AgentEntry(Sender, Text); +public sealed record AgentEfferentChunk(string Sender, string Text) : AgentEntryDraft(Sender); + +public sealed record AgentAfferentChunk(string Sender, string Text) : AgentEntryDraft(Sender); +public sealed record AgentEfferentThoughtChunk(string Sender, string Text) : AgentEntryDraft(Sender); + +public sealed record AgentAfferentThoughtChunk(string Sender, string Text) : AgentEntryDraft(Sender); -public sealed record AgentStreamCompleted(string Sender) : AgentEntry(Sender, ""); +public sealed record AgentStreamCompleted(string Sender) : AgentEntryDraft(Sender); diff --git a/src/Coven.Chat.Console/ConsoleChatSession.cs b/src/Coven.Chat.Console/ConsoleChatSession.cs index 69666cf..6456772 100644 --- a/src/Coven.Chat.Console/ConsoleChatSession.cs +++ b/src/Coven.Chat.Console/ConsoleChatSession.cs @@ -39,7 +39,7 @@ public async Task StartAsync() } ConsoleLog.ConsoleToChatObserved(_logger, entry.GetType().Name, position); - ChatEntry chat = await _transmuter.TransmuteIn(entry, ct).ConfigureAwait(false); + ChatEntry chat = await _transmuter.TransmuteAfferent(entry, ct).ConfigureAwait(false); ConsoleLog.ConsoleToChatTransmuted(_logger, entry.GetType().Name, chat.GetType().Name); long chatPos = await _chatJournal.WriteAsync(chat, ct).ConfigureAwait(false); ConsoleLog.ConsoleToChatAppended(_logger, chat.GetType().Name, chatPos); @@ -64,13 +64,13 @@ public async Task StartAsync() await foreach ((long position, ChatEntry entry) in _chatJournal.TailAsync(0, ct)) { // Forward only fixed ChatOutgoing to console - if (entry is not ChatOutgoing) + if (entry is not ChatEfferent) { continue; } ConsoleLog.ChatToConsoleObserved(_logger, entry.GetType().Name, position); - ConsoleEntry console = await _transmuter.TransmuteOut(entry, ct).ConfigureAwait(false); + ConsoleEntry console = await _transmuter.TransmuteEfferent(entry, ct).ConfigureAwait(false); ConsoleLog.ChatToConsoleTransmuted(_logger, entry.GetType().Name, console.GetType().Name); long consolePos = await _consoleJournal.WriteAsync(console, ct).ConfigureAwait(false); ConsoleLog.ChatToConsoleAppended(_logger, console.GetType().Name, consolePos); diff --git a/src/Coven.Chat.Console/ConsoleEntry.cs b/src/Coven.Chat.Console/ConsoleEntry.cs index 212364d..7f8a9de 100644 --- a/src/Coven.Chat.Console/ConsoleEntry.cs +++ b/src/Coven.Chat.Console/ConsoleEntry.cs @@ -13,12 +13,12 @@ public sealed record ConsoleAck( string Text ) : ConsoleEntry(Sender, Text); -public sealed record ConsoleIncoming( +public sealed record ConsoleAfferent( string Sender, string Text ) : ConsoleEntry(Sender, Text); -public sealed record ConsoleOutgoing( +public sealed record ConsoleEfferent( string Sender, string Text ) : ConsoleEntry(Sender, Text); diff --git a/src/Coven.Chat.Console/ConsoleGatewayConnection.cs b/src/Coven.Chat.Console/ConsoleGatewayConnection.cs index 00e3097..6c5dfbb 100644 --- a/src/Coven.Chat.Console/ConsoleGatewayConnection.cs +++ b/src/Coven.Chat.Console/ConsoleGatewayConnection.cs @@ -50,10 +50,10 @@ public Task ConnectAsync(CancellationToken cancellationToken) } string sender = _configuration.InputSender; - ConsoleIncoming incoming = new(sender, line); + ConsoleAfferent incoming = new(sender, line); ConsoleLog.InboundUserLineReceived(_logger, sender, line.Length); long pos = await _scrivener.WriteAsync(incoming, ct).ConfigureAwait(false); - ConsoleLog.InboundAppendedToJournal(_logger, nameof(ConsoleIncoming), pos); + ConsoleLog.InboundAppendedToJournal(_logger, nameof(ConsoleAfferent), pos); } }, cancellationToken); diff --git a/src/Coven.Chat.Console/ConsoleLog.cs b/src/Coven.Chat.Console/ConsoleLog.cs index 5770791..77e12f3 100644 --- a/src/Coven.Chat.Console/ConsoleLog.cs +++ b/src/Coven.Chat.Console/ConsoleLog.cs @@ -18,13 +18,13 @@ internal static class ConsoleLog private static readonly Action _outboundSendStart = LoggerMessage.Define( - LogLevel.Debug, + LogLevel.Information, new EventId(3006, nameof(OutboundSendStart)), "Sending console line (length {Length})."); private static readonly Action _outboundSendSucceeded = LoggerMessage.Define( - LogLevel.Debug, + LogLevel.Trace, new EventId(3007, nameof(OutboundSendSucceeded)), "Sent console line."); @@ -48,55 +48,55 @@ internal static class ConsoleLog private static readonly Action _inboundEmptySkipped = LoggerMessage.Define( - LogLevel.Debug, + LogLevel.Trace, new EventId(3011, nameof(InboundEmptySkipped)), "Inbound empty/whitespace line skipped."); private static readonly Action _inboundAppendedToJournal = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(3012, nameof(InboundAppendedToJournal)), "Appended inbound {EntryType} to Console journal at position {Position}."); private static readonly Action _consoleToChatObserved = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(3020, nameof(ConsoleToChatObserved)), "Console→Chat observed {EntryType} at position {Position}."); private static readonly Action _consoleToChatTransmuted = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(3021, nameof(ConsoleToChatTransmuted)), "Console→Chat transmuted {FromType} → {ToType}."); private static readonly Action _consoleToChatAppended = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(3022, nameof(ConsoleToChatAppended)), "Console→Chat appended {EntryType} to Chat journal at position {Position}."); private static readonly Action _chatToConsoleObserved = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(3030, nameof(ChatToConsoleObserved)), "Chat→Console observed {EntryType} at position {Position}."); private static readonly Action _chatToConsoleTransmuted = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(3031, nameof(ChatToConsoleTransmuted)), "Chat→Console transmuted {FromType} → {ToType}."); private static readonly Action _chatToConsoleAppended = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(3032, nameof(ChatToConsoleAppended)), "Chat→Console appended {EntryType} to Console journal at position {Position}."); private static readonly Action _consoleScrivenerAppended = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(3040, nameof(ConsoleScrivenerAppended)), "ConsoleScrivener appended {EntryType} to internal journal at position {Position}."); diff --git a/src/Coven.Chat.Console/ConsoleScrivener.cs b/src/Coven.Chat.Console/ConsoleScrivener.cs index 81266f1..9016abc 100644 --- a/src/Coven.Chat.Console/ConsoleScrivener.cs +++ b/src/Coven.Chat.Console/ConsoleScrivener.cs @@ -25,7 +25,7 @@ public ConsoleScrivener( public async Task WriteAsync(ConsoleEntry entry, CancellationToken cancellationToken = default) { - if (entry is ConsoleOutgoing) + if (entry is ConsoleEfferent) { await _gateway.SendAsync(entry.Text, cancellationToken).ConfigureAwait(false); } diff --git a/src/Coven.Chat.Console/ConsoleTransmuter.cs b/src/Coven.Chat.Console/ConsoleTransmuter.cs index ff61fba..2ad25b7 100644 --- a/src/Coven.Chat.Console/ConsoleTransmuter.cs +++ b/src/Coven.Chat.Console/ConsoleTransmuter.cs @@ -6,32 +6,32 @@ public sealed class ConsoleTransmuter(ConsoleClientConfig config) : IBiDirection { private readonly ConsoleClientConfig _config = config ?? throw new ArgumentNullException(nameof(config)); - public Task TransmuteIn(ConsoleEntry Input, CancellationToken cancellationToken) + public Task TransmuteAfferent(ConsoleEntry Input, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); return Input switch { - ConsoleIncoming incoming => Task.FromResult(new ChatIncoming(incoming.Sender, incoming.Text)), - ConsoleOutgoing outgoing => Task.FromResult(new ChatAck(outgoing.Sender, outgoing.Text)), + ConsoleAfferent incoming => Task.FromResult(new ChatAfferent(incoming.Sender, incoming.Text)), + ConsoleEfferent outgoing => Task.FromResult(new ChatAck(outgoing.Sender, outgoing.Text)), _ => throw new ArgumentOutOfRangeException(nameof(Input)) }; } - public Task TransmuteOut(ChatEntry Output, CancellationToken cancellationToken) + public Task TransmuteEfferent(ChatEntry Output, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); return Output switch { - ChatOutgoing outgoing => Task.FromResult(new ConsoleOutgoing(_config.OutputSender, outgoing.Text)), + ChatEfferent outgoing => Task.FromResult(new ConsoleEfferent(_config.OutputSender, outgoing.Text)), // Internal/unfixed artifacts or inbound: acknowledge only - ChatOutgoingDraft draft => Task.FromResult(new ConsoleAck(draft.Sender, draft.Text)), + ChatEfferentDraft draft => Task.FromResult(new ConsoleAck(draft.Sender, draft.Text)), ChatChunk chunk => Task.FromResult(new ConsoleAck(chunk.Sender, chunk.Text)), ChatStreamCompleted done => Task.FromResult(new ConsoleAck(done.Sender, string.Empty)), - ChatIncoming incoming => Task.FromResult(new ConsoleAck(incoming.Sender, incoming.Text)), - ChatIncomingDraft incomingDraft => Task.FromResult(new ConsoleAck(incomingDraft.Sender, incomingDraft.Text)), + ChatAfferent incoming => Task.FromResult(new ConsoleAck(incoming.Sender, incoming.Text)), + ChatAfferentDraft incomingDraft => Task.FromResult(new ConsoleAck(incomingDraft.Sender, incomingDraft.Text)), _ => throw new ArgumentOutOfRangeException(nameof(Output)) }; } diff --git a/src/Coven.Chat.Discord/DiscordChatSession.cs b/src/Coven.Chat.Discord/DiscordChatSession.cs index 3f79f95..1156a80 100644 --- a/src/Coven.Chat.Discord/DiscordChatSession.cs +++ b/src/Coven.Chat.Discord/DiscordChatSession.cs @@ -41,7 +41,7 @@ public async Task StartAsync() } DiscordLog.DiscordToChatObserved(_logger, entry.GetType().Name, position); - ChatEntry chat = await _transmuter.TransmuteIn(entry, ct).ConfigureAwait(false); + ChatEntry chat = await _transmuter.TransmuteAfferent(entry, ct).ConfigureAwait(false); DiscordLog.DiscordToChatTransmuted(_logger, entry.GetType().Name, chat.GetType().Name); long chatPos = await _chatJournal.WriteAsync(chat, ct).ConfigureAwait(false); DiscordLog.DiscordToChatAppended(_logger, chat.GetType().Name, chatPos); @@ -68,7 +68,7 @@ public async Task StartAsync() DiscordLog.ChatToDiscordObserved(_logger, entry.GetType().Name, position); // Session-local shattering for drafts - if (entry is ChatOutgoingDraft draft) + if (entry is ChatEfferentDraft draft) { bool produced = false; IEnumerable outputs = _shatterPolicy.Shatter(draft) ?? []; @@ -90,18 +90,18 @@ public async Task StartAsync() } // Fallback: convert draft to fixed and write for forwarding - long fixedPos = await _chatJournal.WriteAsync(new ChatOutgoing(draft.Sender, draft.Text), ct).ConfigureAwait(false); - DiscordLog.ChatToDiscordAppended(_logger, nameof(ChatOutgoing), fixedPos); + long fixedPos = await _chatJournal.WriteAsync(new ChatEfferent(draft.Sender, draft.Text), ct).ConfigureAwait(false); + DiscordLog.ChatToDiscordAppended(_logger, nameof(ChatEfferent), fixedPos); continue; // handled by subsequent iterations } // Forward only fixed ChatOutgoing to Discord - if (entry is not ChatOutgoing) + if (entry is not ChatEfferent) { continue; } - DiscordEntry discord = await _transmuter.TransmuteOut(entry, ct).ConfigureAwait(false); + DiscordEntry discord = await _transmuter.TransmuteEfferent(entry, ct).ConfigureAwait(false); DiscordLog.ChatToDiscordTransmuted(_logger, entry.GetType().Name, discord.GetType().Name); long discPos = await _discordJournal.WriteAsync(discord, ct).ConfigureAwait(false); DiscordLog.ChatToDiscordAppended(_logger, discord.GetType().Name, discPos); diff --git a/src/Coven.Chat.Discord/DiscordEntry.cs b/src/Coven.Chat.Discord/DiscordEntry.cs index b018c04..8751115 100644 --- a/src/Coven.Chat.Discord/DiscordEntry.cs +++ b/src/Coven.Chat.Discord/DiscordEntry.cs @@ -11,12 +11,12 @@ public sealed record DiscordAck( string Text ) : DiscordEntry(Sender, Text); -public sealed record DiscordIncoming( +public sealed record DiscordAfferent( string Sender, string Text, string MessageId, DateTimeOffset Timestamp) : DiscordEntry(Sender, Text); -public sealed record DiscordOutgoing( +public sealed record DiscordEfferent( string Sender, string Text) : DiscordEntry(Sender, Text); diff --git a/src/Coven.Chat.Discord/DiscordGatewayConnection.cs b/src/Coven.Chat.Discord/DiscordGatewayConnection.cs index 68ea223..602d375 100644 --- a/src/Coven.Chat.Discord/DiscordGatewayConnection.cs +++ b/src/Coven.Chat.Discord/DiscordGatewayConnection.cs @@ -135,7 +135,7 @@ private async Task OnMessageReceivedAsync(SocketMessage message) throw new InvalidOperationException("Discord message author username is missing."); } - DiscordIncoming incoming = new( + DiscordAfferent incoming = new( Sender: sender, Text: message.Content ?? string.Empty, MessageId: message.Id.ToString(CultureInfo.InvariantCulture), @@ -145,7 +145,7 @@ private async Task OnMessageReceivedAsync(SocketMessage message) // Scrivener is responsible for synchronizing etc DiscordLog.InboundUserMessageReceived(_logger, sender, incoming.Text.Length); long position = await _scrivener.WriteAsync(incoming).ConfigureAwait(false); - DiscordLog.InboundAppendedToJournal(_logger, nameof(DiscordIncoming), position); + DiscordLog.InboundAppendedToJournal(_logger, nameof(DiscordAfferent), position); } public void Dispose() diff --git a/src/Coven.Chat.Discord/DiscordLog.cs b/src/Coven.Chat.Discord/DiscordLog.cs index 000aba9..d26e819 100644 --- a/src/Coven.Chat.Discord/DiscordLog.cs +++ b/src/Coven.Chat.Discord/DiscordLog.cs @@ -19,13 +19,13 @@ internal static class DiscordLog // Outbound send breadcrumbs private static readonly Action _outboundSendStart = LoggerMessage.Define( - LogLevel.Debug, + LogLevel.Trace, new EventId(2006, nameof(OutboundSendStart)), "Sending message to channel {ChannelId} (length {Length})."); private static readonly Action _outboundSendSucceeded = LoggerMessage.Define( - LogLevel.Debug, + LogLevel.Trace, new EventId(2007, nameof(OutboundSendSucceeded)), "Sent message to channel {ChannelId}."); @@ -44,25 +44,25 @@ internal static class DiscordLog // Channel resolution breadcrumbs private static readonly Action _channelCacheHit = LoggerMessage.Define( - LogLevel.Debug, + LogLevel.Trace, new EventId(2010, nameof(ChannelCacheHit)), "Channel cache hit for {ChannelId}."); private static readonly Action _channelCacheMiss = LoggerMessage.Define( - LogLevel.Debug, + LogLevel.Trace, new EventId(2011, nameof(ChannelCacheMiss)), "Channel cache miss for {ChannelId}; falling back to REST."); private static readonly Action _channelRestFetchStart = LoggerMessage.Define( - LogLevel.Debug, + LogLevel.Trace, new EventId(2012, nameof(ChannelRestFetchStart)), "Fetching channel {ChannelId} via REST."); private static readonly Action _channelRestFetchSuccess = LoggerMessage.Define( - LogLevel.Debug, + LogLevel.Trace, new EventId(2013, nameof(ChannelRestFetchSuccess)), "Fetched channel {ChannelId} via REST."); @@ -93,58 +93,58 @@ internal static class DiscordLog private static readonly Action _inboundBotMessageObserved = LoggerMessage.Define( - LogLevel.Debug, + LogLevel.Trace, new EventId(2021, nameof(InboundBotMessageObserved)), "Observed bot-authored message from {Sender} (length {Length}); recording ACK."); private static readonly Action _inboundAppendedToJournal = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(2022, nameof(InboundAppendedToJournal)), "Appended inbound {EntryType} to Discord journal at position {Position}."); // Pump: Discord -> Chat private static readonly Action _discordToChatObserved = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(2030, nameof(DiscordToChatObserved)), "Discord→Chat observed {EntryType} at position {Position}."); private static readonly Action _discordToChatTransmuted = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(2031, nameof(DiscordToChatTransmuted)), "Discord→Chat transmuted {FromType} → {ToType}."); private static readonly Action _discordToChatAppended = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(2032, nameof(DiscordToChatAppended)), "Discord→Chat appended {EntryType} to Chat journal at position {Position}."); // Pump: Chat -> Discord private static readonly Action _chatToDiscordObserved = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(2040, nameof(ChatToDiscordObserved)), "Chat→Discord observed {EntryType} at position {Position}."); private static readonly Action _chatToDiscordTransmuted = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(2041, nameof(ChatToDiscordTransmuted)), "Chat→Discord transmuted {FromType} → {ToType}."); private static readonly Action _chatToDiscordAppended = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(2042, nameof(ChatToDiscordAppended)), "Chat→Discord appended {EntryType} to Discord journal at position {Position}."); // DiscordScrivener internal append private static readonly Action _discordScrivenerAppended = LoggerMessage.Define( - LogLevel.Information, + LogLevel.Trace, new EventId(2050, nameof(DiscordScrivenerAppended)), "DiscordScrivener appended {EntryType} to internal journal at position {Position}."); diff --git a/src/Coven.Chat.Discord/DiscordScrivener.cs b/src/Coven.Chat.Discord/DiscordScrivener.cs index 88875db..a858338 100644 --- a/src/Coven.Chat.Discord/DiscordScrivener.cs +++ b/src/Coven.Chat.Discord/DiscordScrivener.cs @@ -29,7 +29,7 @@ public DiscordScrivener([FromKeyedServices("Coven.InternalDiscordScrivener")] IS public async Task WriteAsync(DiscordEntry entry, CancellationToken cancellationToken = default) { // Only send actual outbound messages to Discord. ACKs and inbound entries must not be sent. - if (entry is DiscordOutgoing) + if (entry is DiscordEfferent) { await _discordClient.SendAsync(entry.Text, cancellationToken).ConfigureAwait(false); } diff --git a/src/Coven.Chat.Discord/DiscordTransmuter.cs b/src/Coven.Chat.Discord/DiscordTransmuter.cs index 852a89f..61b7001 100644 --- a/src/Coven.Chat.Discord/DiscordTransmuter.cs +++ b/src/Coven.Chat.Discord/DiscordTransmuter.cs @@ -4,48 +4,33 @@ namespace Coven.Chat.Discord; public class DiscordTransmuter : IBiDirectionalTransmuter { - public Task TransmuteIn(DiscordEntry Input, CancellationToken cancellationToken) + public Task TransmuteAfferent(DiscordEntry Input, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); return Input switch { - DiscordIncoming incoming => Task.FromResult(new ChatIncoming(incoming.Sender, incoming.Text)), - DiscordOutgoing outgoing => Task.FromResult(new ChatAck(outgoing.Sender, outgoing.Text)), + DiscordAfferent incoming => Task.FromResult(new ChatAfferent(incoming.Sender, incoming.Text)), + DiscordEfferent outgoing => Task.FromResult(new ChatAck(outgoing.Sender, outgoing.Text)), _ => throw new ArgumentOutOfRangeException(nameof(Input)) }; } - public Task TransmuteOut(ChatEntry Output, CancellationToken cancellationToken) + public Task TransmuteEfferent(ChatEntry Output, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); return Output switch { - ChatOutgoing outgoing => Task.FromResult(new DiscordOutgoing(outgoing.Sender, outgoing.Text)), + ChatEfferent outgoing => Task.FromResult(new DiscordEfferent(outgoing.Sender, outgoing.Text)), // Internal/unfixed artifacts: acknowledge only to prevent loops - ChatOutgoingDraft draft => Task.FromResult(new DiscordAck(draft.Sender, draft.Text)), + ChatEfferentDraft draft => Task.FromResult(new DiscordAck(draft.Sender, draft.Text)), ChatChunk chunk => Task.FromResult(new DiscordAck(chunk.Sender, chunk.Text)), ChatStreamCompleted done => Task.FromResult(new DiscordAck(done.Sender, string.Empty)), - ChatIncoming incoming => Task.FromResult(new DiscordAck(incoming.Sender, incoming.Text)), - ChatIncomingDraft incomingDraft => Task.FromResult(new DiscordAck(incomingDraft.Sender, incomingDraft.Text)), + ChatAfferent incoming => Task.FromResult(new DiscordAck(incoming.Sender, incoming.Text)), + ChatAfferentDraft incomingDraft => Task.FromResult(new DiscordAck(incomingDraft.Sender, incomingDraft.Text)), _ => throw new ArgumentOutOfRangeException(nameof(Output)) }; } - - // Flows - // Incoming from discord. - // Create DiscordIncoming in the DiscordScrivener - // Read by the tail loop - // Create ChatIncoming in the ChatScrivener - // Read by the tail loop - // Create DiscordAck in the DiscordScrivener (acks are not pumped between scriveners) - - // Outgoing to discord. - // Create ChatOutgoing in the ChatScrivener - // Read by the tail loop - // Create DiscordOutging in the DiscordScrivener - // Read by tail loop - // Create ChatAck in the ChatScrivener (acks are not pumped between scriveners) } diff --git a/src/Coven.Chat/ChatChunkBatchTransmuter.cs b/src/Coven.Chat/ChatChunkBatchTransmuter.cs index 06ec6b5..6457ba0 100644 --- a/src/Coven.Chat/ChatChunkBatchTransmuter.cs +++ b/src/Coven.Chat/ChatChunkBatchTransmuter.cs @@ -1,13 +1,12 @@ // SPDX-License-Identifier: BUSL-1.1 using System.Text; -using Coven.Core.Streaming; using Coven.Transmutation; namespace Coven.Chat; -public sealed class ChatChunkBatchTransmuter : ITransmuter, BatchTransmuteResult> +public sealed class ChatChunkBatchTransmuter : IBatchTransmuter { - public Task> Transmute(IEnumerable Input, CancellationToken cancellationToken = default) + public Task> Transmute(IEnumerable Input, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(Input); @@ -26,7 +25,7 @@ public Task> Transmute(IEnumerable } } - ChatOutgoing output = new(sender, sb.ToString()); - return Task.FromResult(new BatchTransmuteResult(output, false, null)); + ChatEfferent output = new(sender, sb.ToString()); + return Task.FromResult(new BatchTransmuteResult(output, false, null)); } } diff --git a/src/Coven.Chat/ChatEntry.cs b/src/Coven.Chat/ChatEntry.cs index b235a71..e5dc5e2 100644 --- a/src/Coven.Chat/ChatEntry.cs +++ b/src/Coven.Chat/ChatEntry.cs @@ -8,16 +8,16 @@ public abstract record ChatEntry(string Sender); // Unfixed/draft entries that should never be forwarded by adapters directly public abstract record ChatEntryDraft(string Sender) : ChatEntry(Sender); -public sealed record ChatOutgoing(string Sender, string Text) : ChatEntry(Sender); +public sealed record ChatEfferent(string Sender, string Text) : ChatEntry(Sender); -public sealed record ChatIncoming(string Sender, string Text) : ChatEntry(Sender); +public sealed record ChatAfferent(string Sender, string Text) : ChatEntry(Sender); public sealed record ChatAck(string Sender, string Text) : ChatEntry(Sender); // Streaming additions -public sealed record ChatOutgoingDraft(string Sender, string Text) : ChatEntryDraft(Sender); +public sealed record ChatEfferentDraft(string Sender, string Text) : ChatEntryDraft(Sender); -public sealed record ChatIncomingDraft(string Sender, string Text) : ChatEntryDraft(Sender); +public sealed record ChatAfferentDraft(string Sender, string Text) : ChatEntryDraft(Sender); public sealed record ChatChunk(string Sender, string Text) : ChatEntryDraft(Sender); diff --git a/src/Coven.Chat/ServiceCollectionExtensions.cs b/src/Coven.Chat/ServiceCollectionExtensions.cs index e194baa..1e6ab0e 100644 --- a/src/Coven.Chat/ServiceCollectionExtensions.cs +++ b/src/Coven.Chat/ServiceCollectionExtensions.cs @@ -26,14 +26,15 @@ public static IServiceCollection AddChatWindowing(this IServiceCollection servic // Prefer a DI-registered policy; fall back to final-only if none provided IWindowPolicy policy = sp.GetService>() ?? new LambdaWindowPolicy(1, _ => false); - ITransmuter, BatchTransmuteResult> batchTransmuter = - sp.GetRequiredService, BatchTransmuteResult>>(); + IBatchTransmuter batchTransmuter = + sp.GetRequiredService>(); + IShatterPolicy? shatterPolicy = sp.GetService>(); - return new StreamWindowingDaemon( - daemonEvents, chatJournal, policy, batchTransmuter); + return new StreamWindowingDaemon( + daemonEvents, chatJournal, policy, batchTransmuter, shatterPolicy); }); - services.TryAddScoped, BatchTransmuteResult>, ChatChunkBatchTransmuter>(); + services.TryAddScoped, ChatChunkBatchTransmuter>(); return services; } diff --git a/src/Coven.Chat/Shattering/ChatChunkMaxLengthShatterPolicy.cs b/src/Coven.Chat/Shattering/ChatChunkMaxLengthShatterPolicy.cs index 068d819..b77bf1c 100644 --- a/src/Coven.Chat/Shattering/ChatChunkMaxLengthShatterPolicy.cs +++ b/src/Coven.Chat/Shattering/ChatChunkMaxLengthShatterPolicy.cs @@ -21,7 +21,7 @@ public IEnumerable Shatter(ChatEntry entry) { switch (entry) { - case ChatOutgoingDraft draft: + case ChatEfferentDraft draft: foreach (string part in Split(draft.Text)) { yield return new ChatChunk(draft.Sender, part); diff --git a/src/Coven.Chat/Shattering/ChatParagraphShatterPolicy.cs b/src/Coven.Chat/Shattering/ChatParagraphShatterPolicy.cs index a8af938..b8c3d23 100644 --- a/src/Coven.Chat/Shattering/ChatParagraphShatterPolicy.cs +++ b/src/Coven.Chat/Shattering/ChatParagraphShatterPolicy.cs @@ -18,13 +18,13 @@ public IEnumerable Shatter(ChatEntry entry) { switch (entry) { - case ChatOutgoingDraft outgoingDraft: + case ChatEfferentDraft outgoingDraft: foreach (string part in SplitParagraphs(outgoingDraft.Text)) { yield return new ChatChunk(outgoingDraft.Sender, part); } yield break; - case ChatIncomingDraft incomingDraft: + case ChatAfferentDraft incomingDraft: foreach (string part in SplitParagraphs(incomingDraft.Text)) { yield return new ChatChunk(incomingDraft.Sender, part); diff --git a/src/Coven.Chat/Shattering/ChatSentenceShatterPolicy.cs b/src/Coven.Chat/Shattering/ChatSentenceShatterPolicy.cs index b8053d2..da194d5 100644 --- a/src/Coven.Chat/Shattering/ChatSentenceShatterPolicy.cs +++ b/src/Coven.Chat/Shattering/ChatSentenceShatterPolicy.cs @@ -18,13 +18,13 @@ public IEnumerable Shatter(ChatEntry entry) { switch (entry) { - case ChatOutgoingDraft outgoingDraft: + case ChatEfferentDraft outgoingDraft: foreach (string s in SplitSentences(outgoingDraft.Text)) { yield return new ChatChunk(outgoingDraft.Sender, s); } yield break; - case ChatIncomingDraft incomingDraft: + case ChatAfferentDraft incomingDraft: foreach (string s in SplitSentences(incomingDraft.Text)) { yield return new ChatChunk(incomingDraft.Sender, s); diff --git a/src/Coven.Core.Streaming/StreamWindowingDaemon.cs b/src/Coven.Core.Streaming/StreamWindowingDaemon.cs index 86da0c3..69fa9cf 100644 --- a/src/Coven.Core.Streaming/StreamWindowingDaemon.cs +++ b/src/Coven.Core.Streaming/StreamWindowingDaemon.cs @@ -8,7 +8,8 @@ public sealed class StreamWindowingDaemon( IScrivener daemonEvents, IScrivener journal, IWindowPolicy windowPolicy, - ITransmuter, BatchTransmuteResult> batchTransmuter + IBatchTransmuter batchTransmuter, + IShatterPolicy? shatterPolicy = null ) : ContractDaemon(daemonEvents), IAsyncDisposable where TEntry : notnull where TChunk : TEntry @@ -17,7 +18,8 @@ ITransmuter, BatchTransmuteResult> batchTra { private readonly IScrivener _journal = journal ?? throw new ArgumentNullException(nameof(journal)); private readonly IWindowPolicy _windowPolicy = windowPolicy ?? throw new ArgumentNullException(nameof(windowPolicy)); - private readonly ITransmuter, BatchTransmuteResult> _batchTransmuter = batchTransmuter ?? throw new ArgumentNullException(nameof(batchTransmuter)); + private readonly IBatchTransmuter _batchTransmuter = batchTransmuter ?? throw new ArgumentNullException(nameof(batchTransmuter)); + private readonly IShatterPolicy? _shatterPolicy = shatterPolicy; private CancellationTokenSource? _linkedCancellationSource; private Task? _pumpTask; @@ -131,7 +133,27 @@ private async Task EmitBufferAsync(List buffer, CancellationToken cancel } BatchTransmuteResult result = await _batchTransmuter.Transmute(buffer, cancellationToken).ConfigureAwait(false); - await _journal.WriteAsync(result.Output, cancellationToken).ConfigureAwait(false); + + if (_shatterPolicy is not null) + { + bool any = false; + IEnumerable shards = _shatterPolicy.Shatter(result.Output) ?? []; + foreach (TEntry shard in shards) + { + any = true; + await _journal.WriteAsync(shard, cancellationToken).ConfigureAwait(false); + } + + if (!any) + { + // No shatter output produced; forward original output as-is + await _journal.WriteAsync(result.Output, cancellationToken).ConfigureAwait(false); + } + } + else + { + await _journal.WriteAsync(result.Output, cancellationToken).ConfigureAwait(false); + } if (result.HasRemainder && result.Remainder is not null) { @@ -161,4 +183,3 @@ public async ValueTask DisposeAsync() } } } - diff --git a/src/Coven.Core.Streaming/BatchTransmuteResult.cs b/src/Coven.Transmutation/BatchTransmuteResult.cs similarity index 93% rename from src/Coven.Core.Streaming/BatchTransmuteResult.cs rename to src/Coven.Transmutation/BatchTransmuteResult.cs index 3459662..1b73e97 100644 --- a/src/Coven.Core.Streaming/BatchTransmuteResult.cs +++ b/src/Coven.Transmutation/BatchTransmuteResult.cs @@ -1,5 +1,5 @@ // SPDX-License-Identifier: BUSL-1.1 -namespace Coven.Core.Streaming; +namespace Coven.Transmutation; /// /// Result of batch-transmuting a set of chunks into a single output, diff --git a/src/Coven.Transmutation/CompositeBiDirectionalTransmuter.cs b/src/Coven.Transmutation/CompositeBiDirectionalTransmuter.cs index 14b7809..554fe28 100644 --- a/src/Coven.Transmutation/CompositeBiDirectionalTransmuter.cs +++ b/src/Coven.Transmutation/CompositeBiDirectionalTransmuter.cs @@ -6,21 +6,21 @@ namespace Coven.Transmutation; /// /// The primary input type. /// The primary output type. -/// Transmuter used for . -/// Transmuter used for . +/// Transmuter used for . +/// Transmuter used for . public class CompositeBiDirectionalTransmuter(ITransmuter InTramsuter, ITransmuter OutTransmuter) : IBiDirectionalTransmuter { private readonly ITransmuter _inTransmuter = InTramsuter; private readonly ITransmuter _outTransmuter = OutTransmuter; /// - public Task TransmuteIn(TIn Input, CancellationToken cancellationToken = default) + public Task TransmuteAfferent(TIn Input, CancellationToken cancellationToken = default) { return _inTransmuter.Transmute(Input, cancellationToken); } /// - public Task TransmuteOut(TOut Output, CancellationToken cancellationToken = default) + public Task TransmuteEfferent(TOut Output, CancellationToken cancellationToken = default) { return _outTransmuter.Transmute(Output, cancellationToken); } diff --git a/src/Coven.Transmutation/IBatchTransmuter.cs b/src/Coven.Transmutation/IBatchTransmuter.cs new file mode 100644 index 0000000..77caede --- /dev/null +++ b/src/Coven.Transmutation/IBatchTransmuter.cs @@ -0,0 +1,11 @@ +namespace Coven.Transmutation; + +/// +/// Specialization for batch transmutation from a sequence of chunks to a single output plus optional remainder. +/// +/// Input chunk type. +/// Output type. +public interface IBatchTransmuter : ITransmuter, BatchTransmuteResult> +{ +} + diff --git a/src/Coven.Transmutation/IBiDirectionalTransmuter.cs b/src/Coven.Transmutation/IBiDirectionalTransmuter.cs index fa14544..bc03017 100644 --- a/src/Coven.Transmutation/IBiDirectionalTransmuter.cs +++ b/src/Coven.Transmutation/IBiDirectionalTransmuter.cs @@ -14,7 +14,7 @@ public interface IBiDirectionalTransmuter /// The input value. /// Token to observe for cooperative cancellation. /// A task producing the transmuted . - Task TransmuteIn(TIn Input, CancellationToken cancellationToken = default); + Task TransmuteAfferent(TIn Input, CancellationToken cancellationToken = default); /// /// Transmutes a value of back to . @@ -22,5 +22,5 @@ public interface IBiDirectionalTransmuter /// The output value to reverse-transmute. /// Token to observe for cooperative cancellation. /// A task producing the transmuted . - Task TransmuteOut(TOut Output, CancellationToken cancellationToken = default); + Task TransmuteEfferent(TOut Output, CancellationToken cancellationToken = default); } diff --git a/src/samples/01.DiscordAgent/DiscordOpenAITemplatingTransmuter.cs b/src/samples/01.DiscordAgent/DiscordOpenAITemplatingTransmuter.cs index 6a1551d..b79ccf7 100644 --- a/src/samples/01.DiscordAgent/DiscordOpenAITemplatingTransmuter.cs +++ b/src/samples/01.DiscordAgent/DiscordOpenAITemplatingTransmuter.cs @@ -24,11 +24,11 @@ public sealed class DiscordOpenAITemplatingTransmuter : ITransmuter Task.FromResult( + OpenAIEfferent u => Task.FromResult( ResponseItem.CreateUserMessageItem($"[discord username:{u.Sender}] {u.Text}")), // Decorate assistant content for clarity in logs and evals. - OpenAIIncoming a => Task.FromResult( + OpenAIAfferent a => Task.FromResult( ResponseItem.CreateAssistantMessageItem($"[assistant:{a.Model}] {a.Text}")), // Drop thoughts/acks from the prompt input by returning null. diff --git a/src/samples/01.DiscordAgent/Program.cs b/src/samples/01.DiscordAgent/Program.cs index 1584dd0..77ba55a 100644 --- a/src/samples/01.DiscordAgent/Program.cs +++ b/src/samples/01.DiscordAgent/Program.cs @@ -8,6 +8,8 @@ using Microsoft.Extensions.Logging; using Coven.Transmutation; using OpenAI.Responses; +using Coven.Core.Streaming; +using Coven.Agents; // Configuration DiscordClientConfig discordConfig = new() @@ -26,7 +28,24 @@ HostApplicationBuilder builder = Host.CreateApplicationBuilder(args); builder.Services.AddLogging(b => b.AddConsole()); builder.Services.AddDiscordChat(discordConfig); -builder.Services.AddOpenAIAgents(openAiConfig); +builder.Services.AddOpenAIAgents(openAiConfig, registration => +{ + registration.EnableStreaming(); +}); + +// Override windowing policies independently for outputs and thoughts +// Output chunk policy: paragraph-first with a tighter max length cap +builder.Services.AddScoped>(_ => + new CompositeWindowPolicy( + new AgentParagraphWindowPolicy(), + new AgentMaxLengthWindowPolicy(1024))); + +// // Thought chunk policy: summary-marker, sentence, paragraph; independent cap +// builder.Services.AddScoped>(_ => +// new CompositeWindowPolicy( +// new AgentThoughtSummaryMarkerWindowPolicy(), +// new AgentThoughtMaxLengthWindowPolicy(2048))); + // Override default OpenAI entry → ResponseItem mapping with sample templating builder.Services.AddScoped, DiscordOpenAITemplatingTransmuter>(); builder.Services.BuildCoven(c => c.MagikBlock().Done()); diff --git a/src/samples/01.DiscordAgent/RouterBlock.cs b/src/samples/01.DiscordAgent/RouterBlock.cs index 5cfcdde..db33e95 100644 --- a/src/samples/01.DiscordAgent/RouterBlock.cs +++ b/src/samples/01.DiscordAgent/RouterBlock.cs @@ -25,7 +25,7 @@ public async Task DoMagik(Empty input, CancellationToken cancellationToke { await foreach ((long _, ChatEntry? entry) in _chat.TailAsync(0, cancellationToken)) { - if (entry is ChatIncoming inc) + if (entry is ChatAfferent inc) { await _agents.WriteAsync(new AgentPrompt(inc.Sender, inc.Text), cancellationToken).ConfigureAwait(false); } @@ -39,10 +39,11 @@ public async Task DoMagik(Empty input, CancellationToken cancellationToke switch (entry) { case AgentResponse r: - await _chat.WriteAsync(new ChatOutgoingDraft("BOT", r.Text), cancellationToken).ConfigureAwait(false); + await _chat.WriteAsync(new ChatEfferentDraft("BOT", r.Text), cancellationToken).ConfigureAwait(false); break; case AgentThought t: - await _chat.WriteAsync(new ChatOutgoingDraft("BOT", t.Text), cancellationToken).ConfigureAwait(false); + // Uncomment below if you want to output thoughts :) + // await _chat.WriteAsync(new ChatEfferentDraft("BOT", t.Text), cancellationToken).ConfigureAwait(false); break; default: break; diff --git a/src/toys/Coven.Toys.ConsoleChat/EchoBlock.cs b/src/toys/Coven.Toys.ConsoleChat/EchoBlock.cs index 8918cb3..ecefa5f 100644 --- a/src/toys/Coven.Toys.ConsoleChat/EchoBlock.cs +++ b/src/toys/Coven.Toys.ConsoleChat/EchoBlock.cs @@ -15,9 +15,9 @@ public async Task DoMagik(Empty input, CancellationToken cancellationToke await foreach ((long _, ChatEntry? entry) in _scrivener.TailAsync(0, cancellationToken)) { - if (entry is ChatIncoming r) + if (entry is ChatAfferent r) { - await _scrivener.WriteAsync(new ChatOutgoing("BOT", "Echo: " + r.Text), cancellationToken); + await _scrivener.WriteAsync(new ChatEfferent("BOT", "Echo: " + r.Text), cancellationToken); } } diff --git a/src/toys/Coven.Toys.ConsoleOpenAI/Program.cs b/src/toys/Coven.Toys.ConsoleOpenAI/Program.cs index 1cd009a..e1f7643 100644 --- a/src/toys/Coven.Toys.ConsoleOpenAI/Program.cs +++ b/src/toys/Coven.Toys.ConsoleOpenAI/Program.cs @@ -17,7 +17,7 @@ OpenAIClientConfig openAiConfig = new() { ApiKey = "", // set your key - Model = "gpt-5-2025-08-07" // choose the model + Model = "gpt-5-2025-08-07" }; // Register DI @@ -32,4 +32,3 @@ // Execute ritual ICoven coven = host.Services.GetRequiredService(); await coven.Ritual(new Empty()); - diff --git a/src/toys/Coven.Toys.ConsoleOpenAI/RouterBlock.cs b/src/toys/Coven.Toys.ConsoleOpenAI/RouterBlock.cs index b2d8632..3075f38 100644 --- a/src/toys/Coven.Toys.ConsoleOpenAI/RouterBlock.cs +++ b/src/toys/Coven.Toys.ConsoleOpenAI/RouterBlock.cs @@ -27,7 +27,7 @@ public async Task DoMagik(Empty input, CancellationToken cancellationToke { await foreach ((long _, ChatEntry? entry) in _chat.TailAsync(0, cancellationToken)) { - if (entry is ChatIncoming inc) + if (entry is ChatAfferent inc) { await _agents.WriteAsync(new AgentPrompt(inc.Sender, inc.Text), cancellationToken).ConfigureAwait(false); } @@ -42,10 +42,10 @@ public async Task DoMagik(Empty input, CancellationToken cancellationToke switch (entry) { case AgentResponse r: - await _chat.WriteAsync(new ChatOutgoing("BOT", r.Text), cancellationToken).ConfigureAwait(false); + await _chat.WriteAsync(new ChatEfferent("BOT", r.Text), cancellationToken).ConfigureAwait(false); break; case AgentThought t: - await _chat.WriteAsync(new ChatOutgoing("BOT", t.Text), cancellationToken).ConfigureAwait(false); + await _chat.WriteAsync(new ChatEfferent("BOT", t.Text), cancellationToken).ConfigureAwait(false); break; default: break; diff --git a/src/toys/Coven.Toys.ConsoleOpenAIStreaming/Program.cs b/src/toys/Coven.Toys.ConsoleOpenAIStreaming/Program.cs index 485dfe7..7048acc 100644 --- a/src/toys/Coven.Toys.ConsoleOpenAIStreaming/Program.cs +++ b/src/toys/Coven.Toys.ConsoleOpenAIStreaming/Program.cs @@ -1,7 +1,9 @@ +using Coven.Agents; using Coven.Agents.OpenAI; using Coven.Chat.Console; using Coven.Core; using Coven.Core.Builder; +using Coven.Core.Streaming; using Coven.Toys.ConsoleOpenAIStreaming; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -17,7 +19,7 @@ OpenAIClientConfig openAiConfig = new() { ApiKey = "", // set your key - Model = "gpt-5-2025-08-07" // choose the model + Model = "gpt-5-2025-08-07" }; // Register DI @@ -25,12 +27,26 @@ builder.Services.AddLogging(b => b.AddConsole()); builder.Services.AddConsoleChat(consoleConfig); -// Enable OpenAI streaming with sensible segmentation +// Enable OpenAI streaming builder.Services.AddOpenAIAgents(openAiConfig, registration => { - registration.EnableStreaming(); // default: final-only segmentation + registration.EnableStreaming(); }); +// Override windowing policies independently for outputs and thoughts +// Output chunk policy: paragraph-first with a tighter max length cap +builder.Services.AddScoped>(_ => + new CompositeWindowPolicy( + new AgentParagraphWindowPolicy(), + new AgentMaxLengthWindowPolicy(1024))); + +// Thought chunk policy: summary-marker, sentence, paragraph; independent cap +builder.Services.AddScoped>(_ => + new CompositeWindowPolicy( + new AgentThoughtSummaryMarkerWindowPolicy(), + new AgentThoughtSentenceWindowPolicy(), + new AgentThoughtMaxLengthWindowPolicy(2048))); + builder.Services.BuildCoven(c => c.MagikBlock().Done()); IHost host = builder.Build(); diff --git a/src/toys/Coven.Toys.ConsoleOpenAIStreaming/RouterBlock.cs b/src/toys/Coven.Toys.ConsoleOpenAIStreaming/RouterBlock.cs index f3a142f..106b689 100644 --- a/src/toys/Coven.Toys.ConsoleOpenAIStreaming/RouterBlock.cs +++ b/src/toys/Coven.Toys.ConsoleOpenAIStreaming/RouterBlock.cs @@ -27,7 +27,7 @@ public async Task DoMagik(Empty input, CancellationToken cancellationToke { await foreach ((long _, ChatEntry? entry) in _chat.TailAsync(0, cancellationToken)) { - if (entry is ChatIncoming inc) + if (entry is ChatAfferent inc) { await _agents.WriteAsync(new AgentPrompt(inc.Sender, inc.Text), cancellationToken).ConfigureAwait(false); } @@ -35,7 +35,6 @@ public async Task DoMagik(Empty input, CancellationToken cancellationToke }, cancellationToken); // Pump 2: Agents -> Chat (responses) - // For streaming, we surface segmented AgentResponse entries (emitted by the segmentation daemon). Task agentsToChat = Task.Run(async () => { await foreach ((long _, AgentEntry? entry) in _agents.TailAsync(0, cancellationToken)) @@ -43,10 +42,10 @@ public async Task DoMagik(Empty input, CancellationToken cancellationToke switch (entry) { case AgentResponse r: - await _chat.WriteAsync(new ChatOutgoing("BOT", r.Text), cancellationToken).ConfigureAwait(false); + await _chat.WriteAsync(new ChatEfferent("BOT", r.Text), cancellationToken).ConfigureAwait(false); break; case AgentThought t: - await _chat.WriteAsync(new ChatOutgoing("BOT", t.Text), cancellationToken).ConfigureAwait(false); + await _chat.WriteAsync(new ChatEfferent("BOT", t.Text), cancellationToken).ConfigureAwait(false); break; default: // Ignore prompts, acks, and raw chunks here to avoid duplicates @@ -59,4 +58,3 @@ public async Task DoMagik(Empty input, CancellationToken cancellationToke return input; } } - diff --git a/src/toys/Coven.Toys.DiscordChat/EchoBlock.cs b/src/toys/Coven.Toys.DiscordChat/EchoBlock.cs index f7115b7..8f8a0c9 100644 --- a/src/toys/Coven.Toys.DiscordChat/EchoBlock.cs +++ b/src/toys/Coven.Toys.DiscordChat/EchoBlock.cs @@ -23,8 +23,8 @@ public async Task DoMagik(Empty input, CancellationToken cancellationToke // It's also okay to take the extra cycle to look at them rather than short circuiting, we might use them as confirmation of receipt in the future. switch (entry) { - case ChatIncoming r: - await _scrivener.WriteAsync(new ChatOutgoing("BOT", r.Text), cancellationToken); + case ChatAfferent r: + await _scrivener.WriteAsync(new ChatEfferent("BOT", r.Text), cancellationToken); break; default: break; diff --git a/src/toys/Coven.Toys.DiscordStreaming/StreamingBlock.cs b/src/toys/Coven.Toys.DiscordStreaming/StreamingBlock.cs index 542a696..56d348b 100644 --- a/src/toys/Coven.Toys.DiscordStreaming/StreamingBlock.cs +++ b/src/toys/Coven.Toys.DiscordStreaming/StreamingBlock.cs @@ -20,9 +20,9 @@ public async Task DoMagik(Empty input, CancellationToken cancellationToke // Tail the chat journal and convert incoming to draft outgoing. await foreach ((long _, ChatEntry entry) in _scrivener.TailAsync(0, cancellationToken)) { - if (entry is ChatIncoming i) + if (entry is ChatAfferent i) { - await _scrivener.WriteAsync(new ChatOutgoingDraft("BOT", i.Text), cancellationToken).ConfigureAwait(false); + await _scrivener.WriteAsync(new ChatEfferentDraft("BOT", i.Text), cancellationToken).ConfigureAwait(false); } }