Skip to content

Commit

Permalink
KeepAliveManager: include human-readable debug data in h2 frames (#…
Browse files Browse the repository at this point in the history
…2615)

Motivation:

1. `GO_AWAY` allows including a content for debugging purposes. It helps
to debug closure based on frame-logging data.
2. `PING` frames always include data, using even/odd numbers help to
understand originator of the frame.

Modifications:

- Include content when sending `GO_AWAY` frames;
- Use even numbers for keep-alive `PING`s and odd numbers for graceful
close `PING`s;
- Test new behavior;

Result:

Possible to understand the cause for GO_AWAY and PING frame when frame
logging is enabled.
  • Loading branch information
idelpivnitskiy authored Jun 2, 2023
1 parent 4e1464d commit 8f51884
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ protected final void doCloseAsyncGracefully() {
keepAliveManager.initiateGracefulClose(() -> {
// no need to notifyOnClosing bcz it's already notified in NettyChannelListenableAsyncCloseable before
// invoking this method
});
}, true);
}

private void notifyOnClosingImpl() { // For access from AbstractH2ParentConnection
Expand Down Expand Up @@ -278,7 +278,7 @@ public final void channelRead(ChannelHandlerContext ctx, Object msg) {
// We trigger the graceful close process here (with no timeout) to make sure the socket is closed once
// the existing streams are closed. The MultiplexCodec may simulate a GOAWAY when the stream IDs are
// exhausted so we shouldn't rely upon our peer to close the transport.
parentContext.keepAliveManager.initiateGracefulClose(parentContext::notifyOnClosingImpl);
parentContext.keepAliveManager.initiateGracefulClose(parentContext::notifyOnClosingImpl, false);
} else if (msg instanceof Http2PingFrame) {
parentContext.keepAliveManager.pingReceived((Http2PingFrame) msg);
} else if (!(msg instanceof Http2SettingsAckFrame)) { // we ignore SETTINGS(ACK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
import io.servicetalk.http.netty.H2ProtocolConfig.KeepAlivePolicy;
import io.servicetalk.transport.netty.internal.ChannelCloseUtils;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.DuplexChannel;
import io.netty.handler.codec.http2.DefaultHttp2GoAwayFrame;
import io.netty.handler.codec.http2.DefaultHttp2PingFrame;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2PingFrame;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.ssl.SslHandler;
Expand All @@ -42,7 +45,9 @@
import java.util.function.Predicate;
import javax.annotation.Nullable;

import static io.netty.buffer.ByteBufUtil.writeAscii;
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import static io.netty.buffer.Unpooled.unreleasableBuffer;
import static io.netty.channel.ChannelOption.ALLOW_HALF_CLOSURE;
import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
import static io.servicetalk.http.netty.H2KeepAlivePolicies.DEFAULT_ACK_TIMEOUT;
Expand All @@ -65,8 +70,17 @@ private enum State {
private static final Logger LOGGER = LoggerFactory.getLogger(KeepAliveManager.class);
private static final AtomicIntegerFieldUpdater<KeepAliveManager> activeStreamsUpdater =
AtomicIntegerFieldUpdater.newUpdater(KeepAliveManager.class, "activeStreams");
private static final long GRACEFUL_CLOSE_PING_CONTENT = ThreadLocalRandom.current().nextLong();
private static final long KEEP_ALIVE_PING_CONTENT = ThreadLocalRandom.current().nextLong();

// Use the last digit (even or odd) to distinguish PING frames when frame logging is enabled.
private static final long GRACEFUL_CLOSE_PING_CONTENT = ThreadLocalRandom.current().nextLong() | 0x01L; // odd
private static final long KEEP_ALIVE_PING_CONTENT = ThreadLocalRandom.current().nextLong() & ~0x01L; // even

// Frame logging dumps data in hex format. An integer helps to understand the cause without decoding the content.
static final ByteBuf LOCAL_GO_AWAY_CONTENT = staticByteBufFromAscii("0.local");
static final ByteBuf REMOTE_GO_AWAY_CONTENT = staticByteBufFromAscii("1.remote");
static final ByteBuf SECOND_GO_AWAY_CONTENT = staticByteBufFromAscii("2.second");
static final ByteBuf GC_TIMEOUT_GO_AWAY_CONTENT = staticByteBufFromAscii("3.graceful-close-timeout");
static final ByteBuf KA_TIMEOUT_GO_AWAY_CONTENT = staticByteBufFromAscii("4.keep-alive-timeout");

private volatile int activeStreams;

Expand Down Expand Up @@ -147,19 +161,19 @@ protected void channelIdle(final ChannelHandlerContext ctx, final IdleStateEvent
keepAliveState = State.KEEP_ALIVE_ACK_TIMEDOUT;
final long timeoutMillis = NANOSECONDS.toMillis(pingAckTimeoutNanos);
LOGGER.debug(
"{} Timeout after {}ms waiting for keep-alive PING(ACK), writing GO_AWAY and " +
"closing the channel with activeStreams={}",
"{} Timeout after {}ms waiting for keep-alive PING(ACK), writing GO_AWAY frame " +
"and closing the channel with activeStreams={}",
this.channel, timeoutMillis, activeStreams);
final TimeoutException cause = StacklessTimeoutException.newInstance(
"Timeout after " + timeoutMillis + "ms waiting for keep-alive PING(ACK)",
KeepAliveManager.class, "keepAlivePingAckTimeout()");
channel.writeAndFlush(new DefaultHttp2GoAwayFrame(NO_ERROR))
channel.writeAndFlush(newGoAwayFrame(NO_ERROR, KA_TIMEOUT_GO_AWAY_CONTENT))
.addListener(f -> {
Throwable closeCause = cause;
if (!f.isSuccess()) {
closeCause = addSuppressed(f.cause(), cause);
LOGGER.debug("{} Failed to write the last GO_AWAY after PING(ACK) " +
"timeout, closing the channel", channel, closeCause);
LOGGER.debug("{} Failed to write the GO_AWAY frame after keep-alive " +
"PING(ACK) timeout, closing the channel", channel, closeCause);
}
close0(closeCause);
});
Expand All @@ -185,7 +199,7 @@ void pingReceived(final Http2PingFrame pingFrame) {
if (pingFrame.ack()) {
long pingAckContent = pingFrame.content();
if (pingAckContent == GRACEFUL_CLOSE_PING_CONTENT) {
LOGGER.debug("{} Graceful close PING(ACK) received, writing the second GO_AWAY, activeStreams={}",
LOGGER.debug("{} Graceful close PING(ACK) received, writing the second GO_AWAY frame, activeStreams={}",
channel, activeStreams);
cancelIfStateIsAFuture(gracefulCloseState);
gracefulCloseWriteSecondGoAway(null);
Expand Down Expand Up @@ -224,12 +238,12 @@ void channelClosed() {
inputShutdownTimeoutFuture = null;
}

void initiateGracefulClose(final Runnable whenInitiated) {
void initiateGracefulClose(final Runnable whenInitiated, final boolean local) {
EventLoop eventLoop = channel.eventLoop();
if (eventLoop.inEventLoop()) {
doCloseAsyncGracefully0(whenInitiated);
doCloseAsyncGracefully0(whenInitiated, local);
} else {
eventLoop.execute(() -> doCloseAsyncGracefully0(whenInitiated));
eventLoop.execute(() -> doCloseAsyncGracefully0(whenInitiated, local));
}
}

Expand Down Expand Up @@ -320,7 +334,7 @@ private void channelHalfShutdown(String side, Predicate<DuplexChannel> otherSide
}
}

private void doCloseAsyncGracefully0(final Runnable whenInitiated) {
private void doCloseAsyncGracefully0(final Runnable whenInitiated, final boolean local) {
assert channel.eventLoop().inEventLoop();

if (gracefulCloseState != null) {
Expand All @@ -342,7 +356,8 @@ private void doCloseAsyncGracefully0(final Runnable whenInitiated) {
// time duration for inflight frames to land, and the second GOAWAY includes the maximum known stream ID.
// To account for 2 RTTs we can send a PING and when the PING(ACK) comes back we can send the second GOAWAY.
// [1] https://tools.ietf.org/html/rfc7540#section-6.8
DefaultHttp2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(NO_ERROR);
DefaultHttp2GoAwayFrame goAwayFrame = newGoAwayFrame(NO_ERROR,
local ? LOCAL_GO_AWAY_CONTENT : REMOTE_GO_AWAY_CONTENT);
goAwayFrame.setExtraStreamIds(Integer.MAX_VALUE);
channel.write(goAwayFrame);
channel.writeAndFlush(new DefaultHttp2PingFrame(GRACEFUL_CLOSE_PING_CONTENT)).addListener(future -> {
Expand All @@ -360,7 +375,7 @@ private void doCloseAsyncGracefully0(final Runnable whenInitiated) {
// down the connection.
final long timeoutMillis = NANOSECONDS.toMillis(pingAckTimeoutNanos);
LOGGER.debug("{} Timeout after {}ms waiting for graceful close PING(ACK), writing the second " +
"GO_AWAY and closing the channel with activeStreams={}",
"GO_AWAY frame and closing the channel with activeStreams={}",
channel, timeoutMillis, activeStreams);
gracefulCloseWriteSecondGoAway(StacklessTimeoutException.newInstance(
"Timeout after " + timeoutMillis + "ms waiting for graceful close PING(ACK)",
Expand All @@ -379,10 +394,12 @@ private void gracefulCloseWriteSecondGoAway(@Nullable final Throwable cause) {

gracefulCloseState = State.GRACEFUL_CLOSE_SECOND_GO_AWAY_SENT;

channel.writeAndFlush(new DefaultHttp2GoAwayFrame(NO_ERROR)).addListener(future -> {
channel.writeAndFlush(newGoAwayFrame(NO_ERROR, cause == null ?
SECOND_GO_AWAY_CONTENT : GC_TIMEOUT_GO_AWAY_CONTENT)).addListener(future -> {
if (!future.isSuccess()) {
final Throwable closeCause = cause == null ? future.cause() : addSuppressed(future.cause(), cause);
LOGGER.debug("{} Failed to write the second GO_AWAY, closing the channel", channel, closeCause);
LOGGER.debug("{} Failed to write the second GO_AWAY frame{}, closing the channel",
channel, cause == null ? "" : " after graceful close PING(ACK) timeout", closeCause);
close0(closeCause);
} else if (cause != null || activeStreams == 0) {
close0(cause);
Expand Down Expand Up @@ -465,6 +482,16 @@ private void cancelIfStateIsAFuture(@Nullable final Object state) {
}
}

private static DefaultHttp2GoAwayFrame newGoAwayFrame(final Http2Error error, final ByteBuf content) {
return new DefaultHttp2GoAwayFrame(error, content.duplicate());
}

private static ByteBuf staticByteBufFromAscii(final String str) {
ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(str.length());
writeAscii(buf, str);
return unreleasableBuffer(buf.asReadOnly());
}

private static final class StacklessTimeoutException extends TimeoutException {
private static final long serialVersionUID = -8647261218787418981L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.concurrent.internal.TestTimeoutConstants.CI;
import static io.servicetalk.http.netty.H2KeepAlivePolicies.DEFAULT_IDLE_DURATION;
import static io.servicetalk.http.netty.KeepAliveManager.GC_TIMEOUT_GO_AWAY_CONTENT;
import static io.servicetalk.http.netty.KeepAliveManager.KA_TIMEOUT_GO_AWAY_CONTENT;
import static io.servicetalk.http.netty.KeepAliveManager.LOCAL_GO_AWAY_CONTENT;
import static io.servicetalk.http.netty.KeepAliveManager.SECOND_GO_AWAY_CONTENT;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.time.Duration.ofMillis;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -110,7 +115,7 @@ void keepAliveDisallowedWithNoActiveStreams(boolean duplex) {
void keepAliveAllowedWithNoActiveStreams(boolean duplex) {
setUp(duplex, true);
manager.channelIdle();
verifyWrite(instanceOf(Http2PingFrame.class));
verifyKeepAlivePingFrame();
verifyPingAckTimeoutScheduled();
}

Expand All @@ -120,7 +125,7 @@ void keepAliveWithActiveStreams(boolean duplex) {
setUp(duplex, false);
addActiveStream(manager);
manager.channelIdle();
verifyWrite(instanceOf(Http2PingFrame.class));
verifyKeepAlivePingFrame();
verifyPingAckTimeoutScheduled();
}

Expand All @@ -130,7 +135,7 @@ void keepAlivePingAckReceived(boolean duplex) {
setUp(duplex, false);
addActiveStream(manager);
manager.channelIdle();
Http2PingFrame ping = verifyWrite(instanceOf(Http2PingFrame.class));
Http2PingFrame ping = verifyKeepAlivePingFrame();
ScheduledTask ackTimeoutTask = verifyPingAckTimeoutScheduled();

manager.pingReceived(new DefaultHttp2PingFrame(ping.content(), true));
Expand All @@ -148,7 +153,7 @@ void keepAlivePingAckWithUnknownContent(boolean duplex) throws Exception {
setUp(duplex, false);
Http2StreamChannel activeStream = addActiveStream(manager);
manager.channelIdle();
Http2PingFrame ping = verifyWrite(instanceOf(Http2PingFrame.class));
Http2PingFrame ping = verifyKeepAlivePingFrame();
ScheduledTask ackTimeoutTask = verifyPingAckTimeoutScheduled();

manager.pingReceived(new DefaultHttp2PingFrame(ping.content() + 1, true));
Expand All @@ -165,7 +170,7 @@ void keepAliveMissingPingAck(boolean duplex) throws Exception {
setUp(duplex, false);
Http2StreamChannel activeStream = addActiveStream(manager);
manager.channelIdle();
verifyWrite(instanceOf(Http2PingFrame.class));
verifyKeepAlivePingFrame();
verifyChannelCloseOnMissingPingAck(verifyPingAckTimeoutScheduled(), duplex);
activeStream.closeFuture().await();
verifyConnectionObserver(TimeoutException.class);
Expand Down Expand Up @@ -207,7 +212,7 @@ void gracefulCloseNoActiveStreamsMissingPingAck(boolean duplex) throws Exception

ScheduledTask pingAckTimeoutTask = scheduledTasks.take();
pingAckTimeoutTask.runTask();
verifySecondGoAway(duplex);
verifySecondGoAway(duplex, GC_TIMEOUT_GO_AWAY_CONTENT);
shutdownInputIfDuplexChannel();
channel.closeFuture().await();
verifyConnectionObserver(TimeoutException.class);
Expand All @@ -222,7 +227,7 @@ void gracefulCloseActiveStreamsMissingPingAck(boolean duplex) throws Exception {

ScheduledTask pingAckTimeoutTask = scheduledTasks.take();
pingAckTimeoutTask.runTask();
verifySecondGoAway(duplex);
verifySecondGoAway(duplex, GC_TIMEOUT_GO_AWAY_CONTENT);

channel.closeFuture().await();
activeStream.closeFuture().await();
Expand All @@ -240,7 +245,7 @@ void gracefulClosePendingPingsCloseConnection(boolean duplex) throws Exception {
assertThat("Channel closed.", channel.isOpen(), is(true));

manager.channelIdle();
verifyWrite(instanceOf(Http2PingFrame.class));
verifyKeepAlivePingFrame();
verifyChannelCloseOnMissingPingAck(verifyPingAckTimeoutScheduled(), duplex);
activeStream.closeFuture().await();
verifyConnectionObserver(TimeoutException.class);
Expand Down Expand Up @@ -279,7 +284,7 @@ void channelClosedDuringPing(boolean duplex) {
setUp(duplex, false);
addActiveStream(manager);
manager.channelIdle();
verifyWrite(instanceOf(Http2PingFrame.class));
verifyKeepAlivePingFrame();
ScheduledTask ackTimeoutTask = verifyPingAckTimeoutScheduled();

manager.channelClosed();
Expand Down Expand Up @@ -325,7 +330,7 @@ void failureToWritePingClosesChannel(boolean duplex) throws Exception {
void failureToWriteLastGoAwayAfterPingAckTimeoutClosesChannel(boolean duplex) throws Exception {
setUp(duplex, true);
manager.channelIdle();
verifyWrite(instanceOf(Http2PingFrame.class));
verifyKeepAlivePingFrame();
ScheduledTask ackTimeoutTask = verifyPingAckTimeoutScheduled();
failWrite.set(true);
ackTimeoutTask.runTask();
Expand Down Expand Up @@ -360,7 +365,7 @@ private void verifyNoOtherActionPostClose(final KeepAliveManager manager) {
verifyNoScheduledTasks();

Runnable whenInitiated = mock(Runnable.class);
manager.initiateGracefulClose(whenInitiated);
manager.initiateGracefulClose(whenInitiated, true);
verify(whenInitiated, never()).run();
verifyNoWrite();
verifyNoScheduledTasks();
Expand All @@ -380,7 +385,7 @@ private void sendGracefulClosePingAckAndVerifySecondGoAway(final KeepAliveManage
ScheduledTask pingAckTimeoutTask = scheduledTasks.take();
manager.pingReceived(new DefaultHttp2PingFrame(pingFrame.content(), true));
assertThat("Ping ack task not cancelled.", pingAckTimeoutTask.promise.isCancelled(), is(true));
verifySecondGoAway(duplex);
verifySecondGoAway(duplex, SECOND_GO_AWAY_CONTENT);

pingAckTimeoutTask.runTask();

Expand All @@ -392,11 +397,13 @@ private void sendGracefulClosePingAckAndVerifySecondGoAway(final KeepAliveManage
}
}

private void verifySecondGoAway(boolean duplex) {
private void verifySecondGoAway(boolean duplex, ByteBuf expectedContent) {
Http2GoAwayFrame secondGoAway = verifyWrite(instanceOf(Http2GoAwayFrame.class));
assertThat("Unexpected error in go_away", secondGoAway.errorCode(), is(Http2Error.NO_ERROR.code()));
assertThat("Unexpected extra stream ids", secondGoAway.extraStreamIds(), is(0));
assertThat("Unexpected last stream id", secondGoAway.lastStreamId(), is(-1));
assertThat("Unexpected content", secondGoAway.content().toString(US_ASCII),
is(expectedContent.toString(US_ASCII)));
if (duplex) {
verifyAtMostOneScheduledTasks();
} else {
Expand All @@ -411,14 +418,31 @@ private Http2PingFrame initiateGracefulCloseVerifyGoAwayAndPing(final KeepAliveM
assertThat("Unexpected error in go_away", firstGoAway.errorCode(), is(Http2Error.NO_ERROR.code()));
assertThat("Unexpected extra stream ids", firstGoAway.extraStreamIds(), is(Integer.MAX_VALUE));
assertThat("Unexpected last stream id", firstGoAway.lastStreamId(), is(-1));
Http2PingFrame pingFrame = verifyWrite(instanceOf(Http2PingFrame.class));
assertThat("Unexpected content", firstGoAway.content().toString(US_ASCII),
is(LOCAL_GO_AWAY_CONTENT.toString(US_ASCII)));
Http2PingFrame pingFrame = verifyGracefulClosePingFrame();
verifyNoWrite();
return pingFrame;
}

private Http2PingFrame verifyGracefulClosePingFrame() {
return verifyPingFrame(false);
}

private Http2PingFrame verifyKeepAlivePingFrame() {
return verifyPingFrame(true);
}

private Http2PingFrame verifyPingFrame(boolean even) {
Http2PingFrame pingFrame = verifyWrite(instanceOf(Http2PingFrame.class));
assertThat("Unexpected ping ack content.", pingFrame.content() % 2 == 0, is(even));
assertThat("Unexpected ping ack flag.", pingFrame.ack(), is(false));
return pingFrame;
}

private void initiateGracefulClose(final KeepAliveManager manager) {
Runnable whenInitiated = mock(Runnable.class);
manager.initiateGracefulClose(whenInitiated);
manager.initiateGracefulClose(whenInitiated, true);
verify(whenInitiated).run();
}

Expand Down Expand Up @@ -468,12 +492,7 @@ private Http2StreamChannel addActiveStream(final KeepAliveManager manager) {
private void verifyChannelCloseOnMissingPingAck(final ScheduledTask ackTimeoutTask, boolean duplex)
throws InterruptedException {
ackTimeoutTask.runTask();
verifyWrite(instanceOf(Http2GoAwayFrame.class));
if (duplex) {
verifyAtMostOneScheduledTasks();
} else {
verifyNoScheduledTasks();
}
verifySecondGoAway(duplex, KA_TIMEOUT_GO_AWAY_CONTENT);
shutdownInputIfDuplexChannel();
assertThat("Channel not closed.", channel.isOpen(), is(false));
}
Expand Down

0 comments on commit 8f51884

Please sign in to comment.