4545import io .grpc .internal .ClientTransport .PingCallback ;
4646import io .grpc .internal .GrpcUtil ;
4747import io .grpc .internal .Http2Ping ;
48- import io .grpc .internal .ManagedClientTransport ;
4948import io .netty .buffer .ByteBuf ;
5049import io .netty .buffer .ByteBufUtil ;
5150import io .netty .buffer .Unpooled ;
8685import java .util .logging .Level ;
8786import java .util .logging .Logger ;
8887
89- import javax .annotation .Nullable ;
90-
9188/**
9289 * Client-side Netty handler for GRPC processing. All event handlers are executed entirely within
9390 * the context of the Netty Channel thread.
@@ -108,14 +105,13 @@ class NettyClientHandler extends AbstractNettyHandler {
108105 Status .UNAVAILABLE .withDescription ("Stream IDs have been exhausted" );
109106
110107 private final Http2Connection .PropertyKey streamKey ;
108+ private final ClientTransportLifecycleManager lifecycleManager ;
111109 private final Ticker ticker ;
112110 private final Random random = new Random ();
113111 private WriteQueue clientWriteQueue ;
114112 private Http2Ping ping ;
115- private Status goAwayStatus ;
116- private Throwable goAwayStatusThrowable ;
117113
118- static NettyClientHandler newHandler (ManagedClientTransport . Listener listener ,
114+ static NettyClientHandler newHandler (ClientTransportLifecycleManager lifecycleManager ,
119115 int flowControlWindow , int maxHeaderListSize ,
120116 Ticker ticker ) {
121117 Preconditions .checkArgument (maxHeaderListSize > 0 , "maxHeaderListSize must be positive" );
@@ -124,19 +120,20 @@ static NettyClientHandler newHandler(ManagedClientTransport.Listener listener,
124120 Http2FrameReader frameReader = new DefaultHttp2FrameReader (headersDecoder );
125121 Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter ();
126122 Http2Connection connection = new DefaultHttp2Connection (false );
127- return newHandler (connection , frameReader , frameWriter , listener , flowControlWindow , ticker );
123+ return newHandler (
124+ connection , frameReader , frameWriter , lifecycleManager , flowControlWindow , ticker );
128125 }
129126
130127 @ VisibleForTesting
131128 static NettyClientHandler newHandler (Http2Connection connection ,
132129 Http2FrameReader frameReader ,
133130 Http2FrameWriter frameWriter ,
134- final ManagedClientTransport . Listener listener ,
131+ ClientTransportLifecycleManager lifecycleManager ,
135132 int flowControlWindow ,
136133 Ticker ticker ) {
137134 Preconditions .checkNotNull (connection , "connection" );
138135 Preconditions .checkNotNull (frameReader , "frameReader" );
139- Preconditions .checkNotNull (listener , "listener " );
136+ Preconditions .checkNotNull (lifecycleManager , "lifecycleManager " );
140137 Preconditions .checkArgument (flowControlWindow > 0 , "flowControlWindow must be positive" );
141138 Preconditions .checkNotNull (ticker , "ticker" );
142139
@@ -145,18 +142,7 @@ static NettyClientHandler newHandler(Http2Connection connection,
145142 frameWriter = new Http2OutboundFrameLogger (frameWriter , frameLogger );
146143
147144 StreamBufferingEncoder encoder = new StreamBufferingEncoder (
148- new DefaultHttp2ConnectionEncoder (connection , frameWriter )) {
149- private boolean firstSettings = true ;
150-
151- @ Override
152- public ChannelFuture writeSettingsAck (ChannelHandlerContext ctx , ChannelPromise promise ) {
153- if (firstSettings ) {
154- firstSettings = false ;
155- listener .transportReady ();
156- }
157- return super .writeSettingsAck (ctx , promise );
158- }
159- };
145+ new DefaultHttp2ConnectionEncoder (connection , frameWriter ));
160146
161147 // Create the local flow controller configured to auto-refill the connection window.
162148 connection .local ().flowController (new DefaultHttp2LocalFlowController (connection ,
@@ -170,13 +156,15 @@ public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise
170156 settings .initialWindowSize (flowControlWindow );
171157 settings .maxConcurrentStreams (0 );
172158
173- return new NettyClientHandler (decoder , encoder , settings , ticker );
159+ return new NettyClientHandler (decoder , encoder , settings , lifecycleManager , ticker );
174160 }
175161
176162 private NettyClientHandler (Http2ConnectionDecoder decoder ,
177163 StreamBufferingEncoder encoder , Http2Settings settings ,
164+ ClientTransportLifecycleManager lifecycleManager ,
178165 Ticker ticker ) {
179166 super (decoder , encoder , settings );
167+ this .lifecycleManager = lifecycleManager ;
180168 this .ticker = ticker ;
181169
182170 // Set the frame listener on the decoder.
@@ -187,22 +175,11 @@ private NettyClientHandler(Http2ConnectionDecoder decoder,
187175 connection .addListener (new Http2ConnectionAdapter () {
188176 @ Override
189177 public void onGoAwayReceived (int lastStreamId , long errorCode , ByteBuf debugData ) {
190- goAwayStatus (statusFromGoAway (errorCode , ByteBufUtil .getBytes (debugData )));
191- goingAway ();
178+ goingAway (statusFromGoAway (errorCode , ByteBufUtil .getBytes (debugData )));
192179 }
193180 });
194181 }
195182
196- /**
197- * Return the reason the handler failed. Only intended to be used by {@link NettyClientTransport}.
198- * Most other classes should retrieve the transport's shutdown status, since it may be more
199- * complete.
200- */
201- @ Nullable
202- public Status errorStatus () {
203- return goAwayStatus ;
204- }
205-
206183 /**
207184 * Handler for commands sent from the stream.
208185 */
@@ -220,10 +197,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
220197 } else if (msg instanceof SendPingCommand ) {
221198 sendPingFrame (ctx , (SendPingCommand ) msg , promise );
222199 } else if (msg instanceof GracefulCloseCommand ) {
223- // Explicitly flush to create any buffered streams before sending GOAWAY.
224- // TODO(ejona): determine if the need to flush is a bug in Netty
225- flush (ctx );
226- close (ctx , promise );
200+ gracefulClose (ctx , (GracefulCloseCommand ) msg , promise );
227201 } else if (msg == NOOP_MESSAGE ) {
228202 ctx .write (Unpooled .EMPTY_BUFFER , promise );
229203 } else {
@@ -278,7 +252,8 @@ private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception
278252 @ Override
279253 public void close (ChannelHandlerContext ctx , ChannelPromise promise ) throws Exception {
280254 logger .fine ("Network channel being closed by the application." );
281- goAwayStatus (Status .UNAVAILABLE .withDescription ("Channel requested transport shutdown" ));
255+ lifecycleManager .notifyShutdown (
256+ Status .UNAVAILABLE .withDescription ("Transport closed for unknown reason" ));
282257 super .close (ctx , promise );
283258 }
284259
@@ -289,15 +264,17 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
289264 public void channelInactive (ChannelHandlerContext ctx ) throws Exception {
290265 try {
291266 logger .fine ("Network channel is closed" );
292- goAwayStatus (goAwayStatus ().augmentDescription ("Network channel closed" ));
293- cancelPing ();
267+ lifecycleManager .notifyShutdown (
268+ Status .UNAVAILABLE .withDescription ("Network closed for unknown reason" ));
269+ cancelPing (lifecycleManager .getShutdownThrowable ());
294270 // Report status to the application layer for any open streams
295271 connection ().forEachActiveStream (new Http2StreamVisitor () {
296272 @ Override
297273 public boolean visit (Http2Stream stream ) throws Http2Exception {
298274 NettyClientStream clientStream = clientStream (stream );
299275 if (clientStream != null ) {
300- clientStream .transportReportStatus (goAwayStatus , false , new Metadata ());
276+ clientStream .transportReportStatus (
277+ lifecycleManager .getShutdownStatus (), false , new Metadata ());
301278 }
302279 return true ;
303280 }
@@ -312,7 +289,8 @@ public boolean visit(Http2Stream stream) throws Http2Exception {
312289 protected void onConnectionError (ChannelHandlerContext ctx , Throwable cause ,
313290 Http2Exception http2Ex ) {
314291 logger .log (Level .FINE , "Caught a connection error" , cause );
315- goAwayStatus (Utils .statusFromThrowable (cause ));
292+ lifecycleManager .notifyShutdown (Utils .statusFromThrowable (cause ));
293+ // Parent class will shut down the Channel
316294 super .onConnectionError (ctx , cause , http2Ex );
317295 }
318296
@@ -342,9 +320,9 @@ protected boolean isGracefulShutdownComplete() {
342320 */
343321 private void createStream (CreateStreamCommand command , final ChannelPromise promise )
344322 throws Exception {
345- if (goAwayStatus != null ) {
323+ if (lifecycleManager . getShutdownThrowable () != null ) {
346324 // The connection is going away, just terminate the stream now.
347- promise .setFailure (goAwayStatusThrowable );
325+ promise .setFailure (lifecycleManager . getShutdownThrowable () );
348326 return ;
349327 }
350328
@@ -398,8 +376,8 @@ public void operationComplete(ChannelFuture future) throws Exception {
398376 if (cause instanceof StreamBufferingEncoder .Http2GoAwayException ) {
399377 StreamBufferingEncoder .Http2GoAwayException e =
400378 (StreamBufferingEncoder .Http2GoAwayException ) cause ;
401- goAwayStatus (statusFromGoAway (e .errorCode (), e .debugData ()));
402- promise .setFailure (goAwayStatusThrowable );
379+ lifecycleManager . notifyShutdown (statusFromGoAway (e .errorCode (), e .debugData ()));
380+ promise .setFailure (lifecycleManager . getShutdownThrowable () );
403381 } else {
404382 promise .setFailure (cause );
405383 }
@@ -434,9 +412,9 @@ private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
434412 */
435413 private void sendPingFrame (ChannelHandlerContext ctx , SendPingCommand msg ,
436414 ChannelPromise promise ) {
437- // Don't check goAwayStatus since we want to allow pings after shutdown but before termination.
438- // After termination, messages will no longer arrive because the pipeline clears all handlers on
439- // channel close.
415+ // Don't check lifecycleManager.getShutdownStatus() since we want to allow pings after shutdown
416+ // but before termination. After termination, messages will no longer arrive because the
417+ // pipeline clears all handlers on channel close.
440418
441419 PingCallback callback = msg .callback ();
442420 Executor executor = msg .executor ();
@@ -477,12 +455,22 @@ public void operationComplete(ChannelFuture future) throws Exception {
477455 });
478456 }
479457
458+ private void gracefulClose (ChannelHandlerContext ctx , GracefulCloseCommand msg ,
459+ ChannelPromise promise ) throws Exception {
460+ lifecycleManager .notifyShutdown (msg .getStatus ());
461+ // Explicitly flush to create any buffered streams before sending GOAWAY.
462+ // TODO(ejona): determine if the need to flush is a bug in Netty
463+ flush (ctx );
464+ close (ctx , promise );
465+ }
466+
480467 /**
481468 * Handler for a GOAWAY being either sent or received. Fails any streams created after the
482469 * last known stream.
483470 */
484- private void goingAway () {
485- final Status goAwayStatus = goAwayStatus ();
471+ private void goingAway (Status status ) {
472+ lifecycleManager .notifyShutdown (status );
473+ final Status goAwayStatus = lifecycleManager .getShutdownStatus ();
486474 final int lastKnownStream = connection ().local ().lastStreamKnownByPeer ();
487475 try {
488476 connection ().forEachActiveStream (new Http2StreamVisitor () {
@@ -503,27 +491,9 @@ public boolean visit(Http2Stream stream) throws Http2Exception {
503491 }
504492 }
505493
506- /**
507- * Returns the appropriate status used to represent the cause for GOAWAY.
508- */
509- private Status goAwayStatus () {
510- if (goAwayStatus != null ) {
511- return goAwayStatus ;
512- }
513- return Status .UNAVAILABLE .withDescription ("Connection going away, but for unknown reason" );
514- }
515-
516- private void goAwayStatus (Status status ) {
517- // Don't overwrite if we already have a goAwayStatus.
518- if (goAwayStatus == null ) {
519- goAwayStatus = status ;
520- goAwayStatusThrowable = status .asException ();
521- }
522- }
523-
524- private void cancelPing () {
494+ private void cancelPing (Throwable t ) {
525495 if (ping != null ) {
526- ping .failed (goAwayStatus (). asException () );
496+ ping .failed (t );
527497 ping = null ;
528498 }
529499 }
@@ -565,6 +535,16 @@ private Http2Stream requireHttp2Stream(int streamId) {
565535 }
566536
567537 private class FrameListener extends Http2FrameAdapter {
538+ private boolean firstSettings = true ;
539+
540+ @ Override
541+ public void onSettingsRead (ChannelHandlerContext ctx , Http2Settings settings ) {
542+ if (firstSettings ) {
543+ firstSettings = false ;
544+ lifecycleManager .notifyReady ();
545+ }
546+ }
547+
568548 @ Override
569549 public int onDataRead (ChannelHandlerContext ctx , int streamId , ByteBuf data , int padding ,
570550 boolean endOfStream ) throws Http2Exception {
0 commit comments