Skip to content

Commit d5f14de

Browse files
committed
realtimehub-refactor
1 parent 9c032cb commit d5f14de

File tree

26 files changed

+640
-192
lines changed

26 files changed

+640
-192
lines changed

BotSharp.sln

+11
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.Test.BrowserUse",
143143
EndProject
144144
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.LLM.Tests", "tests\BotSharp.LLM.Tests\BotSharp.LLM.Tests.csproj", "{7C0C7D13-D161-4AB0-9C29-83A0F1FF990E}"
145145
EndProject
146+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.Test.RealtimeVoice", "tests\BotSharp.Test.RealtimeVoice\BotSharp.Test.RealtimeVoice.csproj", "{B067B126-88CD-4282-BEEF-7369B64423EF}"
147+
EndProject
146148
Global
147149
GlobalSection(SolutionConfigurationPlatforms) = preSolution
148150
Debug|Any CPU = Debug|Any CPU
@@ -599,6 +601,14 @@ Global
599601
{7C0C7D13-D161-4AB0-9C29-83A0F1FF990E}.Release|Any CPU.Build.0 = Release|Any CPU
600602
{7C0C7D13-D161-4AB0-9C29-83A0F1FF990E}.Release|x64.ActiveCfg = Release|Any CPU
601603
{7C0C7D13-D161-4AB0-9C29-83A0F1FF990E}.Release|x64.Build.0 = Release|Any CPU
604+
{B067B126-88CD-4282-BEEF-7369B64423EF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
605+
{B067B126-88CD-4282-BEEF-7369B64423EF}.Debug|Any CPU.Build.0 = Debug|Any CPU
606+
{B067B126-88CD-4282-BEEF-7369B64423EF}.Debug|x64.ActiveCfg = Debug|Any CPU
607+
{B067B126-88CD-4282-BEEF-7369B64423EF}.Debug|x64.Build.0 = Debug|Any CPU
608+
{B067B126-88CD-4282-BEEF-7369B64423EF}.Release|Any CPU.ActiveCfg = Release|Any CPU
609+
{B067B126-88CD-4282-BEEF-7369B64423EF}.Release|Any CPU.Build.0 = Release|Any CPU
610+
{B067B126-88CD-4282-BEEF-7369B64423EF}.Release|x64.ActiveCfg = Release|Any CPU
611+
{B067B126-88CD-4282-BEEF-7369B64423EF}.Release|x64.Build.0 = Release|Any CPU
602612
EndGlobalSection
603613
GlobalSection(SolutionProperties) = preSolution
604614
HideSolutionNode = FALSE
@@ -668,6 +678,7 @@ Global
668678
{970BE341-9AC8-99A5-6572-E703C1E02FCB} = {E29DC6C4-5E57-48C5-BCB0-6B8F84782749}
669679
{7D0DB012-9798-4BB9-B15B-A5B0B7B3B094} = {32FAFFFE-A4CB-4FEE-BF7C-84518BBC6DCC}
670680
{7C0C7D13-D161-4AB0-9C29-83A0F1FF990E} = {32FAFFFE-A4CB-4FEE-BF7C-84518BBC6DCC}
681+
{B067B126-88CD-4282-BEEF-7369B64423EF} = {32FAFFFE-A4CB-4FEE-BF7C-84518BBC6DCC}
671682
EndGlobalSection
672683
GlobalSection(ExtensibilityGlobals) = postSolution
673684
SolutionGuid = {A9969D89-C98B-40A5-A12B-FC87E55B3A19}

src/Infrastructure/BotSharp.Abstraction/Agents/Settings/AgentSettings.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ namespace BotSharp.Abstraction.Agents.Settings;
22

33
public class AgentSettings
44
{
5-
public string DataDir { get; set; } = string.Empty;
5+
public string DataDir { get; set; } = "agents";
66
public string TemplateFormat { get; set; } = "liquid";
77
public string HostAgentId { get; set; } = string.Empty;
88
public bool EnableTranslator { get; set; } = false;

src/Infrastructure/BotSharp.Abstraction/Conversations/Settings/ConversationSetting.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ namespace BotSharp.Abstraction.Conversations.Settings;
22

33
public class ConversationSetting
44
{
5-
public string DataDir { get; set; }
5+
public string DataDir { get; set; } = "conversations";
66
public string ChatCompletion { get; set; }
77
public bool EnableKnowledgeBase { get; set; }
88
public bool ShowVerboseLog { get; set; }

src/Infrastructure/BotSharp.Abstraction/MLTasks/IRealTimeCompletion.cs

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ Task Connect(RealtimeHubConnection conn,
1818
Action<RoleDialogModel> onInputAudioTranscriptionCompleted,
1919
Action onUserInterrupted);
2020
Task AppenAudioBuffer(string message);
21+
Task AppenAudioBuffer(ArraySegment<byte> data, int length);
2122

2223
Task SendEventToModel(object message);
2324
Task Disconnect();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace BotSharp.Abstraction.Realtime.Enums;
2+
3+
public enum StreamChannelStatus
4+
{
5+
Open = 1,
6+
Closed = 2
7+
}

src/Infrastructure/BotSharp.Abstraction/Realtime/IRealtimeHub.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@ public interface IRealtimeHub
1515
IRealTimeCompletion Completer { get; }
1616
IRealTimeCompletion SetCompleter(string provider);
1717

18-
Task Listen(WebSocket userWebSocket, Action<string> onUserMessageReceived);
18+
Task ConnectToModel(Func<string, Task> responseToUser);
1919
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using BotSharp.Abstraction.Realtime.Enums;
2+
using BotSharp.Abstraction.Realtime.Models;
3+
using System.Threading;
4+
5+
namespace BotSharp.Abstraction.Realtime;
6+
7+
public interface IStreamChannel
8+
{
9+
Task ConnectAsync(string conversationId);
10+
Task<StreamReceiveResult> ReceiveAsync(ArraySegment<byte> buffer, CancellationToken cancellation);
11+
Task SendAsync(byte[] data, CancellationToken cancellation);
12+
Task CloseAsync(StreamChannelStatus status, string description, CancellationToken cancellation);
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace BotSharp.Abstraction.Realtime.Models;
2+
3+
public class ModelResponseEvent
4+
{
5+
[JsonPropertyName("event")]
6+
public string Event { get; set; } = string.Empty;
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace BotSharp.Abstraction.Realtime.Models;
2+
3+
public class ModelResponseMediaEvent : ModelResponseEvent
4+
{
5+
[JsonPropertyName("media")]
6+
public string Media { get; set; } = null!;
7+
}

src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeHubConnection.cs

+3-5
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ namespace BotSharp.Abstraction.Realtime.Models;
44

55
public class RealtimeHubConnection
66
{
7-
public string Event { get; set; } = null!;
87
public string StreamId { get; set; } = null!;
98
public string? LastAssistantItemId { get; set; } = null!;
109
public long LatestMediaTimestamp { get; set; }
@@ -13,10 +12,9 @@ public class RealtimeHubConnection
1312
public ConcurrentQueue<string> MarkQueue { get; set; } = new();
1413
public string CurrentAgentId { get; set; } = null!;
1514
public string ConversationId { get; set; } = null!;
16-
public string Data { get; set; } = string.Empty;
17-
public Func<string, object> OnModelMessageReceived { get; set; } = null!;
18-
public Func<object> OnModelAudioResponseDone { get; set; } = null!;
19-
public Func<object> OnModelUserInterrupted { get; set; } = null!;
15+
public Func<string, string> OnModelMessageReceived { get; set; } = null!;
16+
public Func<string> OnModelAudioResponseDone { get; set; } = null!;
17+
public Func<string> OnModelUserInterrupted { get; set; } = null!;
2018

2119
public void ResetResponseState()
2220
{

src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeModelSettings.cs

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ namespace BotSharp.Abstraction.Realtime.Models;
22

33
public class RealtimeModelSettings
44
{
5+
public string InputAudioFormat { get; set; } = "g711_ulaw";
6+
public string OutputAudioFormat { get; set; } = "g711_ulaw";
57
public string Voice { get; set; } = "alloy";
68
public float Temperature { get; set; } = 0.8f;
79
public int MaxResponseOutputTokens { get; set; } = 512;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using BotSharp.Abstraction.Realtime.Enums;
2+
3+
namespace BotSharp.Abstraction.Realtime.Models;
4+
5+
public class StreamReceiveResult
6+
{
7+
public StreamChannelStatus Status { get; set; }
8+
public int Count { get; set; }
9+
public bool EndOfMessage { get; }
10+
}

src/Infrastructure/BotSharp.Core.Realtime/BotSharp.Core.Realtime.csproj

+4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010
<Nullable>enable</Nullable>
1111
</PropertyGroup>
1212

13+
<ItemGroup>
14+
<PackageReference Include="NAudio" />
15+
</ItemGroup>
16+
1317
<ItemGroup>
1418
<ProjectReference Include="..\BotSharp.Abstraction\BotSharp.Abstraction.csproj" />
1519
<ProjectReference Include="..\BotSharp.Core\BotSharp.Core.csproj" />

src/Infrastructure/BotSharp.Core.Realtime/RealtimePlugin.cs

+1
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ public void RegisterDI(IServiceCollection services, IConfiguration config)
2323

2424
services.AddScoped<IRealtimeHub, RealtimeHub>();
2525
services.AddScoped<IConversationHook, RealtimeConversationHook>();
26+
services.AddScoped<IStreamChannel, WaveStremChannel>();
2627
}
2728
}

src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs

+8-111
Original file line numberDiff line numberDiff line change
@@ -21,53 +21,9 @@ public RealtimeHub(IServiceProvider services, ILogger<RealtimeHub> logger)
2121
_logger = logger;
2222
}
2323

24-
public async Task Listen(WebSocket userWebSocket,
25-
Action<string> onUserMessageReceived)
24+
public async Task ConnectToModel(Func<string, Task> responseToUser)
2625
{
27-
var buffer = new byte[1024 * 32];
28-
WebSocketReceiveResult result;
29-
30-
do
31-
{
32-
Array.Clear(buffer, 0, buffer.Length);
33-
result = await userWebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
34-
string receivedText = Encoding.UTF8.GetString(buffer, 0, result.Count);
35-
36-
if (string.IsNullOrEmpty(receivedText))
37-
{
38-
continue;
39-
}
40-
41-
onUserMessageReceived(receivedText);
42-
43-
if (_conn.Event == "user_connected")
44-
{
45-
await ConnectToModel(userWebSocket);
46-
}
47-
else if (_conn.Event == "user_data_received")
48-
{
49-
await _completer.AppenAudioBuffer(_conn.Data);
50-
}
51-
else if (_conn.Event == "user_dtmf_receiving")
52-
{
53-
}
54-
else if (_conn.Event == "user_dtmf_received")
55-
{
56-
await HandleUserDtmfReceived();
57-
}
58-
else if (_conn.Event == "user_disconnected")
59-
{
60-
await _completer.Disconnect();
61-
await HandleUserDisconnected();
62-
}
63-
} while (!result.CloseStatus.HasValue);
64-
65-
await userWebSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);
66-
}
67-
68-
private async Task ConnectToModel(WebSocket userWebSocket)
69-
{
70-
var hookProvider = _services.GetRequiredService<ConversationHookProvider>();
26+
var hookProvider = _services.GetService<ConversationHookProvider>();
7127
var convService = _services.GetRequiredService<IConversationService>();
7228
convService.SetConversationId(_conn.ConversationId, []);
7329
var conversation = await convService.GetConversation(_conn.ConversationId);
@@ -103,7 +59,7 @@ await _completer.Connect(_conn,
10359
onModelAudioDeltaReceived: async (audioDeltaData, itemId) =>
10460
{
10561
var data = _conn.OnModelMessageReceived(audioDeltaData);
106-
await SendEventToUser(userWebSocket, data);
62+
await responseToUser(data);
10763

10864
// If this is the first delta of a new response, set the start timestamp
10965
if (!_conn.ResponseStartTimestamp.HasValue)
@@ -118,12 +74,12 @@ await _completer.Connect(_conn,
11874
}
11975

12076
// Send mark messages to Media Streams so we know if and when AI response playback is finished
121-
await SendMark(userWebSocket, _conn);
77+
// await SendMark(userWebSocket, _conn);
12278
},
12379
onModelAudioResponseDone: async () =>
12480
{
12581
var data = _conn.OnModelAudioResponseDone();
126-
await SendEventToUser(userWebSocket, data);
82+
await responseToUser(data);
12783
},
12884
onAudioTranscriptDone: async transcript =>
12985
{
@@ -151,7 +107,7 @@ await _completer.Connect(_conn,
151107
dialogs.Add(message);
152108
storage.Append(_conn.ConversationId, message);
153109

154-
foreach (var hook in hookProvider.HooksOrderByPriority)
110+
foreach (var hook in hookProvider?.HooksOrderByPriority ?? [])
155111
{
156112
hook.SetAgent(agent)
157113
.SetConversation(conversation);
@@ -172,7 +128,7 @@ await _completer.Connect(_conn,
172128
storage.Append(_conn.ConversationId, message);
173129
routing.Context.SetMessageId(_conn.ConversationId, message.MessageId);
174130

175-
foreach (var hook in hookProvider.HooksOrderByPriority)
131+
foreach (var hook in hookProvider?.HooksOrderByPriority ?? [])
176132
{
177133
hook.SetAgent(agent)
178134
.SetConversation(conversation);
@@ -186,69 +142,10 @@ await _completer.Connect(_conn,
186142
_conn.ResetResponseState();
187143

188144
var data = _conn.OnModelUserInterrupted();
189-
await SendEventToUser(userWebSocket, data);
145+
await responseToUser(data);
190146
});
191147
}
192148

193-
private async Task SendMark(WebSocket userWebSocket, RealtimeHubConnection conn)
194-
{
195-
if (!string.IsNullOrEmpty(conn.StreamId))
196-
{
197-
var markEvent = new
198-
{
199-
@event = "mark",
200-
streamSid = conn.StreamId,
201-
mark = new { name = "responsePart" }
202-
};
203-
await SendEventToUser(userWebSocket, markEvent);
204-
conn.MarkQueue.Enqueue("responsePart");
205-
}
206-
}
207-
208-
private async Task HandleUserDtmfReceived()
209-
{
210-
var routing = _services.GetRequiredService<IRoutingService>();
211-
var hookProvider = _services.GetRequiredService<ConversationHookProvider>();
212-
var agentService = _services.GetRequiredService<IAgentService>();
213-
var agent = await agentService.LoadAgent(_conn.CurrentAgentId);
214-
var dialogs = routing.Context.GetDialogs();
215-
var convService = _services.GetRequiredService<IConversationService>();
216-
var conversation = await convService.GetConversation(_conn.ConversationId);
217-
218-
var message = new RoleDialogModel(AgentRole.User, _conn.Data)
219-
{
220-
CurrentAgentId = routing.Context.GetCurrentAgentId()
221-
};
222-
dialogs.Add(message);
223-
224-
var storage = _services.GetRequiredService<IConversationStorage>();
225-
storage.Append(_conn.ConversationId, message);
226-
227-
foreach (var hook in hookProvider.HooksOrderByPriority)
228-
{
229-
hook.SetAgent(agent)
230-
.SetConversation(conversation);
231-
232-
await hook.OnMessageReceived(message);
233-
}
234-
235-
await _completer.InsertConversationItem(message);
236-
var instruction = await _completer.UpdateSession(_conn);
237-
await _completer.TriggerModelInference($"{instruction}\r\n\r\nReply based on the user input: {message.Content}");
238-
}
239-
240-
private async Task HandleUserDisconnected()
241-
{
242-
243-
}
244-
245-
private async Task SendEventToUser(WebSocket webSocket, object message)
246-
{
247-
var data = JsonSerializer.Serialize(message);
248-
var buffer = Encoding.UTF8.GetBytes(data);
249-
await webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, CancellationToken.None);
250-
}
251-
252149
public RealtimeHubConnection SetHubConnection(string conversationId)
253150
{
254151
_conn = new RealtimeHubConnection

0 commit comments

Comments
 (0)