From 00910e109b16ad893e9fd5c3edea8f0915a53310 Mon Sep 17 00:00:00 2001 From: shirx Date: Sun, 28 Sep 2025 21:07:03 +0800 Subject: [PATCH] drc no need proxy to close channel --- .../integratedtest/metaserver/proxy/LocalProxyConfig.java | 5 +++++ .../ctrip/xpipe/redis/proxy/config/DefaultProxyConfig.java | 7 +++++++ .../com/ctrip/xpipe/redis/proxy/config/ProxyConfig.java | 5 +++++ .../com/ctrip/xpipe/redis/proxy/tunnel/DefaultTunnel.java | 4 ++-- .../java/com/ctrip/xpipe/redis/proxy/TestProxyConfig.java | 5 +++++ 5 files changed, 24 insertions(+), 2 deletions(-) diff --git a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/metaserver/proxy/LocalProxyConfig.java b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/metaserver/proxy/LocalProxyConfig.java index ce4770ac70..c6166a17cd 100644 --- a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/metaserver/proxy/LocalProxyConfig.java +++ b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/metaserver/proxy/LocalProxyConfig.java @@ -91,6 +91,11 @@ public boolean isCompressEnabled() { return compress; } + @Override + public boolean allowCloseChannel() { + return true; + } + @Override public CompressAlgorithm getCompressAlgorithm() { return new CompressAlgorithm() { diff --git a/redis/redis-proxy/src/main/java/com/ctrip/xpipe/redis/proxy/config/DefaultProxyConfig.java b/redis/redis-proxy/src/main/java/com/ctrip/xpipe/redis/proxy/config/DefaultProxyConfig.java index 18663ec112..f682caa232 100644 --- a/redis/redis-proxy/src/main/java/com/ctrip/xpipe/redis/proxy/config/DefaultProxyConfig.java +++ b/redis/redis-proxy/src/main/java/com/ctrip/xpipe/redis/proxy/config/DefaultProxyConfig.java @@ -68,6 +68,8 @@ public class DefaultProxyConfig extends AbstractConfigBean implements ProxyConfi private static final String KEY_CROSS_REGION_TRAFFIC_CONTROL_LIMIT = "proxy.cross.region.traffic.control.limit"; + private static final String ALLOW_CLOSE_CHANNEL = "proxy.allow.close.channel"; + public DefaultProxyConfig() { CompositeConfig compositeConfig = new CompositeConfig(); compositeConfig.addConfig(Config.DEFAULT); @@ -215,4 +217,9 @@ public boolean isCrossRegionTrafficControlEnabled() { public long getCrossRegionTrafficControlLimit() { return getLongProperty(KEY_CROSS_REGION_TRAFFIC_CONTROL_LIMIT, 200 * 1024 * 1024L); // 100MB/s default } + + @Override + public boolean allowCloseChannel() { + return getBooleanProperty(ALLOW_CLOSE_CHANNEL, true); + } } diff --git a/redis/redis-proxy/src/main/java/com/ctrip/xpipe/redis/proxy/config/ProxyConfig.java b/redis/redis-proxy/src/main/java/com/ctrip/xpipe/redis/proxy/config/ProxyConfig.java index 95a34885a5..78b10da141 100644 --- a/redis/redis-proxy/src/main/java/com/ctrip/xpipe/redis/proxy/config/ProxyConfig.java +++ b/redis/redis-proxy/src/main/java/com/ctrip/xpipe/redis/proxy/config/ProxyConfig.java @@ -61,4 +61,9 @@ public interface ProxyConfig extends TLSConfig { */ long getCrossRegionTrafficControlLimit(); + /** + * DRC does not allow the proxy to actively close the channel during backpressure. + * @return true if allowing the proxy to actively close the connection, false otherwise. + */ + boolean allowCloseChannel(); } diff --git a/redis/redis-proxy/src/main/java/com/ctrip/xpipe/redis/proxy/tunnel/DefaultTunnel.java b/redis/redis-proxy/src/main/java/com/ctrip/xpipe/redis/proxy/tunnel/DefaultTunnel.java index 5006ce0557..03054ffef4 100644 --- a/redis/redis-proxy/src/main/java/com/ctrip/xpipe/redis/proxy/tunnel/DefaultTunnel.java +++ b/redis/redis-proxy/src/main/java/com/ctrip/xpipe/redis/proxy/tunnel/DefaultTunnel.java @@ -236,10 +236,10 @@ protected void check() { long maxBlockWait = config.getBlockWaitBaseMill() + DefaultProxyServer.WRITE_HIGH_WATER_MARK / config.getBlockWaitRate(); long localFrontendBlockFrom = frontendBlockFrom.get(); long localBackendBlockFrom = backendBlockFrom.get(); - if (localFrontendBlockFrom > 0 && current - localFrontendBlockFrom > maxBlockWait) { + if (localFrontendBlockFrom > 0 && current - localFrontendBlockFrom > maxBlockWait && config.allowCloseChannel()) { logger.warn("[check][frontendLongBlock][{}] close", current - localFrontendBlockFrom); frontend.release(); - } else if (localBackendBlockFrom > 0 && current - localBackendBlockFrom > maxBlockWait) { + } else if (localBackendBlockFrom > 0 && current - localBackendBlockFrom > maxBlockWait && config.allowCloseChannel()) { logger.warn("[check][backendLongBlock][{}] close", current - localBackendBlockFrom); backend.release(); } diff --git a/redis/redis-proxy/src/test/java/com/ctrip/xpipe/redis/proxy/TestProxyConfig.java b/redis/redis-proxy/src/test/java/com/ctrip/xpipe/redis/proxy/TestProxyConfig.java index 2b87b78ce3..b4bf99e44e 100644 --- a/redis/redis-proxy/src/test/java/com/ctrip/xpipe/redis/proxy/TestProxyConfig.java +++ b/redis/redis-proxy/src/test/java/com/ctrip/xpipe/redis/proxy/TestProxyConfig.java @@ -186,4 +186,9 @@ public TestProxyConfig setStartMonitor(boolean startMonitor) { public void setCompress(boolean compress) { this.compress = compress; } + + @Override + public boolean allowCloseChannel() { + return true; + } }