Skip to content

Commit d9885c6

Browse files
authored
refactor: split endpoint reader and writer (#2392)
1 parent dd9ede7 commit d9885c6

5 files changed

Lines changed: 161 additions & 90 deletions

File tree

logs/exceptions.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,6 @@
3434

3535
### Proto.Tests.ActorTests.StopActorWithLongRunningTask
3636
`Proto.TestKit.TestKitException: Expected user message of type System.Threading.Tasks.TaskCanceledException, but received system message of type Proto.Stopping`
37+
38+
### Proto.Cluster.Tests.GossipCoreTests.Large_cluster_should_get_topology_consensus
39+
`Expected x.consensus to be True, but found False.`

logs/log1756126139.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
## Log 1756126139
2+
- Split connection reader and writer loops into dedicated `ConnectionReader` and `ConnectionWriter` types under `Endpoints/Runner` to improve separation of concerns and testability.
3+
- Updated `ConnectionRunner` to orchestrate the new types and moved it into the new `Runner` directory.
4+
- Verified .NET SDK installation and rebuilt solution.
5+
- Ran core test suites; `Proto.Actor.Tests` and `Proto.Remote.Tests` passed, while `Proto.Cluster.Tests` had one failing test (`GossipCoreTests.Large_cluster_should_get_topology_consensus`).
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
namespace Proto.Remote;
2+
3+
using System;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Grpc.Core;
7+
using Microsoft.Extensions.Logging;
8+
9+
/// <summary>
10+
/// Handles incoming messages from the remote endpoint.
11+
/// </summary>
12+
internal sealed class ConnectionReader
13+
{
14+
private readonly string _address;
15+
private readonly ActorSystem _system;
16+
private readonly IConnectionMode _mode;
17+
private readonly ILogger _logger;
18+
19+
public ConnectionReader(string address, ActorSystem system, IConnectionMode mode, ILogger logger)
20+
{
21+
_address = address;
22+
_system = system;
23+
_mode = mode;
24+
_logger = logger;
25+
}
26+
27+
public async Task RunAsync(CancellationToken token, AsyncDuplexStreamingCall<RemoteMessage, RemoteMessage> call, string actorSystemId, CancellationTokenSource cts)
28+
{
29+
try
30+
{
31+
while (await call.ResponseStream.MoveNext(token).ConfigureAwait(false))
32+
{
33+
var currentMessage = call.ResponseStream.Current;
34+
switch (currentMessage.MessageTypeCase)
35+
{
36+
case RemoteMessage.MessageTypeOneofCase.DisconnectRequest:
37+
_logger.ReceivedDisconnectionRequest(_system.Address, _address);
38+
var terminated = new EndpointTerminatedEvent(false, _address, actorSystemId);
39+
_system.EventStream.Publish(terminated);
40+
break;
41+
default:
42+
_mode.HandleMessage(currentMessage, _address);
43+
break;
44+
}
45+
}
46+
47+
_logger.ReaderFinished(_system.Address, _address);
48+
}
49+
catch (OperationCanceledException)
50+
{
51+
_logger.ReaderCancelledDebug(_system.Address, _address);
52+
}
53+
catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
54+
{
55+
_logger.ReaderCancelledWarning(_system.Address, _address);
56+
}
57+
catch (Exception e)
58+
{
59+
_logger.ReaderError(_system.Address, _address, e.GetType().Name);
60+
cts.Cancel();
61+
throw;
62+
}
63+
}
64+
}

src/Proto.Remote/Endpoints/ConnectionRunner.cs renamed to src/Proto.Remote/Endpoints/Runner/ConnectionRunner.cs

Lines changed: 7 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
namespace Proto.Remote;
22

33
using System;
4-
using System.Diagnostics;
5-
using System.Linq;
64
using System.Threading;
75
using System.Threading.Tasks;
86
using Grpc.Core;
@@ -25,6 +23,8 @@ public sealed class ConnectionRunner
2523
private readonly Action _onDisconnected;
2624
private readonly Action<double> _recordWriteDuration;
2725
private readonly ILogger _logger;
26+
private readonly ConnectionWriter _writer;
27+
private readonly ConnectionReader _reader;
2828

2929
public ConnectionRunner(string address, IEndpoint endpoint, ActorSystem system, RemoteConfig remoteConfig,
3030
IConnectionMode mode, TimeSpan backoff, int maxRetries, Random random, CancellationToken stopToken,
@@ -43,6 +43,9 @@ public ConnectionRunner(string address, IEndpoint endpoint, ActorSystem system,
4343
_onDisconnected = onDisconnected;
4444
_recordWriteDuration = recordWriteDuration;
4545
_logger = logger;
46+
47+
_writer = new ConnectionWriter(address, endpoint, system, remoteConfig, recordWriteDuration, logger);
48+
_reader = new ConnectionReader(address, system, mode, logger);
4649
}
4750

4851
public async Task RunAsync()
@@ -96,8 +99,8 @@ public async Task RunAsync()
9699

97100
var combinedToken = CancellationTokenSource.CreateLinkedTokenSource(_stopToken, cts.Token).Token;
98101

99-
var writer = RunWriterAsync(combinedToken, call, cts);
100-
var reader = RunReaderAsync(combinedToken, call, actorSystemId, cts);
102+
var writer = _writer.RunAsync(combinedToken, call, cts);
103+
var reader = _reader.RunAsync(combinedToken, call, actorSystemId, cts);
101104

102105
_logger.Connected(_system.Address, _address);
103106

@@ -150,92 +153,6 @@ public async Task RunAsync()
150153
}
151154
}
152155

153-
private async Task RunWriterAsync(CancellationToken token, AsyncDuplexStreamingCall<RemoteMessage, RemoteMessage> call, CancellationTokenSource cts)
154-
{
155-
while (!token.IsCancellationRequested)
156-
{
157-
while (_endpoint.OutgoingStash.TryPop(out var messages))
158-
{
159-
var batch = MessageBatchFactory.CreateBatch(_system, _remoteConfig, messages);
160-
try
161-
{
162-
var sw = Stopwatch.StartNew();
163-
await call.RequestStream.WriteAsync(new RemoteMessage { MessageBatch = batch }, token).ConfigureAwait(false);
164-
sw.Stop();
165-
_recordWriteDuration(sw.Elapsed.TotalSeconds);
166-
}
167-
catch (Exception)
168-
{
169-
_ = _endpoint.OutgoingStash.Append(messages);
170-
cts.Cancel();
171-
throw;
172-
}
173-
}
174-
175-
try
176-
{
177-
await foreach (var messages in _endpoint.Outgoing.Reader.ReadAllAsync(token).ConfigureAwait(false))
178-
{
179-
var batch = MessageBatchFactory.CreateBatch(_system, _remoteConfig, messages);
180-
try
181-
{
182-
var sw = Stopwatch.StartNew();
183-
await call.RequestStream.WriteAsync(new RemoteMessage { MessageBatch = batch }, token).ConfigureAwait(false);
184-
sw.Stop();
185-
_recordWriteDuration(sw.Elapsed.TotalSeconds);
186-
}
187-
catch (Exception)
188-
{
189-
_ = _endpoint.OutgoingStash.Append(messages);
190-
cts.Cancel();
191-
throw;
192-
}
193-
}
194-
}
195-
catch (OperationCanceledException)
196-
{
197-
_logger.WriterCancelled(_system.Address, _address);
198-
}
199-
}
200-
}
201-
202-
private async Task RunReaderAsync(CancellationToken token, AsyncDuplexStreamingCall<RemoteMessage, RemoteMessage> call, string actorSystemId, CancellationTokenSource cts)
203-
{
204-
try
205-
{
206-
while (await call.ResponseStream.MoveNext(token).ConfigureAwait(false))
207-
{
208-
var currentMessage = call.ResponseStream.Current;
209-
switch (currentMessage.MessageTypeCase)
210-
{
211-
case RemoteMessage.MessageTypeOneofCase.DisconnectRequest:
212-
_logger.ReceivedDisconnectionRequest(_system.Address, _address);
213-
var terminated = new EndpointTerminatedEvent(false, _address, actorSystemId);
214-
_system.EventStream.Publish(terminated);
215-
break;
216-
default:
217-
_mode.HandleMessage(currentMessage, _address);
218-
break;
219-
}
220-
}
221-
222-
_logger.ReaderFinished(_system.Address, _address);
223-
}
224-
catch (OperationCanceledException)
225-
{
226-
_logger.ReaderCancelledDebug(_system.Address, _address);
227-
}
228-
catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
229-
{
230-
_logger.ReaderCancelledWarning(_system.Address, _address);
231-
}
232-
catch (Exception e)
233-
{
234-
_logger.ReaderError(_system.Address, _address, e.GetType().Name);
235-
cts.Cancel();
236-
throw;
237-
}
238-
}
239156

240157
private bool ShouldStop(RestartStatistics rs)
241158
{
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
namespace Proto.Remote;
2+
3+
using System;
4+
using System.Diagnostics;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Grpc.Core;
8+
using Microsoft.Extensions.Logging;
9+
using Proto.Extensions;
10+
using System.Linq;
11+
12+
/// <summary>
13+
/// Handles sending message batches to the remote endpoint.
14+
/// </summary>
15+
internal sealed class ConnectionWriter
16+
{
17+
private readonly string _address;
18+
private readonly IEndpoint _endpoint;
19+
private readonly RemoteConfig _remoteConfig;
20+
private readonly ActorSystem _system;
21+
private readonly Action<double> _recordWriteDuration;
22+
private readonly ILogger _logger;
23+
24+
public ConnectionWriter(string address, IEndpoint endpoint, ActorSystem system, RemoteConfig remoteConfig, Action<double> recordWriteDuration, ILogger logger)
25+
{
26+
_address = address;
27+
_endpoint = endpoint;
28+
_system = system;
29+
_remoteConfig = remoteConfig;
30+
_recordWriteDuration = recordWriteDuration;
31+
_logger = logger;
32+
}
33+
34+
public async Task RunAsync(CancellationToken token, AsyncDuplexStreamingCall<RemoteMessage, RemoteMessage> call, CancellationTokenSource cts)
35+
{
36+
while (!token.IsCancellationRequested)
37+
{
38+
while (_endpoint.OutgoingStash.TryPop(out var messages))
39+
{
40+
var batch = MessageBatchFactory.CreateBatch(_system, _remoteConfig, messages);
41+
try
42+
{
43+
var sw = Stopwatch.StartNew();
44+
await call.RequestStream.WriteAsync(new RemoteMessage { MessageBatch = batch }, token).ConfigureAwait(false);
45+
sw.Stop();
46+
_recordWriteDuration(sw.Elapsed.TotalSeconds);
47+
}
48+
catch (Exception)
49+
{
50+
_ = _endpoint.OutgoingStash.Append(messages);
51+
cts.Cancel();
52+
throw;
53+
}
54+
}
55+
56+
try
57+
{
58+
await foreach (var messages in _endpoint.Outgoing.Reader.ReadAllAsync(token).ConfigureAwait(false))
59+
{
60+
var batch = MessageBatchFactory.CreateBatch(_system, _remoteConfig, messages);
61+
try
62+
{
63+
var sw = Stopwatch.StartNew();
64+
await call.RequestStream.WriteAsync(new RemoteMessage { MessageBatch = batch }, token).ConfigureAwait(false);
65+
sw.Stop();
66+
_recordWriteDuration(sw.Elapsed.TotalSeconds);
67+
}
68+
catch (Exception)
69+
{
70+
_ = _endpoint.OutgoingStash.Append(messages);
71+
cts.Cancel();
72+
throw;
73+
}
74+
}
75+
}
76+
catch (OperationCanceledException)
77+
{
78+
_logger.WriterCancelled(_system.Address, _address);
79+
}
80+
}
81+
}
82+
}

0 commit comments

Comments
 (0)