From 94b2ec4584caea06c4d25d68608a25c58397ec31 Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Sat, 26 Apr 2025 12:04:01 +0800 Subject: [PATCH] ZOOKEEPER-4923: Add timeout to control brand-new session establishment Refs: ZOOKEEPER-4508, ZOOKEEPER-4921, ZOOKEEPER-4923 and https://lists.apache.org/thread/nfb9z7rhgglbjzfxvg4z2m3pks53b3c1 --- .../java/org/apache/zookeeper/ClientCnxn.java | 40 +++++++++++++++- .../java/org/apache/zookeeper/ZooKeeper.java | 3 ++ .../zookeeper/client/ZooKeeperBuilder.java | 17 +++++++ .../zookeeper/client/ZooKeeperOptions.java | 11 +++++ .../ClientCnxnSocketFragilityTest.java | 6 ++- .../zookeeper/ClientRequestTimeoutTest.java | 4 ++ .../zookeeper/test/SessionTimeoutTest.java | 46 +++++++++++++++++++ 7 files changed, 125 insertions(+), 2 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java index ed03359f7fe..64c225015fa 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -168,6 +168,8 @@ static class AuthData { private final int sessionTimeout; + private final long newSessionTimeout; + private final ZKWatchManager watchManager; private long sessionId; @@ -398,6 +400,36 @@ public ClientCnxn( long sessionId, byte[] sessionPasswd, boolean canBeReadOnly + ) throws IOException { + this(hostProvider, sessionTimeout, Long.MAX_VALUE, clientConfig, defaultWatcher, clientCnxnSocket, sessionId, sessionPasswd, canBeReadOnly); + } + + /** + * Creates a connection object. The actual network connect doesn't get + * established until needed. The start() instance method must be called + * after construction. + * + * @param hostProvider the list of ZooKeeper servers to connect to + * @param sessionTimeout the timeout for connections. + * @param newSessionTimeout the timeout before giving up brand-new session establishment. + * @param clientConfig the client configuration. + * @param defaultWatcher default watcher for this connection + * @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty) + * @param sessionId session id if re-establishing session + * @param sessionPasswd session passwd if re-establishing session + * @param canBeReadOnly whether the connection is allowed to go to read-only mode in case of partitioning + * @throws IOException in cases of broken network + */ + public ClientCnxn( + HostProvider hostProvider, + int sessionTimeout, + long newSessionTimeout, + ZKClientConfig clientConfig, + Watcher defaultWatcher, + ClientCnxnSocket clientCnxnSocket, + long sessionId, + byte[] sessionPasswd, + boolean canBeReadOnly ) throws IOException { this.hostProvider = hostProvider; this.sessionTimeout = sessionTimeout; @@ -413,6 +445,7 @@ public ClientCnxn( this.connectTimeout = sessionTimeout / hostProvider.size(); this.readTimeout = sessionTimeout * 2 / 3; this.expirationTimeout = sessionTimeout * 4 / 3; + this.newSessionTimeout = newSessionTimeout == 0 ? expirationTimeout : newSessionTimeout; this.sendThread = new SendThread(clientCnxnSocket); this.eventThread = new EventThread(); @@ -1192,7 +1225,12 @@ public void run() { to = connectTimeout - clientCnxnSocket.getIdleSend(); } - int expiration = sessionId == 0 ? Integer.MAX_VALUE : expirationTimeout - clientCnxnSocket.getIdleRecv(); + long expiration; + if (sessionId == 0) { + expiration = newSessionTimeout - clientCnxnSocket.getIdleRecv(); + } else { + expiration = expirationTimeout - clientCnxnSocket.getIdleRecv(); + } if (expiration <= 0) { String warnInfo = String.format( "Client session timed out, have not heard from server in %dms for session id 0x%s", diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java index c6ca3bef0a7..1a71bd74fb2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java @@ -672,6 +672,7 @@ public ZooKeeper( ClientCnxn createConnection( HostProvider hostProvider, int sessionTimeout, + long newSessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, @@ -682,6 +683,7 @@ ClientCnxn createConnection( return new ClientCnxn( hostProvider, sessionTimeout, + newSessionTimeout, clientConfig, defaultWatcher, clientCnxnSocket, @@ -1111,6 +1113,7 @@ public ZooKeeper(ZooKeeperOptions options) throws IOException { cnxn = createConnection( hostProvider, sessionTimeout, + options.getNewSessionTimeoutMs(), this.clientConfig, watcher, getClientCnxnSocket(), diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperBuilder.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperBuilder.java index 36f815ab2da..4642b7a0865 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperBuilder.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperBuilder.java @@ -39,6 +39,7 @@ public class ZooKeeperBuilder { private final String connectString; private final Duration sessionTimeout; + private Duration newSessionTimeout = Duration.ofSeconds(Long.MAX_VALUE, 999_999_999L); private Function, HostProvider> hostProvider; private Watcher defaultWatcher; private boolean canBeReadOnly = false; @@ -128,6 +129,21 @@ public ZooKeeperBuilder withSession(long sessionId, byte[] sessionPasswd) { return this; } + /** + * Specifies timeout to establish a brand-new session. + * + * @param timeout timeout to get {@link org.apache.zookeeper.Watcher.Event.KeeperState#Expired} in establishing a + * brand-new session. {@code Duration.ofSeconds(Long.MAX_VALUE, 999_999_999L)}, which is the default, + * means endless retry until connected. {@code Duration.ZERO} means a sensible value deduced from + * specified session timeout, currently, it is approximate {@code sessionTimeout * 4 / 3}. + * @return this + * @since 3.10.0 + */ + public ZooKeeperBuilder withNewSessionTimeout(Duration timeout) { + this.newSessionTimeout = timeout; + return this; + } + /** * Specifies the client config used to construct ZooKeeper instances. * @@ -152,6 +168,7 @@ public ZooKeeperOptions toOptions() { return new ZooKeeperOptions( connectString, sessionTimeout, + newSessionTimeout, defaultWatcher, hostProvider, canBeReadOnly, diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperOptions.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperOptions.java index 52a173ebfff..2605cb0f8c4 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperOptions.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperOptions.java @@ -33,6 +33,7 @@ public class ZooKeeperOptions { private final String connectString; private final Duration sessionTimeout; + private final Duration newSessionTimeout; private final Watcher defaultWatcher; private final Function, HostProvider> hostProvider; private final boolean canBeReadOnly; @@ -42,6 +43,7 @@ public class ZooKeeperOptions { ZooKeeperOptions(String connectString, Duration sessionTimeout, + Duration newSessionTimeout, Watcher defaultWatcher, Function, HostProvider> hostProvider, boolean canBeReadOnly, @@ -50,6 +52,7 @@ public class ZooKeeperOptions { ZKClientConfig clientConfig) { this.connectString = connectString; this.sessionTimeout = sessionTimeout; + this.newSessionTimeout = newSessionTimeout; this.hostProvider = hostProvider; this.defaultWatcher = defaultWatcher; this.canBeReadOnly = canBeReadOnly; @@ -66,6 +69,14 @@ public int getSessionTimeoutMs() { return (int) Long.min(Integer.MAX_VALUE, sessionTimeout.toMillis()); } + public long getNewSessionTimeoutMs() { + try { + return newSessionTimeout.toMillis(); + } catch (ArithmeticException ignored) { + return Long.MAX_VALUE; + } + } + public Watcher getDefaultWatcher() { return defaultWatcher; } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java index 54426f0b6e2..50ca6a9673e 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java @@ -282,6 +282,7 @@ class CustomClientCnxn extends ClientCnxn { public CustomClientCnxn( HostProvider hostProvider, int sessionTimeout, + long newSessionTimeout, ZKClientConfig zkClientConfig, Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, @@ -292,6 +293,7 @@ public CustomClientCnxn( super( hostProvider, sessionTimeout, + newSessionTimeout, zkClientConfig, defaultWatcher, clientCnxnSocket, @@ -357,6 +359,7 @@ public boolean isAlive() { ClientCnxn createConnection( HostProvider hostProvider, int sessionTimeout, + long newSessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, @@ -369,6 +372,7 @@ ClientCnxn createConnection( ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn( hostProvider, sessionTimeout, + newSessionTimeout, clientConfig, defaultWatcher, clientCnxnSocket, @@ -378,4 +382,4 @@ ClientCnxn createConnection( return ClientCnxnSocketFragilityTest.this.cnxn; } } -} \ No newline at end of file +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java index 93f801cabc2..a05825ce2de 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java @@ -225,6 +225,7 @@ class CustomClientCnxn extends ClientCnxn { CustomClientCnxn( HostProvider hostProvider, int sessionTimeout, + long newSessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, @@ -235,6 +236,7 @@ class CustomClientCnxn extends ClientCnxn { super( hostProvider, sessionTimeout, + newSessionTimeout, clientConfig, defaultWatcher, clientCnxnSocket, @@ -286,6 +288,7 @@ public CustomZooKeeper(String connectString, int sessionTimeout, Watcher watcher ClientCnxn createConnection( HostProvider hostProvider, int sessionTimeout, + long newSessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, @@ -296,6 +299,7 @@ ClientCnxn createConnection( return new CustomClientCnxn( hostProvider, sessionTimeout, + newSessionTimeout, clientConfig, defaultWatcher, clientCnxnSocket, diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java index 9f5943f6821..7668e1dfefd 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java @@ -19,6 +19,7 @@ package org.apache.zookeeper.test; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThan; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -29,6 +30,7 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -42,6 +44,7 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.client.ZooKeeperBuilder; import org.apache.zookeeper.common.Time; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -201,6 +204,49 @@ public void testSessionExpirationWhenNoServerUp() throws Exception { assertThrows(TimeoutException.class, () -> watcher.expired.get(3 * sessionTimeout, TimeUnit.MILLISECONDS)); assertThrows(KeeperException.ConnectionLossException.class, () -> zk.exists("/", null)); } + + // when: try to establish a brand-new session using builder with default newSessionTimeout + watcher.reset(); + try (ZooKeeper zk = new ZooKeeperBuilder(hostPort, Duration.ofMillis(sessionTimeout)) + .withDefaultWatcher(watcher) + .build()) { + // then: never Expired + assertThrows(TimeoutException.class, () -> watcher.expired.get(3 * sessionTimeout, TimeUnit.MILLISECONDS)); + assertThrows(KeeperException.ConnectionLossException.class, () -> zk.exists("/", null)); + } + + // when: try to establish a brand-new session using builder with Duration.ZERO newSessionTimeout + watcher.reset(); + long start = Time.currentElapsedTime(); + try (ZooKeeper zk = new ZooKeeperBuilder(hostPort, Duration.ofMillis(sessionTimeout)) + .withDefaultWatcher(watcher) + .withNewSessionTimeout(Duration.ZERO) + .build()) { + // then: get Expired after some delay + watcher.expired.join(); + long elapsed = Time.currentElapsedTime() - start; + assertThat(elapsed, greaterThan((long) sessionTimeout)); + assertThat(elapsed, lessThan(sessionTimeout * 10L)); + // then: future request will get SessionExpiredException + assertThrows(KeeperException.SessionExpiredException.class, () -> zk.exists("/", null)); + } + + // when: try to establish a brand-new session using builder with custom newSessionTimeout + watcher.reset(); + start = Time.currentElapsedTime(); + Duration newSessionTimeout = Duration.ofMillis(300); + try (ZooKeeper zk = new ZooKeeperBuilder(hostPort, Duration.ofMillis(30000)) + .withDefaultWatcher(watcher) + .withNewSessionTimeout(newSessionTimeout) + .build()) { + // then: get Expired after newSessionTimeout + watcher.expired.join(); + long elapsed = Time.currentElapsedTime() - start; + assertThat(elapsed, greaterThanOrEqualTo(newSessionTimeout.toMillis())); + assertThat(elapsed, lessThan(newSessionTimeout.toMillis() * 10)); + // then: future request will get SessionExpiredException + assertThrows(KeeperException.SessionExpiredException.class, () -> zk.exists("/", null)); + } } @Test