Skip to content

Commit 75eb52b

Browse files
authored
Coalesce support (#870)
1 parent b9a80aa commit 75eb52b

File tree

10 files changed

+182
-30
lines changed

10 files changed

+182
-30
lines changed

src/Client/NetDaemon.HassClient.Tests/HomeAssistantClientTest/HomeAssistantClientTests.cs

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,17 @@ public async Task TestConnectWithHomeAShouldReturnConnection()
3939

4040
connection.Should().NotBeNull();
4141
}
42+
43+
[Fact]
44+
public async Task TestConnectWithOldVersionHomeAShouldReturnConnection()
45+
{
46+
var client = GetDefaultConnectOkToOldVersionHomeAssistantClient();
47+
48+
var connection = await client.ConnectAsync("host", 1, true, "token", "path", CancellationToken.None)
49+
.ConfigureAwait(false);
50+
51+
connection.Should().NotBeNull();
52+
}
4253

4354
[Fact]
4455
public async Task TestConnectWithHomeAssistantNotReadyShouldThrowException()
@@ -83,15 +94,24 @@ private HomeAssistantClient GetDefaultAuthorizedHomeAssistantClient()
8394
Type = "auth_required"
8495
}
8596
);
97+
_pipeline.AddAuthResponse(
98+
new HassAuthResponse()
99+
{
100+
Type = "auth_ok",
101+
HaVersion = "2023.1.0"
102+
}
103+
);
86104
_pipeline.AddResponse(
87105
new HassMessage
88106
{
89-
Type = "auth_ok"
90-
}
107+
Id = 1,
108+
Type = "result",
109+
Success = true
110+
} //{"id":1,"type":"result","success":true,"result":null}
91111
);
92112
return GetDefaultHomeAssistantClient();
93113
}
94-
114+
95115
/// <summary>
96116
/// Return a pre authenticated and running state
97117
/// HomeAssistantClient
@@ -108,10 +128,53 @@ private HomeAssistantClient GetDefaultConnectOkHomeAssistantClient()
108128
Type = "auth_required"
109129
}
110130
);
131+
_pipeline.AddAuthResponse(
132+
new HassAuthResponse()
133+
{
134+
Type = "auth_ok",
135+
HaVersion = "2023.1.0"
136+
}
137+
);
138+
_pipeline.AddResponse(
139+
new HassMessage
140+
{
141+
Id = 1,
142+
Type = "result",
143+
Success = true
144+
} //{"id":1,"type":"result","success":true,"result":null}
145+
);
146+
147+
// The add the fake config state that says running
148+
_haConnectionMock.AddConfigResponseMessage(
149+
new HassConfig
150+
{
151+
State = "RUNNING"
152+
}
153+
);
154+
return GetDefaultHomeAssistantClient();
155+
}
156+
157+
/// <summary>
158+
/// Return a pre authenticated and running state
159+
/// HomeAssistantClient with version less than 2023.9.x
160+
/// </summary>
161+
private HomeAssistantClient GetDefaultConnectOkToOldVersionHomeAssistantClient()
162+
{
163+
// For a successful connection we need success on authorization
164+
// and success on getting a config message that has state="RUNNING"
165+
166+
// First add the authorization responses from pipeline
111167
_pipeline.AddResponse(
112168
new HassMessage
113169
{
114-
Type = "auth_ok"
170+
Type = "auth_required"
171+
}
172+
);
173+
_pipeline.AddAuthResponse(
174+
new HassAuthResponse()
175+
{
176+
Type = "auth_ok",
177+
HaVersion = "2022.8.99"
115178
}
116179
);
117180
// The add the fake config state that says running

src/Client/NetDaemon.HassClient.Tests/HomeAssistantClientTest/TransportPipelineMock.cs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,34 @@ namespace NetDaemon.HassClient.Tests.HomeAssistantClientTest;
33
internal class TransportPipelineMock : Mock<IWebSocketClientTransportPipeline>
44
{
55
private readonly Channel<HassMessage> _responseMessageChannel = Channel.CreateBounded<HassMessage>(100);
6+
private readonly Channel<HassAuthResponse> _authResponseMessageChannel = Channel.CreateBounded<HassAuthResponse>(2);
67

78
public TransportPipelineMock()
89
{
9-
Setup(n => n.GetNextMessageAsync<HassMessage>(It.IsAny<CancellationToken>())).Returns(
10+
Setup(n => n.GetNextMessagesAsync<HassMessage>(It.IsAny<CancellationToken>())).Returns(
1011
async (CancellationToken _) =>
11-
await _responseMessageChannel.Reader.ReadAsync(CancellationToken.None).ConfigureAwait(false));
12+
{
13+
14+
var msg = await _responseMessageChannel.Reader.ReadAsync(CancellationToken.None).ConfigureAwait(false);
15+
return new HassMessage[] { msg };
16+
});
17+
18+
Setup(n => n.GetNextMessagesAsync<HassAuthResponse>(It.IsAny<CancellationToken>())).Returns(
19+
async (CancellationToken _) =>
20+
{
21+
22+
var msg = await _authResponseMessageChannel.Reader.ReadAsync(CancellationToken.None).ConfigureAwait(false);
23+
return new [] { msg };
24+
});
1225
}
1326

1427
public void AddResponse(HassMessage msg)
1528
{
1629
_responseMessageChannel.Writer.TryWrite(msg);
1730
}
31+
32+
public void AddAuthResponse(HassAuthResponse msg)
33+
{
34+
_authResponseMessageChannel.Writer.TryWrite(msg);
35+
}
1836
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
{
2-
"type": "auth_ok"
2+
"type": "auth_ok",
3+
"ha_version": "23.1.0"
34
}

src/Client/NetDaemon.HassClient.Tests/Net/WebSocketTransportPipelineTests.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ public async Task TestGetNextMessageAsyncGetsCorrectMessage()
1717
WsMock.AddResponse(@"{""type"": ""auth_required""}");
1818

1919
// ACT
20-
var msg = await DefaultPipeline.GetNextMessageAsync<HassMessage>(CancellationToken.None).ConfigureAwait(false);
20+
var msg = await DefaultPipeline.GetNextMessagesAsync<HassMessage>(CancellationToken.None).ConfigureAwait(false);
2121

2222
// ASSERT
23-
msg.Type
23+
msg[0].Type
2424
.Should()
2525
.BeEquivalentTo("auth_required");
2626
}
@@ -33,7 +33,7 @@ public async Task TestGetNextMessageAsyncOnClosedSocketShouldCastException()
3333

3434
// ACT AND ASSERT
3535
await Assert.ThrowsAsync<ApplicationException>(async () =>
36-
await DefaultPipeline.GetNextMessageAsync<HassMessage>(CancellationToken.None).ConfigureAwait(false));
36+
await DefaultPipeline.GetNextMessagesAsync<HassMessage>(CancellationToken.None).ConfigureAwait(false));
3737
}
3838

3939
[Fact]
@@ -48,10 +48,10 @@ public async Task TestGetNextMessageAsyncBigMessageInChunks()
4848
WsMock.AddResponse(sb.ToString());
4949

5050
// ACT
51-
var msg = await DefaultPipeline.GetNextMessageAsync<ChunkedMessagesTestClass>(CancellationToken.None)
51+
var msg = await DefaultPipeline.GetNextMessagesAsync<ChunkedMessagesTestClass>(CancellationToken.None)
5252
.ConfigureAwait(false);
5353

54-
msg.BigChunkedMessage
54+
msg[0].BigChunkedMessage
5555
.Should()
5656
.HaveLength(8180);
5757

@@ -107,7 +107,7 @@ public async Task TestGetNextMessageAsyncOnRemoteClosingWebsocketShouldThrowExce
107107

108108
// The operation should be cancelled when remote closes websocket
109109
await Assert.ThrowsAsync<OperationCanceledException>(async () =>
110-
await DefaultPipeline.GetNextMessageAsync<HassMessage>(CancellationToken.None).ConfigureAwait(false));
110+
await DefaultPipeline.GetNextMessagesAsync<HassMessage>(CancellationToken.None).ConfigureAwait(false));
111111

112112
// CloseOutput should always be called when
113113
// a close frame are sent from the remote websocket
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace NetDaemon.Client.HomeAssistant.Model;
2+
3+
public record HassAuthResponse : HassMessageBase
4+
{
5+
[JsonPropertyName("ha_version")] public string HaVersion { get; init; } = String.Empty;
6+
7+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using System.Runtime.InteropServices.JavaScript;
2+
3+
namespace NetDaemon.Client.Internal.HomeAssistant.Commands;
4+
5+
internal record SupportedFeaturesCommand : CommandMessage
6+
{
7+
public SupportedFeaturesCommand()
8+
{
9+
Type = "supported_features";
10+
}
11+
12+
[JsonPropertyName("features")] public Features? Features { get; init; } = null;
13+
}
14+
15+
internal record Features
16+
{
17+
[JsonPropertyName("coalesce_messages")] public short? CoalesceMessages { get; init; } = 1;
18+
}

src/Client/NetDaemon.HassClient/Internal/HomeAssistantClient.cs

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,12 @@ public async Task<IHomeAssistantConnection> ConnectAsync(string host, int port,
4040

4141
var transportPipeline = _transportPipelineFactory.New(ws);
4242

43-
await HandleAuthorizationSequence(token, transportPipeline, cancelToken).ConfigureAwait(false);
43+
var hassVersionInfo = await HandleAuthorizationSequenceAndReturnHassVersionInfo(token, transportPipeline, cancelToken).ConfigureAwait(false);
44+
45+
if (Version.Parse(hassVersionInfo) >= new Version(2022, 9))
46+
{
47+
await AddCoalesceSupport(transportPipeline, cancelToken).ConfigureAwait(false);
48+
}
4449

4550
var connection = _connectionFactory.New(transportPipeline);
4651

@@ -60,6 +65,28 @@ public async Task<IHomeAssistantConnection> ConnectAsync(string host, int port,
6065
}
6166
}
6267

68+
private async Task AddCoalesceSupport(IWebSocketClientTransportPipeline transportPipeline, CancellationToken cancelToken)
69+
{
70+
var supportedFeaturesCommandMsg = new SupportedFeaturesCommand
71+
{Id = 1, Features = new Features() { CoalesceMessages = 1 }};
72+
73+
// Send the supported features command
74+
await transportPipeline.SendMessageAsync(
75+
supportedFeaturesCommandMsg,
76+
cancelToken
77+
).ConfigureAwait(false);
78+
79+
// Get the result from command
80+
var resultMsg = await transportPipeline
81+
.GetNextMessagesAsync<HassMessage>(cancelToken).ConfigureAwait(false);
82+
83+
if (resultMsg.Single().Success == true)
84+
{
85+
return;
86+
}
87+
throw new InvalidOperationException($"Failed to get result from supported feature command : {resultMsg.Single()}");
88+
}
89+
6390
private static Uri GetHomeAssistantWebSocketUri(string host, int port, bool ssl, string websocketPath)
6491
{
6592
return new Uri($"{(ssl ? "wss" : "ws")}://{host}:{port}/{websocketPath}");
@@ -78,17 +105,17 @@ private static async Task<bool> CheckIfRunning(IHomeAssistantConnection connecti
78105
return config.State == "RUNNING";
79106
}
80107

81-
private static async Task HandleAuthorizationSequence(string token,
108+
private static async Task<string> HandleAuthorizationSequenceAndReturnHassVersionInfo(string token,
82109
IWebSocketClientTransportPipeline transportPipeline, CancellationToken cancelToken)
83110
{
84111
var connectTimeoutTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancelToken);
85112
connectTimeoutTokenSource.CancelAfter(5000);
86113
// Begin the authorization sequence
87114
// Expect 'auth_required'
88-
var msg = await transportPipeline.GetNextMessageAsync<HassMessage>(connectTimeoutTokenSource.Token)
115+
var msg = await transportPipeline.GetNextMessagesAsync<HassMessage>(connectTimeoutTokenSource.Token)
89116
.ConfigureAwait(false);
90-
if (msg.Type != "auth_required")
91-
throw new ApplicationException($"Unexpected type: '{msg.Type}' expected 'auth_required'");
117+
if (msg[0].Type != "auth_required")
118+
throw new ApplicationException($"Unexpected type: '{msg[0].Type}' expected 'auth_required'");
92119

93120
// Now send the auth message to Home Assistant
94121
await transportPipeline.SendMessageAsync(
@@ -97,19 +124,20 @@ await transportPipeline.SendMessageAsync(
97124
).ConfigureAwait(false);
98125
// Now get the result
99126
var authResultMessage = await transportPipeline
100-
.GetNextMessageAsync<HassMessage>(connectTimeoutTokenSource.Token).ConfigureAwait(false);
127+
.GetNextMessagesAsync<HassAuthResponse>(connectTimeoutTokenSource.Token).ConfigureAwait(false);
101128

102-
switch (authResultMessage.Type)
129+
switch (authResultMessage.Single().Type)
103130
{
104131
case "auth_ok":
105-
return;
132+
133+
return authResultMessage[0].HaVersion;
106134

107135
case "auth_invalid":
108136
await transportPipeline.CloseAsync().ConfigureAwait(false);
109137
throw new HomeAssistantConnectionException(DisconnectReason.Unauthorized);
110138

111139
default:
112-
throw new ApplicationException($"Unexpected response ({authResultMessage.Type})");
140+
throw new ApplicationException($"Unexpected response ({authResultMessage.Single().Type})");
113141
}
114142
}
115143
}

src/Client/NetDaemon.HassClient/Internal/HomeAssistantConnection.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,11 +208,14 @@ private async Task HandleNewMessages()
208208
{
209209
while (!_internalCancelSource.IsCancellationRequested)
210210
{
211-
var msg = await _transportPipeline.GetNextMessageAsync<HassMessage>(_internalCancelSource.Token)
211+
var msg = await _transportPipeline.GetNextMessagesAsync<HassMessage>(_internalCancelSource.Token)
212212
.ConfigureAwait(false);
213213
try
214214
{
215-
_hassMessageSubject.OnNext(msg);
215+
foreach (var obj in msg)
216+
{
217+
_hassMessageSubject.OnNext(obj);
218+
}
216219
}
217220
catch (Exception e)
218221
{

src/Client/NetDaemon.HassClient/Internal/Net/ITransportPipeline.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ internal interface IWebSocketClientTransportPipeline : IAsyncDisposable
1414
/// <summary>
1515
/// Gets next message from pipeline
1616
/// </summary>
17-
ValueTask<T> GetNextMessageAsync<T>(CancellationToken cancellationToken) where T : class;
17+
ValueTask<T[]> GetNextMessagesAsync<T>(CancellationToken cancellationToken) where T : class;
1818

1919
/// <summary>
2020
/// Sends a message to the pipeline

src/Client/NetDaemon.HassClient/Internal/Net/WebSocketTransportPipeline.cs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public async ValueTask DisposeAsync()
4545
await _ws.DisposeAsync().ConfigureAwait(false);
4646
}
4747

48-
public async ValueTask<T> GetNextMessageAsync<T>(CancellationToken cancelToken) where T : class
48+
public async ValueTask<T[]> GetNextMessagesAsync<T>(CancellationToken cancelToken) where T : class
4949
{
5050
if (_ws.State != WebSocketState.Open)
5151
throw new ApplicationException("Cannot send data on a closed socket!");
@@ -60,7 +60,7 @@ public async ValueTask<T> GetNextMessageAsync<T>(CancellationToken cancelToken)
6060
// the pipeline for new data written from websocket input
6161
// We want the processing to start before we read data
6262
// from the websocket so the pipeline is not getting full
63-
var serializeTask = ReadMessageFromPipelineAndSerializeAsync<T>(combinedTokenSource.Token);
63+
var serializeTask = ReadMessagesFromPipelineAndSerializeAsync<T>(combinedTokenSource.Token);
6464
await ReadMessageFromWebSocketAndWriteToPipelineAsync(combinedTokenSource.Token).ConfigureAwait(false);
6565
var result = await serializeTask.ConfigureAwait(false);
6666
// File.WriteAllText("./json_result.json", JsonSerializer.Serialize<T>(result, _defaultSerializerOptions));
@@ -96,15 +96,29 @@ public Task SendMessageAsync<T>(T message, CancellationToken cancelToken) where
9696
/// </summary>
9797
/// <param name="cancelToken">Cancellation token</param>
9898
/// <typeparam name="T">The type to serialize to</typeparam>
99-
private async ValueTask<T> ReadMessageFromPipelineAndSerializeAsync<T>(CancellationToken cancelToken)
99+
private async Task<T[]> ReadMessagesFromPipelineAndSerializeAsync<T>(CancellationToken cancelToken)
100100
{
101101
try
102102
{
103-
var message = await JsonSerializer.DeserializeAsync<T>(_pipe.Reader.AsStream(),
103+
var message = await JsonSerializer.DeserializeAsync<JsonElement?>(_pipe.Reader.AsStream(),
104104
cancellationToken: cancelToken).ConfigureAwait(false)
105105
?? throw new ApplicationException(
106106
"Deserialization of websocket returned empty result (null)");
107-
return message;
107+
if (message.ValueKind == JsonValueKind.Array)
108+
{
109+
// This is a coalesced message containing multiple messages so we need to
110+
// deserialize it as an array
111+
var obj = message.Deserialize<T[]>() ?? throw new ApplicationException(
112+
"Deserialization of websocket returned empty result (null)");
113+
return obj;
114+
}
115+
else
116+
{
117+
// This is normal message and we deserialize it as object
118+
var obj = message.Deserialize<T>() ?? throw new ApplicationException(
119+
"Deserialization of websocket returned empty result (null)");
120+
return new T[] { obj };
121+
}
108122
}
109123
finally
110124
{

0 commit comments

Comments
 (0)