@@ -7,6 +7,7 @@ namespace Proto.Remote;
77using System . Threading . Tasks ;
88using Grpc . Core ;
99using Grpc . Net . Client ;
10+ using Microsoft . Extensions . Logging ;
1011using Proto . Extensions ;
1112
1213public sealed class ConnectionRunner
@@ -23,16 +24,11 @@ public sealed class ConnectionRunner
2324 private readonly Action _onConnected ;
2425 private readonly Action _onDisconnected ;
2526 private readonly Action < double > _recordWriteDuration ;
26- private readonly Action < string > _logInfo ;
27- private readonly Action < string > _logDebug ;
28- private readonly Action < string > _logWarning ;
29- private readonly Action < Exception , string > _logError ;
27+ private readonly ILogger _logger ;
3028
3129 public ConnectionRunner ( string address , IEndpoint endpoint , ActorSystem system , RemoteConfig remoteConfig ,
3230 IConnectionMode mode , TimeSpan backoff , int maxRetries , Random random , CancellationToken stopToken ,
33- Action onConnected , Action onDisconnected , Action < double > recordWriteDuration ,
34- Action < string > logInfo , Action < string > logDebug , Action < string > logWarning ,
35- Action < Exception , string > logError )
31+ Action onConnected , Action onDisconnected , Action < double > recordWriteDuration , ILogger logger )
3632 {
3733 _address = address ;
3834 _endpoint = endpoint ;
@@ -46,10 +42,7 @@ public ConnectionRunner(string address, IEndpoint endpoint, ActorSystem system,
4642 _onConnected = onConnected ;
4743 _onDisconnected = onDisconnected ;
4844 _recordWriteDuration = recordWriteDuration ;
49- _logInfo = logInfo ;
50- _logDebug = logDebug ;
51- _logWarning = logWarning ;
52- _logError = logError ;
45+ _logger = logger ;
5346 }
5447
5548 public async Task RunAsync ( )
@@ -62,7 +55,7 @@ public async Task RunAsync()
6255 var cts = new CancellationTokenSource ( ) ;
6356 try
6457 {
65- _logInfo ( $ "[ServerConnector][ { _system . Address } ] Connecting to { _address } " ) ;
58+ _logger . Connecting ( _system . Address , _address ) ;
6659
6760 var addressWithProtocol = $ "{ ( _remoteConfig . UseHttps ? "https://" : "http://" ) } { _address } ";
6861 var channel = GrpcChannel . ForAddress ( addressWithProtocol , _remoteConfig . ChannelOptions ) ;
@@ -83,7 +76,7 @@ public async Task RunAsync()
8376 var connectResponse = response . ConnectResponse ;
8477 if ( connectResponse . Blocked )
8578 {
86- _logError ( new Exception ( "Blocked" ) , $ "[ServerConnector][ { _system . Address } ] Connection Refused to remote member { connectResponse . MemberId } address { _address } , we are blocked" ) ;
79+ _logger . ConnectionRefusedWeAreBlocked ( new Exception ( "Blocked" ) , _system . Address , connectResponse . MemberId , _address ) ;
8780 _system . Remote ( ) . BlockList . Block ( new [ ] { _system . Id } , "Blocked by remote member" ) ;
8881 var terminated = new EndpointTerminatedEvent ( false , _address , _system . Id ) ;
8982 _system . EventStream . Publish ( terminated ) ;
@@ -93,7 +86,7 @@ public async Task RunAsync()
9386 actorSystemId = connectResponse . MemberId ;
9487 if ( _system . Remote ( ) . BlockList . IsBlocked ( actorSystemId ) )
9588 {
96- _logError ( new Exception ( "Blocked" ) , $ "[ServerConnector][ { _system . Address } ] Connection Refused to remote member { connectResponse . MemberId } address { _address } , they are blocked" ) ;
89+ _logger . ConnectionRefusedTheyAreBlocked ( new Exception ( "Blocked" ) , _system . Address , connectResponse . MemberId , _address ) ;
9790 var terminated = new EndpointTerminatedEvent ( false , _address , _system . Id ) ;
9891 _system . EventStream . Publish ( terminated ) ;
9992 return ;
@@ -103,26 +96,26 @@ public async Task RunAsync()
10396
10497 var combinedToken = CancellationTokenSource . CreateLinkedTokenSource ( _stopToken , cts . Token ) . Token ;
10598
106- var writer = StartWriter ( combinedToken , call , cts ) ;
107- var reader = StartReader ( combinedToken , call , actorSystemId , cts ) ;
99+ var writer = RunWriterAsync ( combinedToken , call , cts ) ;
100+ var reader = RunReaderAsync ( combinedToken , call , actorSystemId , cts ) ;
108101
109- _logInfo ( $ "[ServerConnector][ { _system . Address } ] Connected to { _address } " ) ;
102+ _logger . Connected ( _system . Address , _address ) ;
110103
111104 await writer . ConfigureAwait ( false ) ;
112105 cts . Cancel ( ) ;
113106 await call . RequestStream . CompleteAsync ( ) . ConfigureAwait ( false ) ;
114107 await reader . ConfigureAwait ( false ) ;
115108
116109 _onDisconnected ( ) ;
117- _logInfo ( $ "[ServerConnector][ { _system . Address } ] Disconnected from { _address } " ) ;
110+ _logger . Disconnected ( _system . Address , _address ) ;
118111 }
119112 catch ( Exception e )
120113 {
121114 e . CheckFailFast ( ) ;
122115
123116 if ( actorSystemId is not null && _system . Remote ( ) . BlockList . IsBlocked ( actorSystemId ) )
124117 {
125- _logDebug ( $ "[ServerConnector][ { _system . Address } ] dropped connection to blocked member { actorSystemId } / { _address } " ) ;
118+ _logger . DroppedBlockedConnection ( _system . Address , actorSystemId , _address ) ;
126119 var terminated = new EndpointTerminatedEvent ( true , _address , actorSystemId ) ;
127120 _system . EventStream . Publish ( terminated ) ;
128121 break ;
@@ -132,11 +125,11 @@ public async Task RunAsync()
132125 {
133126 if ( e is RpcException { StatusCode : StatusCode . Unavailable } )
134127 {
135- _logInfo ( $ "[ServerConnector][ { _system . Address } ] Stopping connection to { _address } after retries expired because the endpoint is unavailable" ) ;
128+ _logger . StoppingUnavailable ( _system . Address , _address ) ;
136129 }
137130 else
138131 {
139- _logError ( e , $ "[ServerConnector][ { _system . Address } ] Stopping connection to { _address } after retries expired because of { e . GetType ( ) . Name } " ) ;
132+ _logger . StoppingBecauseException ( e , _system . Address , _address , e . GetType ( ) . Name ) ;
140133 }
141134
142135 var terminated = new EndpointTerminatedEvent ( true , _address , actorSystemId ) ;
@@ -148,7 +141,7 @@ public async Task RunAsync()
148141 var noise = _random . Next ( 500 ) ;
149142 var duration = TimeSpan . FromMilliseconds ( backoff + noise ) ;
150143 await Task . Delay ( duration ) . ConfigureAwait ( false ) ;
151- _logWarning ( $ "[ServerConnector][ { _system . Address } ] Restarting endpoint connection to { _address } after { duration } because of { e . GetType ( ) . Name } ( { rs . FailureCount } / { _maxRetries } )" ) ;
144+ _logger . RestartingEndpoint ( _system . Address , _address , duration , e . GetType ( ) . Name , rs . FailureCount , _maxRetries ) ;
152145 }
153146 finally
154147 {
@@ -157,13 +150,31 @@ public async Task RunAsync()
157150 }
158151 }
159152
160- private Task StartWriter ( CancellationToken token , AsyncDuplexStreamingCall < RemoteMessage , RemoteMessage > call , CancellationTokenSource cts )
153+ private async Task RunWriterAsync ( CancellationToken token , AsyncDuplexStreamingCall < RemoteMessage , RemoteMessage > call , CancellationTokenSource cts )
161154 {
162- return Task . Run ( async ( ) =>
155+ while ( ! token . IsCancellationRequested )
163156 {
164- while ( ! token . IsCancellationRequested )
157+ while ( _endpoint . OutgoingStash . TryPop ( out var messages ) )
165158 {
166- while ( _endpoint . OutgoingStash . TryPop ( out var messages ) )
159+ var batch = MessageBatchFactory . CreateBatch ( _system , _remoteConfig , messages ) ;
160+ try
161+ {
162+ var sw = Stopwatch . StartNew ( ) ;
163+ await call . RequestStream . WriteAsync ( new RemoteMessage { MessageBatch = batch } , token ) . ConfigureAwait ( false ) ;
164+ sw . Stop ( ) ;
165+ _recordWriteDuration ( sw . Elapsed . TotalSeconds ) ;
166+ }
167+ catch ( Exception )
168+ {
169+ _ = _endpoint . OutgoingStash . Append ( messages ) ;
170+ cts . Cancel ( ) ;
171+ throw ;
172+ }
173+ }
174+
175+ try
176+ {
177+ await foreach ( var messages in _endpoint . Outgoing . Reader . ReadAllAsync ( token ) . ConfigureAwait ( false ) )
167178 {
168179 var batch = MessageBatchFactory . CreateBatch ( _system , _remoteConfig , messages ) ;
169180 try
@@ -180,74 +191,50 @@ private Task StartWriter(CancellationToken token, AsyncDuplexStreamingCall<Remot
180191 throw ;
181192 }
182193 }
183-
184- try
185- {
186- await foreach ( var messages in _endpoint . Outgoing . Reader . ReadAllAsync ( token ) . ConfigureAwait ( false ) )
187- {
188- var batch = MessageBatchFactory . CreateBatch ( _system , _remoteConfig , messages ) ;
189- try
190- {
191- var sw = Stopwatch . StartNew ( ) ;
192- await call . RequestStream . WriteAsync ( new RemoteMessage { MessageBatch = batch } , token ) . ConfigureAwait ( false ) ;
193- sw . Stop ( ) ;
194- _recordWriteDuration ( sw . Elapsed . TotalSeconds ) ;
195- }
196- catch ( Exception )
197- {
198- _ = _endpoint . OutgoingStash . Append ( messages ) ;
199- cts . Cancel ( ) ;
200- throw ;
201- }
202- }
203- }
204- catch ( OperationCanceledException )
205- {
206- _logDebug ( $ "[ServerConnector][{ _system . Address } ] Writer cancelled for { _address } ") ;
207- }
208194 }
209- } ) ;
195+ catch ( OperationCanceledException )
196+ {
197+ _logger . WriterCancelled ( _system . Address , _address ) ;
198+ }
199+ }
210200 }
211201
212- private Task StartReader ( CancellationToken token , AsyncDuplexStreamingCall < RemoteMessage , RemoteMessage > call , string actorSystemId , CancellationTokenSource cts )
202+ private async Task RunReaderAsync ( CancellationToken token , AsyncDuplexStreamingCall < RemoteMessage , RemoteMessage > call , string actorSystemId , CancellationTokenSource cts )
213203 {
214- return Task . Run ( async ( ) =>
204+ try
215205 {
216- try
206+ while ( await call . ResponseStream . MoveNext ( token ) . ConfigureAwait ( false ) )
217207 {
218- while ( await call . ResponseStream . MoveNext ( token ) . ConfigureAwait ( false ) )
208+ var currentMessage = call . ResponseStream . Current ;
209+ switch ( currentMessage . MessageTypeCase )
219210 {
220- var currentMessage = call . ResponseStream . Current ;
221- switch ( currentMessage . MessageTypeCase )
222- {
223- case RemoteMessage . MessageTypeOneofCase . DisconnectRequest :
224- _logDebug ( $ "[ServerConnector][{ _system . Address } ] Received disconnection request from { _address } ") ;
225- var terminated = new EndpointTerminatedEvent ( false , _address , actorSystemId ) ;
226- _system . EventStream . Publish ( terminated ) ;
227- break ;
228- default :
229- _mode . HandleMessage ( currentMessage , _address ) ;
230- break ;
231- }
211+ case RemoteMessage . MessageTypeOneofCase . DisconnectRequest :
212+ _logger . ReceivedDisconnectionRequest ( _system . Address , _address ) ;
213+ var terminated = new EndpointTerminatedEvent ( false , _address , actorSystemId ) ;
214+ _system . EventStream . Publish ( terminated ) ;
215+ break ;
216+ default :
217+ _mode . HandleMessage ( currentMessage , _address ) ;
218+ break ;
232219 }
233-
234- _logDebug ( $ "[ServerConnector][{ _system . Address } ] Reader finished for { _address } ") ;
235220 }
236- catch ( OperationCanceledException )
237- {
238- _logDebug ( $ "[ServerConnector][{ _system . Address } ] Reader cancelled for { _address } ") ;
239- }
240- catch ( RpcException e ) when ( e . StatusCode == StatusCode . Cancelled )
241- {
242- _logWarning ( $ "[ServerConnector][{ _system . Address } ] Reader cancelled for { _address } ") ;
243- }
244- catch ( Exception e )
245- {
246- _logWarning ( $ "[ServerConnector][{ _system . Address } ] Error in reader for { _address } { e . GetType ( ) . Name } ") ;
247- cts . Cancel ( ) ;
248- throw ;
249- }
250- } ) ;
221+
222+ _logger . ReaderFinished ( _system . Address , _address ) ;
223+ }
224+ catch ( OperationCanceledException )
225+ {
226+ _logger . ReaderCancelledDebug ( _system . Address , _address ) ;
227+ }
228+ catch ( RpcException e ) when ( e . StatusCode == StatusCode . Cancelled )
229+ {
230+ _logger . ReaderCancelledWarning ( _system . Address , _address ) ;
231+ }
232+ catch ( Exception e )
233+ {
234+ _logger . ReaderError ( _system . Address , _address , e . GetType ( ) . Name ) ;
235+ cts . Cancel ( ) ;
236+ throw ;
237+ }
251238 }
252239
253240 private bool ShouldStop ( RestartStatistics rs )
0 commit comments