Skip to content

Commit 8a2ca1a

Browse files
authored
Correctly handle http conn state when origin responds before LastHttpContent is sent (#2129)
This PR ensures we close the origin connection (rather than returning it to the pool) when an HTTP/1.1 origin sends a response before zuul finishes streaming the request body to it. If an origin responds before zuul finishes sending the request body without signally to close the connection, the outbound `HttpClientCodec` encoder is left mid-body (state `ST_CONTENT_NON_CHUNK`). When the connection is reused for the next request, the encoder rejects it with `IllegalStateException: unexpected message type: DefaultHttpRequest, state: 1`, resulting in a HTTP 500 response to an unrelated request. Per RFC 9112 / RFC 7230, an origin that produces a response without consuming the full request body is expected to signal `Connection: close` so the connection is not reused. However, some origins don't, so we ensure the connection pool correctly protects against this case.
1 parent 3b98e3a commit 8a2ca1a

8 files changed

Lines changed: 436 additions & 3 deletions

File tree

zuul-core/src/main/java/com/netflix/netty/common/HttpClientLifecycleChannelHandler.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.netty.handler.codec.http.HttpRequest;
2525
import io.netty.handler.codec.http.HttpResponse;
2626
import io.netty.handler.codec.http.LastHttpContent;
27+
import io.netty.util.AttributeKey;
2728

2829
/**
2930
* @author michaels
@@ -32,6 +33,9 @@ public class HttpClientLifecycleChannelHandler extends HttpLifecycleChannelHandl
3233
public static final ChannelHandler INBOUND_CHANNEL_HANDLER = new HttpClientLifecycleInboundChannelHandler();
3334
public static final ChannelHandler OUTBOUND_CHANNEL_HANDLER = new HttpClientLifecycleOutboundChannelHandler();
3435

36+
public static final AttributeKey<Boolean> ATTR_OUTBOUND_LAST_CONTENT_PENDING =
37+
AttributeKey.newInstance("_outbound_last_content_pending");
38+
3539
@ChannelHandler.Sharable
3640
private static class HttpClientLifecycleInboundChannelHandler extends ChannelInboundHandlerAdapter {
3741
@Override
@@ -64,9 +68,19 @@ private static class HttpClientLifecycleOutboundChannelHandler extends ChannelOu
6468
@Override
6569
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
6670
if (msg instanceof HttpRequest httpRequest) {
71+
ctx.channel().attr(ATTR_OUTBOUND_LAST_CONTENT_PENDING).set(Boolean.TRUE);
6772
fireStartEvent(ctx, httpRequest);
6873
}
6974

75+
if (msg instanceof LastHttpContent) {
76+
// only clear after content write succeeds to account for retrans
77+
promise.addListener(future -> {
78+
if (future.isSuccess()) {
79+
ctx.channel().attr(ATTR_OUTBOUND_LAST_CONTENT_PENDING).set(null);
80+
}
81+
});
82+
}
83+
7084
super.write(ctx, msg, promise);
7185
}
7286

zuul-core/src/main/java/com/netflix/zuul/netty/connectionpool/ConnectionPoolHandler.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.netflix.netty.common.HttpLifecycleChannelHandler.CompleteEvent;
2020
import static com.netflix.netty.common.HttpLifecycleChannelHandler.CompleteReason;
2121

22+
import com.netflix.netty.common.HttpClientLifecycleChannelHandler;
2223
import com.netflix.spectator.api.Spectator;
2324
import com.netflix.zuul.netty.ChannelUtils;
2425
import com.netflix.zuul.origins.OriginName;
@@ -81,6 +82,15 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
8182
+ ChannelUtils.channelInfoForLogging(ctx.channel());
8283
metrics.headerCloseCounter().increment();
8384
closeConnection(ctx, msg);
85+
} else if (isOutboundLastContentPending(ctx)) {
86+
// response arrived before zuul finished writing the request body; the
87+
// HttpClientCodec encoder is mid-body and the connection cannot be reused
88+
metrics.outboundIncompleteCounter().increment();
89+
String msg = "Origin channel for origin - " + originName
90+
+ " - response completed before request body fully written, closing. "
91+
+ ChannelUtils.channelInfoForLogging(ctx.channel());
92+
LOG.warn(msg);
93+
closeConnection(ctx, msg);
8494
} else {
8595
conn.setConnectionState(PooledConnection.ConnectionState.WRITE_READY);
8696
conn.release();
@@ -148,6 +158,12 @@ private void flagCloseAndReleaseConnection(PooledConnection pooledConnection) {
148158
}
149159
}
150160

161+
private static boolean isOutboundLastContentPending(ChannelHandlerContext ctx) {
162+
return Boolean.TRUE.equals(ctx.channel()
163+
.attr(HttpClientLifecycleChannelHandler.ATTR_OUTBOUND_LAST_CONTENT_PENDING)
164+
.get());
165+
}
166+
151167
private static String getConnectionHeader(CompleteEvent completeEvt) {
152168
HttpResponse response = completeEvt.getResponse();
153169
if (response != null) {

zuul-core/src/main/java/com/netflix/zuul/netty/connectionpool/ConnectionPoolMetrics.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ public record ConnectionPoolMetrics(
5050
Counter inactiveCounter,
5151
Counter errorCounter,
5252
Counter headerCloseCounter,
53-
Counter sslCloseCompletionCounter) {
53+
Counter sslCloseCompletionCounter,
54+
Counter outboundIncompleteCounter) {
5455

5556
public static ConnectionPoolMetrics create(OriginName originName, Registry registry) {
5657
Counter createNewConnCounter = newCounter("connectionpool_create", originName, registry);
@@ -77,6 +78,7 @@ public static ConnectionPoolMetrics create(OriginName originName, Registry regis
7778
Counter errorCounter = newCounter("connectionpool_error", originName, registry);
7879
Counter headerCloseCounter = newCounter("connectionpool_headerClose", originName, registry);
7980
Counter sslCloseCompletionCounter = newCounter("connectionpool_sslClose", originName, registry);
81+
Counter outboundIncompleteCounter = newCounter("connectionpool_outboundIncomplete", originName, registry);
8082

8183
PercentileTimer connEstablishTimer = PercentileTimer.get(
8284
registry, registry.createId("connectionpool_createTiming", "id", originName.getMetricId()));
@@ -107,7 +109,8 @@ public static ConnectionPoolMetrics create(OriginName originName, Registry regis
107109
inactiveCounter,
108110
errorCounter,
109111
headerCloseCounter,
110-
sslCloseCompletionCounter);
112+
sslCloseCompletionCounter,
113+
outboundIncompleteCounter);
111114
}
112115

113116
private static Counter newCounter(String metricName, OriginName originName, Registry registry) {
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright 2026 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.netflix.netty.common;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import io.netty.buffer.Unpooled;
22+
import io.netty.channel.embedded.EmbeddedChannel;
23+
import io.netty.handler.codec.http.DefaultHttpContent;
24+
import io.netty.handler.codec.http.DefaultHttpRequest;
25+
import io.netty.handler.codec.http.DefaultLastHttpContent;
26+
import io.netty.handler.codec.http.HttpMethod;
27+
import io.netty.handler.codec.http.HttpVersion;
28+
import org.junit.jupiter.api.AfterEach;
29+
import org.junit.jupiter.api.BeforeEach;
30+
import org.junit.jupiter.api.Test;
31+
32+
class HttpClientLifecycleChannelHandlerTest {
33+
34+
private EmbeddedChannel channel;
35+
36+
@BeforeEach
37+
void setup() {
38+
channel = new EmbeddedChannel(HttpClientLifecycleChannelHandler.OUTBOUND_CHANNEL_HANDLER);
39+
}
40+
41+
@AfterEach
42+
void tearDown() {
43+
channel.finishAndReleaseAll();
44+
}
45+
46+
@Test
47+
void lastContentPendingAfterRequestHeadersOnly() {
48+
channel.writeOutbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/foo"));
49+
channel.writeOutbound(new DefaultHttpContent(Unpooled.wrappedBuffer(new byte[] {1, 2, 3})));
50+
51+
assertThat(channel.attr(HttpClientLifecycleChannelHandler.ATTR_OUTBOUND_LAST_CONTENT_PENDING)
52+
.get())
53+
.isEqualTo(Boolean.TRUE);
54+
}
55+
56+
@Test
57+
void lastContentPendingClearedAfterLastHttpContent() {
58+
channel.writeOutbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/foo"));
59+
channel.writeOutbound(new DefaultHttpContent(Unpooled.wrappedBuffer(new byte[] {1, 2, 3})));
60+
channel.writeOutbound(new DefaultLastHttpContent());
61+
62+
assertThat(channel.attr(HttpClientLifecycleChannelHandler.ATTR_OUTBOUND_LAST_CONTENT_PENDING)
63+
.get())
64+
.isNull();
65+
}
66+
67+
@Test
68+
void lastContentPendingResetsOnNewRequest() {
69+
channel.writeOutbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/first"));
70+
channel.writeOutbound(new DefaultLastHttpContent());
71+
assertThat(channel.attr(HttpClientLifecycleChannelHandler.ATTR_OUTBOUND_LAST_CONTENT_PENDING)
72+
.get())
73+
.isNull();
74+
75+
// mimic the response side completing the previous request so a second start event will fire
76+
channel.attr(HttpLifecycleChannelHandler.ATTR_STATE).set(null);
77+
78+
channel.writeOutbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/second"));
79+
assertThat(channel.attr(HttpClientLifecycleChannelHandler.ATTR_OUTBOUND_LAST_CONTENT_PENDING)
80+
.get())
81+
.isEqualTo(Boolean.TRUE);
82+
}
83+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright 2026 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.netflix.zuul.netty.connectionpool;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.Mockito.verify;
21+
22+
import com.netflix.netty.common.HttpClientLifecycleChannelHandler;
23+
import com.netflix.netty.common.HttpLifecycleChannelHandler.CompleteEvent;
24+
import com.netflix.netty.common.HttpLifecycleChannelHandler.CompleteReason;
25+
import com.netflix.spectator.api.DefaultRegistry;
26+
import com.netflix.zuul.discovery.DiscoveryResult;
27+
import com.netflix.zuul.origins.OriginName;
28+
import io.netty.channel.embedded.EmbeddedChannel;
29+
import org.junit.jupiter.api.AfterEach;
30+
import org.junit.jupiter.api.BeforeEach;
31+
import org.junit.jupiter.api.Test;
32+
import org.junit.jupiter.api.extension.ExtendWith;
33+
import org.mockito.Mock;
34+
import org.mockito.junit.jupiter.MockitoExtension;
35+
36+
@ExtendWith(MockitoExtension.class)
37+
class ConnectionPoolHandlerTest {
38+
39+
@Mock
40+
private ClientChannelManager channelManager;
41+
42+
private DefaultRegistry registry;
43+
private ConnectionPoolMetrics metrics;
44+
private ConnectionPoolHandler handler;
45+
private EmbeddedChannel channel;
46+
47+
@BeforeEach
48+
void setup() {
49+
registry = new DefaultRegistry();
50+
OriginName originName = OriginName.fromVipAndApp("whatever", "whatever");
51+
metrics = ConnectionPoolMetrics.create(originName, registry);
52+
handler = new ConnectionPoolHandler(metrics);
53+
channel = new EmbeddedChannel(handler);
54+
}
55+
56+
@AfterEach
57+
void tearDown() {
58+
channel.finishAndReleaseAll();
59+
}
60+
61+
@Test
62+
void sessionCompleteReleasesConnectionToPoolWhenNothingPending() {
63+
PooledConnection conn = newPooledConnection();
64+
65+
channel.pipeline().fireUserEventTriggered(new CompleteEvent(CompleteReason.SESSION_COMPLETE, null, null));
66+
67+
verify(channelManager).release(conn);
68+
assertThat(metrics.outboundIncompleteCounter().count()).isZero();
69+
assertThat(channel.isOpen()).isTrue();
70+
}
71+
72+
@Test
73+
void sessionCompleteClosesConnectionWhenLastContentStillPending() {
74+
PooledConnection conn = newPooledConnection();
75+
channel.attr(HttpClientLifecycleChannelHandler.ATTR_OUTBOUND_LAST_CONTENT_PENDING)
76+
.set(Boolean.TRUE);
77+
78+
channel.pipeline().fireUserEventTriggered(new CompleteEvent(CompleteReason.SESSION_COMPLETE, null, null));
79+
80+
assertThat(metrics.outboundIncompleteCounter().count()).isEqualTo(1);
81+
assertThat(conn.isShouldClose()).isTrue();
82+
verify(channelManager).release(conn);
83+
}
84+
85+
private PooledConnection newPooledConnection() {
86+
return new PooledConnection(
87+
channel,
88+
DiscoveryResult.EMPTY,
89+
channelManager,
90+
registry.counter("close"),
91+
registry.counter("closeBusy"));
92+
}
93+
}

zuul-core/src/test/java/com/netflix/zuul/netty/connectionpool/ConnectionPoolMetricsTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public void validateMetricNames() {
6060
validateCounter("connectionpool_error", metrics.errorCounter());
6161
validateCounter("connectionpool_headerClose", metrics.headerCloseCounter());
6262
validateCounter("connectionpool_sslClose", metrics.sslCloseCompletionCounter());
63+
validateCounter("connectionpool_outboundIncomplete", metrics.outboundIncompleteCounter());
6364
}
6465

6566
private void validateCounter(String name, Counter counter) {

zuul-integration-test/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ dependencies {
1414

1515
testImplementation 'org.wiremock:wiremock:3.10.0'
1616
testImplementation 'javax.servlet:javax.servlet-api:4.0.1'
17-
testImplementation libraries.assertj, libraries.jupiterApi, libraries.jupiterParams, libraries.jupiterEngine, libraries.junitPlatformLauncher, libraries.jupiterMockito, libraries.okhttp
17+
testImplementation libraries.assertj, libraries.awaitility, libraries.jupiterApi, libraries.jupiterParams, libraries.jupiterEngine, libraries.junitPlatformLauncher, libraries.jupiterMockito, libraries.okhttp
1818
testImplementation "io.netty:netty-transport-native-io_uring"
1919
// testImplementation "io.netty.incubator:netty-transport-native-io_uring::linux-x86_64"
2020
testImplementation "org.slf4j:slf4j-api:2.0.16"

0 commit comments

Comments
 (0)