diff --git a/core/src/main/java/com/linecorp/armeria/common/ShuttingDownException.java b/core/src/main/java/com/linecorp/armeria/common/ShuttingDownException.java new file mode 100644 index 00000000000..44798508024 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/common/ShuttingDownException.java @@ -0,0 +1,47 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.common; + +import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.common.util.Sampler; +import com.linecorp.armeria.server.Server; + +/** + * A {@link CancellationException} raised when a {@link Server} cannot handle a request because it's shutting + * down. + */ +@UnstableApi +public final class ShuttingDownException extends CancellationException { + private static final long serialVersionUID = -4963725400532294491L; + + private static final ShuttingDownException INSTANCE = new ShuttingDownException(false); + + /** + * Returns a singleton {@link ShuttingDownException} or newly-created exception depending on + * the result of {@link Sampler#isSampled(Object)} of {@link Flags#verboseExceptionSampler()}. + */ + public static ShuttingDownException get() { + return Flags.verboseExceptionSampler().isSampled(ShuttingDownException.class) ? + new ShuttingDownException() : INSTANCE; + } + + private ShuttingDownException() {} + + private ShuttingDownException(@SuppressWarnings("unused") boolean dummy) { + super(null, null, false, false); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/Http1ObjectEncoder.java b/core/src/main/java/com/linecorp/armeria/internal/common/Http1ObjectEncoder.java index dd2478064b0..fb67fecb353 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/Http1ObjectEncoder.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/Http1ObjectEncoder.java @@ -16,7 +16,6 @@ package com.linecorp.armeria.internal.common; -import static com.linecorp.armeria.internal.client.ClosedStreamExceptionUtil.newClosedSessionException; import static java.util.Objects.requireNonNull; import java.util.AbstractMap.SimpleImmutableEntry; @@ -392,7 +391,7 @@ protected final void updateClosedId(int id) { protected abstract boolean isPing(int id); @Override - public final void close() { + public final void close(Throwable cause) { if (closed) { return; } @@ -403,7 +402,6 @@ public final void close() { return; } - final ClosedSessionException cause = newClosedSessionException(ch); for (Queue> queue : pendingWritesMap.values()) { for (;;) { final Entry e = queue.poll(); diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/Http2ObjectEncoder.java b/core/src/main/java/com/linecorp/armeria/internal/common/Http2ObjectEncoder.java index 44a4867442d..04a30737550 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/Http2ObjectEncoder.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/Http2ObjectEncoder.java @@ -124,7 +124,7 @@ protected final boolean isStreamPresentAndWritable(int streamId) { } @Override - public final void close() { + public final void close(Throwable unused) { closed = true; keepAliveHandler().destroy(); } diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/HttpObjectEncoder.java b/core/src/main/java/com/linecorp/armeria/internal/common/HttpObjectEncoder.java index 41560972bc3..b904d12efc5 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/HttpObjectEncoder.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/HttpObjectEncoder.java @@ -94,10 +94,10 @@ default ChannelFuture writeReset(int id, int streamId, Http2Error error, /** * Releases the resources related with this encoder and fails any unfinished writes. */ - void close(); + void close(Throwable cause); /** - * Returns {@code true} if {@link #close()} is called. + * Returns {@code true} if {@link #close(Throwable)} is called. */ boolean isClosed(); diff --git a/core/src/main/java/com/linecorp/armeria/server/AggregatingDecodedHttpRequest.java b/core/src/main/java/com/linecorp/armeria/server/AggregatingDecodedHttpRequest.java index 73331f1f252..cc5c3ad3680 100644 --- a/core/src/main/java/com/linecorp/armeria/server/AggregatingDecodedHttpRequest.java +++ b/core/src/main/java/com/linecorp/armeria/server/AggregatingDecodedHttpRequest.java @@ -59,6 +59,7 @@ final class AggregatingDecodedHttpRequest extends AggregatingStreamMessage aggregationFuture = new CompletableFuture<>(); + private final CompletableFuture whenResponseSent = new CompletableFuture<>(); AggregatingDecodedHttpRequest(EventLoop eventLoop, int id, int streamId, RequestHeaders headers, boolean keepAlive, long maxRequestLength, @@ -89,6 +90,12 @@ public CompletableFuture aggregate(AggregationOptions opt return super.aggregate(options); } + @Nullable + @Override + public ServiceRequestContext requestContext() { + return ctx; + } + @Override public RoutingContext routingContext() { return routingCtx; @@ -234,6 +241,11 @@ public CompletableFuture whenAggregated() { return aggregationFuture; } + @Override + public CompletableFuture whenResponseSent() { + return whenResponseSent; + } + @Override public ExchangeType exchangeType() { return exchangeType; diff --git a/core/src/main/java/com/linecorp/armeria/server/DecodedHttpRequest.java b/core/src/main/java/com/linecorp/armeria/server/DecodedHttpRequest.java index a298c53c690..80e394d0a52 100644 --- a/core/src/main/java/com/linecorp/armeria/server/DecodedHttpRequest.java +++ b/core/src/main/java/com/linecorp/armeria/server/DecodedHttpRequest.java @@ -75,6 +75,9 @@ static DecodedHttpRequest of(boolean endOfStream, EventLoop eventLoop, int id, i void init(ServiceRequestContext ctx); + @Nullable + ServiceRequestContext requestContext(); + RoutingContext routingContext(); /** @@ -118,6 +121,11 @@ default CompletableFuture whenAggregated() { return null; } + /** + * Returns a {@link CompletableFuture} that is completed when the response is fully sent. + */ + CompletableFuture whenResponseSent(); + /** * Returns the {@link ExchangeType} that determines whether to stream an {@link HttpRequest} or * {@link HttpResponse}. diff --git a/core/src/main/java/com/linecorp/armeria/server/DefaultGracefulShutdown.java b/core/src/main/java/com/linecorp/armeria/server/DefaultGracefulShutdown.java new file mode 100644 index 00000000000..eb202a57ce5 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/server/DefaultGracefulShutdown.java @@ -0,0 +1,82 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.server; + +import java.time.Duration; +import java.util.Objects; +import java.util.function.BiFunction; + +import com.google.common.base.MoreObjects; + +import com.linecorp.armeria.common.HttpRequest; + +final class DefaultGracefulShutdown implements GracefulShutdown { + + private final Duration quietPeriod; + private final Duration timeout; + private final BiFunction errorFunction; + + DefaultGracefulShutdown(Duration quietPeriod, Duration timeout, + BiFunction errorFunction) { + this.quietPeriod = quietPeriod; + this.timeout = timeout; + this.errorFunction = errorFunction; + } + + @Override + public Duration quietPeriod() { + return quietPeriod; + } + + @Override + public Duration timeout() { + return timeout; + } + + @Override + public Throwable shutdownError(ServiceRequestContext ctx, HttpRequest request) { + return errorFunction.apply(ctx, request); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof DefaultGracefulShutdown)) { + return false; + } + final DefaultGracefulShutdown that = (DefaultGracefulShutdown) o; + return quietPeriod.equals(that.quietPeriod) && + timeout.equals(that.timeout) && + errorFunction.equals(that.errorFunction); + } + + @Override + public int hashCode() { + return Objects.hash(quietPeriod, timeout, errorFunction); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("quietPeriod", quietPeriod) + .add("timeout", timeout) + .add("errorFunction", errorFunction) + .toString(); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/server/DefaultServerConfig.java b/core/src/main/java/com/linecorp/armeria/server/DefaultServerConfig.java index 02cdad33d55..93afcbdb24d 100644 --- a/core/src/main/java/com/linecorp/armeria/server/DefaultServerConfig.java +++ b/core/src/main/java/com/linecorp/armeria/server/DefaultServerConfig.java @@ -91,8 +91,7 @@ final class DefaultServerConfig implements ServerConfig { private final int http1MaxHeaderSize; private final int http1MaxChunkSize; - private final Duration gracefulShutdownQuietPeriod; - private final Duration gracefulShutdownTimeout; + private final GracefulShutdown gracefulShutdown; private final BlockingTaskExecutor blockingTaskExecutor; @@ -135,7 +134,7 @@ final class DefaultServerConfig implements ServerConfig { long http2MaxStreamsPerConnection, int http2MaxFrameSize, long http2MaxHeaderListSize, int http2MaxResetFramesPerWindow, int http2MaxResetFramesWindowSeconds, int http1MaxInitialLineLength, int http1MaxHeaderSize, - int http1MaxChunkSize, Duration gracefulShutdownQuietPeriod, Duration gracefulShutdownTimeout, + int http1MaxChunkSize, GracefulShutdown gracefulShutdown, BlockingTaskExecutor blockingTaskExecutor, MeterRegistry meterRegistry, int proxyProtocolMaxTlvSize, Map, Object> channelOptions, @@ -183,12 +182,7 @@ final class DefaultServerConfig implements ServerConfig { http1MaxHeaderSize, "http1MaxHeaderSize"); this.http1MaxChunkSize = validateNonNegative( http1MaxChunkSize, "http1MaxChunkSize"); - this.gracefulShutdownQuietPeriod = validateNonNegative(requireNonNull( - gracefulShutdownQuietPeriod), "gracefulShutdownQuietPeriod"); - this.gracefulShutdownTimeout = validateNonNegative(requireNonNull( - gracefulShutdownTimeout), "gracefulShutdownTimeout"); - validateGreaterThanOrEqual(gracefulShutdownTimeout, "gracefulShutdownTimeout", - gracefulShutdownQuietPeriod, "gracefulShutdownQuietPeriod"); + this.gracefulShutdown = requireNonNull(gracefulShutdown, "gracefulShutdown"); requireNonNull(blockingTaskExecutor, "blockingTaskExecutor"); this.blockingTaskExecutor = monitorBlockingTaskExecutor(blockingTaskExecutor, meterRegistry); @@ -369,7 +363,7 @@ static Duration validateNonNegative(Duration duration, String fieldName) { static void validateGreaterThanOrEqual(Duration larger, String largerFieldName, Duration smaller, String smallerFieldName) { if (larger.compareTo(smaller) < 0) { - throw new IllegalArgumentException(largerFieldName + " must be greater than or equal to" + + throw new IllegalArgumentException(largerFieldName + " must be greater than or equal to " + smallerFieldName); } } @@ -586,12 +580,17 @@ public int http2MaxResetFramesWindowSeconds() { @Override public Duration gracefulShutdownQuietPeriod() { - return gracefulShutdownQuietPeriod; + return gracefulShutdown.quietPeriod(); } @Override public Duration gracefulShutdownTimeout() { - return gracefulShutdownTimeout; + return gracefulShutdown.timeout(); + } + + @Override + public GracefulShutdown gracefulShutdown() { + return gracefulShutdown; } @Override @@ -702,7 +701,7 @@ public String toString() { http2InitialConnectionWindowSize(), http2InitialStreamWindowSize(), http2MaxStreamsPerConnection(), http2MaxFrameSize(), http2MaxHeaderListSize(), http1MaxInitialLineLength(), http1MaxHeaderSize(), http1MaxChunkSize(), - proxyProtocolMaxTlvSize(), gracefulShutdownQuietPeriod(), gracefulShutdownTimeout(), + proxyProtocolMaxTlvSize(), gracefulShutdown(), blockingTaskExecutor(), meterRegistry(), channelOptions(), childChannelOptions(), clientAddressSources(), clientAddressTrustedProxyFilter(), clientAddressFilter(), @@ -723,7 +722,7 @@ static String toString( int http2InitialStreamWindowSize, long http2MaxStreamsPerConnection, int http2MaxFrameSize, long http2MaxHeaderListSize, long http1MaxInitialLineLength, long http1MaxHeaderSize, long http1MaxChunkSize, int proxyProtocolMaxTlvSize, - Duration gracefulShutdownQuietPeriod, Duration gracefulShutdownTimeout, + GracefulShutdown gracefulShutdown, @Nullable BlockingTaskExecutor blockingTaskExecutor, @Nullable MeterRegistry meterRegistry, Map, ?> channelOptions, Map, ?> childChannelOptions, @@ -799,10 +798,8 @@ static String toString( buf.append(http1MaxChunkSize); buf.append("B, proxyProtocolMaxTlvSize: "); buf.append(proxyProtocolMaxTlvSize); - buf.append("B, gracefulShutdownQuietPeriod: "); - buf.append(gracefulShutdownQuietPeriod); - buf.append(", gracefulShutdownTimeout: "); - buf.append(gracefulShutdownTimeout); + buf.append("B, gracefulShutdown: "); + buf.append(gracefulShutdown); if (blockingTaskExecutor != null) { buf.append(", blockingTaskExecutor: "); buf.append(blockingTaskExecutor); diff --git a/core/src/main/java/com/linecorp/armeria/server/DefaultServerErrorHandler.java b/core/src/main/java/com/linecorp/armeria/server/DefaultServerErrorHandler.java index 8e8d5c4dd74..c6005da4a7f 100644 --- a/core/src/main/java/com/linecorp/armeria/server/DefaultServerErrorHandler.java +++ b/core/src/main/java/com/linecorp/armeria/server/DefaultServerErrorHandler.java @@ -27,6 +27,7 @@ import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.armeria.common.ResponseHeaders; +import com.linecorp.armeria.common.ShuttingDownException; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.util.Exceptions; import com.linecorp.armeria.internal.common.RequestContextExtension; @@ -95,6 +96,11 @@ public HttpResponse onServiceException(ServiceRequestContext ctx, Throwable caus return internalRenderStatus(ctx, ctx.request().headers(), status, cause); } + if (cause instanceof ShuttingDownException) { + return internalRenderStatus(ctx, ctx.request().headers(), + HttpStatus.SERVICE_UNAVAILABLE, cause); + } + return internalRenderStatus(ctx, ctx.request().headers(), HttpStatus.INTERNAL_SERVER_ERROR, cause); } diff --git a/core/src/main/java/com/linecorp/armeria/server/EmptyContentDecodedHttpRequest.java b/core/src/main/java/com/linecorp/armeria/server/EmptyContentDecodedHttpRequest.java index f8485648293..bd68819b1a0 100644 --- a/core/src/main/java/com/linecorp/armeria/server/EmptyContentDecodedHttpRequest.java +++ b/core/src/main/java/com/linecorp/armeria/server/EmptyContentDecodedHttpRequest.java @@ -58,6 +58,7 @@ final class EmptyContentDecodedHttpRequest implements DecodedHttpRequest { @Nullable private CompletableFuture aggregateFuture; + private final CompletableFuture whenResponseSent = new CompletableFuture<>(); @Nullable private HttpResponse response; @@ -84,6 +85,12 @@ public void init(ServiceRequestContext ctx) { this.ctx = ctx; } + @Nullable + @Override + public ServiceRequestContext requestContext() { + return ctx; + } + @Override public RoutingContext routingContext() { return routingContext; @@ -234,6 +241,11 @@ public boolean isResponseAborted() { return abortResponseCause != null; } + @Override + public CompletableFuture whenResponseSent() { + return whenResponseSent; + } + @Override public ExchangeType exchangeType() { return exchangeType; diff --git a/core/src/main/java/com/linecorp/armeria/server/GracefulShutdown.java b/core/src/main/java/com/linecorp/armeria/server/GracefulShutdown.java new file mode 100644 index 00000000000..81657596352 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/server/GracefulShutdown.java @@ -0,0 +1,68 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.server; + +import java.time.Duration; + +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.ShuttingDownException; +import com.linecorp.armeria.common.annotation.UnstableApi; + +/** + * Configures the graceful shutdown behavior of a {@link Server}. + */ +@UnstableApi +public interface GracefulShutdown { + + /** + * Returns a new {@link GracefulShutdownBuilder}. + */ + static GracefulShutdownBuilder builder() { + return new GracefulShutdownBuilder(); + } + + /** + * Returns a {@link GracefulShutdown} that disables the graceful shutdown feature. + */ + static GracefulShutdown disabled() { + return GracefulShutdownBuilder.DISABLED; + } + + /** + * Returns the quiet period to wait for active requests to go end before shutting down. + * {@link Duration#ZERO} means the server will stop right away without waiting. + */ + Duration quietPeriod(); + + /** + * Returns the amount of time to wait before shutting down the server regardless of active requests. + * This should be set to a time greater than {@code quietPeriod} to ensure the server shuts down even + * if there is a stuck request. + */ + Duration timeout(); + + /** + * Returns an {@link Throwable} to terminate a pending request when the server is shutting down. + * The exception will be converted to an {@link HttpResponse} by {@link ServerErrorHandler}. + * + *

If null is returned, the request will be terminated with {@link ShuttingDownException} that will be + * converted to an {@link HttpStatus#SERVICE_UNAVAILABLE} response. + */ + Throwable shutdownError(ServiceRequestContext ctx, HttpRequest request); +} diff --git a/core/src/main/java/com/linecorp/armeria/server/GracefulShutdownBuilder.java b/core/src/main/java/com/linecorp/armeria/server/GracefulShutdownBuilder.java new file mode 100644 index 00000000000..a06032894e8 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/server/GracefulShutdownBuilder.java @@ -0,0 +1,117 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.server; + +import static com.linecorp.armeria.server.DefaultServerConfig.validateGreaterThanOrEqual; +import static com.linecorp.armeria.server.DefaultServerConfig.validateNonNegative; +import static java.util.Objects.requireNonNull; + +import java.time.Duration; +import java.util.function.BiFunction; + +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.ShuttingDownException; +import com.linecorp.armeria.common.annotation.UnstableApi; + +/** + * Builds a {@link GracefulShutdown}. + */ +@UnstableApi +public final class GracefulShutdownBuilder { + + // Defaults to no graceful shutdown. + private static final Duration DEFAULT_GRACEFUL_SHUTDOWN_QUIET_PERIOD = Duration.ZERO; + private static final Duration DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = Duration.ZERO; + private static final BiFunction DEFAULT_ERROR_FUNCTION = + (ctx, req) -> ShuttingDownException.get(); + + static final GracefulShutdown DISABLED = GracefulShutdown.builder().build(); + + private Duration quietPeriod = DEFAULT_GRACEFUL_SHUTDOWN_QUIET_PERIOD; + private Duration timeout = DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT; + private BiFunction errorFunction = DEFAULT_ERROR_FUNCTION; + + GracefulShutdownBuilder() {} + + /** + * Sets the quiet period to wait for active requests to go end before shutting down. + * {@link Duration#ZERO} means the server will stop right away without waiting. + * + *

The default is {@link Duration#ZERO}. + */ + public GracefulShutdownBuilder quietPeriod(Duration quietPeriod) { + requireNonNull(quietPeriod, "quietPeriod"); + this.quietPeriod = validateNonNegative(quietPeriod, "quietPeriod"); + return this; + } + + /** + * Sets the quiet period millis to wait for active requests to go end before shutting down. + * 0 means the server will stop right away without waiting. + * + *

The default is 0. + */ + public GracefulShutdownBuilder quietPeriodMillis(long quietPeriodMillis) { + return quietPeriod(Duration.ofMillis(quietPeriodMillis)); + } + + /** + * Sets the amount of time to wait before shutting down the server regardless of active requests. + * This should be set to a time greater than {@code quietPeriod} to ensure the server shuts down even + * if there is a stuck request. + * + *

The default is {@link Duration#ZERO}. + */ + public GracefulShutdownBuilder timeout(Duration timeout) { + requireNonNull(timeout, "timeout"); + this.timeout = validateNonNegative(timeout, "timeout"); + return this; + } + + /** + * Sets the amount of time to wait before shutting down the server regardless of active requests. + * This should be set to a time greater than {@code quietPeriod} to ensure the server shuts down even + * if there is a stuck request. + * + *

The default is {@link Duration#ZERO}. + */ + public GracefulShutdownBuilder timeoutMillis(long timeoutMillis) { + return timeout(Duration.ofMillis(timeoutMillis)); + } + + /** + * Sets the function that returns an {@link Throwable} to terminate a pending request when the server is + * shutting down. If unspecified, the request will be terminated with {@link ShuttingDownException} that + * will be converted to an {@link HttpStatus#SERVICE_UNAVAILABLE} response. + */ + public GracefulShutdownBuilder shutdownErrorFunction( + BiFunction errorFunction) { + requireNonNull(errorFunction, "errorFunction"); + //noinspection unchecked + this.errorFunction = (BiFunction) errorFunction; + return this; + } + + /** + * Builds a new {@link GracefulShutdown} with the configured parameters. + */ + public GracefulShutdown build() { + validateGreaterThanOrEqual(timeout, "timeout", quietPeriod, "quietPeriod"); + return new DefaultGracefulShutdown(quietPeriod, timeout, errorFunction); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/server/Http1RequestDecoder.java b/core/src/main/java/com/linecorp/armeria/server/Http1RequestDecoder.java index cc35c9d52f3..4951a6f89db 100644 --- a/core/src/main/java/com/linecorp/armeria/server/Http1RequestDecoder.java +++ b/core/src/main/java/com/linecorp/armeria/server/Http1RequestDecoder.java @@ -438,7 +438,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc pipeline.context(Http2ServerConnectionHandler.class); final Http2ServerConnectionHandler connectionHandler = (Http2ServerConnectionHandler) connectionHandlerCtx.handler(); - encoder.close(); + encoder.close(ClosedSessionException.get()); // The HTTP/2 encoder will be used when a protocol violation error occurs after upgrading to HTTP/2 // that is directly written by 'fail()'. encoder = connectionHandler.getOrCreateResponseEncoder(connectionHandlerCtx); diff --git a/core/src/main/java/com/linecorp/armeria/server/HttpServerHandler.java b/core/src/main/java/com/linecorp/armeria/server/HttpServerHandler.java index f3722de3e91..6a0ee610f51 100644 --- a/core/src/main/java/com/linecorp/armeria/server/HttpServerHandler.java +++ b/core/src/main/java/com/linecorp/armeria/server/HttpServerHandler.java @@ -20,6 +20,7 @@ import static com.linecorp.armeria.common.SessionProtocol.H1C; import static com.linecorp.armeria.common.SessionProtocol.H2; import static com.linecorp.armeria.common.SessionProtocol.H2C; +import static com.linecorp.armeria.internal.client.ClosedStreamExceptionUtil.newClosedSessionException; import static com.linecorp.armeria.internal.common.HttpHeadersUtil.CLOSE_STRING; import static com.linecorp.armeria.internal.common.RequestContextUtil.NOOP_CONTEXT_HOOK; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; @@ -54,6 +55,7 @@ import com.linecorp.armeria.common.ResponseHeaders; import com.linecorp.armeria.common.ResponseHeadersBuilder; import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.common.ShuttingDownException; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.logging.RequestLogBuilder; import com.linecorp.armeria.common.metric.NoopMeterRegistry; @@ -90,6 +92,7 @@ final class HttpServerHandler extends ChannelInboundHandlerAdapter implements Ht private static final Logger logger = LoggerFactory.getLogger(HttpServerHandler.class); + private static final CompletableFuture[] EMPTY_FUTURES = {}; private static final String ALLOWED_METHODS_STRING = HttpMethod.knownMethods().stream().map(HttpMethod::name).collect(Collectors.joining(",")); @@ -197,6 +200,7 @@ static void safeClose(Channel ch) { private final IdentityHashMap unfinishedRequests; private boolean isReading; private boolean isCleaning; + private boolean isClosing; private boolean handledLastRequest; HttpServerHandler(ServerConfig config, @@ -228,11 +232,23 @@ public int unfinishedRequests() { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - if (responseEncoder != null) { - // Immediately close responseEncoder so that a late response is completed with - // a ClosedSessionException. - responseEncoder.close(); + cleanup(ctx.channel(), false, null); + } + + CompletableFuture shutdown(Channel channel) { + final CompletableFuture completionFuture = new CompletableFuture<>(); + // This method is called from a startStopExecutor + channel.eventLoop().execute(() -> { + cleanup(channel, true, completionFuture); + }); + return completionFuture; + } + + private void cleanup(Channel ch, boolean shutdown, @Nullable CompletableFuture completionFuture) { + if (isClosing) { + return; } + isClosing = true; // Give the unfinished streaming responses a chance to close themselves before we abort them, // so that successful responses are not aborted due to a race condition like the following: @@ -249,29 +265,71 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { case H1C: case H1: // XXX(trustin): How much time is 'a little bit'? - ctx.channel().eventLoop().schedule(this::cleanup, 1, TimeUnit.SECONDS); + ch.eventLoop().schedule(() -> cleanup0(ch, shutdown, completionFuture), 1, TimeUnit.SECONDS); break; default: // HTTP/2 is unaffected by this issue because a client is expected to wait for a frame with // endOfStream set. - cleanup(); + cleanup0(ch, shutdown, completionFuture); } } - private void cleanup() { + private void cleanup0(Channel ch, boolean shutdown, @Nullable CompletableFuture completionFuture) { + final Throwable defaultCause = shutdown ? ShuttingDownException.get() : newClosedSessionException(ch); if (!unfinishedRequests.isEmpty()) { isCleaning = true; - final ClosedSessionException cause = ClosedSessionException.get(); unfinishedRequests.forEach((req, res) -> { // An HTTP2 request is cancelled by Http2RequestDecoder.onRstStreamRead() final boolean cancel = !protocol.isMultiplex(); // Mark the request stream as closed due to disconnection. + Throwable cause = null; + if (shutdown) { + cause = shutdownError(req); + } + if (cause == null) { + cause = defaultCause; + } req.abortResponse(cause, cancel); }); + + if (completionFuture != null) { + final CompletableFuture[] futures = + unfinishedRequests.keySet().stream() + .map(DecodedHttpRequest::whenResponseSent) + .toArray(CompletableFuture[]::new); + CompletableFuture.allOf(futures).handle((unused0, unused1) -> { + completionFuture.complete(null); + // responseEncoder.close() should be called after writing all unfinished responses. + if (responseEncoder != null) { + responseEncoder.close(defaultCause); + } + return null; + }); + } else { + if (responseEncoder != null) { + responseEncoder.close(defaultCause); + } + } + unfinishedRequests.clear(); } } + @Nullable + private Throwable shutdownError(DecodedHttpRequest req) { + final ServiceRequestContext ctx = req.requestContext(); + if (ctx == null) { + return null; + } + try { + return config.gracefulShutdown().shutdownError(ctx, req); + } catch (Exception e) { + logger.warn("{} Unexpected exception from gracefulShutdown.shutdownError(): {}", + ctx, config.gracefulShutdown(), e); + return null; + } + } + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { isReading = true; // Cleared in channelReadComplete() @@ -302,7 +360,7 @@ private void handleHttp2Settings(ChannelHandlerContext ctx, Http2Settings h2sett final Http2ServerConnectionHandler connectionHandler = (Http2ServerConnectionHandler) connectionHandlerCtx.handler(); if (responseEncoder instanceof Http1ObjectEncoder) { - responseEncoder.close(); + responseEncoder.close(ClosedSessionException.get()); } responseEncoder = connectionHandler.getOrCreateResponseEncoder(connectionHandlerCtx); @@ -437,7 +495,7 @@ private void handleRequest(ChannelHandlerContext ctx, DecodedHttpRequest req) th // A future which is completed when the all response objects are written to channel and // the returned promises are done. - final CompletableFuture resWriteFuture = new CompletableFuture<>(); + final CompletableFuture resWriteFuture = req.whenResponseSent(); resWriteFuture.handle(handler.responseCompleteHandler); // Set the response to the request in order to be able to immediately abort the response diff --git a/core/src/main/java/com/linecorp/armeria/server/Server.java b/core/src/main/java/com/linecorp/armeria/server/Server.java index 9beb62f4ab2..a535c587a7f 100644 --- a/core/src/main/java/com/linecorp/armeria/server/Server.java +++ b/core/src/main/java/com/linecorp/armeria/server/Server.java @@ -26,6 +26,7 @@ import java.net.SocketAddress; import java.security.cert.Certificate; import java.security.cert.X509Certificate; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; @@ -75,6 +76,7 @@ import com.linecorp.armeria.common.util.StartStopSupport; import com.linecorp.armeria.common.util.TlsEngineType; import com.linecorp.armeria.common.util.TransportType; +import com.linecorp.armeria.common.util.UnmodifiableFuture; import com.linecorp.armeria.common.util.Version; import com.linecorp.armeria.internal.common.RequestTargetCache; import com.linecorp.armeria.internal.common.util.ChannelUtil; @@ -501,11 +503,11 @@ private final class ServerStartStopSupport extends StartStopSupport doStart(@Nullable Void arg) { - if (config().gracefulShutdownQuietPeriod().isZero()) { + if (config().gracefulShutdown().quietPeriod().isZero()) { gracefulShutdownSupport = GracefulShutdownSupport.createDisabled(); } else { gracefulShutdownSupport = - GracefulShutdownSupport.create(config().gracefulShutdownQuietPeriod(), + GracefulShutdownSupport.create(config().gracefulShutdown().quietPeriod(), config().blockingTaskExecutor()); } @@ -618,7 +620,7 @@ protected CompletionStage doStop(@Nullable Void arg) { gracefulShutdownExecutor.schedule(() -> { quietPeriodFuture.cancel(false); doStop(future, gracefulShutdownExecutor); - }, config.gracefulShutdownTimeout().toMillis(), TimeUnit.MILLISECONDS); + }, config.gracefulShutdown().timeout().toMillis(), TimeUnit.MILLISECONDS); } catch (RejectedExecutionException e) { // Can be rejected if quiet period is complete already. } @@ -655,45 +657,48 @@ private void doStop(CompletableFuture future, lock.unlock(); } - // Close all accepted sockets. - ChannelUtil.close(connectionLimitingHandler.children()).handle((unused3, unused4) -> { - // Shut down the worker group if necessary. - final Future workerShutdownFuture; - if (config.shutdownWorkerGroupOnStop()) { - workerShutdownFuture = config.workerGroup().shutdownGracefully(); - } else { - workerShutdownFuture = ImmediateEventExecutor.INSTANCE.newSucceededFuture(null); - } - - workerShutdownFuture.addListener(unused5 -> { - final Set bossGroups = - Server.this.serverChannels.stream() - .map(ch -> ch.eventLoop().parent()) - .collect(toImmutableSet()); - - // If started to shutdown before initializing a boss group, - // complete the future immediately. - if (bossGroups.isEmpty()) { - finishDoStop(future); - return; + shutdownServerHandlers().handle((unused3, unused4) -> { + // Close all accepted sockets. + ChannelUtil.close(connectionLimitingHandler.children()).handle((unused5, unused6) -> { + // Shut down the worker group if necessary. + final Future workerShutdownFuture; + if (config.shutdownWorkerGroupOnStop()) { + workerShutdownFuture = config.workerGroup().shutdownGracefully(); + } else { + workerShutdownFuture = ImmediateEventExecutor.INSTANCE.newSucceededFuture(null); } - // Shut down all boss groups and wait until they are terminated. - final AtomicInteger remainingBossGroups = new AtomicInteger(bossGroups.size()); - bossGroups.forEach(bossGroup -> { - bossGroup.shutdownGracefully(); - bossGroup.terminationFuture().addListener(unused6 -> { - if (remainingBossGroups.decrementAndGet() != 0) { - // There are more boss groups to terminate. - return; - } - - // Boss groups have been terminated completely. + workerShutdownFuture.addListener(unused7 -> { + final Set bossGroups = + Server.this.serverChannels.stream() + .map(ch -> ch.eventLoop().parent()) + .collect(toImmutableSet()); + + // If started to shutdown before initializing a boss group, + // complete the future immediately. + if (bossGroups.isEmpty()) { finishDoStop(future); + return; + } + + // Shut down all boss groups and wait until they are terminated. + final AtomicInteger remainingBossGroups = new AtomicInteger(bossGroups.size()); + bossGroups.forEach(bossGroup -> { + bossGroup.shutdownGracefully(); + bossGroup.terminationFuture().addListener(unused8 -> { + if (remainingBossGroups.decrementAndGet() != 0) { + // There are more boss groups to terminate. + return; + } + + // Boss groups have been terminated completely. + finishDoStop(future); + }); }); }); - }); + return null; + }); return null; }); @@ -701,6 +706,30 @@ private void doStop(CompletableFuture future, }); } + private CompletableFuture shutdownServerHandlers() { + if (config.gracefulShutdown().timeout().isZero()) { + return UnmodifiableFuture.completedFuture(null); + } + + final Set children = connectionLimitingHandler.children(); + final List> closeFutures = new ArrayList<>(children.size()); + for (Channel ch : children) { + final HttpServerHandler serverHandler = ch.pipeline().get(HttpServerHandler.class); + if (serverHandler != null) { + closeFutures.add(serverHandler.shutdown(ch)); + } + } + + // The future returned by shutdown() will be always completed successfully. + final CompletableFuture> combined = CompletableFutures.allAsList(closeFutures); + config.workerGroup().schedule(() -> { + combined.complete(ImmutableList.of()); + // TODO(ikhoon): Make the timeout configurable. + // Wait for up to 1 second to send shutdown responses to clients. + }, 1, TimeUnit.SECONDS); + return combined.thenApply(ignored -> null); + } + private void finishDoStop(CompletableFuture future) { serverChannels.clear(); diff --git a/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java b/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java index 5674d209f67..aacb6b1a7a0 100644 --- a/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java +++ b/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java @@ -23,7 +23,6 @@ import static com.linecorp.armeria.common.SessionProtocol.HTTP; import static com.linecorp.armeria.common.SessionProtocol.HTTPS; import static com.linecorp.armeria.common.SessionProtocol.PROXY; -import static com.linecorp.armeria.server.DefaultServerConfig.validateGreaterThanOrEqual; import static com.linecorp.armeria.server.DefaultServerConfig.validateIdleTimeoutMillis; import static com.linecorp.armeria.server.DefaultServerConfig.validateMaxNumConnections; import static com.linecorp.armeria.server.DefaultServerConfig.validateNonNegative; @@ -216,8 +215,7 @@ public final class ServerBuilder implements TlsSetters, ServiceConfigsBuilder whenResponseSent = new CompletableFuture<>(); + private boolean shouldResetOnlyIfRemoteIsOpen; @Nullable @@ -83,6 +87,12 @@ public void init(ServiceRequestContext ctx) { this.ctx = ctx; } + @Nullable + @Override + public ServiceRequestContext requestContext() { + return ctx; + } + @Override public RoutingContext routingContext() { return routingCtx; @@ -213,6 +223,11 @@ public boolean isResponseAborted() { return abortResponseCause != null; } + @Override + public CompletableFuture whenResponseSent() { + return whenResponseSent; + } + @Override public ExchangeType exchangeType() { return exchangeType; diff --git a/core/src/main/java/com/linecorp/armeria/server/UpdatableServerConfig.java b/core/src/main/java/com/linecorp/armeria/server/UpdatableServerConfig.java index 1b82f01a805..e3c327bdb64 100644 --- a/core/src/main/java/com/linecorp/armeria/server/UpdatableServerConfig.java +++ b/core/src/main/java/com/linecorp/armeria/server/UpdatableServerConfig.java @@ -240,6 +240,11 @@ public Duration gracefulShutdownTimeout() { return delegate.gracefulShutdownTimeout(); } + @Override + public GracefulShutdown gracefulShutdown() { + return delegate.gracefulShutdown(); + } + @Override public BlockingTaskExecutor blockingTaskExecutor() { return delegate.blockingTaskExecutor(); diff --git a/core/src/test/java/com/linecorp/armeria/GracefulShutdownBuilderTest.java b/core/src/test/java/com/linecorp/armeria/GracefulShutdownBuilderTest.java new file mode 100644 index 00000000000..9fd60d8cb95 --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/GracefulShutdownBuilderTest.java @@ -0,0 +1,74 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; + +import com.linecorp.armeria.common.ShuttingDownException; +import com.linecorp.armeria.internal.testing.AnticipatedException; +import com.linecorp.armeria.server.GracefulShutdown; + +class GracefulShutdownBuilderTest { + + @Test + void testInvalidValues() { + assertThatThrownBy(() -> GracefulShutdown.builder().quietPeriodMillis(-1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageMatching("quietPeriod:.*?\\(expected: >= 0\\)"); + + assertThatThrownBy(() -> GracefulShutdown.builder().timeoutMillis(-1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageMatching("timeout:.*?\\(expected: >= 0\\)"); + + assertThatThrownBy(() -> { + GracefulShutdown.builder() + .quietPeriodMillis(10) + .timeoutMillis(5) + .build(); + }) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageMatching("timeout must be greater than or equal to quietPeriod"); + } + + @Test + void testDefault() { + final GracefulShutdown gracefulShutdown = GracefulShutdown.builder().build(); + assertThat(gracefulShutdown.quietPeriod()).isZero(); + assertThat(gracefulShutdown.timeout()).isZero(); + assertThat(gracefulShutdown.shutdownError(null, null)) + .isInstanceOf(ShuttingDownException.class); + } + + @Test + void testCustomValues() { + final GracefulShutdown gracefulShutdown = + GracefulShutdown.builder() + .quietPeriod(Duration.ofSeconds(1)) + .timeout(Duration.ofSeconds(2)) + .shutdownErrorFunction((ctx, req) -> new AnticipatedException("test")) + .build(); + assertThat(gracefulShutdown.quietPeriod()).isEqualTo(Duration.ofSeconds(1)); + assertThat(gracefulShutdown.timeout()).isEqualTo(Duration.ofSeconds(2)); + assertThat(gracefulShutdown.shutdownError(null, null)) + .isInstanceOf(AnticipatedException.class); + } +} diff --git a/core/src/test/java/com/linecorp/armeria/server/CustomGracefulShutDownTest.java b/core/src/test/java/com/linecorp/armeria/server/CustomGracefulShutDownTest.java new file mode 100644 index 00000000000..0e91480af88 --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/server/CustomGracefulShutDownTest.java @@ -0,0 +1,95 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.server; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; + +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.client.logging.LoggingClient; +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.ShuttingDownException; +import com.linecorp.armeria.internal.testing.AnticipatedException; + +class CustomGracefulShutDownTest { + + @ArgumentsSource(GracefulShutdownProvider.class) + @ParameterizedTest + void testGracefulShutdown(GracefulShutdown gracefulShutdown, Class expectedCause, + HttpStatus expectedStatus) { + final CompletableFuture whenReceived = new CompletableFuture<>(); + final Server server = + Server.builder() + .service("/", (ctx, req) -> { + whenReceived.complete(ctx); + return HttpResponse.streaming(); + }) + .gracefulShutdown(gracefulShutdown) + .errorHandler((ctx, cause) -> { + if (cause instanceof AnticipatedException) { + return HttpResponse.of(HttpStatus.BAD_GATEWAY); + } + return null; + }) + .build(); + server.start().join(); + final WebClient client = WebClient.builder("http://127.0.0.1:" + server.activeLocalPort()) + .responseTimeoutMillis(0) + .decorator(LoggingClient.newDecorator()) + .build(); + final CompletableFuture res = client.get("/").aggregate(); + final ServiceRequestContext sctx = whenReceived.join(); + final CompletableFuture closeFuture = server.stop(); + final AggregatedHttpResponse response = res.join(); + assertThat(response.status()).isEqualTo(expectedStatus); + assertThat(sctx.log().whenComplete().join().responseCause()).isInstanceOf(expectedCause); + closeFuture.join(); + } + + private static class GracefulShutdownProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(ExtensionContext context) { + final GracefulShutdown customError = + GracefulShutdown.builder() + .quietPeriod(Duration.ofMillis(500)) + .timeout(Duration.ofMillis(500)) + .shutdownErrorFunction( + (ctx, req) -> new AnticipatedException()) + .build(); + + final GracefulShutdown defaultError = + GracefulShutdown.builder() + .quietPeriod(Duration.ofMillis(200)) + .timeout(Duration.ofMillis(200)) + .build(); + return Stream.of( + Arguments.of(defaultError, ShuttingDownException.class, HttpStatus.SERVICE_UNAVAILABLE), + Arguments.of(customError, AnticipatedException.class, HttpStatus.BAD_GATEWAY)); + } + } +} diff --git a/spring/boot3-autoconfigure/src/test/java/com/linecorp/armeria/spring/ArmeriaSettingsConfigurationTest.java b/spring/boot3-autoconfigure/src/test/java/com/linecorp/armeria/spring/ArmeriaSettingsConfigurationTest.java index b111becec47..44c2cae2b6d 100644 --- a/spring/boot3-autoconfigure/src/test/java/com/linecorp/armeria/spring/ArmeriaSettingsConfigurationTest.java +++ b/spring/boot3-autoconfigure/src/test/java/com/linecorp/armeria/spring/ArmeriaSettingsConfigurationTest.java @@ -115,8 +115,8 @@ void buildServerBasedOnProperties() { assertThat(defaultVirtualHost.verboseResponses()).isTrue(); // ArmeriaServerConfigurator overrides the properties from ArmeriaSettings. - assertThat(config.gracefulShutdownTimeout().toMillis()).isEqualTo(10000); - assertThat(config.gracefulShutdownQuietPeriod().toMillis()).isEqualTo(1000); + assertThat(config.gracefulShutdown().timeout().toMillis()).isEqualTo(10000); + assertThat(config.gracefulShutdown().quietPeriod().toMillis()).isEqualTo(1000); assertThat(config.dependencyInjector().getInstance(Object.class)).isSameAs(dummyObject); assertThat(config.errorHandler().onServiceException(null, null)).isSameAs(dummyResponse); diff --git a/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/server/GracefulShutdownIntegrationTest.java b/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/server/GracefulShutdownIntegrationTest.java index ae797287207..beaceffd36b 100644 --- a/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/server/GracefulShutdownIntegrationTest.java +++ b/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/server/GracefulShutdownIntegrationTest.java @@ -34,8 +34,9 @@ import org.slf4j.LoggerFactory; import com.linecorp.armeria.client.ClientFactory; +import com.linecorp.armeria.client.InvalidResponseHeadersException; +import com.linecorp.armeria.client.logging.LoggingClient; import com.linecorp.armeria.client.thrift.ThriftClients; -import com.linecorp.armeria.common.ClosedSessionException; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.logging.RequestLog; @@ -45,6 +46,7 @@ import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.ServiceRequestContext; import com.linecorp.armeria.server.logging.AccessLogWriter; +import com.linecorp.armeria.server.logging.LoggingService; import com.linecorp.armeria.server.thrift.THttpService; import com.linecorp.armeria.testing.junit5.server.ServerExtension; @@ -74,6 +76,7 @@ protected void configure(ServerBuilder sb) throws Exception { ServiceRequestContext.current().eventLoop().schedule( () -> resultHandler.onComplete(milliseconds), milliseconds, MILLISECONDS); })); + sb.decorator(LoggingService.newDecorator()); final AccessLogWriter writer1 = new AccessLogWriter() { @Override @@ -211,7 +214,9 @@ void interruptsSlowRequests() throws Exception { client.sleep(30000L); completed.set(true); } catch (TTransportException cause) { - assertThat(cause).hasCauseInstanceOf(ClosedSessionException.class); + assertThat(cause).hasCauseInstanceOf(InvalidResponseHeadersException.class); + assertThat(((InvalidResponseHeadersException) cause.getCause()).headers().status()) + .isEqualTo(HttpStatus.SERVICE_UNAVAILABLE); latch2.countDown(); } catch (Throwable t) { logger.error("Unexpected failure:", t); @@ -272,6 +277,7 @@ void testHardTimeout() throws Exception { private SleepService.Iface newClient() { return ThriftClients.builder(server.httpUri()) + .decorator(LoggingClient.newDecorator()) .path("/sleep") .factory(clientFactory) .build(SleepService.Iface.class);