diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java index 9fe6cfb916..bf7232b159 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java @@ -158,7 +158,7 @@ protected final void doCloseAsyncGracefully() { keepAliveManager.initiateGracefulClose(() -> { // no need to notifyOnClosing bcz it's already notified in NettyChannelListenableAsyncCloseable before // invoking this method - }); + }, true); } private void notifyOnClosingImpl() { // For access from AbstractH2ParentConnection @@ -278,7 +278,7 @@ public final void channelRead(ChannelHandlerContext ctx, Object msg) { // We trigger the graceful close process here (with no timeout) to make sure the socket is closed once // the existing streams are closed. The MultiplexCodec may simulate a GOAWAY when the stream IDs are // exhausted so we shouldn't rely upon our peer to close the transport. - parentContext.keepAliveManager.initiateGracefulClose(parentContext::notifyOnClosingImpl); + parentContext.keepAliveManager.initiateGracefulClose(parentContext::notifyOnClosingImpl, false); } else if (msg instanceof Http2PingFrame) { parentContext.keepAliveManager.pingReceived((Http2PingFrame) msg); } else if (!(msg instanceof Http2SettingsAckFrame)) { // we ignore SETTINGS(ACK) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/KeepAliveManager.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/KeepAliveManager.java index ba6b97c002..acdc7a1c53 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/KeepAliveManager.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/KeepAliveManager.java @@ -19,12 +19,15 @@ import io.servicetalk.http.netty.H2ProtocolConfig.KeepAlivePolicy; import io.servicetalk.transport.netty.internal.ChannelCloseUtils; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoop; import io.netty.channel.socket.DuplexChannel; import io.netty.handler.codec.http2.DefaultHttp2GoAwayFrame; import io.netty.handler.codec.http2.DefaultHttp2PingFrame; +import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2PingFrame; import io.netty.handler.codec.http2.Http2StreamChannel; import io.netty.handler.ssl.SslHandler; @@ -42,7 +45,9 @@ import java.util.function.Predicate; import javax.annotation.Nullable; +import static io.netty.buffer.ByteBufUtil.writeAscii; import static io.netty.buffer.Unpooled.EMPTY_BUFFER; +import static io.netty.buffer.Unpooled.unreleasableBuffer; import static io.netty.channel.ChannelOption.ALLOW_HALF_CLOSURE; import static io.netty.handler.codec.http2.Http2Error.NO_ERROR; import static io.servicetalk.http.netty.H2KeepAlivePolicies.DEFAULT_ACK_TIMEOUT; @@ -65,8 +70,17 @@ private enum State { private static final Logger LOGGER = LoggerFactory.getLogger(KeepAliveManager.class); private static final AtomicIntegerFieldUpdater activeStreamsUpdater = AtomicIntegerFieldUpdater.newUpdater(KeepAliveManager.class, "activeStreams"); - private static final long GRACEFUL_CLOSE_PING_CONTENT = ThreadLocalRandom.current().nextLong(); - private static final long KEEP_ALIVE_PING_CONTENT = ThreadLocalRandom.current().nextLong(); + + // Use the last digit (even or odd) to distinguish PING frames when frame logging is enabled. + private static final long GRACEFUL_CLOSE_PING_CONTENT = ThreadLocalRandom.current().nextLong() | 0x01L; // odd + private static final long KEEP_ALIVE_PING_CONTENT = ThreadLocalRandom.current().nextLong() & ~0x01L; // even + + // Frame logging dumps data in hex format. An integer helps to understand the cause without decoding the content. + static final ByteBuf LOCAL_GO_AWAY_CONTENT = staticByteBufFromAscii("0.local"); + static final ByteBuf REMOTE_GO_AWAY_CONTENT = staticByteBufFromAscii("1.remote"); + static final ByteBuf SECOND_GO_AWAY_CONTENT = staticByteBufFromAscii("2.second"); + static final ByteBuf GC_TIMEOUT_GO_AWAY_CONTENT = staticByteBufFromAscii("3.graceful-close-timeout"); + static final ByteBuf KA_TIMEOUT_GO_AWAY_CONTENT = staticByteBufFromAscii("4.keep-alive-timeout"); private volatile int activeStreams; @@ -147,19 +161,19 @@ protected void channelIdle(final ChannelHandlerContext ctx, final IdleStateEvent keepAliveState = State.KEEP_ALIVE_ACK_TIMEDOUT; final long timeoutMillis = NANOSECONDS.toMillis(pingAckTimeoutNanos); LOGGER.debug( - "{} Timeout after {}ms waiting for keep-alive PING(ACK), writing GO_AWAY and " + - "closing the channel with activeStreams={}", + "{} Timeout after {}ms waiting for keep-alive PING(ACK), writing GO_AWAY frame " + + "and closing the channel with activeStreams={}", this.channel, timeoutMillis, activeStreams); final TimeoutException cause = StacklessTimeoutException.newInstance( "Timeout after " + timeoutMillis + "ms waiting for keep-alive PING(ACK)", KeepAliveManager.class, "keepAlivePingAckTimeout()"); - channel.writeAndFlush(new DefaultHttp2GoAwayFrame(NO_ERROR)) + channel.writeAndFlush(newGoAwayFrame(NO_ERROR, KA_TIMEOUT_GO_AWAY_CONTENT)) .addListener(f -> { Throwable closeCause = cause; if (!f.isSuccess()) { closeCause = addSuppressed(f.cause(), cause); - LOGGER.debug("{} Failed to write the last GO_AWAY after PING(ACK) " + - "timeout, closing the channel", channel, closeCause); + LOGGER.debug("{} Failed to write the GO_AWAY frame after keep-alive " + + "PING(ACK) timeout, closing the channel", channel, closeCause); } close0(closeCause); }); @@ -185,7 +199,7 @@ void pingReceived(final Http2PingFrame pingFrame) { if (pingFrame.ack()) { long pingAckContent = pingFrame.content(); if (pingAckContent == GRACEFUL_CLOSE_PING_CONTENT) { - LOGGER.debug("{} Graceful close PING(ACK) received, writing the second GO_AWAY, activeStreams={}", + LOGGER.debug("{} Graceful close PING(ACK) received, writing the second GO_AWAY frame, activeStreams={}", channel, activeStreams); cancelIfStateIsAFuture(gracefulCloseState); gracefulCloseWriteSecondGoAway(null); @@ -224,12 +238,12 @@ void channelClosed() { inputShutdownTimeoutFuture = null; } - void initiateGracefulClose(final Runnable whenInitiated) { + void initiateGracefulClose(final Runnable whenInitiated, final boolean local) { EventLoop eventLoop = channel.eventLoop(); if (eventLoop.inEventLoop()) { - doCloseAsyncGracefully0(whenInitiated); + doCloseAsyncGracefully0(whenInitiated, local); } else { - eventLoop.execute(() -> doCloseAsyncGracefully0(whenInitiated)); + eventLoop.execute(() -> doCloseAsyncGracefully0(whenInitiated, local)); } } @@ -320,7 +334,7 @@ private void channelHalfShutdown(String side, Predicate otherSide } } - private void doCloseAsyncGracefully0(final Runnable whenInitiated) { + private void doCloseAsyncGracefully0(final Runnable whenInitiated, final boolean local) { assert channel.eventLoop().inEventLoop(); if (gracefulCloseState != null) { @@ -342,7 +356,8 @@ private void doCloseAsyncGracefully0(final Runnable whenInitiated) { // time duration for inflight frames to land, and the second GOAWAY includes the maximum known stream ID. // To account for 2 RTTs we can send a PING and when the PING(ACK) comes back we can send the second GOAWAY. // [1] https://tools.ietf.org/html/rfc7540#section-6.8 - DefaultHttp2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(NO_ERROR); + DefaultHttp2GoAwayFrame goAwayFrame = newGoAwayFrame(NO_ERROR, + local ? LOCAL_GO_AWAY_CONTENT : REMOTE_GO_AWAY_CONTENT); goAwayFrame.setExtraStreamIds(Integer.MAX_VALUE); channel.write(goAwayFrame); channel.writeAndFlush(new DefaultHttp2PingFrame(GRACEFUL_CLOSE_PING_CONTENT)).addListener(future -> { @@ -360,7 +375,7 @@ private void doCloseAsyncGracefully0(final Runnable whenInitiated) { // down the connection. final long timeoutMillis = NANOSECONDS.toMillis(pingAckTimeoutNanos); LOGGER.debug("{} Timeout after {}ms waiting for graceful close PING(ACK), writing the second " + - "GO_AWAY and closing the channel with activeStreams={}", + "GO_AWAY frame and closing the channel with activeStreams={}", channel, timeoutMillis, activeStreams); gracefulCloseWriteSecondGoAway(StacklessTimeoutException.newInstance( "Timeout after " + timeoutMillis + "ms waiting for graceful close PING(ACK)", @@ -379,10 +394,12 @@ private void gracefulCloseWriteSecondGoAway(@Nullable final Throwable cause) { gracefulCloseState = State.GRACEFUL_CLOSE_SECOND_GO_AWAY_SENT; - channel.writeAndFlush(new DefaultHttp2GoAwayFrame(NO_ERROR)).addListener(future -> { + channel.writeAndFlush(newGoAwayFrame(NO_ERROR, cause == null ? + SECOND_GO_AWAY_CONTENT : GC_TIMEOUT_GO_AWAY_CONTENT)).addListener(future -> { if (!future.isSuccess()) { final Throwable closeCause = cause == null ? future.cause() : addSuppressed(future.cause(), cause); - LOGGER.debug("{} Failed to write the second GO_AWAY, closing the channel", channel, closeCause); + LOGGER.debug("{} Failed to write the second GO_AWAY frame{}, closing the channel", + channel, cause == null ? "" : " after graceful close PING(ACK) timeout", closeCause); close0(closeCause); } else if (cause != null || activeStreams == 0) { close0(cause); @@ -465,6 +482,16 @@ private void cancelIfStateIsAFuture(@Nullable final Object state) { } } + private static DefaultHttp2GoAwayFrame newGoAwayFrame(final Http2Error error, final ByteBuf content) { + return new DefaultHttp2GoAwayFrame(error, content.duplicate()); + } + + private static ByteBuf staticByteBufFromAscii(final String str) { + ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(str.length()); + writeAscii(buf, str); + return unreleasableBuffer(buf.asReadOnly()); + } + private static final class StacklessTimeoutException extends TimeoutException { private static final long serialVersionUID = -8647261218787418981L; diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/KeepAliveManagerTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/KeepAliveManagerTest.java index 7db1475c96..1de1fbc89d 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/KeepAliveManagerTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/KeepAliveManagerTest.java @@ -56,6 +56,11 @@ import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; import static io.servicetalk.concurrent.internal.TestTimeoutConstants.CI; import static io.servicetalk.http.netty.H2KeepAlivePolicies.DEFAULT_IDLE_DURATION; +import static io.servicetalk.http.netty.KeepAliveManager.GC_TIMEOUT_GO_AWAY_CONTENT; +import static io.servicetalk.http.netty.KeepAliveManager.KA_TIMEOUT_GO_AWAY_CONTENT; +import static io.servicetalk.http.netty.KeepAliveManager.LOCAL_GO_AWAY_CONTENT; +import static io.servicetalk.http.netty.KeepAliveManager.SECOND_GO_AWAY_CONTENT; +import static java.nio.charset.StandardCharsets.US_ASCII; import static java.time.Duration.ofMillis; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -110,7 +115,7 @@ void keepAliveDisallowedWithNoActiveStreams(boolean duplex) { void keepAliveAllowedWithNoActiveStreams(boolean duplex) { setUp(duplex, true); manager.channelIdle(); - verifyWrite(instanceOf(Http2PingFrame.class)); + verifyKeepAlivePingFrame(); verifyPingAckTimeoutScheduled(); } @@ -120,7 +125,7 @@ void keepAliveWithActiveStreams(boolean duplex) { setUp(duplex, false); addActiveStream(manager); manager.channelIdle(); - verifyWrite(instanceOf(Http2PingFrame.class)); + verifyKeepAlivePingFrame(); verifyPingAckTimeoutScheduled(); } @@ -130,7 +135,7 @@ void keepAlivePingAckReceived(boolean duplex) { setUp(duplex, false); addActiveStream(manager); manager.channelIdle(); - Http2PingFrame ping = verifyWrite(instanceOf(Http2PingFrame.class)); + Http2PingFrame ping = verifyKeepAlivePingFrame(); ScheduledTask ackTimeoutTask = verifyPingAckTimeoutScheduled(); manager.pingReceived(new DefaultHttp2PingFrame(ping.content(), true)); @@ -148,7 +153,7 @@ void keepAlivePingAckWithUnknownContent(boolean duplex) throws Exception { setUp(duplex, false); Http2StreamChannel activeStream = addActiveStream(manager); manager.channelIdle(); - Http2PingFrame ping = verifyWrite(instanceOf(Http2PingFrame.class)); + Http2PingFrame ping = verifyKeepAlivePingFrame(); ScheduledTask ackTimeoutTask = verifyPingAckTimeoutScheduled(); manager.pingReceived(new DefaultHttp2PingFrame(ping.content() + 1, true)); @@ -165,7 +170,7 @@ void keepAliveMissingPingAck(boolean duplex) throws Exception { setUp(duplex, false); Http2StreamChannel activeStream = addActiveStream(manager); manager.channelIdle(); - verifyWrite(instanceOf(Http2PingFrame.class)); + verifyKeepAlivePingFrame(); verifyChannelCloseOnMissingPingAck(verifyPingAckTimeoutScheduled(), duplex); activeStream.closeFuture().await(); verifyConnectionObserver(TimeoutException.class); @@ -207,7 +212,7 @@ void gracefulCloseNoActiveStreamsMissingPingAck(boolean duplex) throws Exception ScheduledTask pingAckTimeoutTask = scheduledTasks.take(); pingAckTimeoutTask.runTask(); - verifySecondGoAway(duplex); + verifySecondGoAway(duplex, GC_TIMEOUT_GO_AWAY_CONTENT); shutdownInputIfDuplexChannel(); channel.closeFuture().await(); verifyConnectionObserver(TimeoutException.class); @@ -222,7 +227,7 @@ void gracefulCloseActiveStreamsMissingPingAck(boolean duplex) throws Exception { ScheduledTask pingAckTimeoutTask = scheduledTasks.take(); pingAckTimeoutTask.runTask(); - verifySecondGoAway(duplex); + verifySecondGoAway(duplex, GC_TIMEOUT_GO_AWAY_CONTENT); channel.closeFuture().await(); activeStream.closeFuture().await(); @@ -240,7 +245,7 @@ void gracefulClosePendingPingsCloseConnection(boolean duplex) throws Exception { assertThat("Channel closed.", channel.isOpen(), is(true)); manager.channelIdle(); - verifyWrite(instanceOf(Http2PingFrame.class)); + verifyKeepAlivePingFrame(); verifyChannelCloseOnMissingPingAck(verifyPingAckTimeoutScheduled(), duplex); activeStream.closeFuture().await(); verifyConnectionObserver(TimeoutException.class); @@ -279,7 +284,7 @@ void channelClosedDuringPing(boolean duplex) { setUp(duplex, false); addActiveStream(manager); manager.channelIdle(); - verifyWrite(instanceOf(Http2PingFrame.class)); + verifyKeepAlivePingFrame(); ScheduledTask ackTimeoutTask = verifyPingAckTimeoutScheduled(); manager.channelClosed(); @@ -325,7 +330,7 @@ void failureToWritePingClosesChannel(boolean duplex) throws Exception { void failureToWriteLastGoAwayAfterPingAckTimeoutClosesChannel(boolean duplex) throws Exception { setUp(duplex, true); manager.channelIdle(); - verifyWrite(instanceOf(Http2PingFrame.class)); + verifyKeepAlivePingFrame(); ScheduledTask ackTimeoutTask = verifyPingAckTimeoutScheduled(); failWrite.set(true); ackTimeoutTask.runTask(); @@ -360,7 +365,7 @@ private void verifyNoOtherActionPostClose(final KeepAliveManager manager) { verifyNoScheduledTasks(); Runnable whenInitiated = mock(Runnable.class); - manager.initiateGracefulClose(whenInitiated); + manager.initiateGracefulClose(whenInitiated, true); verify(whenInitiated, never()).run(); verifyNoWrite(); verifyNoScheduledTasks(); @@ -380,7 +385,7 @@ private void sendGracefulClosePingAckAndVerifySecondGoAway(final KeepAliveManage ScheduledTask pingAckTimeoutTask = scheduledTasks.take(); manager.pingReceived(new DefaultHttp2PingFrame(pingFrame.content(), true)); assertThat("Ping ack task not cancelled.", pingAckTimeoutTask.promise.isCancelled(), is(true)); - verifySecondGoAway(duplex); + verifySecondGoAway(duplex, SECOND_GO_AWAY_CONTENT); pingAckTimeoutTask.runTask(); @@ -392,11 +397,13 @@ private void sendGracefulClosePingAckAndVerifySecondGoAway(final KeepAliveManage } } - private void verifySecondGoAway(boolean duplex) { + private void verifySecondGoAway(boolean duplex, ByteBuf expectedContent) { Http2GoAwayFrame secondGoAway = verifyWrite(instanceOf(Http2GoAwayFrame.class)); assertThat("Unexpected error in go_away", secondGoAway.errorCode(), is(Http2Error.NO_ERROR.code())); assertThat("Unexpected extra stream ids", secondGoAway.extraStreamIds(), is(0)); assertThat("Unexpected last stream id", secondGoAway.lastStreamId(), is(-1)); + assertThat("Unexpected content", secondGoAway.content().toString(US_ASCII), + is(expectedContent.toString(US_ASCII))); if (duplex) { verifyAtMostOneScheduledTasks(); } else { @@ -411,14 +418,31 @@ private Http2PingFrame initiateGracefulCloseVerifyGoAwayAndPing(final KeepAliveM assertThat("Unexpected error in go_away", firstGoAway.errorCode(), is(Http2Error.NO_ERROR.code())); assertThat("Unexpected extra stream ids", firstGoAway.extraStreamIds(), is(Integer.MAX_VALUE)); assertThat("Unexpected last stream id", firstGoAway.lastStreamId(), is(-1)); - Http2PingFrame pingFrame = verifyWrite(instanceOf(Http2PingFrame.class)); + assertThat("Unexpected content", firstGoAway.content().toString(US_ASCII), + is(LOCAL_GO_AWAY_CONTENT.toString(US_ASCII))); + Http2PingFrame pingFrame = verifyGracefulClosePingFrame(); verifyNoWrite(); return pingFrame; } + private Http2PingFrame verifyGracefulClosePingFrame() { + return verifyPingFrame(false); + } + + private Http2PingFrame verifyKeepAlivePingFrame() { + return verifyPingFrame(true); + } + + private Http2PingFrame verifyPingFrame(boolean even) { + Http2PingFrame pingFrame = verifyWrite(instanceOf(Http2PingFrame.class)); + assertThat("Unexpected ping ack content.", pingFrame.content() % 2 == 0, is(even)); + assertThat("Unexpected ping ack flag.", pingFrame.ack(), is(false)); + return pingFrame; + } + private void initiateGracefulClose(final KeepAliveManager manager) { Runnable whenInitiated = mock(Runnable.class); - manager.initiateGracefulClose(whenInitiated); + manager.initiateGracefulClose(whenInitiated, true); verify(whenInitiated).run(); } @@ -468,12 +492,7 @@ private Http2StreamChannel addActiveStream(final KeepAliveManager manager) { private void verifyChannelCloseOnMissingPingAck(final ScheduledTask ackTimeoutTask, boolean duplex) throws InterruptedException { ackTimeoutTask.runTask(); - verifyWrite(instanceOf(Http2GoAwayFrame.class)); - if (duplex) { - verifyAtMostOneScheduledTasks(); - } else { - verifyNoScheduledTasks(); - } + verifySecondGoAway(duplex, KA_TIMEOUT_GO_AWAY_CONTENT); shutdownInputIfDuplexChannel(); assertThat("Channel not closed.", channel.isOpen(), is(false)); }