Skip to content

Commit

Permalink
KeepAliveManager: report an exception to ConnectionObserver (#2614)
Browse files Browse the repository at this point in the history
Motivation:

When `KeepAliveManager` closes a channel due to some error, it force
closes the `Channel` but does not report an error to
`ConnectionObserver`.

Modifications:
- `KeepAliveManager` uses `ChannelCloseUtils` to make errors visible for
`ConnectionObserver`;
- When timeout fires, create `TimeoutException` that will be reported to
`ConnectionObserver`;
- If it doesn't receive `PING(ACK)` during graceful closure process,
force close the channel regardless of the number of active streams;
- Enhance `KeepAliveManagerTest` to verify new behavior;

Result:

All errors observed by `KeepAliveManager` are visible to
`ConnectionObserver`.
  • Loading branch information
idelpivnitskiy authored Jun 1, 2023
1 parent 1071da9 commit 4e1464d
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package io.servicetalk.http.netty;

import io.servicetalk.concurrent.internal.ThrowableUtils;
import io.servicetalk.http.netty.H2ProtocolConfig.KeepAlivePolicy;
import io.servicetalk.transport.netty.internal.ChannelCloseUtils;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -35,6 +37,7 @@

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Predicate;
import javax.annotation.Nullable;
Expand All @@ -43,6 +46,7 @@
import static io.netty.channel.ChannelOption.ALLOW_HALF_CLOSURE;
import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
import static io.servicetalk.http.netty.H2KeepAlivePolicies.DEFAULT_ACK_TIMEOUT;
import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.lang.Boolean.TRUE;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

Expand Down Expand Up @@ -141,17 +145,23 @@ protected void channelIdle(final ChannelHandlerContext ctx, final IdleStateEvent
keepAliveState = scheduler.afterDuration(() -> {
if (keepAliveState != null) {
keepAliveState = State.KEEP_ALIVE_ACK_TIMEDOUT;
final long timeoutMillis = NANOSECONDS.toMillis(pingAckTimeoutNanos);
LOGGER.debug(
"{} Timeout after {}ms waiting for keep-alive PING(ACK), writing GO_AWAY and " +
"closing the channel with activeStreams={}",
this.channel, NANOSECONDS.toMillis(pingAckTimeoutNanos), activeStreams);
this.channel, timeoutMillis, activeStreams);
final TimeoutException cause = StacklessTimeoutException.newInstance(
"Timeout after " + timeoutMillis + "ms waiting for keep-alive PING(ACK)",
KeepAliveManager.class, "keepAlivePingAckTimeout()");
channel.writeAndFlush(new DefaultHttp2GoAwayFrame(NO_ERROR))
.addListener(f -> {
Throwable closeCause = cause;
if (!f.isSuccess()) {
closeCause = addSuppressed(f.cause(), cause);
LOGGER.debug("{} Failed to write the last GO_AWAY after PING(ACK) " +
"timeout, closing the channel", channel, f.cause());
"timeout, closing the channel", channel, closeCause);
}
close0(f.cause());
close0(closeCause);
});
}
}, pingAckTimeoutNanos, NANOSECONDS);
Expand All @@ -178,7 +188,7 @@ void pingReceived(final Http2PingFrame pingFrame) {
LOGGER.debug("{} Graceful close PING(ACK) received, writing the second GO_AWAY, activeStreams={}",
channel, activeStreams);
cancelIfStateIsAFuture(gracefulCloseState);
gracefulCloseWriteSecondGoAway();
gracefulCloseWriteSecondGoAway(null);
} else if (pingAckContent == KEEP_ALIVE_PING_CONTENT) {
LOGGER.trace("{} PING(ACK) received, activeStreams={}", channel, activeStreams);
cancelIfStateIsAFuture(keepAliveState);
Expand Down Expand Up @@ -294,12 +304,18 @@ private void channelHalfShutdown(String side, Predicate<DuplexChannel> otherSide
gracefulCloseState != State.CLOSED) {
// If we have not started the graceful close process, or waiting for ack/read to complete the graceful
// close process just force a close now because we will not read any more data.
LOGGER.debug("{} Observed {} shutdown, graceful close is not started or in progress, must force " +
final String state = gracefulCloseState == null ? "not started" : "in progress";
final IllegalStateException cause = new IllegalStateException("Observed " + side +
" shutdown while graceful closure is " + state);
LOGGER.debug("{} Observed {} shutdown while graceful closure is {}, must force " +
"channel closure with activeStreams={}, gracefulCloseState={}, keepAliveState={}",
channel, side, activeStreams, gracefulCloseState, keepAliveState);
channel.close();
channel, side, state, activeStreams, gracefulCloseState, keepAliveState, cause);
ChannelCloseUtils.close(channel, cause);
}
} else {
LOGGER.debug("{} Observed {} shutdown, closing non-duplex channel with " +
"activeStreams={}, gracefulCloseState={}, keepAliveState={}",
channel, side, activeStreams, gracefulCloseState, keepAliveState);
channel.close();
}
}
Expand Down Expand Up @@ -342,16 +358,19 @@ private void doCloseAsyncGracefully0(final Runnable whenInitiated) {
// If the PING(ACK) times out we may have under estimated the 2RTT time so we
// optimistically keep the connection open and rely upon higher level timeouts to tear
// down the connection.
LOGGER.debug(
"{} Timeout after {}ms waiting for graceful close PING(ACK), writing the second GO_AWAY",
channel, NANOSECONDS.toMillis(pingAckTimeoutNanos));
gracefulCloseWriteSecondGoAway();
final long timeoutMillis = NANOSECONDS.toMillis(pingAckTimeoutNanos);
LOGGER.debug("{} Timeout after {}ms waiting for graceful close PING(ACK), writing the second " +
"GO_AWAY and closing the channel with activeStreams={}",
channel, timeoutMillis, activeStreams);
gracefulCloseWriteSecondGoAway(StacklessTimeoutException.newInstance(
"Timeout after " + timeoutMillis + "ms waiting for graceful close PING(ACK)",
KeepAliveManager.class, "gracefulClosePingAckTimeout()"));
}, pingAckTimeoutNanos, NANOSECONDS);
}
});
}

private void gracefulCloseWriteSecondGoAway() {
private void gracefulCloseWriteSecondGoAway(@Nullable final Throwable cause) {
assert channel.eventLoop().inEventLoop();

if (gracefulCloseState == State.GRACEFUL_CLOSE_SECOND_GO_AWAY_SENT) {
Expand All @@ -362,10 +381,11 @@ private void gracefulCloseWriteSecondGoAway() {

channel.writeAndFlush(new DefaultHttp2GoAwayFrame(NO_ERROR)).addListener(future -> {
if (!future.isSuccess()) {
LOGGER.debug("{} Failed to write the second GO_AWAY, closing the channel", channel, future.cause());
close0(future.cause());
} else if (activeStreams == 0) {
close0(null);
final Throwable closeCause = cause == null ? future.cause() : addSuppressed(future.cause(), cause);
LOGGER.debug("{} Failed to write the second GO_AWAY, closing the channel", channel, closeCause);
close0(closeCause);
} else if (cause != null || activeStreams == 0) {
close0(cause);
}
});
}
Expand All @@ -383,7 +403,7 @@ private void close0(@Nullable Throwable cause) {

if (cause != null) {
// Previous write failed with an exception, close immediately.
channel.close();
ChannelCloseUtils.close(channel, cause);
return;
}
// The way netty H2 stream state machine works, we may trigger stream closures during writes with flushes
Expand Down Expand Up @@ -422,9 +442,12 @@ private void doShutdownOutput() {
if (duplexChannel.isInputShutdown()) {
return;
}
final long timeoutMillis = NANOSECONDS.toMillis(pingAckTimeoutNanos);
LOGGER.debug("{} Timeout after {}ms waiting for InputShutdown, closing the channel",
channel, NANOSECONDS.toMillis(pingAckTimeoutNanos));
channel.close();
channel, timeoutMillis);
ChannelCloseUtils.close(channel, StacklessTimeoutException.newInstance(
"Timeout after " + timeoutMillis + "ms waiting for InputShutdown",
KeepAliveManager.class, "doShutdownOutput()"));
}, pingAckTimeoutNanos, NANOSECONDS);
}
});
Expand All @@ -441,4 +464,22 @@ private void cancelIfStateIsAFuture(@Nullable final Object state) {
}
}
}

private static final class StacklessTimeoutException extends TimeoutException {
private static final long serialVersionUID = -8647261218787418981L;

private StacklessTimeoutException(final String message) {
super(message);
}

@Override
public Throwable fillInStackTrace() {
// Don't fill in the stacktrace to reduce performance overhead
return this;
}

static StacklessTimeoutException newInstance(final String message, final Class<?> clazz, final String method) {
return ThrowableUtils.unknownStackTrace(new StacklessTimeoutException(message), clazz, method);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/
package io.servicetalk.http.netty;

import io.servicetalk.concurrent.internal.DeliberateException;
import io.servicetalk.http.netty.H2ProtocolConfig.KeepAlivePolicy;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.netty.internal.ConnectionObserverInitializer;
import io.servicetalk.transport.netty.internal.EmbeddedDuplexChannel;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -45,7 +48,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

import static io.netty.util.ReferenceCountUtil.release;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
Expand All @@ -60,6 +65,7 @@
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
Expand All @@ -71,6 +77,7 @@ class KeepAliveManagerTest {

private final BlockingQueue<ScheduledTask> scheduledTasks = new LinkedBlockingQueue<>();
private final AtomicBoolean failWrite = new AtomicBoolean();
private final ConnectionObserver connectionObserver = mock(ConnectionObserver.class);
private EmbeddedChannel channel;
private KeepAliveManager manager;

Expand All @@ -84,6 +91,7 @@ private void setUp(boolean duplex, boolean allowPingWithoutActiveStreams) {
FailWriteHandler failWriteHandler = new FailWriteHandler();
channel = duplex ? new EmbeddedDuplexChannel(true, failWriteHandler, managerHandler)
: new EmbeddedChannel(failWriteHandler, managerHandler);
new ConnectionObserverInitializer(connectionObserver, false, false).init(channel);
manager = newManager(allowPingWithoutActiveStreams, channel);
managerHandler.keepAliveManager(manager);
}
Expand Down Expand Up @@ -138,7 +146,7 @@ void keepAlivePingAckReceived(boolean duplex) {
@ValueSource(booleans = {true, false})
void keepAlivePingAckWithUnknownContent(boolean duplex) throws Exception {
setUp(duplex, false);
addActiveStream(manager);
Http2StreamChannel activeStream = addActiveStream(manager);
manager.channelIdle();
Http2PingFrame ping = verifyWrite(instanceOf(Http2PingFrame.class));
ScheduledTask ackTimeoutTask = verifyPingAckTimeoutScheduled();
Expand All @@ -147,16 +155,20 @@ void keepAlivePingAckWithUnknownContent(boolean duplex) throws Exception {
assertThat("Ping ack timeout task cancelled.", ackTimeoutTask.promise.isCancelled(), is(false));

verifyChannelCloseOnMissingPingAck(ackTimeoutTask, duplex);
activeStream.closeFuture().await();
verifyConnectionObserver(TimeoutException.class);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
@ValueSource(booleans = {true, false})
void keepAliveMissingPingAck(boolean duplex) throws Exception {
setUp(duplex, false);
addActiveStream(manager);
Http2StreamChannel activeStream = addActiveStream(manager);
manager.channelIdle();
verifyWrite(instanceOf(Http2PingFrame.class));
verifyChannelCloseOnMissingPingAck(verifyPingAckTimeoutScheduled(), duplex);
activeStream.closeFuture().await();
verifyConnectionObserver(TimeoutException.class);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
Expand All @@ -168,6 +180,7 @@ void gracefulCloseNoActiveStreams(boolean duplex) throws Exception {
sendGracefulClosePingAckAndVerifySecondGoAway(manager, pingFrame, duplex);
shutdownInputIfDuplexChannel();
channel.closeFuture().await();
verifyConnectionObserver(null);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
Expand All @@ -183,6 +196,7 @@ void gracefulCloseWithActiveStreams(boolean duplex) throws Exception {
activeStream.close().sync().await();
shutdownInputIfDuplexChannel();
channel.closeFuture().await();
verifyConnectionObserver(null);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
Expand All @@ -196,6 +210,7 @@ void gracefulCloseNoActiveStreamsMissingPingAck(boolean duplex) throws Exception
verifySecondGoAway(duplex);
shutdownInputIfDuplexChannel();
channel.closeFuture().await();
verifyConnectionObserver(TimeoutException.class);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
Expand All @@ -209,17 +224,16 @@ void gracefulCloseActiveStreamsMissingPingAck(boolean duplex) throws Exception {
pingAckTimeoutTask.runTask();
verifySecondGoAway(duplex);

assertThat("Channel closed.", channel.isOpen(), is(true));
activeStream.close().sync().await();
shutdownInputIfDuplexChannel();
channel.closeFuture().await();
activeStream.closeFuture().await();
verifyConnectionObserver(TimeoutException.class);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
@ValueSource(booleans = {true, false})
void gracefulClosePendingPingsCloseConnection(boolean duplex) throws Exception {
setUp(duplex, false);
addActiveStream(manager);
Http2StreamChannel activeStream = addActiveStream(manager);
Http2PingFrame pingFrame = initiateGracefulCloseVerifyGoAwayAndPing(manager);

sendGracefulClosePingAckAndVerifySecondGoAway(manager, pingFrame, duplex);
Expand All @@ -228,6 +242,8 @@ void gracefulClosePendingPingsCloseConnection(boolean duplex) throws Exception {
manager.channelIdle();
verifyWrite(instanceOf(Http2PingFrame.class));
verifyChannelCloseOnMissingPingAck(verifyPingAckTimeoutScheduled(), duplex);
activeStream.closeFuture().await();
verifyConnectionObserver(TimeoutException.class);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
Expand Down Expand Up @@ -291,6 +307,7 @@ void duplexGracefulCloseNoInputShutdown() throws Exception {
inputShutdownTimeoutTask.runTask();
closeLatch.await();
channel.closeFuture().await();
verifyConnectionObserver(TimeoutException.class);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
Expand All @@ -300,6 +317,7 @@ void failureToWritePingClosesChannel(boolean duplex) throws Exception {
failWrite.set(true);
manager.channelIdle();
channel.closeFuture().await();
verifyConnectionObserver(DeliberateException.class);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
Expand All @@ -312,6 +330,7 @@ void failureToWriteLastGoAwayAfterPingAckTimeoutClosesChannel(boolean duplex) th
failWrite.set(true);
ackTimeoutTask.runTask();
channel.closeFuture().await();
verifyConnectionObserver(DeliberateException.class);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
Expand All @@ -321,6 +340,7 @@ void failureToWriteFirstGoAwayClosesChannel(boolean duplex) throws Exception {
failWrite.set(true);
initiateGracefulClose(manager);
channel.closeFuture().await();
verifyConnectionObserver(DeliberateException.class);
}

@ParameterizedTest(name = "{displayName} [{index}] duplex={0}")
Expand All @@ -331,6 +351,7 @@ void failureToWriteSecondGoAwayClosesChannel(boolean duplex) throws Exception {
failWrite.set(true);
manager.pingReceived(new DefaultHttp2PingFrame(pingFrame.content(), true));
channel.closeFuture().await();
verifyConnectionObserver(DeliberateException.class);
}

private void verifyNoOtherActionPostClose(final KeepAliveManager manager) {
Expand Down Expand Up @@ -440,6 +461,7 @@ private Http2StreamChannel addActiveStream(final KeepAliveManager manager) {
return channel.newSucceededFuture();
});
manager.trackActiveStream(stream);
channel.closeFuture().addListener(f -> stream.close());
return stream;
}

Expand Down Expand Up @@ -482,6 +504,16 @@ private void shutdownInputIfDuplexChannel() throws InterruptedException {
}
}

private void verifyConnectionObserver(@Nullable Class<? extends Throwable> exceptionClass) {
if (exceptionClass != null) {
verify(connectionObserver).connectionClosed(any(exceptionClass));
verify(connectionObserver, never()).connectionClosed();
} else {
verify(connectionObserver).connectionClosed();
verify(connectionObserver, never()).connectionClosed(any(Throwable.class));
}
}

private static final class ScheduledTask {
final Runnable task;
final Promise<?> promise;
Expand Down

0 comments on commit 4e1464d

Please sign in to comment.