diff --git a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java index 12a12e77c9..77e086995f 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java @@ -1113,11 +1113,6 @@ public RedisFuture>> functionList(String libraryName) { return dispatch(commandBuilder.functionList(libraryName)); } - @Override - public void flushCommands() { - connection.flushCommands(); - } - @Override public RedisFuture flushall() { return dispatch(commandBuilder.flushall()); @@ -1553,11 +1548,6 @@ public RedisFuture info(String section) { return dispatch(commandBuilder.info(section)); } - @Override - public boolean isOpen() { - return connection.isOpen(); - } - @Override public RedisFuture ftCreate(K index, CreateArgs options, List> fieldArgs) { return dispatch(searchCommandBuilder.ftCreate(index, options, fieldArgs)); @@ -2603,11 +2593,6 @@ public RedisFuture setGet(K key, V value, SetArgs setArgs) { return dispatch(commandBuilder.setGet(key, value, setArgs)); } - @Override - public void setAutoFlushCommands(boolean autoFlush) { - connection.setAutoFlushCommands(autoFlush); - } - @Override public void setTimeout(Duration timeout) { connection.setTimeout(timeout); diff --git a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java index 65c1ed3e3a..7d3717bfe0 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java @@ -492,10 +492,6 @@ public Mono clientUnblock(long id, UnblockType type) { return createMono(() -> commandBuilder.clientUnblock(id, type)); } - public void close() { - connection.close(); - } - @Override public Mono clusterAddSlots(int... slots) { return createMono(() -> commandBuilder.clusterAddslots(slots)); @@ -1177,11 +1173,6 @@ public Flux> functionList(String libraryName) { return createDissolvingFlux(() -> commandBuilder.functionList(libraryName)); } - @Override - public void flushCommands() { - connection.flushCommands(); - } - @Override public Mono flushall() { return createMono(commandBuilder::flushall); @@ -1618,11 +1609,6 @@ public Mono info(String section) { return createMono(() -> commandBuilder.info(section)); } - @Override - public boolean isOpen() { - return connection.isOpen(); - } - @Override public Mono ftCreate(K index, CreateArgs options, List> fieldArgs) { return createMono(() -> searchCommandBuilder.ftCreate(index, options, fieldArgs)); @@ -2688,11 +2674,6 @@ public Mono setGet(K key, V value, SetArgs setArgs) { return createMono(() -> commandBuilder.setGet(key, value, setArgs)); } - @Override - public void setAutoFlushCommands(boolean autoFlush) { - connection.setAutoFlushCommands(autoFlush); - } - @Override public void setTimeout(Duration timeout) { connection.setTimeout(timeout); diff --git a/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java b/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java index 76ad7e0239..abd692c531 100644 --- a/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java @@ -188,34 +188,4 @@ public interface BaseRedisAsyncCommands { */ RedisFuture dispatch(ProtocolKeyword type, CommandOutput output, CommandArgs args); - /** - * @return {@code true} if the connection is open (connected and not closed). - * @deprecated since 6.2. Use the corresponding {@link io.lettuce.core.api.StatefulConnection#isOpen()} method on the - * connection interface. To be removed with Lettuce 7.0. - */ - @Deprecated - boolean isOpen(); - - /** - * Disable or enable auto-flush behavior. Default is {@code true}. If autoFlushCommands is disabled, multiple commands can - * be issued without writing them actually to the transport. Commands are buffered until a {@link #flushCommands()} is - * issued. After calling {@link #flushCommands()} commands are sent to the transport and executed by Redis. - * - * @param autoFlush state of autoFlush. - * @deprecated since 6.2. Use the corresponding {@link io.lettuce.core.api.StatefulConnection#setAutoFlushCommands(boolean)} - * method on the connection interface. To be removed with Lettuce 7.0. - */ - @Deprecated - void setAutoFlushCommands(boolean autoFlush); - - /** - * Flush pending commands. This commands forces a flush on the channel and can be used to buffer ("pipeline") commands to - * achieve batching. No-op if channel is not connected. - * - * @deprecated since 6.2. Use the corresponding {@link io.lettuce.core.api.StatefulConnection#flushCommands()} method on the - * connection interface. To be removed with Lettuce 7.0. - */ - @Deprecated - void flushCommands(); - } diff --git a/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java b/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java index 0b1a848b6d..5a6a7ca945 100644 --- a/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java @@ -189,36 +189,6 @@ public interface BaseRedisReactiveCommands { */ Flux dispatch(ProtocolKeyword type, CommandOutput output, CommandArgs args); - /** - * @return {@code true} if the connection is open (connected and not closed). - * @deprecated since 6.2. Use the corresponding {@link io.lettuce.core.api.StatefulConnection#isOpen()} method on the - * connection interface. To be removed with Lettuce 7.0. - */ - @Deprecated - boolean isOpen(); - - /** - * Disable or enable auto-flush behavior. Default is {@code true}. If autoFlushCommands is disabled, multiple commands can - * be issued without writing them actually to the transport. Commands are buffered until a {@link #flushCommands()} is - * issued. After calling {@link #flushCommands()} commands are sent to the transport and executed by Redis. - * - * @param autoFlush state of autoFlush. - * @deprecated since 6.2. Use the corresponding {@link io.lettuce.core.api.StatefulConnection#setAutoFlushCommands(boolean)} - * method on the connection interface. To be removed with Lettuce 7.0. - */ - @Deprecated - void setAutoFlushCommands(boolean autoFlush); - - /** - * Flush pending commands. This commands forces a flush on the channel and can be used to buffer ("pipeline") commands to - * achieve batching. No-op if channel is not connected. - * - * @deprecated since 6.2. Use the corresponding {@link io.lettuce.core.api.StatefulConnection#flushCommands()} method on the - * connection interface. To be removed with Lettuce 7.0. - */ - @Deprecated - void flushCommands(); - /** * @return the currently configured instance of the {@link JsonParser} * @since 6.5 diff --git a/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java b/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java index f5dff096a0..5abaaa10db 100644 --- a/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java +++ b/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java @@ -187,12 +187,4 @@ public interface BaseRedisCommands { */ T dispatch(ProtocolKeyword type, CommandOutput output, CommandArgs args); - /** - * @return {@code true} if the connection is open (connected and not closed). - * @deprecated since 6.2. Use the corresponding {@link io.lettuce.core.api.StatefulConnection#isOpen()} method on the - * connection interface. To be removed with Lettuce 7.0. - */ - @Deprecated - boolean isOpen(); - } diff --git a/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterAsyncCommandsImpl.java b/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterAsyncCommandsImpl.java index d8f8e40923..928a7d19ea 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterAsyncCommandsImpl.java +++ b/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterAsyncCommandsImpl.java @@ -148,22 +148,22 @@ public RedisFuture clientSetname(K name) { RedisURI uri = redisClusterNode.getUri(); - CompletableFuture> byNodeId = getConnectionAsync(redisClusterNode.getNodeId()); + CompletableFuture> byNodeId = getStatefulConnection(redisClusterNode.getNodeId()); executions.put("NodeId: " + redisClusterNode.getNodeId(), byNodeId.thenCompose(c -> { if (c.isOpen()) { - return c.clientSetname(name); + return c.async().clientSetname(name); } return ok; })); - CompletableFuture> byHost = getConnectionAsync(uri.getHost(), uri.getPort()); + CompletableFuture> byHost = getStatefulConnection(uri.getHost(), uri.getPort()); executions.put("HostAndPort: " + redisClusterNode.getNodeId(), byHost.thenCompose(c -> { if (c.isOpen()) { - return c.clientSetname(name); + return c.async().clientSetname(name); } return ok; })); @@ -596,9 +596,12 @@ public RedisClusterAsyncCommands getConnection(String host, int port) { return getStatefulConnection().getConnection(host, port).async(); } - private CompletableFuture> getConnectionAsync(String nodeId) { - return getConnectionProvider(). getConnectionAsync(ConnectionIntent.WRITE, nodeId) - .thenApply(StatefulRedisConnection::async); + private CompletableFuture> getStatefulConnection(String nodeId) { + return getConnectionProvider().getConnectionAsync(ConnectionIntent.WRITE, nodeId); + } + + private CompletableFuture> getStatefulConnection(String host, int port) { + return getConnectionProvider().getConnectionAsync(ConnectionIntent.WRITE, host, port); } private CompletableFuture> getConnectionAsync(String host, int port) { diff --git a/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java b/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java index 448c4073bb..09ba59ecb7 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java +++ b/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java @@ -136,23 +136,23 @@ public Mono clientSetname(K name) { for (RedisClusterNode redisClusterNode : getStatefulConnection().getPartitions()) { - Mono> byNodeId = getConnectionReactive(redisClusterNode.getNodeId()); + Mono> byNodeId = getStatefulConnection(redisClusterNode.getNodeId()); publishers.add(byNodeId.flatMap(conn -> { if (conn.isOpen()) { - return conn.clientSetname(name); + return conn.reactive().clientSetname(name); } return Mono.empty(); })); - Mono> byHost = getConnectionReactive(redisClusterNode.getUri().getHost(), + Mono> byHost = getStatefulConnection(redisClusterNode.getUri().getHost(), redisClusterNode.getUri().getPort()); publishers.add(byHost.flatMap(conn -> { if (conn.isOpen()) { - return conn.clientSetname(name); + return conn.reactive().clientSetname(name); } return Mono.empty(); })); @@ -441,6 +441,10 @@ public RedisClusterReactiveCommands getConnection(String nodeId) { return getStatefulConnection().getConnection(nodeId).reactive(); } + private Mono> getStatefulConnection(String nodeId) { + return getMono(getConnectionProvider().getConnectionAsync(ConnectionIntent.WRITE, nodeId)); + } + private Mono> getConnectionReactive(String nodeId) { return getMono(getConnectionProvider(). getConnectionAsync(ConnectionIntent.WRITE, nodeId)) .map(StatefulRedisConnection::reactive); @@ -456,6 +460,10 @@ private Mono> getConnectionReactive(String ho .map(StatefulRedisConnection::reactive); } + private Mono> getStatefulConnection(String host, int port) { + return getMono(getConnectionProvider(). getConnectionAsync(ConnectionIntent.WRITE, host, port)); + } + @Override public StatefulRedisClusterConnection getStatefulConnection() { return (StatefulRedisClusterConnection) super.getConnection(); diff --git a/src/main/java/io/lettuce/core/sentinel/RedisSentinelAsyncCommandsImpl.java b/src/main/java/io/lettuce/core/sentinel/RedisSentinelAsyncCommandsImpl.java index 4cf9ebbc34..d33d7f328f 100644 --- a/src/main/java/io/lettuce/core/sentinel/RedisSentinelAsyncCommandsImpl.java +++ b/src/main/java/io/lettuce/core/sentinel/RedisSentinelAsyncCommandsImpl.java @@ -196,15 +196,6 @@ public AsyncCommand dispatch(RedisCommand cmd) { return asyncCommand; } - public void close() { - connection.close(); - } - - @Override - public boolean isOpen() { - return connection.isOpen(); - } - @Override public StatefulRedisSentinelConnection getStatefulConnection() { return (StatefulRedisSentinelConnection) connection; diff --git a/src/main/java/io/lettuce/core/sentinel/RedisSentinelReactiveCommandsImpl.java b/src/main/java/io/lettuce/core/sentinel/RedisSentinelReactiveCommandsImpl.java index af5937d4e2..4437bfdfb0 100644 --- a/src/main/java/io/lettuce/core/sentinel/RedisSentinelReactiveCommandsImpl.java +++ b/src/main/java/io/lettuce/core/sentinel/RedisSentinelReactiveCommandsImpl.java @@ -189,16 +189,6 @@ public Flux dispatch(ProtocolKeyword type, CommandOutput output, return (Flux) createFlux(() -> new Command<>(type, output, args)); } - @Override - public void close() { - getStatefulConnection().close(); - } - - @Override - public boolean isOpen() { - return getStatefulConnection().isOpen(); - } - @Override public StatefulRedisSentinelConnection getStatefulConnection() { return (StatefulRedisSentinelConnection) super.getConnection(); diff --git a/src/main/java/io/lettuce/core/sentinel/api/async/RedisSentinelAsyncCommands.java b/src/main/java/io/lettuce/core/sentinel/api/async/RedisSentinelAsyncCommands.java index cd3462d7f5..503306341b 100644 --- a/src/main/java/io/lettuce/core/sentinel/api/async/RedisSentinelAsyncCommands.java +++ b/src/main/java/io/lettuce/core/sentinel/api/async/RedisSentinelAsyncCommands.java @@ -249,11 +249,6 @@ public interface RedisSentinelAsyncCommands { */ RedisFuture dispatch(ProtocolKeyword type, CommandOutput output, CommandArgs args); - /** - * @return {@code true} if the connection is open (connected and not closed). - */ - boolean isOpen(); - /** * @return the underlying connection. */ diff --git a/src/main/java/io/lettuce/core/sentinel/api/reactive/RedisSentinelReactiveCommands.java b/src/main/java/io/lettuce/core/sentinel/api/reactive/RedisSentinelReactiveCommands.java index 04fb113034..18b01b4fad 100644 --- a/src/main/java/io/lettuce/core/sentinel/api/reactive/RedisSentinelReactiveCommands.java +++ b/src/main/java/io/lettuce/core/sentinel/api/reactive/RedisSentinelReactiveCommands.java @@ -249,11 +249,6 @@ public interface RedisSentinelReactiveCommands { */ Flux dispatch(ProtocolKeyword type, CommandOutput output, CommandArgs args); - /** - * @return {@code true} if the connection is open (connected and not closed). - */ - boolean isOpen(); - /** * @return the underlying connection. */ diff --git a/src/main/java/io/lettuce/core/sentinel/api/sync/RedisSentinelCommands.java b/src/main/java/io/lettuce/core/sentinel/api/sync/RedisSentinelCommands.java index b7089c80ee..bef0820423 100644 --- a/src/main/java/io/lettuce/core/sentinel/api/sync/RedisSentinelCommands.java +++ b/src/main/java/io/lettuce/core/sentinel/api/sync/RedisSentinelCommands.java @@ -248,11 +248,6 @@ public interface RedisSentinelCommands { */ T dispatch(ProtocolKeyword type, CommandOutput output, CommandArgs args); - /** - * @return {@code true} if the connection is open (connected and not closed). - */ - boolean isOpen(); - /** * @return the underlying connection. */ diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt index 475a9f81bc..6847dc02a4 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt @@ -188,34 +188,5 @@ interface BaseRedisCoroutinesCommands { */ fun dispatch(type: ProtocolKeyword, output: CommandOutput, args: CommandArgs): Flow - /** - * @return {@code true} if the connection is open (connected and not closed). - * @deprecated since 6.2. Use the corresponding [io.lettuce.core.api.StatefulConnection#isOpen()] method on the connection - * interface. To be removed with Lettuce 7.0. - */ - @Deprecated("since 6.2, to be removed with Lettuce 7") - fun isOpen(): Boolean - - /** - * Disable or enable auto-flush behavior. Default is `true`. If autoFlushCommands is disabled, multiple commands can - * be issued without writing them actually to the transport. Commands are buffered until a [flushCommands] is - * issued. After calling [flushCommands] commands are sent to the transport and executed by Redis. - * - * @param autoFlush state of autoFlush. - * @deprecated since 6.2. Use the corresponding [io.lettuce.core.api.StatefulConnection#setAutoFlushCommands(boolean)] method on the connection - * interface. To be removed with Lettuce 7.0. - */ - @Deprecated("since 6.2, to be removed with Lettuce 7") - fun setAutoFlushCommands(autoFlush: Boolean) - - /** - * Flush pending commands. This commands forces a flush on the channel and can be used to buffer ("pipeline") commands to - * achieve batching. No-op if channel is not connected. - * @deprecated since 6.2. Use the corresponding [io.lettuce.core.api.StatefulConnection#flushCommands()] method on the connection - * interface. To be removed with Lettuce 7.0. - */ - @Deprecated("since 6.2, to be removed with Lettuce 7") - fun flushCommands() - } diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt index 6351169241..60a6f30204 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt @@ -80,11 +80,5 @@ internal class BaseRedisCoroutinesCommandsImpl(internal val op override fun dispatch(type: ProtocolKeyword, output: CommandOutput, args: CommandArgs): Flow = ops.dispatch(type, output, args).asFlow() - override fun isOpen(): Boolean = ops.isOpen - - override fun setAutoFlushCommands(autoFlush: Boolean) = ops.setAutoFlushCommands(autoFlush) - - override fun flushCommands() = ops.flushCommands() - } diff --git a/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommands.kt b/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommands.kt index cc8e6e4862..08198a9aab 100644 --- a/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommands.kt +++ b/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommands.kt @@ -246,11 +246,5 @@ interface RedisSentinelCoroutinesCommands { */ fun dispatch(type: ProtocolKeyword, output: CommandOutput, args: CommandArgs): Flow - /** - * - * @return @code true} if the connection is open (connected and not closed). - */ - fun isOpen(): Boolean - } diff --git a/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommandsImpl.kt index d0b9cff033..254c1d128b 100644 --- a/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommandsImpl.kt @@ -99,7 +99,5 @@ internal class RedisSentinelCoroutinesCommandsImpl(internal va override fun dispatch(type: ProtocolKeyword, output: CommandOutput, args: CommandArgs): Flow = ops.dispatch(type, output, args).asFlow() - override fun isOpen(): Boolean = ops.isOpen - } diff --git a/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java b/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java index 7d5569f2f1..eb396c4f04 100644 --- a/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java +++ b/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java @@ -188,36 +188,4 @@ public interface BaseRedisCommands { */ T dispatch(ProtocolKeyword type, CommandOutput output, CommandArgs args); - /** - * @return {@code true} if the connection is open (connected and not closed). - * @deprecated since 6.2. Use the corresponding {@link io.lettuce.core.api.StatefulConnection#isOpen()} method on the - * connection interface. To be removed with Lettuce 7.0. - */ - @Deprecated - boolean isOpen(); - - /** - * Disable or enable auto-flush behavior. Default is {@code true}. If autoFlushCommands is disabled, multiple commands can - * be issued without writing them actually to the transport. Commands are buffered until a {@link #flushCommands()} is - * issued. After calling {@link #flushCommands()} commands are sent to the transport and executed by Redis. - * - * @param autoFlush state of autoFlush. - * @deprecated since 6.2. Use the corresponding {@link io.lettuce.core.api.StatefulConnection#setAutoFlushCommands(boolean)} - * method on the connection interface. To be removed with Lettuce 7.0. - * - */ - @Deprecated - void setAutoFlushCommands(boolean autoFlush); - - /** - * Flush pending commands. This commands forces a flush on the channel and can be used to buffer ("pipeline") commands to - * achieve batching. No-op if channel is not connected. - * - * @deprecated since 6.2. Use the corresponding {@link io.lettuce.core.api.StatefulConnection#flushCommands()} method on the - * connection interface. To be removed with Lettuce 7.0. - * - */ - @Deprecated - void flushCommands(); - } diff --git a/src/main/templates/io/lettuce/core/api/RedisSentinelCommands.java b/src/main/templates/io/lettuce/core/api/RedisSentinelCommands.java index 9e7db5137b..133d926e23 100644 --- a/src/main/templates/io/lettuce/core/api/RedisSentinelCommands.java +++ b/src/main/templates/io/lettuce/core/api/RedisSentinelCommands.java @@ -247,12 +247,6 @@ public interface RedisSentinelCommands { */ T dispatch(ProtocolKeyword type, CommandOutput output, CommandArgs args); - /** - * - * @return {@code true} if the connection is open (connected and not closed). - */ - boolean isOpen(); - /** * * @return the underlying connection. diff --git a/src/test/java/biz/paluch/redis/extensibility/MyExtendedRedisClientIntegrationTests.java b/src/test/java/biz/paluch/redis/extensibility/MyExtendedRedisClientIntegrationTests.java index 12b559a4be..076b6e384a 100644 --- a/src/test/java/biz/paluch/redis/extensibility/MyExtendedRedisClientIntegrationTests.java +++ b/src/test/java/biz/paluch/redis/extensibility/MyExtendedRedisClientIntegrationTests.java @@ -47,12 +47,12 @@ static void shutdownClient() { @Test void testPubsub() throws Exception { - StatefulRedisPubSubConnection connection = client.connectPubSub(); - RedisPubSubAsyncCommands commands = connection.async(); - assertThat(commands).isInstanceOf(RedisPubSubAsyncCommandsImpl.class); - assertThat(commands.getStatefulConnection()).isInstanceOf(MyPubSubConnection.class); - commands.set("key", "value").get(); - connection.close(); + try (StatefulRedisPubSubConnection connection = client.connectPubSub()) { + RedisPubSubAsyncCommands commands = connection.async(); + assertThat(commands).isInstanceOf(RedisPubSubAsyncCommandsImpl.class); + assertThat(connection).isInstanceOf(MyPubSubConnection.class); + commands.set("key", "value").get(); + } } } diff --git a/src/test/java/io/lettuce/core/AbstractRedisClientTest.java b/src/test/java/io/lettuce/core/AbstractRedisClientTest.java index b7a338baec..dc0c3f12ef 100644 --- a/src/test/java/io/lettuce/core/AbstractRedisClientTest.java +++ b/src/test/java/io/lettuce/core/AbstractRedisClientTest.java @@ -19,13 +19,13 @@ */ package io.lettuce.core; +import io.lettuce.core.api.StatefulRedisConnection; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import io.lettuce.core.api.sync.RedisCommands; import io.lettuce.test.resource.DefaultRedisClient; -import io.lettuce.test.resource.TestClientResources; /** * @author Will Glozer @@ -37,23 +37,21 @@ public abstract class AbstractRedisClientTest extends TestSupport { protected RedisCommands redis; + protected StatefulRedisConnection statefulRedisConnection; + @BeforeAll public static void setupClient() { client = DefaultRedisClient.get(); client.setOptions(ClientOptions.create()); } - private static RedisClient newRedisClient() { - return RedisClient.create(TestClientResources.get(), RedisURI.Builder.redis(host, port).build()); - } - protected RedisCommands connect() { - RedisCommands connect = client.connect().sync(); - return connect; + statefulRedisConnection = client.connect(); + return statefulRedisConnection.sync(); } @BeforeEach - public void openConnection() throws Exception { + public void openConnection() { client.setOptions(ClientOptions.builder().build()); redis = connect(); boolean scriptRunning; @@ -77,9 +75,9 @@ public void openConnection() throws Exception { } @AfterEach - public void closeConnection() throws Exception { - if (redis != null) { - redis.getStatefulConnection().close(); + public void closeConnection() { + if (statefulRedisConnection != null) { + statefulRedisConnection.close(); } } diff --git a/src/test/java/io/lettuce/core/AsyncConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/AsyncConnectionIntegrationTests.java index 3be2c9200b..2651d09d01 100644 --- a/src/test/java/io/lettuce/core/AsyncConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/AsyncConnectionIntegrationTests.java @@ -19,48 +19,42 @@ */ package io.lettuce.core; -import static io.lettuce.TestTags.INTEGRATION_TEST; -import static org.assertj.core.api.Assertions.assertThat; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.internal.Futures; +import io.lettuce.test.Delay; +import io.lettuce.test.LettuceExtension; +import io.lettuce.test.TestFutures; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import javax.enterprise.inject.New; +import javax.inject.Inject; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import javax.inject.Inject; - -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - -import io.lettuce.core.api.StatefulRedisConnection; -import io.lettuce.core.api.async.RedisAsyncCommands; -import io.lettuce.core.internal.Futures; -import io.lettuce.test.Delay; -import io.lettuce.test.LettuceExtension; -import io.lettuce.test.TestFutures; +import static io.lettuce.TestTags.INTEGRATION_TEST; +import static org.assertj.core.api.Assertions.assertThat; /** * @author Will Glozer * @author Mark Paluch + * @author Hari Mani */ @Tag(INTEGRATION_TEST) @ExtendWith(LettuceExtension.class) class AsyncConnectionIntegrationTests extends TestSupport { - private final RedisClient client; - - private final StatefulRedisConnection connection; - private final RedisAsyncCommands async; @Inject - AsyncConnectionIntegrationTests(RedisClient client, StatefulRedisConnection connection) { - this.client = client; - this.connection = connection; + AsyncConnectionIntegrationTests(@New final StatefulRedisConnection connection) { this.async = connection.async(); - this.connection.sync().flushall(); + connection.sync().flushall(); } @Test @@ -94,61 +88,38 @@ void watch() { @Test void futureListener() { - - final List run = new ArrayList<>(); - - Runnable listener = () -> run.add(new Object()); - - List> futures = new ArrayList<>(); - + // using 'key' causes issues for some strange reason so using a fresh key + final String listKey = "list:" + key; + final List> futures = new ArrayList<>(); for (int i = 0; i < 1000; i++) { - futures.add(async.lpush(key, "" + i)); + futures.add(async.lpush(listKey, "" + i)); } - TestFutures.awaitOrTimeout(futures); - RedisAsyncCommands connection = client.connect().async(); - - Long len = TestFutures.getOrTimeout(connection.llen(key)); + Long len = TestFutures.getOrTimeout(async.llen(listKey)); assertThat(len.intValue()).isEqualTo(1000); - RedisFuture> sort = connection.sort(key); + RedisFuture> sort = async.sort(listKey); assertThat(sort.isCancelled()).isFalse(); - sort.thenRun(listener); + final List run = new ArrayList<>(); + sort.thenRun(() -> run.add(new Object())); TestFutures.awaitOrTimeout(sort); Delay.delay(Duration.ofMillis(100)); assertThat(run).hasSize(1); - - connection.getStatefulConnection().close(); } @Test void futureListenerCompleted() { - - final List run = new ArrayList<>(); - - Runnable listener = new Runnable() { - - @Override - public void run() { - run.add(new Object()); - } - - }; - - RedisAsyncCommands connection = client.connect().async(); - - RedisFuture set = connection.set(key, value); + final RedisFuture set = async.set(key, value); TestFutures.awaitOrTimeout(set); - set.thenRun(listener); + final List run = new ArrayList<>(); + set.thenRun(() -> run.add(new Object())); assertThat(run).hasSize(1); - - connection.getStatefulConnection().close(); } @Test diff --git a/src/test/java/io/lettuce/core/ClientIntegrationTests.java b/src/test/java/io/lettuce/core/ClientIntegrationTests.java index d9b630edc0..4361574269 100644 --- a/src/test/java/io/lettuce/core/ClientIntegrationTests.java +++ b/src/test/java/io/lettuce/core/ClientIntegrationTests.java @@ -29,6 +29,7 @@ /** * @author Will Glozer * @author Mark Paluch + * @author Hari Mani */ @Tag(INTEGRATION_TEST) @ExtendWith(LettuceExtension.class) @@ -36,19 +37,20 @@ class ClientIntegrationTests extends TestSupport { private final RedisClient client; + private final StatefulRedisConnection connection; + private final RedisCommands redis; @Inject - ClientIntegrationTests(RedisClient client, StatefulRedisConnection connection) { + ClientIntegrationTests(@New final RedisClient client, @New final StatefulRedisConnection connection) { this.client = client; + this.connection = connection; this.redis = connection.sync(); this.redis.flushall(); } @Test - @Inject - void close(@New StatefulRedisConnection connection) { - + void close() { connection.close(); assertThatThrownBy(() -> connection.sync().get(key)).isInstanceOf(RedisException.class); } @@ -72,32 +74,13 @@ public void afterChannelInitialized(Channel channel) { FastShutdown.shutdown(handshakeFailure); } - @Test - void statefulConnectionFromSync() { - assertThat(redis.getStatefulConnection().sync()).isSameAs(redis); - } - - @Test - void statefulConnectionFromAsync() { - RedisAsyncCommands async = client.connect().async(); - assertThat(async.getStatefulConnection().async()).isSameAs(async); - async.getStatefulConnection().close(); - } - - @Test - void statefulConnectionFromReactive() { - RedisAsyncCommands async = client.connect().async(); - assertThat(async.getStatefulConnection().reactive().getStatefulConnection()).isSameAs(async.getStatefulConnection()); - async.getStatefulConnection().close(); - } - @Test void timeout() { - redis.setTimeout(Duration.ofNanos(100)); + connection.setTimeout(Duration.ofNanos(100)); assertThatThrownBy(() -> redis.blpop(1, "unknown")).isInstanceOf(RedisCommandTimeoutException.class); - redis.setTimeout(Duration.ofSeconds(60)); + connection.setTimeout(Duration.ofSeconds(60)); } @Test @@ -139,11 +122,11 @@ public void onRedisDisconnected(RedisChannelHandler connection) { MyListener listener = new MyListener(); - redis.getStatefulConnection().addListener(listener); + connection.addListener(listener); redis.quit(); Thread.sleep(100); - Wait.untilTrue(redis::isOpen).waitOrTimeout(); + Wait.untilTrue(connection::isOpen).waitOrTimeout(); assertThat(listener.connect).hasValueGreaterThan(0); assertThat(listener.disconnect).hasValueGreaterThan(0); @@ -151,13 +134,9 @@ public void onRedisDisconnected(RedisChannelHandler connection) { @Test void interrupt() { - - StatefulRedisConnection connection = client.connect(); Thread.currentThread().interrupt(); assertThatThrownBy(() -> connection.sync().blpop(0, key)).isInstanceOf(RedisCommandInterruptedException.class); - Thread.interrupted(); - - connection.closeAsync(); + assertThat(Thread.interrupted()).isTrue(); } @Test @@ -213,40 +192,34 @@ void testExceptionWithCause() { @Test void standaloneConnectionShouldSetClientName() { - - RedisURI redisURI = RedisURI.create(host, port); + final RedisURI redisURI = RedisURI.create(host, port); redisURI.setClientName("my-client"); + try (StatefulRedisConnection connection = client.connect(redisURI)) { - StatefulRedisConnection connection = client.connect(redisURI); - - assertThat(connection.sync().clientGetname()).isEqualTo(redisURI.getClientName()); + assertThat(connection.sync().clientGetname()).isEqualTo(redisURI.getClientName()); - connection.sync().quit(); - Delay.delay(Duration.ofMillis(100)); - Wait.untilTrue(connection::isOpen).waitOrTimeout(); + connection.sync().quit(); + Delay.delay(Duration.ofMillis(100)); + Wait.untilTrue(connection::isOpen).waitOrTimeout(); - assertThat(connection.sync().clientGetname()).isEqualTo(redisURI.getClientName()); - - connection.close(); + assertThat(connection.sync().clientGetname()).isEqualTo(redisURI.getClientName()); + } } @Test void pubSubConnectionShouldSetClientName() { - - RedisURI redisURI = RedisURI.create(host, port); + final RedisURI redisURI = RedisURI.create(host, port); redisURI.setClientName("my-client"); + try (StatefulRedisConnection connection = client.connectPubSub(redisURI)) { - StatefulRedisConnection connection = client.connectPubSub(redisURI); - - assertThat(connection.sync().clientGetname()).isEqualTo(redisURI.getClientName()); + assertThat(connection.sync().clientGetname()).isEqualTo(redisURI.getClientName()); - connection.sync().quit(); - Delay.delay(Duration.ofMillis(100)); - Wait.untilTrue(connection::isOpen).waitOrTimeout(); + connection.sync().quit(); + Delay.delay(Duration.ofMillis(100)); + Wait.untilTrue(connection::isOpen).waitOrTimeout(); - assertThat(connection.sync().clientGetname()).isEqualTo(redisURI.getClientName()); - - connection.close(); + assertThat(connection.sync().clientGetname()).isEqualTo(redisURI.getClientName()); + } } } diff --git a/src/test/java/io/lettuce/core/ClientOptionsIntegrationTests.java b/src/test/java/io/lettuce/core/ClientOptionsIntegrationTests.java index 4463e76d6a..2e998c1125 100644 --- a/src/test/java/io/lettuce/core/ClientOptionsIntegrationTests.java +++ b/src/test/java/io/lettuce/core/ClientOptionsIntegrationTests.java @@ -61,11 +61,14 @@ * Integration tests for effects configured via {@link ClientOptions}. * * @author Mark Paluch + * @author Hari Mani */ @Tag(INTEGRATION_TEST) @ExtendWith(LettuceExtension.class) class ClientOptionsIntegrationTests extends TestSupport { + private static final TimeoutOptions DISABLE_COMMAND_TIMEOUT = TimeoutOptions.builder().timeoutCommands(false).build(); + private final RedisClient client; @Inject @@ -94,112 +97,100 @@ void variousClientOptions() { @Test void requestQueueSize() { + client.setOptions(ClientOptions.builder().requestQueueSize(10).timeoutOptions(DISABLE_COMMAND_TIMEOUT).build()); + try (StatefulRedisConnection connection = client.connect()) { + getConnectionWatchdog(connection).setListenOnChannelInactive(false); + final RedisAsyncCommands async = connection.async(); - client.setOptions(ClientOptions.builder().requestQueueSize(10) - .timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build()); - - StatefulRedisConnection connection = client.connect(); - getConnectionWatchdog(connection).setListenOnChannelInactive(false); + async.quit(); - connection.async().quit(); + Wait.untilTrue(() -> !connection.isOpen()).waitOrTimeout(); - Wait.untilTrue(() -> !connection.isOpen()).waitOrTimeout(); + for (int i = 0; i < 10; i++) { + connection.async().ping(); + } - for (int i = 0; i < 10; i++) { - connection.async().ping(); + assertThatThrownBy(() -> connection.async().ping().toCompletableFuture().join()) + .hasMessageContaining("Request queue size exceeded"); + assertThatThrownBy(() -> connection.sync().ping()).hasMessageContaining("Request queue size exceeded"); } - - assertThatThrownBy(() -> connection.async().ping().toCompletableFuture().join()) - .hasMessageContaining("Request queue size exceeded"); - assertThatThrownBy(() -> connection.sync().ping()).hasMessageContaining("Request queue size exceeded"); - - connection.close(); } @Test void requestQueueSizeAppliedForReconnect() { - - client.setOptions(ClientOptions.builder().requestQueueSize(10) - .timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build()); - - RedisAsyncCommands connection = client.connect().async(); - testHitRequestQueueLimit(connection); + client.setOptions(ClientOptions.builder().requestQueueSize(10).timeoutOptions(DISABLE_COMMAND_TIMEOUT).build()); + try (StatefulRedisConnection connection = client.connect()) { + final RedisAsyncCommands commands = connection.async(); + testHitRequestQueueLimit(commands, connection); + } } @Test void testHitRequestQueueLimitReconnectWithAuthCommand() { - WithPassword.run(client, () -> { - client.setOptions(ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false) - .requestQueueSize(10).timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build()); - - RedisAsyncCommands connection = client.connect().async(); - connection.auth(passwd); - testHitRequestQueueLimit(connection); + .requestQueueSize(10).timeoutOptions(DISABLE_COMMAND_TIMEOUT).build()); + try (StatefulRedisConnection connection = client.connect()) { + final RedisAsyncCommands commands = connection.async(); + commands.auth(passwd); + testHitRequestQueueLimit(commands, connection); + } }); } @Test @EnabledOnCommand("ACL") void testHitRequestQueueLimitReconnectWithAuthUsernamePasswordCommand() { - WithPassword.run(client, () -> { - client.setOptions(ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false) - .requestQueueSize(10).timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build()); - - RedisAsyncCommands connection = client.connect().async(); - connection.auth(username, passwd); - testHitRequestQueueLimit(connection); + .requestQueueSize(10).timeoutOptions(DISABLE_COMMAND_TIMEOUT).build()); + try (StatefulRedisConnection connection = client.connect()) { + final RedisAsyncCommands commands = connection.async(); + commands.auth(username, passwd); + testHitRequestQueueLimit(commands, connection); + } }); } @Test void testHitRequestQueueLimitReconnectWithUriAuth() { - WithPassword.run(client, () -> { - client.setOptions(ClientOptions.builder().requestQueueSize(10) - .timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build()); - ; - - RedisURI redisURI = RedisURI.create(host, port); + client.setOptions(ClientOptions.builder().requestQueueSize(10).timeoutOptions(DISABLE_COMMAND_TIMEOUT).build()); + final RedisURI redisURI = RedisURI.create(host, port); redisURI.setAuthentication(passwd); - - RedisAsyncCommands connection = client.connect(redisURI).async(); - testHitRequestQueueLimit(connection); + try (StatefulRedisConnection connection = client.connect(redisURI)) { + final RedisAsyncCommands commands = connection.async(); + testHitRequestQueueLimit(commands, connection); + } }); } @Test void testHitRequestQueueLimitReconnectWithUriAuthPingCommand() { - WithPassword.run(client, () -> { - - client.setOptions(ClientOptions.builder().requestQueueSize(10) - .timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build()); - - RedisURI redisURI = RedisURI.create(host, port); + client.setOptions(ClientOptions.builder().requestQueueSize(10).timeoutOptions(DISABLE_COMMAND_TIMEOUT).build()); + final RedisURI redisURI = RedisURI.create(host, port); redisURI.setAuthentication(passwd); - - RedisAsyncCommands connection = client.connect(redisURI).async(); - testHitRequestQueueLimit(connection); + try (StatefulRedisConnection connection = client.connect(redisURI)) { + final RedisAsyncCommands commands = connection.async(); + testHitRequestQueueLimit(commands, connection); + } }); } - private void testHitRequestQueueLimit(RedisAsyncCommands connection) { - - ConnectionWatchdog watchdog = getConnectionWatchdog(connection.getStatefulConnection()); + private void testHitRequestQueueLimit(final RedisAsyncCommands commands, + final StatefulRedisConnection connection) { + ConnectionWatchdog watchdog = getConnectionWatchdog(connection); watchdog.setListenOnChannelInactive(false); - connection.quit(); + commands.quit(); - Wait.untilTrue(() -> !connection.getStatefulConnection().isOpen()).waitOrTimeout(); + Wait.untilTrue(() -> !connection.isOpen()).waitOrTimeout(); List> pings = new ArrayList<>(); for (int i = 0; i < 10; i++) { - pings.add(connection.ping()); + pings.add(commands.ping()); } watchdog.setListenOnChannelInactive(true); @@ -208,112 +199,91 @@ private void testHitRequestQueueLimit(RedisAsyncCommands connect for (RedisFuture ping : pings) { assertThat(TestFutures.getOrTimeout(ping)).isEqualTo("PONG"); } - - connection.getStatefulConnection().close(); } @Test void requestQueueSizeOvercommittedReconnect() { + client.setOptions(ClientOptions.builder().requestQueueSize(10).timeoutOptions(DISABLE_COMMAND_TIMEOUT).build()); - client.setOptions(ClientOptions.builder().requestQueueSize(10) - .timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build()); + try (StatefulRedisConnection connection = client.connect()) { + ConnectionWatchdog watchdog = getConnectionWatchdog(connection); - StatefulRedisConnection connection = client.connect(); - ConnectionWatchdog watchdog = getConnectionWatchdog(connection); + watchdog.setListenOnChannelInactive(false); - watchdog.setListenOnChannelInactive(false); + Queue buffer = getStack(connection); + List> pings = new ArrayList<>(); + for (int i = 0; i < 11; i++) { - Queue buffer = getStack(connection); - List> pings = new ArrayList<>(); - for (int i = 0; i < 11; i++) { + AsyncCommand command = new AsyncCommand<>( + new Command<>(CommandType.PING, new StatusOutput<>(StringCodec.UTF8))); + pings.add(command); + buffer.add(command); + } - AsyncCommand command = new AsyncCommand<>( - new Command<>(CommandType.PING, new StatusOutput<>(StringCodec.UTF8))); - pings.add(command); - buffer.add(command); - } + getChannel(connection).disconnect(); - getChannel(connection).disconnect(); + Wait.untilTrue(() -> !connection.isOpen()).waitOrTimeout(); - Wait.untilTrue(() -> !connection.isOpen()).waitOrTimeout(); + watchdog.setListenOnChannelInactive(true); + watchdog.scheduleReconnect(); - watchdog.setListenOnChannelInactive(true); - watchdog.scheduleReconnect(); + for (int i = 0; i < 10; i++) { + assertThat(TestFutures.getOrTimeout(pings.get(i))).isEqualTo("PONG"); + } - for (int i = 0; i < 10; i++) { - assertThat(TestFutures.getOrTimeout(pings.get(i))).isEqualTo("PONG"); + assertThatThrownBy(() -> TestFutures.awaitOrTimeout(pings.get(10))).hasCauseInstanceOf(IllegalStateException.class) + .hasMessage("java.lang.IllegalStateException: Queue full"); } - - assertThatThrownBy(() -> TestFutures.awaitOrTimeout(pings.get(10))).hasCauseInstanceOf(IllegalStateException.class) - .hasMessage("java.lang.IllegalStateException: Queue full"); - - connection.close(); } @Test void disconnectedWithoutReconnect() { - client.setOptions(ClientOptions.builder().autoReconnect(false).build()); - - RedisAsyncCommands connection = client.connect().async(); - - connection.quit(); - Wait.untilTrue(() -> !connection.getStatefulConnection().isOpen()).waitOrTimeout(); - try { - connection.get(key); + try (StatefulRedisConnection connection = client.connect()) { + RedisAsyncCommands commands = connection.async(); + commands.quit(); + Wait.untilTrue(() -> !connection.isOpen()).waitOrTimeout(); + commands.get(key); } catch (Exception e) { assertThat(e).isInstanceOf(RedisException.class).hasMessageContaining("not connected"); - } finally { - connection.getStatefulConnection().close(); } } @Test void disconnectedRejectCommands() { - client.setOptions(ClientOptions.builder().disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) - .timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build()); - - RedisAsyncCommands connection = client.connect().async(); - - getConnectionWatchdog(connection.getStatefulConnection()).setListenOnChannelInactive(false); - connection.quit(); - Wait.untilTrue(() -> !connection.getStatefulConnection().isOpen()).waitOrTimeout(); - try { - connection.get(key); + .timeoutOptions(DISABLE_COMMAND_TIMEOUT).build()); + try (StatefulRedisConnection connection = client.connect()) { + RedisAsyncCommands commands = connection.async(); + getConnectionWatchdog(connection).setListenOnChannelInactive(false); + commands.quit(); + Wait.untilTrue(() -> !connection.isOpen()).waitOrTimeout(); + commands.get(key); } catch (Exception e) { assertThat(e).isInstanceOf(RedisException.class).hasMessageContaining("not connected"); - } finally { - connection.getStatefulConnection().close(); } } @Test void disconnectedAcceptCommands() { - client.setOptions(ClientOptions.builder().autoReconnect(false) .disconnectedBehavior(ClientOptions.DisconnectedBehavior.ACCEPT_COMMANDS).build()); - - RedisAsyncCommands connection = client.connect().async(); - - connection.quit(); - Wait.untilTrue(() -> !connection.getStatefulConnection().isOpen()).waitOrTimeout(); - connection.get(key); - connection.getStatefulConnection().close(); + try (StatefulRedisConnection connection = client.connect()) { + RedisAsyncCommands commands = connection.async(); + commands.quit(); + Wait.untilTrue(() -> !connection.isOpen()).waitOrTimeout(); + commands.get(key); + } } @Test @Inject void pingBeforeConnect(StatefulRedisConnection sharedConnection) { - sharedConnection.sync().set(key, value); - RedisCommands connection = client.connect().sync(); - - try { - String result = connection.get(key); + try (StatefulRedisConnection connection = client.connect()) { + RedisCommands commands = connection.sync(); + String result = commands.get(key); assertThat(result).isEqualTo(value); - } finally { - connection.getStatefulConnection().close(); } } @@ -337,17 +307,12 @@ void connectTimeout() throws Exception { @Test void connectWithAuthentication() { - WithPassword.run(client, () -> { RedisURI redisURI = RedisURI.Builder.redis(host, port).withPassword(passwd).build(); - - RedisCommands connection = client.connect(redisURI).sync(); - - try { - String result = connection.info(); + try (StatefulRedisConnection connection = client.connect(redisURI)) { + final RedisCommands commands = connection.sync(); + String result = commands.info(); assertThat(result).contains("memory"); - } finally { - connection.getStatefulConnection().close(); } }); } @@ -375,21 +340,14 @@ void authenticationTimeout() { @Test void sslAndAuthentication() { - WithPassword.run(client, () -> { - RedisURI redisURI = RedisURI.Builder.redis(host, 6443).withPassword(passwd).withVerifyPeer(false).withSsl(true) .build(); - - RedisCommands connection = client.connect(redisURI).sync(); - - try { - String result = connection.info(); + try (StatefulRedisConnection connection = client.connect(redisURI)) { + final RedisCommands commands = connection.sync(); + String result = commands.info(); assertThat(result).contains("memory"); - } finally { - connection.getStatefulConnection().close(); } - }); } @@ -480,8 +438,7 @@ void timeoutExpiresBatchedCommands() { @Test void pingBeforeConnectWithQueuedCommandsAndReconnect() throws Exception { - client.setOptions( - ClientOptions.builder().timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build()); + client.setOptions(ClientOptions.builder().timeoutOptions(DISABLE_COMMAND_TIMEOUT).build()); StatefulRedisConnection controlConnection = client.connect(); StatefulRedisConnection redisConnection = client.connect(RedisURI.create("redis://localhost:6479/5")); @@ -523,8 +480,7 @@ void authenticatedPingBeforeConnectWithQueuedCommandsAndReconnect() { WithPassword.run(client, () -> { RedisURI redisURI = RedisURI.Builder.redis(host, port).withPassword(passwd).withDatabase(5).build(); - client.setOptions( - ClientOptions.builder().timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build()); + client.setOptions(ClientOptions.builder().timeoutOptions(DISABLE_COMMAND_TIMEOUT).build()); StatefulRedisConnection controlConnection = client.connect(redisURI); StatefulRedisConnection redisConnection = client.connect(redisURI); diff --git a/src/test/java/io/lettuce/core/ConnectMethodsIntegrationTests.java b/src/test/java/io/lettuce/core/ConnectMethodsIntegrationTests.java index 5eab339557..8654cb5d5c 100644 --- a/src/test/java/io/lettuce/core/ConnectMethodsIntegrationTests.java +++ b/src/test/java/io/lettuce/core/ConnectMethodsIntegrationTests.java @@ -2,6 +2,9 @@ import javax.inject.Inject; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; +import io.lettuce.core.sentinel.api.StatefulRedisSentinelConnection; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -15,6 +18,7 @@ /** * @author Mark Paluch + * @author Hari Mani */ @Tag(INTEGRATION_TEST) @ExtendWith(LettuceExtension.class) @@ -38,12 +42,16 @@ void standaloneSync() { @Test void standaloneAsync() { - redisClient.connect().async().getStatefulConnection().close(); + try (StatefulRedisConnection connection = redisClient.connect()) { + connection.async(); + } } @Test void standaloneReactive() { - redisClient.connect().reactive().getStatefulConnection().close(); + try (StatefulRedisConnection connection = redisClient.connect()) { + connection.reactive(); + } } @Test @@ -75,17 +83,23 @@ void pubsubStateful() { // Sentinel @Test void sentinelSync() { - redisClient.connectSentinel().sync().getStatefulConnection().close(); + try (StatefulRedisSentinelConnection connection = redisClient.connectSentinel()) { + connection.sync(); + } } @Test void sentinelAsync() { - redisClient.connectSentinel().async().getStatefulConnection().close(); + try (StatefulRedisSentinelConnection connection = redisClient.connectSentinel()) { + connection.async(); + } } @Test void sentinelReactive() { - redisClient.connectSentinel().reactive().getStatefulConnection().close(); + try (StatefulRedisSentinelConnection connection = redisClient.connectSentinel()) { + connection.reactive(); + } } @Test @@ -96,17 +110,23 @@ void sentinelStateful() { // Cluster @Test void clusterSync() { - clusterClient.connect().sync().getStatefulConnection().close(); + try (StatefulRedisClusterConnection connection = clusterClient.connect()) { + connection.sync(); + } } @Test void clusterAsync() { - clusterClient.connect().async().getStatefulConnection().close(); + try (StatefulRedisClusterConnection connection = clusterClient.connect()) { + connection.async(); + } } @Test void clusterReactive() { - clusterClient.connect().reactive().getStatefulConnection().close(); + try (StatefulRedisClusterConnection connection = clusterClient.connect()) { + connection.reactive(); + } } @Test @@ -116,17 +136,23 @@ void clusterStateful() { @Test void clusterPubSubSync() { - clusterClient.connectPubSub().sync().getStatefulConnection().close(); + try (StatefulRedisClusterPubSubConnection connection = clusterClient.connectPubSub()) { + connection.sync(); + } } @Test void clusterPubSubAsync() { - clusterClient.connectPubSub().async().getStatefulConnection().close(); + try (StatefulRedisClusterPubSubConnection connection = clusterClient.connectPubSub()) { + connection.async(); + } } @Test void clusterPubSubReactive() { - clusterClient.connectPubSub().reactive().getStatefulConnection().close(); + try (StatefulRedisClusterPubSubConnection connection = clusterClient.connectPubSub()) { + connection.reactive(); + } } @Test @@ -137,26 +163,26 @@ void clusterPubSubStateful() { // Advanced Cluster @Test void advancedClusterSync() { - StatefulRedisClusterConnection statefulConnection = clusterClient.connect(); - RedisURI uri = clusterClient.getPartitions().getPartition(0).getUri(); - statefulConnection.getConnection(uri.getHost(), uri.getPort()).sync(); - statefulConnection.close(); + try (StatefulRedisClusterConnection statefulConnection = clusterClient.connect()) { + RedisURI uri = clusterClient.getPartitions().getPartition(0).getUri(); + statefulConnection.getConnection(uri.getHost(), uri.getPort()).sync(); + } } @Test void advancedClusterAsync() { - StatefulRedisClusterConnection statefulConnection = clusterClient.connect(); - RedisURI uri = clusterClient.getPartitions().getPartition(0).getUri(); - statefulConnection.getConnection(uri.getHost(), uri.getPort()).sync(); - statefulConnection.close(); + try (StatefulRedisClusterConnection statefulConnection = clusterClient.connect()) { + RedisURI uri = clusterClient.getPartitions().getPartition(0).getUri(); + statefulConnection.getConnection(uri.getHost(), uri.getPort()).sync(); + } } @Test void advancedClusterReactive() { - StatefulRedisClusterConnection statefulConnection = clusterClient.connect(); - RedisURI uri = clusterClient.getPartitions().getPartition(0).getUri(); - statefulConnection.getConnection(uri.getHost(), uri.getPort()).reactive(); - statefulConnection.close(); + try (StatefulRedisClusterConnection statefulConnection = clusterClient.connect()) { + RedisURI uri = clusterClient.getPartitions().getPartition(0).getUri(); + statefulConnection.getConnection(uri.getHost(), uri.getPort()).reactive(); + } } @Test @@ -167,9 +193,9 @@ void advancedClusterStateful() { // Cluster node selection @Test void nodeSelectionClusterAsync() { - StatefulRedisClusterConnection statefulConnection = clusterClient.connect(); - AsyncNodeSelection masters = statefulConnection.async().masters(); - statefulConnection.close(); + try (StatefulRedisClusterConnection statefulConnection = clusterClient.connect()) { + AsyncNodeSelection masters = statefulConnection.async().masters(); + } } } diff --git a/src/test/java/io/lettuce/core/ConnectionCommandIntegrationTests.java b/src/test/java/io/lettuce/core/ConnectionCommandIntegrationTests.java index 8f8b906fce..ea2ce78066 100644 --- a/src/test/java/io/lettuce/core/ConnectionCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/ConnectionCommandIntegrationTests.java @@ -54,19 +54,26 @@ * @author Will Glozer * @author Mark Paluch * @author Tugdual Grall + * @author Hari Mani */ @Tag(INTEGRATION_TEST) @ExtendWith(LettuceExtension.class) class ConnectionCommandIntegrationTests extends TestSupport { + private static final ClientOptions RESP2_DO_NOT_PING_BEFORE_ACTIVATION = ClientOptions.builder() + .pingBeforeActivateConnection(false).protocolVersion(ProtocolVersion.RESP2).build(); + private final RedisClient client; + private final StatefulRedisConnection cnxn; + private final RedisCommands redis; @Inject ConnectionCommandIntegrationTests(RedisClient client, StatefulRedisConnection connection) { this.client = client; - this.redis = connection.sync(); + this.cnxn = connection; + this.redis = this.cnxn.sync(); } @BeforeEach @@ -76,83 +83,77 @@ void setUp() { @Test void auth() { - WithPassword.run(client, () -> { - client.setOptions( - ClientOptions.builder().pingBeforeActivateConnection(false).protocolVersion(ProtocolVersion.RESP2).build()); - RedisCommands connection = client.connect().sync(); - - assertThatThrownBy(connection::ping).isInstanceOf(RedisException.class).hasMessageContaining("NOAUTH"); - - assertThat(connection.auth(passwd)).isEqualTo("OK"); - assertThat(connection.set(key, value)).isEqualTo("OK"); - + client.setOptions(RESP2_DO_NOT_PING_BEFORE_ACTIVATION); + try (StatefulRedisConnection connection = client.connect()) { + RedisCommands commands = connection.sync(); + assertThatThrownBy(commands::ping).isInstanceOf(RedisException.class).hasMessageContaining("NOAUTH"); + assertThat(commands.auth(passwd)).isEqualTo("OK"); + assertThat(commands.set(key, value)).isEqualTo("OK"); + } RedisURI redisURI = RedisURI.Builder.redis(host, port).withDatabase(2).withPassword(passwd).build(); - RedisCommands authConnection = client.connect(redisURI).sync(); - authConnection.ping(); - authConnection.getStatefulConnection().close(); + try (StatefulRedisConnection connection = client.connect(redisURI)) { + RedisCommands authConnection = connection.sync(); + assertThat(authConnection.ping()).isEqualTo("PONG"); + } }); } @Test @EnabledOnCommand("ACL") void authWithUsername() { - WithPassword.run(client, () -> { - client.setOptions( - ClientOptions.builder().pingBeforeActivateConnection(false).protocolVersion(ProtocolVersion.RESP2).build()); - RedisCommands connection = client.connect().sync(); - - assertThatThrownBy(connection::ping).isInstanceOf(RedisException.class).hasMessageContaining("NOAUTH"); + client.setOptions(RESP2_DO_NOT_PING_BEFORE_ACTIVATION); + try (StatefulRedisConnection connection = client.connect()) { + RedisCommands commands = connection.sync(); - assertThat(connection.auth(passwd)).isEqualTo("OK"); - assertThat(connection.set(key, value)).isEqualTo("OK"); + assertThatThrownBy(commands::ping).isInstanceOf(RedisException.class).hasMessageContaining("NOAUTH"); - // Aut with the same user & password (default) - assertThat(connection.auth(username, passwd)).isEqualTo("OK"); - assertThat(connection.set(key, value)).isEqualTo("OK"); + assertThat(commands.auth(passwd)).isEqualTo("OK"); + assertThat(commands.set(key, value)).isEqualTo("OK"); - // Switch to another user - assertThat(connection.auth(aclUsername, aclPasswd)).isEqualTo("OK"); - assertThat(connection.set("cached:demo", value)).isEqualTo("OK"); - assertThatThrownBy(() -> connection.get(key)).isInstanceOf(RedisCommandExecutionException.class); - assertThat(connection.del("cached:demo")).isEqualTo(1); + // Aut with the same user & password (default) + assertThat(commands.auth(username, passwd)).isEqualTo("OK"); + assertThat(commands.set(key, value)).isEqualTo("OK"); + // Switch to another user + assertThat(commands.auth(aclUsername, aclPasswd)).isEqualTo("OK"); + assertThat(commands.set("cached:demo", value)).isEqualTo("OK"); + assertThatThrownBy(() -> commands.get(key)).isInstanceOf(RedisCommandExecutionException.class); + assertThat(commands.del("cached:demo")).isEqualTo(1); + } RedisURI redisURI = RedisURI.Builder.redis(host, port).withDatabase(2).withPassword(passwd).build(); - RedisCommands authConnection = client.connect(redisURI).sync(); - authConnection.ping(); - authConnection.getStatefulConnection().close(); + try (StatefulRedisConnection connection = client.connect(redisURI)) { + RedisCommands authConnection = connection.sync(); + assertThat(authConnection.ping()).isEqualTo("PONG"); + } }); } @Test @EnabledOnCommand("ACL") void changeAclPasswordWhileAuthenticated() { - WithPassword.run(client, () -> { - client.setOptions( - ClientOptions.builder().pingBeforeActivateConnection(false).protocolVersion(ProtocolVersion.RESP2).build()); + client.setOptions(RESP2_DO_NOT_PING_BEFORE_ACTIVATION); RedisURI redisURI = RedisURI.Builder.redis(host, port).withDatabase(2) .withAuthentication(TestSettings.aclUsername(), TestSettings.aclPassword()).build(); - StatefulRedisConnection connection = client.connect(redisURI); + try (StatefulRedisConnection connection = client.connect(redisURI)) { - Command> command = CliParser.parse("ACL SETUSER " + TestSettings.aclUsername() - + " on <" + TestSettings.aclPassword() + " >another-password ~cached:* +@all"); - connection.sync().dispatch(command.getType(), command.getOutput(), command.getArgs()); + Command> command = CliParser.parse("ACL SETUSER " + TestSettings.aclUsername() + + " on <" + TestSettings.aclPassword() + " >another-password ~cached:* +@all"); + connection.sync().dispatch(command.getType(), command.getOutput(), command.getArgs()); - connection.sync().ping(); - connection.close(); + connection.sync().ping(); + } }); } @Test @EnabledOnCommand("ACL") void changeAclPasswordDuringDisconnect() { - WithPassword.run(client, () -> { - client.setOptions( - ClientOptions.builder().pingBeforeActivateConnection(false).protocolVersion(ProtocolVersion.RESP2).build()); + client.setOptions(RESP2_DO_NOT_PING_BEFORE_ACTIVATION); AtomicReference passwd = new AtomicReference<>(TestSettings.aclPassword()); @@ -160,40 +161,34 @@ void changeAclPasswordDuringDisconnect() { .just(TestSettings.aclUsername(), passwd.get()); RedisURI redisURI = RedisURI.Builder.redis(host, port).withDatabase(2).withAuthentication(rcp).build(); - StatefulRedisConnection connection = client.connect(redisURI); + try (StatefulRedisConnection connection = client.connect(redisURI)) { - Command> command = CliParser.parse("ACL SETUSER " + TestSettings.aclUsername() - + " on <" + TestSettings.aclPassword() + " >another-password ~cached:* +@all"); - connection.sync().dispatch(command.getType(), command.getOutput(), command.getArgs()); + Command> command = CliParser.parse("ACL SETUSER " + TestSettings.aclUsername() + + " on <" + TestSettings.aclPassword() + " >another-password ~cached:* +@all"); + connection.sync().dispatch(command.getType(), command.getOutput(), command.getArgs()); - connection.async().quit().await(100, TimeUnit.MILLISECONDS); + connection.async().quit().await(100, TimeUnit.MILLISECONDS); - passwd.set("another-password"); + passwd.set("another-password"); - connection.sync().ping(); - connection.close(); + connection.sync().ping(); + } }); } @Test @EnabledOnCommand("ACL") void resp2HandShakeWithUsernamePassword() { - RedisURI redisURI = RedisURI.Builder.redis(host, port).withAuthentication(username, passwd).build(); RedisClient clientResp2 = RedisClient.create(redisURI); - clientResp2.setOptions( - ClientOptions.builder().pingBeforeActivateConnection(false).protocolVersion(ProtocolVersion.RESP2).build()); + clientResp2.setOptions(RESP2_DO_NOT_PING_BEFORE_ACTIVATION); RedisCommands connTestResp2 = null; - try { - connTestResp2 = clientResp2.connect().sync(); - assertThat(redis.ping()).isEqualTo("PONG"); - } catch (Exception e) { + try (StatefulRedisConnection resp2Connection = clientResp2.connect()) { + connTestResp2 = resp2Connection.sync(); + assertThat(connTestResp2.ping()).isEqualTo("PONG"); } finally { assertThat(connTestResp2).isNotNull(); - if (connTestResp2 != null) { - connTestResp2.getStatefulConnection().close(); - } } clientResp2.shutdown(); } @@ -224,18 +219,16 @@ void authNull() { @Test void authReconnect() { WithPassword.run(client, () -> { - - client.setOptions( - ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build()); - RedisCommands connection = client.connect().sync(); - assertThat(connection.auth(passwd)).isEqualTo("OK"); - assertThat(connection.set(key, value)).isEqualTo("OK"); - connection.quit(); - - Delay.delay(Duration.ofMillis(100)); - assertThat(connection.get(key)).isEqualTo(value); - - connection.getStatefulConnection().close(); + client.setOptions(RESP2_DO_NOT_PING_BEFORE_ACTIVATION); + try (StatefulRedisConnection connection = client.connect()) { + RedisCommands commands = connection.sync(); + assertThat(commands.auth(passwd)).isEqualTo("OK"); + assertThat(commands.set(key, value)).isEqualTo("OK"); + commands.quit(); + + Delay.delay(Duration.ofMillis(100)); + assertThat(commands.get(key)).isEqualTo(value); + } }); } @@ -243,26 +236,24 @@ void authReconnect() { @EnabledOnCommand("ACL") void authReconnectRedis6() { WithPassword.run(client, () -> { - - client.setOptions( - ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build()); - RedisCommands connection = client.connect().sync(); - assertThat(connection.auth(passwd)).isEqualTo("OK"); - assertThat(connection.set(key, value)).isEqualTo("OK"); - connection.quit(); - - Delay.delay(Duration.ofMillis(100)); - assertThat(connection.get(key)).isEqualTo(value); - - // reconnect with username/password - assertThat(connection.auth(username, passwd)).isEqualTo("OK"); - assertThat(connection.set(key, value)).isEqualTo("OK"); - connection.quit(); - - Delay.delay(Duration.ofMillis(100)); - assertThat(connection.get(key)).isEqualTo(value); - - connection.getStatefulConnection().close(); + client.setOptions(RESP2_DO_NOT_PING_BEFORE_ACTIVATION); + try (StatefulRedisConnection connection = client.connect()) { + RedisCommands commands = connection.sync(); + assertThat(commands.auth(passwd)).isEqualTo("OK"); + assertThat(commands.set(key, value)).isEqualTo("OK"); + commands.quit(); + + Delay.delay(Duration.ofMillis(100)); + assertThat(commands.get(key)).isEqualTo(value); + + // reconnect with username/password + assertThat(commands.auth(username, passwd)).isEqualTo("OK"); + assertThat(commands.set(key, value)).isEqualTo("OK"); + commands.quit(); + + Delay.delay(Duration.ofMillis(100)); + assertThat(commands.get(key)).isEqualTo(value); + } }); } @@ -272,7 +263,7 @@ void selectReconnect() { redis.set(key, value); redis.quit(); - Wait.untilTrue(redis::isOpen).waitOrTimeout(); + Wait.untilTrue(cnxn::isOpen).waitOrTimeout(); assertThat(redis.get(key)).isEqualTo(value); } @@ -280,76 +271,72 @@ void selectReconnect() { void getSetReconnect() { redis.set(key, value); redis.quit(); - Wait.untilTrue(redis::isOpen).waitOrTimeout(); + Wait.untilTrue(cnxn::isOpen).waitOrTimeout(); assertThat(redis.get(key)).isEqualTo(value); } @Test void authInvalidPassword() { - RedisAsyncCommands async = client.connect().async(); + final StatefulRedisConnection connection = client.connect(); try { + final RedisAsyncCommands async = connection.async(); TestFutures.awaitOrTimeout(async.auth("invalid")); fail("Authenticated with invalid password"); } catch (RedisException e) { assertThat(e.getMessage()).startsWith("ERR").contains("AUTH"); - StatefulRedisConnectionImpl statefulRedisCommands = (StatefulRedisConnectionImpl) async - .getStatefulConnection(); - assertThat(statefulRedisCommands.getConnectionState().getCredentialsProvider().resolveCredentials().block() - .getPassword()).isNull(); + StatefulRedisConnectionImpl connectionImpl = (StatefulRedisConnectionImpl) connection; + assertThat(connectionImpl.getConnectionState().getCredentialsProvider().resolveCredentials().block().getPassword()) + .isNull(); } finally { - async.getStatefulConnection().close(); + connection.close(); } } @Test @EnabledOnCommand("ACL") void authInvalidUsernamePassword() { - WithPassword.run(client, () -> { - client.setOptions( - ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build()); - RedisCommands connection = client.connect().sync(); - - assertThat(connection.auth(username, passwd)).isEqualTo("OK"); + client.setOptions(RESP2_DO_NOT_PING_BEFORE_ACTIVATION); + try (StatefulRedisConnection connection = client.connect()) { + RedisCommands commands = connection.sync(); - assertThatThrownBy(() -> connection.auth(username, "invalid")) - .hasMessageContaining("WRONGPASS invalid username-password pair"); + assertThat(commands.auth(username, passwd)).isEqualTo("OK"); - assertThat(connection.auth(aclUsername, aclPasswd)).isEqualTo("OK"); + assertThatThrownBy(() -> commands.auth(username, "invalid")) + .hasMessageContaining("WRONGPASS invalid username-password pair"); - assertThatThrownBy(() -> connection.auth(aclUsername, "invalid")) - .hasMessageContaining("WRONGPASS invalid username-password pair"); + assertThat(commands.auth(aclUsername, aclPasswd)).isEqualTo("OK"); - connection.getStatefulConnection().close(); + assertThatThrownBy(() -> commands.auth(aclUsername, "invalid")) + .hasMessageContaining("WRONGPASS invalid username-password pair"); + } }); } @Test @EnabledOnCommand("ACL") void authInvalidDefaultPasswordNoACL() { - RedisAsyncCommands async = client.connect().async(); // When the database is not secured the AUTH default invalid command returns OK - try { + try (StatefulRedisConnection connection = client.connect()) { + final RedisAsyncCommands async = connection.async(); Future auth = async.auth(username, "invalid"); assertThat(TestFutures.getOrTimeout(auth)).isEqualTo("OK"); - } finally { - async.getStatefulConnection().close(); } } @Test void authInvalidUsernamePasswordNoACL() { - RedisAsyncCommands async = client.connect().async(); + final StatefulRedisConnection connection = client.connect(); try { + final RedisAsyncCommands async = connection.async(); TestFutures.awaitOrTimeout(async.select(1024)); fail("Selected invalid db index"); } catch (RedisException e) { assertThat(e.getMessage()).startsWith("ERR"); - StatefulRedisConnectionImpl statefulRedisCommands = (StatefulRedisConnectionImpl) async - .getStatefulConnection(); - assertThat(statefulRedisCommands.getConnectionState()).extracting("db").isEqualTo(0); + final StatefulRedisConnectionImpl connectionImpl = (StatefulRedisConnectionImpl) connection; + assertThat(connectionImpl.getConnectionState()).extracting("db").isEqualTo(0); } finally { - async.getStatefulConnection().close(); + connection.close(); } } diff --git a/src/test/java/io/lettuce/core/ReactiveConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/ReactiveConnectionIntegrationTests.java index 940a181732..51ad974511 100644 --- a/src/test/java/io/lettuce/core/ReactiveConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/ReactiveConnectionIntegrationTests.java @@ -55,6 +55,7 @@ * @author Mark Paluch * @author Nikolai Perevozchikov * @author Tugdual Grall + * @author Hari Mani */ @Tag(INTEGRATION_TEST) @ExtendWith(LettuceExtension.class) @@ -100,12 +101,7 @@ void fireCommandAfterObserve() { @Test void isOpen() { - assertThat(reactive.isOpen()).isTrue(); - } - - @Test - void getStatefulConnection() { - assertThat(reactive.getStatefulConnection()).isSameAs(connection); + assertThat(connection.isOpen()).isTrue(); } @Test @@ -135,23 +131,20 @@ void multiSubscribe() throws Exception { void transactional(RedisClient client) throws Exception { final CountDownLatch sync = new CountDownLatch(1); + try (StatefulRedisConnection statefulRedisConnection = client.connect()) { + RedisReactiveCommands reactive = statefulRedisConnection.reactive(); - RedisReactiveCommands reactive = client.connect().reactive(); - - reactive.multi().subscribe(multiResponse -> { - reactive.set(key, "1").subscribe(); - reactive.incr(key).subscribe(getResponse -> { - sync.countDown(); + reactive.multi().subscribe(multiResponse -> { + reactive.set(key, "1").subscribe(); + reactive.incr(key).subscribe(getResponse -> sync.countDown()); + reactive.exec().subscribe(); }); - reactive.exec().subscribe(); - }); - - sync.await(5, TimeUnit.SECONDS); - String result = redis.get(key); - assertThat(result).isEqualTo("2"); + sync.await(5, TimeUnit.SECONDS); - reactive.getStatefulConnection().close(); + String result = redis.get(key); + assertThat(result).isEqualTo("2"); + } } @Test @@ -206,11 +199,10 @@ void subscribeWithDisconnectedClient(RedisClient client) { connection.async().quit(); Wait.untilTrue(() -> !connection.isOpen()).waitOrTimeout(); - StepVerifier.create(connection.reactive().ping()).consumeErrorWith(throwable -> { - assertThat(throwable).isInstanceOf(RedisException.class) - .hasMessageContaining("not connected. Commands are rejected"); - - }).verify(); + StepVerifier + .create(connection.reactive().ping()).consumeErrorWith(throwable -> assertThat(throwable) + .isInstanceOf(RedisException.class).hasMessageContaining("not connected. Commands are rejected")) + .verify(); connection.close(); } @@ -220,19 +212,18 @@ void subscribeWithDisconnectedClient(RedisClient client) { void publishOnSchedulerTest(RedisClient client) { client.setOptions(ClientOptions.builder().publishOnScheduler(true).build()); - - RedisReactiveCommands reactive = client.connect().reactive(); - - int counter = 0; - for (int i = 0; i < 1000; i++) { - if (reactive.eval("return 1", INTEGER).next().block() == null) { - counter++; + try (StatefulRedisConnection statefulRedisConnection = client.connect()) { + RedisReactiveCommands reactive = statefulRedisConnection.reactive(); + + int counter = 0; + for (int i = 0; i < 1000; i++) { + if (reactive.eval("return 1", INTEGER).next().block() == null) { + counter++; + } } - } - - assertThat(counter).isZero(); - reactive.getStatefulConnection().close(); + assertThat(counter).isZero(); + } } private static Subscriber createSubscriberWithExceptionOnComplete() { diff --git a/src/test/java/io/lettuce/core/cluster/AdvancedClusterClientIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/AdvancedClusterClientIntegrationTests.java index 5c1c6bd906..af2032100b 100644 --- a/src/test/java/io/lettuce/core/cluster/AdvancedClusterClientIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/AdvancedClusterClientIntegrationTests.java @@ -66,6 +66,7 @@ * * @author Mark Paluch * @author Jon Chambers + * @author Hari Mani */ @Tag(INTEGRATION_TEST) @SuppressWarnings("rawtypes") @@ -88,7 +89,6 @@ class AdvancedClusterClientIntegrationTests extends TestSupport { AdvancedClusterClientIntegrationTests(RedisClusterClient clusterClient, StatefulRedisClusterConnection clusterConnection) { this.clusterClient = clusterClient; - this.clusterConnection = clusterConnection; this.async = clusterConnection.async(); this.sync = clusterConnection.sync(); @@ -125,7 +125,7 @@ void invalidHost() { @Test void partitions() { - Partitions partitions = async.getStatefulConnection().getPartitions(); + Partitions partitions = clusterConnection.getPartitions(); assertThat(partitions).hasSize(4); } @@ -140,17 +140,16 @@ void differentConnections() { assertThat(nodeId).isNotSameAs(hostAndPort); } - StatefulRedisClusterConnection statefulConnection = async.getStatefulConnection(); for (RedisClusterNode redisClusterNode : clusterClient.getPartitions()) { - StatefulRedisConnection nodeId = statefulConnection.getConnection(redisClusterNode.getNodeId()); - StatefulRedisConnection hostAndPort = statefulConnection + StatefulRedisConnection nodeId = clusterConnection.getConnection(redisClusterNode.getNodeId()); + StatefulRedisConnection hostAndPort = clusterConnection .getConnection(redisClusterNode.getUri().getHost(), redisClusterNode.getUri().getPort()); assertThat(nodeId).isNotSameAs(hostAndPort); } - RedisAdvancedClusterCommands sync = statefulConnection.sync(); + RedisAdvancedClusterCommands sync = clusterConnection.sync(); for (RedisClusterNode redisClusterNode : clusterClient.getPartitions()) { RedisClusterCommands nodeId = sync.getConnection(redisClusterNode.getNodeId()); @@ -160,7 +159,7 @@ void differentConnections() { assertThat(nodeId).isNotSameAs(hostAndPort); } - RedisAdvancedClusterReactiveCommands rx = statefulConnection.reactive(); + RedisAdvancedClusterReactiveCommands rx = clusterConnection.reactive(); for (RedisClusterNode redisClusterNode : clusterClient.getPartitions()) { RedisClusterReactiveCommands nodeId = rx.getConnection(redisClusterNode.getNodeId()); @@ -235,7 +234,7 @@ void mgetCrossSlot() { expectation.add(kv(key, "value-" + key)); } - List> result = sync.mget(keys.toArray(new String[keys.size()])); + List> result = sync.mget(keys.toArray(new String[0])); assertThat(result).hasSize(keys.size()); assertThat(result).isEqualTo(expectation); @@ -257,7 +256,7 @@ void delCrossSlot() { List keys = prepareKeys(); - Long result = sync.del(keys.toArray(new String[keys.size()])); + Long result = sync.del(keys.toArray(new String[0])); assertThat(result).isEqualTo(25); @@ -284,7 +283,7 @@ void unlinkCrossSlot() { List keys = prepareKeys(); - Long result = sync.unlink(keys.toArray(new String[keys.size()])); + Long result = sync.unlink(keys.toArray(new String[0])); assertThat(result).isEqualTo(25); @@ -315,7 +314,7 @@ void clientSetname() { sync.clientSetname(name); for (RedisClusterNode redisClusterNode : clusterClient.getPartitions()) { - RedisClusterCommands nodeConnection = async.getStatefulConnection().sync() + RedisClusterCommands nodeConnection = clusterConnection.sync() .getConnection(redisClusterNode.getNodeId()); assertThat(nodeConnection.clientList()).contains(name); } @@ -464,15 +463,13 @@ void shutdown() { @Test void testSync() { - RedisAdvancedClusterCommands sync = async.getStatefulConnection().sync(); + RedisAdvancedClusterCommands sync = clusterConnection.sync(); sync.set(key, value); assertThat(sync.get(key)).isEqualTo(value); RedisClusterCommands node2Connection = sync.getConnection(ClusterTestSettings.host, ClusterTestSettings.port2); assertThat(node2Connection.get(key)).isEqualTo(value); - - assertThat(sync.getStatefulConnection()).isSameAs(async.getStatefulConnection()); } @Test @@ -555,7 +552,7 @@ void pipelining(@New StatefulRedisClusterConnection connectionUn TestFutures.awaitOrTimeout(async.get(key(0))); int iterations = 1000; - async.setAutoFlushCommands(false); + connectionUnderTest.setAutoFlushCommands(false); List> futures = new ArrayList<>(); for (int i = 0; i < iterations; i++) { futures.add(async.set(key(i), value(i))); @@ -565,7 +562,7 @@ void pipelining(@New StatefulRedisClusterConnection connectionUn assertThat(this.sync.get(key(i))).as("Key " + key(i) + " must be null").isNull(); } - async.flushCommands(); + connectionUnderTest.flushCommands(); boolean result = TestFutures.awaitOrTimeout(futures); assertThat(result).isTrue(); @@ -578,7 +575,7 @@ void pipelining(@New StatefulRedisClusterConnection connectionUn @Test void clusterScan() { - RedisAdvancedClusterCommands sync = async.getStatefulConnection().sync(); + RedisAdvancedClusterCommands sync = clusterConnection.sync(); sync.mset(KeysAndValues.MAP); Set allKeys = new HashSet<>(); @@ -600,7 +597,7 @@ void clusterScan() { @Test void clusterScanWithArgs() { - RedisAdvancedClusterCommands sync = async.getStatefulConnection().sync(); + RedisAdvancedClusterCommands sync = clusterConnection.sync(); sync.mset(KeysAndValues.MAP); Set allKeys = new HashSet<>(); @@ -623,7 +620,7 @@ void clusterScanWithArgs() { @Test void clusterScanStreaming() { - RedisAdvancedClusterCommands sync = async.getStatefulConnection().sync(); + RedisAdvancedClusterCommands sync = clusterConnection.sync(); sync.mset(KeysAndValues.MAP); ListStreamingAdapter adapter = new ListStreamingAdapter<>(); @@ -645,7 +642,7 @@ void clusterScanStreaming() { @Test void clusterScanStreamingWithArgs() { - RedisAdvancedClusterCommands sync = async.getStatefulConnection().sync(); + RedisAdvancedClusterCommands sync = clusterConnection.sync(); sync.mset(KeysAndValues.MAP); ListStreamingAdapter adapter = new ListStreamingAdapter<>(); diff --git a/src/test/java/io/lettuce/core/cluster/RedisClusterSetupIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/RedisClusterSetupIntegrationTests.java index 62ac7848a2..8a0e97aa1b 100644 --- a/src/test/java/io/lettuce/core/cluster/RedisClusterSetupIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/RedisClusterSetupIntegrationTests.java @@ -33,6 +33,7 @@ import java.util.stream.Collectors; import io.lettuce.core.TimeoutOptions; +import io.lettuce.core.api.StatefulRedisConnection; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -50,7 +51,6 @@ import io.lettuce.core.RedisFuture; import io.lettuce.core.RedisURI; import io.lettuce.core.TestSupport; -import io.lettuce.core.api.async.RedisAsyncCommands; import io.lettuce.core.api.sync.RedisCommands; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; @@ -74,6 +74,7 @@ * * @author Mark Paluch * @author dengliming + * @author Hari Mani * @since 3.0 */ @Tag(INTEGRATION_TEST) @@ -88,10 +89,14 @@ public class RedisClusterSetupIntegrationTests extends TestSupport { private static RedisClusterClient clusterClient; - private static RedisClient client = DefaultRedisClient.get(); + private static final RedisClient client = DefaultRedisClient.get(); private static ClusterTestHelper clusterHelper; + private StatefulRedisConnection redisConnection1; + + private StatefulRedisConnection redisConnection2; + private RedisCommands redis1; private RedisCommands redis2; @@ -111,15 +116,17 @@ public static void shutdownClient() { @BeforeEach public void openConnection() { clusterHelper.flushdb(); - redis1 = client.connect(RedisURI.Builder.redis(ClusterTestSettings.host, ClusterTestSettings.port5).build()).sync(); - redis2 = client.connect(RedisURI.Builder.redis(ClusterTestSettings.host, ClusterTestSettings.port6).build()).sync(); + redisConnection1 = client.connect(RedisURI.Builder.redis(ClusterTestSettings.host, ClusterTestSettings.port5).build()); + redis1 = redisConnection1.sync(); + redisConnection2 = client.connect(RedisURI.Builder.redis(ClusterTestSettings.host, ClusterTestSettings.port6).build()); + redis2 = redisConnection2.sync(); clusterHelper.clusterReset(); } @AfterEach public void closeConnection() { - redis1.getStatefulConnection().close(); - redis2.getStatefulConnection().close(); + redisConnection1.close(); + redisConnection2.close(); } @Test @@ -222,21 +229,17 @@ public void clusterSlotMigrationImport() { @Test public void clusterTopologyRefresh() { - clusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(PERIODIC_REFRESH_ENABLED).build()); clusterClient.reloadPartitions(); - - RedisAdvancedClusterAsyncCommands clusterConnection = clusterClient.connect().async(); - assertThat(clusterClient.getPartitions()).hasSize(1); - - ClusterSetup.setup2Masters(clusterHelper); - assertThat(clusterClient.getPartitions()).hasSize(2); - - clusterConnection.getStatefulConnection().close(); + try (StatefulRedisClusterConnection clusterConnection = clusterClient.connect()) { + assertThat(clusterClient.getPartitions()).hasSize(1); + ClusterSetup.setup2Masters(clusterHelper); + assertThat(clusterClient.getPartitions()).hasSize(2); + } } @Test - public void changeTopologyWhileOperations() throws Exception { + public void changeTopologyWhileOperations() { ClusterSetup.setup2Masters(clusterHelper); @@ -244,63 +247,59 @@ public void changeTopologyWhileOperations() throws Exception { .enableAllAdaptiveRefreshTriggers().build(); clusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(clusterTopologyRefreshOptions).build()); - StatefulRedisClusterConnection connection = clusterClient.connect(); - RedisAdvancedClusterCommands sync = connection.sync(); - RedisAdvancedClusterAsyncCommands async = connection.async(); - - Partitions partitions = connection.getPartitions(); - assertThat(partitions.getPartitionBySlot(0).getSlots().size()).isEqualTo(12000); - assertThat(partitions.getPartitionBySlot(16380).getSlots().size()).isEqualTo(4384); - assertRoutedExecution(async); - - sync.del("A"); - sync.del("t"); - sync.del("p"); - - shiftAllSlotsToNode1(); - assertRoutedExecution(async); - - Wait.untilTrue(() -> { - if (clusterClient.getPartitions().size() == 2) { - for (RedisClusterNode redisClusterNode : clusterClient.getPartitions()) { - if (redisClusterNode.getSlots().size() > 16380) { - return true; + try (StatefulRedisClusterConnection connection = clusterClient.connect()) { + RedisAdvancedClusterCommands sync = connection.sync(); + RedisAdvancedClusterAsyncCommands async = connection.async(); + + Partitions partitions = connection.getPartitions(); + assertThat(partitions.getPartitionBySlot(0).getSlots().size()).isEqualTo(12000); + assertThat(partitions.getPartitionBySlot(16380).getSlots().size()).isEqualTo(4384); + assertRoutedExecution(async); + + sync.del("A"); + sync.del("t"); + sync.del("p"); + + shiftAllSlotsToNode1(); + assertRoutedExecution(async); + + Wait.untilTrue(() -> { + if (clusterClient.getPartitions().size() == 2) { + for (RedisClusterNode redisClusterNode : clusterClient.getPartitions()) { + if (redisClusterNode.getSlots().size() > 16380) { + return true; + } } } - } - return false; - }).waitOrTimeout(); - - assertThat(partitions.getPartitionBySlot(0).getSlots().size()).isEqualTo(16384); + return false; + }).waitOrTimeout(); - assertThat(sync.get("A")).isEqualTo("value"); - assertThat(sync.get("t")).isEqualTo("value"); - assertThat(sync.get("p")).isEqualTo("value"); + assertThat(partitions.getPartitionBySlot(0).getSlots().size()).isEqualTo(16384); - async.getStatefulConnection().close(); + assertThat(sync.get("A")).isEqualTo("value"); + assertThat(sync.get("t")).isEqualTo("value"); + assertThat(sync.get("p")).isEqualTo("value"); + } } @Test public void slotMigrationShouldUseAsking() { - ClusterSetup.setup2Masters(clusterHelper); + try (StatefulRedisClusterConnection connection = clusterClient.connect()) { - StatefulRedisClusterConnection connection = clusterClient.connect(); + RedisAdvancedClusterCommands sync = connection.sync(); + RedisAdvancedClusterAsyncCommands async = connection.async(); - RedisAdvancedClusterCommands sync = connection.sync(); - RedisAdvancedClusterAsyncCommands async = connection.async(); + Partitions partitions = connection.getPartitions(); + assertThat(partitions.getPartitionBySlot(0).getSlots().size()).isEqualTo(12000); + assertThat(partitions.getPartitionBySlot(16380).getSlots().size()).isEqualTo(4384); - Partitions partitions = connection.getPartitions(); - assertThat(partitions.getPartitionBySlot(0).getSlots().size()).isEqualTo(12000); - assertThat(partitions.getPartitionBySlot(16380).getSlots().size()).isEqualTo(4384); + redis1.clusterSetSlotMigrating(3300, redis2.clusterMyId()); + redis2.clusterSetSlotImporting(3300, redis1.clusterMyId()); - redis1.clusterSetSlotMigrating(3300, redis2.clusterMyId()); - redis2.clusterSetSlotImporting(3300, redis1.clusterMyId()); - - assertThat(sync.get("b")).isNull(); - - async.getStatefulConnection().close(); + assertThat(sync.get("b")).isNull(); + } } @Test @@ -309,28 +308,29 @@ public void disconnectedConnectionRejectTest() throws Exception { clusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(PERIODIC_REFRESH_ENABLED) .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) .timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build()); - RedisAdvancedClusterAsyncCommands clusterConnection = clusterClient.connect().async(); - clusterClient.setOptions( - ClusterClientOptions.builder().disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) - .timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build()); - ClusterSetup.setup2Masters(clusterHelper); + try (StatefulRedisClusterConnection clusterConnection = clusterClient.connect()) { + RedisAdvancedClusterAsyncCommands clusterAsyncCommands = clusterConnection.async(); + clusterClient.setOptions( + ClusterClientOptions.builder().disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) + .timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build()); + ClusterSetup.setup2Masters(clusterHelper); - assertRoutedExecution(clusterConnection); + assertRoutedExecution(clusterAsyncCommands); - RedisClusterNode partition1 = getOwnPartition(redis1); - RedisClusterAsyncCommands node1Connection = clusterConnection - .getConnection(partition1.getUri().getHost(), partition1.getUri().getPort()); + RedisClusterNode partition1 = getOwnPartition(redis1); + StatefulRedisConnection node1Connection = clusterConnection + .getConnection(partition1.getUri().getHost(), partition1.getUri().getPort()); - shiftAllSlotsToNode1(); + shiftAllSlotsToNode1(); - suspendConnection(node1Connection); + suspendConnection(node1Connection); - RedisFuture set = clusterConnection.set("t", "value"); // 15891 + RedisFuture set = clusterAsyncCommands.set("t", "value"); // 15891 - set.await(5, TimeUnit.SECONDS); + set.await(5, TimeUnit.SECONDS); - assertThatThrownBy(() -> TestFutures.awaitOrTimeout(set)).hasRootCauseInstanceOf(RedisException.class); - clusterConnection.getStatefulConnection().close(); + assertThatThrownBy(() -> TestFutures.awaitOrTimeout(set)).hasRootCauseInstanceOf(RedisException.class); + } } @Test @@ -338,50 +338,50 @@ public void atLeastOnceForgetNodeFailover() throws Exception { clusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(PERIODIC_REFRESH_ENABLED) .timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build()); - RedisAdvancedClusterAsyncCommands clusterConnection = clusterClient.connect().async(); - clusterClient.setOptions( - ClusterClientOptions.builder().timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build()); - ClusterSetup.setup2Masters(clusterHelper); - - assertRoutedExecution(clusterConnection); + try (StatefulRedisClusterConnection clusterConnection = clusterClient.connect()) { + RedisAdvancedClusterAsyncCommands clusterAsyncCommands = clusterConnection.async(); + clusterClient.setOptions(ClusterClientOptions.builder() + .timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build()); + ClusterSetup.setup2Masters(clusterHelper); - RedisClusterNode partition1 = getOwnPartition(redis1); - RedisClusterNode partition2 = getOwnPartition(redis2); - RedisClusterAsyncCommands node1Connection = clusterConnection - .getConnection(partition1.getUri().getHost(), partition1.getUri().getPort()); + assertRoutedExecution(clusterAsyncCommands); - RedisClusterAsyncCommands node2Connection = clusterConnection - .getConnection(partition2.getUri().getHost(), partition2.getUri().getPort()); + RedisClusterNode partition1 = getOwnPartition(redis1); + RedisClusterNode partition2 = getOwnPartition(redis2); + RedisClusterAsyncCommands node1Connection = clusterAsyncCommands + .getConnection(partition1.getUri().getHost(), partition1.getUri().getPort()); - shiftAllSlotsToNode1(); + StatefulRedisConnection node2Connection = clusterConnection + .getConnection(partition2.getUri().getHost(), partition2.getUri().getPort()); - suspendConnection(node2Connection); + shiftAllSlotsToNode1(); - List> futures = new ArrayList<>(); + suspendConnection(node2Connection); - futures.add(clusterConnection.set("t", "value")); // 15891 - futures.add(clusterConnection.set("p", "value")); // 16023 + List> futures = new ArrayList<>(); - clusterConnection.set("A", "value").get(1, TimeUnit.SECONDS); // 6373 + futures.add(clusterAsyncCommands.set("t", "value")); // 15891 + futures.add(clusterAsyncCommands.set("p", "value")); // 16023 - for (RedisFuture future : futures) { - assertThat(future.isDone()).isFalse(); - assertThat(future.isCancelled()).isFalse(); - } - redis1.clusterForget(partition2.getNodeId()); - redis2.clusterForget(partition1.getNodeId()); + clusterAsyncCommands.set("A", "value").get(1, TimeUnit.SECONDS); // 6373 - Partitions partitions = clusterClient.getPartitions(); - partitions.remove(partition2); - partitions.getPartition(0).setSlots(Arrays.stream(createSlots(0, 16384)).boxed().collect(Collectors.toList())); - partitions.updateCache(); - clusterClient.updatePartitionsInConnections(); + for (RedisFuture future : futures) { + assertThat(future.isDone()).isFalse(); + assertThat(future.isCancelled()).isFalse(); + } + redis1.clusterForget(partition2.getNodeId()); + redis2.clusterForget(partition1.getNodeId()); - Wait.untilTrue(() -> TestFutures.areAllDone(futures)).waitOrTimeout(); + Partitions partitions = clusterClient.getPartitions(); + partitions.remove(partition2); + partitions.getPartition(0).setSlots(Arrays.stream(createSlots(0, 16384)).boxed().collect(Collectors.toList())); + partitions.updateCache(); + clusterClient.updatePartitionsInConnections(); - assertRoutedExecution(clusterConnection); + Wait.untilTrue(() -> TestFutures.areAllDone(futures)).waitOrTimeout(); - clusterConnection.getStatefulConnection().close(); + assertRoutedExecution(clusterAsyncCommands); + } } @Test @@ -454,37 +454,37 @@ public void expireStaleDefaultPubSubConnection() { public void expireStaleNodeIdConnections() { clusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(PERIODIC_REFRESH_ENABLED).build()); - RedisAdvancedClusterAsyncCommands clusterConnection = clusterClient.connect().async(); + try (StatefulRedisClusterConnection clusterConnection = clusterClient.connect()) { + RedisAdvancedClusterAsyncCommands clusterAsyncCommands = clusterConnection.async(); - ClusterSetup.setup2Masters(clusterHelper); + ClusterSetup.setup2Masters(clusterHelper); - PooledClusterConnectionProvider clusterConnectionProvider = getPooledClusterConnectionProvider( - clusterConnection); + PooledClusterConnectionProvider clusterConnectionProvider = getPooledClusterConnectionProvider( + clusterConnection); - assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(0); + assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(0); - assertRoutedExecution(clusterConnection); + assertRoutedExecution(clusterAsyncCommands); - assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(2); + assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(2); - Partitions partitions = ClusterPartitionParser.parse(redis1.clusterNodes()); - for (RedisClusterNode redisClusterNode : partitions.getPartitions()) { - if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { - redis1.clusterForget(redisClusterNode.getNodeId()); + Partitions partitions = ClusterPartitionParser.parse(redis1.clusterNodes()); + for (RedisClusterNode redisClusterNode : partitions.getPartitions()) { + if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { + redis1.clusterForget(redisClusterNode.getNodeId()); + } } - } - partitions = ClusterPartitionParser.parse(redis2.clusterNodes()); - for (RedisClusterNode redisClusterNode : partitions.getPartitions()) { - if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { - redis2.clusterForget(redisClusterNode.getNodeId()); + partitions = ClusterPartitionParser.parse(redis2.clusterNodes()); + for (RedisClusterNode redisClusterNode : partitions.getPartitions()) { + if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { + redis2.clusterForget(redisClusterNode.getNodeId()); + } } - } - Wait.untilEquals(1, () -> clusterClient.getPartitions().size()).waitOrTimeout(); - Wait.untilEquals(1, () -> clusterConnectionProvider.getConnectionCount()).waitOrTimeout(); - - clusterConnection.getStatefulConnection().close(); + Wait.untilEquals(1, () -> clusterClient.getPartitions().size()).waitOrTimeout(); + Wait.untilEquals(1, clusterConnectionProvider::getConnectionCount).waitOrTimeout(); + } } private void assertRoutedExecution(RedisClusterAsyncCommands clusterConnection) { @@ -494,124 +494,120 @@ private void assertRoutedExecution(RedisClusterAsyncCommands clu } @Test - public void doNotExpireStaleNodeIdConnections() throws Exception { + public void doNotExpireStaleNodeIdConnections() { clusterClient.setOptions(ClusterClientOptions.builder() .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder().closeStaleConnections(false).build()).build()); - RedisAdvancedClusterAsyncCommands clusterConnection = clusterClient.connect().async(); + try (StatefulRedisClusterConnection clusterConnection = clusterClient.connect()) { + RedisAdvancedClusterAsyncCommands clusterAsyncCommands = clusterConnection.async(); - ClusterSetup.setup2Masters(clusterHelper); + ClusterSetup.setup2Masters(clusterHelper); - PooledClusterConnectionProvider clusterConnectionProvider = getPooledClusterConnectionProvider( - clusterConnection); + PooledClusterConnectionProvider clusterConnectionProvider = getPooledClusterConnectionProvider( + clusterConnection); - assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(0); + assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(0); - assertRoutedExecution(clusterConnection); + assertRoutedExecution(clusterAsyncCommands); - assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(2); + assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(2); - Partitions partitions = ClusterPartitionParser.parse(redis1.clusterNodes()); - for (RedisClusterNode redisClusterNode : partitions.getPartitions()) { - if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { - redis1.clusterForget(redisClusterNode.getNodeId()); + Partitions partitions = ClusterPartitionParser.parse(redis1.clusterNodes()); + for (RedisClusterNode redisClusterNode : partitions.getPartitions()) { + if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { + redis1.clusterForget(redisClusterNode.getNodeId()); + } } - } - partitions = ClusterPartitionParser.parse(redis2.clusterNodes()); - for (RedisClusterNode redisClusterNode : partitions.getPartitions()) { - if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { - redis2.clusterForget(redisClusterNode.getNodeId()); + partitions = ClusterPartitionParser.parse(redis2.clusterNodes()); + for (RedisClusterNode redisClusterNode : partitions.getPartitions()) { + if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { + redis2.clusterForget(redisClusterNode.getNodeId()); + } } - } - - clusterClient.reloadPartitions(); - assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(2); - - clusterConnection.getStatefulConnection().close(); + clusterClient.reloadPartitions(); + assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(2); + } } @Test - public void expireStaleHostAndPortConnections() throws Exception { - + public void expireStaleHostAndPortConnections() { clusterClient.setOptions(ClusterClientOptions.builder().build()); - RedisAdvancedClusterAsyncCommands clusterConnection = clusterClient.connect().async(); + try (StatefulRedisClusterConnection clusterConnection = clusterClient.connect()) { + RedisAdvancedClusterAsyncCommands clusterAsyncCommands = clusterConnection.async(); - ClusterSetup.setup2Masters(clusterHelper); + ClusterSetup.setup2Masters(clusterHelper); - final PooledClusterConnectionProvider clusterConnectionProvider = getPooledClusterConnectionProvider( - clusterConnection); + final PooledClusterConnectionProvider clusterConnectionProvider = getPooledClusterConnectionProvider( + clusterConnection); - assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(0); + assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(0); - assertRoutedExecution(clusterConnection); - assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(2); + assertRoutedExecution(clusterAsyncCommands); + assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(2); - for (RedisClusterNode redisClusterNode : clusterClient.getPartitions()) { - clusterConnection.getConnection(redisClusterNode.getUri().getHost(), redisClusterNode.getUri().getPort()); - clusterConnection.getConnection(redisClusterNode.getNodeId()); - } + for (RedisClusterNode redisClusterNode : clusterClient.getPartitions()) { + clusterAsyncCommands.getConnection(redisClusterNode.getUri().getHost(), redisClusterNode.getUri().getPort()); + clusterAsyncCommands.getConnection(redisClusterNode.getNodeId()); + } - assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(4); + assertThat(clusterConnectionProvider.getConnectionCount()).isEqualTo(4); - Partitions partitions = ClusterPartitionParser.parse(redis1.clusterNodes()); - for (RedisClusterNode redisClusterNode : partitions.getPartitions()) { - if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { - redis1.clusterForget(redisClusterNode.getNodeId()); + Partitions partitions = ClusterPartitionParser.parse(redis1.clusterNodes()); + for (RedisClusterNode redisClusterNode : partitions.getPartitions()) { + if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { + redis1.clusterForget(redisClusterNode.getNodeId()); + } } - } - partitions = ClusterPartitionParser.parse(redis2.clusterNodes()); - for (RedisClusterNode redisClusterNode : partitions.getPartitions()) { - if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { - redis2.clusterForget(redisClusterNode.getNodeId()); + partitions = ClusterPartitionParser.parse(redis2.clusterNodes()); + for (RedisClusterNode redisClusterNode : partitions.getPartitions()) { + if (!redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { + redis2.clusterForget(redisClusterNode.getNodeId()); + } } - } - - clusterClient.reloadPartitions(); - Wait.untilEquals(1, () -> clusterClient.getPartitions().size()).waitOrTimeout(); - Wait.untilEquals(2L, () -> clusterConnectionProvider.getConnectionCount()).waitOrTimeout(); + clusterClient.reloadPartitions(); - clusterConnection.getStatefulConnection().close(); + Wait.untilEquals(1, () -> clusterClient.getPartitions().size()).waitOrTimeout(); + Wait.untilEquals(2L, clusterConnectionProvider::getConnectionCount).waitOrTimeout(); + } } @Test public void readFromReplicaTest() { - ClusterSetup.setup2Masters(clusterHelper); - RedisAdvancedClusterAsyncCommands clusterConnection = clusterClient.connect().async(); - clusterConnection.getStatefulConnection().setReadFrom(ReadFrom.REPLICA); + try (StatefulRedisClusterConnection clusterConnection = clusterClient.connect()) { + RedisAdvancedClusterAsyncCommands clusterAsyncCommands = clusterConnection.async(); + clusterConnection.setReadFrom(ReadFrom.REPLICA); - TestFutures.awaitOrTimeout(clusterConnection.set(key, value)); + TestFutures.awaitOrTimeout(clusterAsyncCommands.set(key, value)); - try { - clusterConnection.get(key); - } catch (RedisException e) { - assertThat(e).hasMessageContaining("Cannot determine a partition to read for slot"); + try { + clusterAsyncCommands.get(key); + } catch (RedisException e) { + assertThat(e).hasMessageContaining("Cannot determine a partition to read for slot"); + } } - - clusterConnection.getStatefulConnection().close(); } @Test public void readFromNearestTest() { - ClusterSetup.setup2Masters(clusterHelper); - RedisAdvancedClusterCommands clusterConnection = clusterClient.connect().sync(); - clusterConnection.getStatefulConnection().setReadFrom(ReadFrom.NEAREST); + try (StatefulRedisClusterConnection clusterConnection = clusterClient.connect()) { + RedisAdvancedClusterCommands clusterCommands = clusterConnection.sync(); + clusterConnection.setReadFrom(ReadFrom.NEAREST); - clusterConnection.set(key, value); + clusterCommands.set(key, value); - assertThat(clusterConnection.get(key)).isEqualTo(value); - - clusterConnection.getStatefulConnection().close(); + assertThat(clusterCommands.get(key)).isEqualTo(value); + } } private PooledClusterConnectionProvider getPooledClusterConnectionProvider( - RedisAdvancedClusterAsyncCommands clusterAsyncConnection) { + StatefulRedisClusterConnection clusterAsyncConnection) { RedisChannelHandler channelHandler = getChannelHandler(clusterAsyncConnection); ClusterDistributionChannelWriter writer = (ClusterDistributionChannelWriter) channelHandler.getChannelWriter(); @@ -619,8 +615,8 @@ private PooledClusterConnectionProvider getPooledClusterConnecti } private RedisChannelHandler getChannelHandler( - RedisAdvancedClusterAsyncCommands clusterAsyncConnection) { - return (RedisChannelHandler) clusterAsyncConnection.getStatefulConnection(); + StatefulRedisClusterConnection clusterAsyncConnection) { + return (RedisChannelHandler) clusterAsyncConnection; } private void assertExecuted(RedisFuture set) { @@ -629,13 +625,10 @@ private void assertExecuted(RedisFuture set) { assertThat(TestFutures.getOrTimeout(set)).isEqualTo("OK"); } - private void suspendConnection(RedisClusterAsyncCommands asyncCommands) { - - ConnectionTestUtil.getConnectionWatchdog(((RedisAsyncCommands) asyncCommands).getStatefulConnection()) - .setReconnectSuspended(true); - asyncCommands.quit(); - - Wait.untilTrue(() -> !asyncCommands.isOpen()).waitOrTimeout(); + private void suspendConnection(final StatefulRedisConnection statefulRedisConnection) { + ConnectionTestUtil.getConnectionWatchdog(statefulRedisConnection).setReconnectSuspended(true); + statefulRedisConnection.sync().quit(); + Wait.untilTrue(() -> !statefulRedisConnection.isOpen()).waitOrTimeout(); } private void shiftAllSlotsToNode1() { @@ -658,7 +651,7 @@ public Boolean get() { removeRemaining(partition); } - return partition.getSlots().size() == 0; + return partition.getSlots().isEmpty(); } private void removeRemaining(RedisClusterNode partition) { diff --git a/src/test/java/io/lettuce/core/cluster/RedisClusterStressScenariosIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/RedisClusterStressScenariosIntegrationTests.java index 65aad4071b..f1bd64379b 100644 --- a/src/test/java/io/lettuce/core/cluster/RedisClusterStressScenariosIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/RedisClusterStressScenariosIntegrationTests.java @@ -25,6 +25,7 @@ import java.util.Collections; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.AfterAll; @@ -67,7 +68,7 @@ public class RedisClusterStressScenariosIntegrationTests extends TestSupport { private static ClusterTestHelper clusterHelper; - private Logger log = LogManager.getLogger(getClass()); + private final Logger log = LogManager.getLogger(getClass()); private StatefulRedisConnection redis5; @@ -113,9 +114,7 @@ public void before() { @AfterEach public void after() { redis5.close(); - - redissync5.getStatefulConnection().close(); - redissync6.getStatefulConnection().close(); + redis6.close(); } @Test @@ -173,12 +172,13 @@ public void testClusterFailoverWithTakeOver() { @Test public void testClusterConnectionStability() { - RedisAdvancedClusterAsyncCommandsImpl connection = (RedisAdvancedClusterAsyncCommandsImpl) clusterClient - .connect().async(); + StatefulRedisClusterConnection clusterConnection = clusterClient.connect(); + RedisAdvancedClusterAsyncCommandsImpl clusterAsyncCommands = (RedisAdvancedClusterAsyncCommandsImpl) clusterConnection + .async(); - RedisChannelHandler statefulConnection = (RedisChannelHandler) connection.getStatefulConnection(); + RedisChannelHandler statefulConnection = (RedisChannelHandler) clusterConnection; - connection.set("a", "b"); + clusterAsyncCommands.set("a", "b"); ClusterDistributionChannelWriter writer = (ClusterDistributionChannelWriter) statefulConnection.getChannelWriter(); StatefulRedisConnectionImpl statefulSlotConnection = (StatefulRedisConnectionImpl) writer @@ -187,24 +187,24 @@ public void testClusterConnectionStability() { final RedisAsyncCommands slotConnection = statefulSlotConnection.async(); slotConnection.set("a", "b"); - slotConnection.getStatefulConnection().close(); + statefulSlotConnection.close(); - Wait.untilTrue(() -> !slotConnection.isOpen()).waitOrTimeout(); + Wait.untilTrue(() -> !statefulSlotConnection.isOpen()).waitOrTimeout(); assertThat(statefulSlotConnection.isClosed()).isTrue(); assertThat(statefulSlotConnection.isOpen()).isFalse(); - assertThat(connection.isOpen()).isTrue(); + assertThat(clusterConnection.isOpen()).isTrue(); assertThat(statefulConnection.isOpen()).isTrue(); assertThat(statefulConnection.isClosed()).isFalse(); try { - connection.set("a", "b"); + clusterAsyncCommands.set("a", "b"); } catch (RedisException e) { assertThat(e).hasMessageContaining("Connection is closed"); } - connection.getStatefulConnection().close(); + clusterConnection.close(); } } diff --git a/src/test/java/io/lettuce/core/commands/RunOnlyOnceServerCommandIntegrationTests.java b/src/test/java/io/lettuce/core/commands/RunOnlyOnceServerCommandIntegrationTests.java index ca8233e023..2f74e740fc 100644 --- a/src/test/java/io/lettuce/core/commands/RunOnlyOnceServerCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/commands/RunOnlyOnceServerCommandIntegrationTests.java @@ -27,6 +27,7 @@ /** * @author Will Glozer * @author Mark Paluch + * @author Hari Mani */ @Tag(INTEGRATION_TEST) @ExtendWith(LettuceExtension.class) @@ -35,15 +36,11 @@ class RunOnlyOnceServerCommandIntegrationTests extends TestSupport { private final RedisClient client; - private final StatefulRedisConnection connection; - private final RedisCommands redis; @Inject RunOnlyOnceServerCommandIntegrationTests(RedisClient client, StatefulRedisConnection connection) { - this.client = client; - this.connection = connection; this.redis = connection.sync(); } @@ -55,18 +52,14 @@ class RunOnlyOnceServerCommandIntegrationTests extends TestSupport { @Disabled @Order(1) void debugSegfault() { - assumeTrue(CanConnect.to(host(), port(1))); - - final RedisAsyncCommands commands = client.connect(RedisURI.Builder.redis(host(), port(1)).build()) - .async(); - try { + final RedisURI redisURI = RedisURI.Builder.redis(host(), port(1)).build(); + try (StatefulRedisConnection connection = client.connect(redisURI)) { + final RedisAsyncCommands commands = connection.async(); commands.debugSegfault(); - Wait.untilTrue(() -> !commands.getStatefulConnection().isOpen()).waitOrTimeout(); - assertThat(commands.getStatefulConnection().isOpen()).isFalse(); - } finally { - commands.getStatefulConnection().close(); + Wait.untilTrue(() -> !connection.isOpen()).waitOrTimeout(); + assertThat(connection.isOpen()).isFalse(); } } @@ -113,21 +106,15 @@ void migrateCopyReplace() { @Test @Order(4) void shutdown() { - assumeTrue(CanConnect.to(host(), port(7))); - - final RedisAsyncCommands commands = client.connect(RedisURI.Builder.redis(host(), port(2)).build()) - .async(); - try { - + final RedisURI redisURI = RedisURI.Builder.redis(host(), port(2)).build(); + try (StatefulRedisConnection cnxn = client.connect(redisURI)) { + final RedisAsyncCommands commands = cnxn.async(); commands.shutdown(true); commands.shutdown(false); - Wait.untilTrue(() -> !commands.getStatefulConnection().isOpen()).waitOrTimeout(); - - assertThat(commands.getStatefulConnection().isOpen()).isFalse(); + Wait.untilTrue(() -> !cnxn.isOpen()).waitOrTimeout(); - } finally { - commands.getStatefulConnection().close(); + assertThat(cnxn.isOpen()).isFalse(); } } diff --git a/src/test/java/io/lettuce/core/protocol/ConnectionFailureIntegrationTests.java b/src/test/java/io/lettuce/core/protocol/ConnectionFailureIntegrationTests.java index 1793320a09..a57c7243f6 100644 --- a/src/test/java/io/lettuce/core/protocol/ConnectionFailureIntegrationTests.java +++ b/src/test/java/io/lettuce/core/protocol/ConnectionFailureIntegrationTests.java @@ -21,9 +21,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.platform.commons.util.ReflectionUtils; - -import io.lettuce.test.ReflectionTestUtils; import io.lettuce.core.*; import io.lettuce.core.api.StatefulRedisConnection; @@ -41,6 +38,7 @@ /** * @author Mark Paluch + * @author Hari Mani */ @Tag(INTEGRATION_TEST) @ExtendWith(LettuceExtension.class) @@ -101,10 +99,9 @@ void failOnReconnect() throws Exception { RedisURI redisUri = RedisURI.Builder.redis(TestSettings.host(), TestSettings.port()).build(); redisUri.setTimeout(Duration.ofSeconds(5)); - try { - RedisAsyncCommands connection = client.connect(redisUri).async(); - ConnectionWatchdog connectionWatchdog = ConnectionTestUtil - .getConnectionWatchdog(connection.getStatefulConnection()); + try (StatefulRedisConnection cnxn = client.connect(redisUri)) { + RedisAsyncCommands commands = cnxn.async(); + ConnectionWatchdog connectionWatchdog = ConnectionTestUtil.getConnectionWatchdog(cnxn); assertThat(connectionWatchdog.isListenOnChannelInactive()).isTrue(); assertThat(connectionWatchdog.isReconnectSuspended()).isFalse(); @@ -113,15 +110,13 @@ void failOnReconnect() throws Exception { redisUri.setPort(TestSettings.nonexistentPort()); - connection.quit(); - Wait.untilTrue(() -> connectionWatchdog.isReconnectSuspended()).waitOrTimeout(); + commands.quit(); + Wait.untilTrue(connectionWatchdog::isReconnectSuspended).waitOrTimeout(); assertThat(connectionWatchdog.isListenOnChannelInactive()).isTrue(); - assertThatThrownBy(() -> TestFutures.awaitOrTimeout(connection.info())).hasRootCauseInstanceOf(RedisException.class) + assertThatThrownBy(() -> TestFutures.awaitOrTimeout(commands.info())).hasRootCauseInstanceOf(RedisException.class) .hasMessageContaining("Invalid first byte"); - - connection.getStatefulConnection().close(); } finally { ts.shutdown(); } @@ -145,12 +140,11 @@ void failOnReconnectShouldSendEvents() throws Exception { RedisURI redisUri = RedisURI.create(defaultRedisUri.toURI()); redisUri.setTimeout(Duration.ofSeconds(5)); - try { + try (StatefulRedisConnection connection = client.connect(redisUri)) { final BlockingQueue events = new LinkedBlockingDeque<>(); - RedisAsyncCommands connection = client.connect(redisUri).async(); - ConnectionWatchdog connectionWatchdog = ConnectionTestUtil - .getConnectionWatchdog(connection.getStatefulConnection()); + RedisAsyncCommands commands = connection.async(); + ConnectionWatchdog connectionWatchdog = ConnectionTestUtil.getConnectionWatchdog(connection); ReconnectionListener reconnectionListener = events::offer; @@ -160,9 +154,8 @@ void failOnReconnectShouldSendEvents() throws Exception { redisUri.setPort(TestSettings.nonexistentPort()); - connection.quit(); + commands.quit(); Wait.untilTrue(() -> events.size() > 1).waitOrTimeout(); - connection.getStatefulConnection().close(); ConnectionEvents.Reconnect event1 = events.take(); assertThat(event1.getAttempt()).isEqualTo(1); @@ -188,24 +181,20 @@ void emitEventOnReconnectFailure() throws Exception { client.setOptions( ClientOptions.builder().timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build()); - try { - RedisAsyncCommandsImpl connection = (RedisAsyncCommandsImpl) client - .connect(redisUri).async(); - ConnectionWatchdog connectionWatchdog = ConnectionTestUtil - .getConnectionWatchdog(connection.getStatefulConnection()); + try (StatefulRedisConnection connection = client.connect(redisUri)) { + RedisAsyncCommandsImpl commands = (RedisAsyncCommandsImpl) connection.async(); + ConnectionWatchdog connectionWatchdog = ConnectionTestUtil.getConnectionWatchdog(connection); redisUri.setPort(TestSettings.nonexistentPort()); client.getResources().eventBus().get().subscribe(queue::add); - connection.quit(); - Wait.untilTrue(() -> !connection.getStatefulConnection().isOpen()).waitOrTimeout(); + commands.quit(); + Wait.untilTrue(() -> !connection.isOpen()).waitOrTimeout(); connectionWatchdog.run(0); Delay.delay(Duration.ofMillis(500)); - connection.getStatefulConnection().close(); - assertThat(queue).isNotEmpty(); List failures = queue.stream().filter(ReconnectFailedEvent.class::isInstance) diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java b/src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java index 3472660281..7400e7f3a9 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java @@ -19,36 +19,16 @@ */ package io.lettuce.core.pubsub; -import static io.lettuce.TestTags.INTEGRATION_TEST; -import static org.assertj.core.api.Assertions.*; - -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; - import io.lettuce.core.AbstractRedisClientTest; import io.lettuce.core.ClientOptions; import io.lettuce.core.KillArgs; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisFuture; import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.async.RedisAsyncCommands; import io.lettuce.core.api.push.PushMessage; import io.lettuce.core.internal.LettuceFactories; -import io.lettuce.core.protocol.ProtocolVersion; import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; import io.lettuce.core.support.PubSubTestListener; import io.lettuce.test.Delay; @@ -59,6 +39,27 @@ import io.lettuce.test.resource.DefaultRedisClient; import io.lettuce.test.resource.FastShutdown; import io.lettuce.test.resource.TestClientResources; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static io.lettuce.TestTags.INTEGRATION_TEST; +import static io.lettuce.core.protocol.ProtocolVersion.RESP2; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Pub/Sub Command tests using protocol version discovery. @@ -73,27 +74,32 @@ @Tag(INTEGRATION_TEST) class PubSubCommandIntegrationTests extends AbstractRedisClientTest { - RedisPubSubAsyncCommands pubsub; + private static final ClientOptions RESP2_DO_NO_PING_BEFORE_ACTIVATE = ClientOptions.builder().protocolVersion(RESP2) + .pingBeforeActivateConnection(false).build(); + + protected RedisPubSubAsyncCommands pubsub; + + protected StatefulRedisPubSubConnection pubSubConnection; - PubSubTestListener listener = new PubSubTestListener(); + private final PubSubTestListener listener = new PubSubTestListener(); - BlockingQueue channels = listener.getChannels(); + protected BlockingQueue channels = listener.getChannels(); - BlockingQueue shardChannels = listener.getShardChannels(); + protected BlockingQueue shardChannels = listener.getShardChannels(); - BlockingQueue patterns = listener.getPatterns(); + protected BlockingQueue patterns = listener.getPatterns(); - BlockingQueue messages = listener.getMessages(); + protected BlockingQueue messages = listener.getMessages(); - BlockingQueue counts = listener.getCounts(); + protected BlockingQueue counts = listener.getCounts(); - BlockingQueue shardCounts = listener.getShardCounts(); + protected BlockingQueue shardCounts = listener.getShardCounts(); String channel = "channel0"; String shardChannel = "shard-channel"; - private String pattern = "channel*"; + private static final String pattern = "channel*"; String message = "msg!"; @@ -102,10 +108,10 @@ class PubSubCommandIntegrationTests extends AbstractRedisClientTest { @BeforeEach void openPubSubConnection() { try { - client.setOptions(getOptions()); - pubsub = client.connectPubSub().async(); - pubsub.getStatefulConnection().addListener(listener); + pubSubConnection = client.connectPubSub(); + pubSubConnection.addListener(listener); + pubsub = pubSubConnection.async(); } finally { listener.clear(); } @@ -117,23 +123,22 @@ protected ClientOptions getOptions() { @AfterEach void closePubSubConnection() { - if (pubsub != null) { - pubsub.getStatefulConnection().close(); + if (pubSubConnection != null) { + pubSubConnection.close(); } } @Test void auth() { WithPassword.run(client, () -> { - - client.setOptions( - ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build()); - RedisPubSubAsyncCommands connection = client.connectPubSub().async(); - connection.getStatefulConnection().addListener(listener); - connection.auth(passwd); - - connection.subscribe(channel); - assertThat(channels.take()).isEqualTo(channel); + client.setOptions(RESP2_DO_NO_PING_BEFORE_ACTIVATE); + try (StatefulRedisPubSubConnection connection = client.connectPubSub()) { + RedisPubSubAsyncCommands pubSubAsyncCommands = connection.async(); + connection.addListener(listener); + pubSubAsyncCommands.auth(passwd); + pubSubAsyncCommands.subscribe(channel); + assertThat(channels.take()).isEqualTo(channel); + } }); } @@ -141,71 +146,65 @@ void auth() { @EnabledOnCommand("ACL") void authWithUsername() { WithPassword.run(client, () -> { - - client.setOptions( - ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build()); - RedisPubSubAsyncCommands connection = client.connectPubSub().async(); - connection.getStatefulConnection().addListener(listener); - connection.auth(username, passwd); - - connection.subscribe(channel); - assertThat(channels.take()).isEqualTo(channel); + client.setOptions(RESP2_DO_NO_PING_BEFORE_ACTIVATE); + try (StatefulRedisPubSubConnection connection = client.connectPubSub()) { + RedisPubSubAsyncCommands pubSubAsyncCommands = connection.async(); + connection.addListener(listener); + pubSubAsyncCommands.auth(username, passwd); + pubSubAsyncCommands.subscribe(channel); + assertThat(channels.take()).isEqualTo(channel); + } }); } @Test void authWithReconnect() { - WithPassword.run(client, () -> { + client.setOptions(RESP2_DO_NO_PING_BEFORE_ACTIVATE); + try (StatefulRedisPubSubConnection connection = client.connectPubSub()) { + RedisPubSubAsyncCommands pubSubAsyncCommands = connection.async(); + connection.addListener(listener); + pubSubAsyncCommands.auth(passwd); - client.setOptions( - ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build()); + pubSubAsyncCommands.clientSetname("authWithReconnect"); + pubSubAsyncCommands.subscribe(channel).get(); - RedisPubSubAsyncCommands connection = client.connectPubSub().async(); - connection.getStatefulConnection().addListener(listener); - connection.auth(passwd); + assertThat(channels.take()).isEqualTo(channel); - connection.clientSetname("authWithReconnect"); - connection.subscribe(channel).get(); + // kill using shared client to trigger reconnect + redis.auth(passwd); + long id = findNamedClient("authWithReconnect"); + redis.clientKill(KillArgs.Builder.id(id)); - assertThat(channels.take()).isEqualTo(channel); - - redis.auth(passwd); - long id = findNamedClient("authWithReconnect"); - redis.clientKill(KillArgs.Builder.id(id)); - - Delay.delay(Duration.ofMillis(100)); - Wait.untilTrue(connection::isOpen).waitOrTimeout(); - - assertThat(channels.take()).isEqualTo(channel); + Delay.delay(Duration.ofMillis(100)); + Wait.untilTrue(connection::isOpen).waitOrTimeout(); + assertThat(channels.take()).isEqualTo(channel); + } }); } @Test @EnabledOnCommand("ACL") void authWithUsernameAndReconnect() { - WithPassword.run(client, () -> { - - client.setOptions( - ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build()); - - RedisPubSubAsyncCommands connection = client.connectPubSub().async(); - connection.getStatefulConnection().addListener(listener); - connection.auth(username, passwd); - connection.clientSetname("authWithReconnect"); - connection.subscribe(channel).get(); - - assertThat(channels.take()).isEqualTo(channel); - - long id = findNamedClient("authWithReconnect"); - redis.auth(username, passwd); - redis.clientKill(KillArgs.Builder.id(id)); - - Delay.delay(Duration.ofMillis(100)); - Wait.untilTrue(connection::isOpen).waitOrTimeout(); - - assertThat(channels.take()).isEqualTo(channel); + client.setOptions(RESP2_DO_NO_PING_BEFORE_ACTIVATE); + try (StatefulRedisPubSubConnection connection = client.connectPubSub()) { + RedisPubSubAsyncCommands pubSubAsyncCommands = connection.async(); + connection.addListener(listener); + pubSubAsyncCommands.auth(username, passwd); + pubSubAsyncCommands.clientSetname("authWithReconnect"); + pubSubAsyncCommands.subscribe(channel).get(); + assertThat(channels.take()).isEqualTo(channel); + + // kill using shared client to trigger reconnect + long id = findNamedClient("authWithReconnect"); + redis.auth(username, passwd); + redis.clientKill(KillArgs.Builder.id(id)); + + Delay.delay(Duration.ofMillis(100)); + Wait.untilTrue(connection::isOpen).waitOrTimeout(); + assertThat(channels.take()).isEqualTo(channel); + } }); } @@ -234,7 +233,7 @@ void message() throws Exception { @Test @EnabledOnCommand("SSUBSCRIBE") - void messageToShardChannel() throws Exception { + void messageToShardChannel() { pubsub.ssubscribe(shardChannel); Wait.untilEquals(shardChannel, shardChannels::poll).waitOrTimeout(); @@ -245,14 +244,15 @@ void messageToShardChannel() throws Exception { @Test @EnabledOnCommand("SSUBSCRIBE") - void messageToShardChannelViaNewClient() throws Exception { + void messageToShardChannelViaNewClient() { pubsub.ssubscribe(shardChannel); Wait.untilEquals(shardChannel, shardChannels::poll).waitOrTimeout(); - - RedisPubSubAsyncCommands redis = DefaultRedisClient.get().connectPubSub().async(); - redis.spublish(shardChannel, shardMessage); - Wait.untilEquals(shardMessage, messages::poll).waitOrTimeout(); - Wait.untilEquals(shardChannel, shardChannels::poll).waitOrTimeout(); + try (StatefulRedisPubSubConnection connection = DefaultRedisClient.get().connectPubSub()) { + RedisPubSubAsyncCommands pubSubAsyncCommands = connection.async(); + pubSubAsyncCommands.spublish(shardChannel, shardMessage); + Wait.untilEquals(shardMessage, messages::poll).waitOrTimeout(); + Wait.untilEquals(shardChannel, shardChannels::poll).waitOrTimeout(); + } } @Test @@ -263,7 +263,7 @@ void messageAsPushMessage() throws Exception { assertThat(counts.take()).isNotNull(); AtomicReference messageRef = new AtomicReference<>(); - pubsub.getStatefulConnection().addListener(messageRef::set); + pubSubConnection.addListener(messageRef::set); redis.publish(channel, message); assertThat(messages.take()).isEqualTo(message); @@ -280,19 +280,16 @@ void messageAsPushMessage() throws Exception { void pipelinedMessage() throws Exception { pubsub.subscribe(channel); assertThat(channels.take()).isEqualTo(channel); - RedisAsyncCommands connection = client.connect().async(); - - connection.setAutoFlushCommands(false); - connection.publish(channel, message); - Delay.delay(Duration.ofMillis(100)); - - assertThat(channels).isEmpty(); - connection.flushCommands(); - - assertThat(channels.take()).isEqualTo(channel); - assertThat(messages.take()).isEqualTo(message); - - connection.getStatefulConnection().close(); + try (StatefulRedisConnection cnxn = client.connect()) { + RedisAsyncCommands connection = cnxn.async(); + cnxn.setAutoFlushCommands(false); + connection.publish(channel, message); + Delay.delay(Duration.ofMillis(100)); + assertThat(channels).isEmpty(); + cnxn.flushCommands(); + assertThat(channels.take()).isEqualTo(channel); + assertThat(messages.take()).isEqualTo(message); + } } @Test @@ -314,11 +311,11 @@ void pmessage() throws Exception { @Test void pipelinedSubscribe() throws Exception { - pubsub.setAutoFlushCommands(false); + pubSubConnection.setAutoFlushCommands(false); pubsub.subscribe(channel); Delay.delay(Duration.ofMillis(100)); assertThat(channels).isEmpty(); - pubsub.flushCommands(); + pubSubConnection.flushCommands(); assertThat(channels.take()).isEqualTo(channel); @@ -467,13 +464,10 @@ void unsubscribe() throws Exception { @Test void pubsubCloseOnClientShutdown() { - - RedisClient redisClient = RedisClient.create(TestClientResources.get(), RedisURI.Builder.redis(host, port).build()); - - RedisPubSubAsyncCommands connection = redisClient.connectPubSub().async(); - + final RedisURI uri = RedisURI.Builder.redis(host, port).build(); + final RedisClient redisClient = RedisClient.create(TestClientResources.get(), uri); + final StatefulRedisPubSubConnection connection = redisClient.connectPubSub(); FastShutdown.shutdown(redisClient); - assertThat(connection.isOpen()).isFalse(); } @@ -501,7 +495,7 @@ void resubscribeChannelsOnReconnect() throws Exception { assertThat(channels.take()).isEqualTo(channel); assertThat((long) counts.take()).isEqualTo(1); - Wait.untilTrue(pubsub::isOpen).waitOrTimeout(); + Wait.untilTrue(pubSubConnection::isOpen).waitOrTimeout(); redis.publish(channel, message); assertThat(channels.take()).isEqualTo(channel); @@ -519,7 +513,7 @@ void resubscribePatternsOnReconnect() throws Exception { assertThat(patterns.take()).isEqualTo(pattern); assertThat((long) counts.take()).isEqualTo(1); - Wait.untilTrue(pubsub::isOpen).waitOrTimeout(); + Wait.untilTrue(pubSubConnection::isOpen).waitOrTimeout(); redis.publish(channel, message); assertThat(channels.take()).isEqualTo(channel); @@ -537,7 +531,7 @@ void resubscribeShardChannelsOnReconnect() throws Exception { assertThat(shardChannels.take()).isEqualTo(shardChannel); assertThat((long) shardCounts.take()).isEqualTo(1); - Wait.untilTrue(pubsub::isOpen).waitOrTimeout(); + Wait.untilTrue(pubSubConnection::isOpen).waitOrTimeout(); redis.spublish(shardChannel, shardMessage); assertThat(shardChannels.take()).isEqualTo(shardChannel); @@ -573,7 +567,7 @@ public void unsubscribed(String channel, long count) { }; - pubsub.getStatefulConnection().addListener(adapter); + pubSubConnection.addListener(adapter); pubsub.subscribe(channel); pubsub.psubscribe(pattern); @@ -595,7 +589,7 @@ void removeListener() throws Exception { assertThat(channels.take()).isEqualTo(channel); assertThat(messages.take()).isEqualTo(message); - pubsub.getStatefulConnection().removeListener(listener); + pubSubConnection.removeListener(listener); redis.publish(channel, message); assertThat(channels.poll(10, TimeUnit.MILLISECONDS)).isNull(); diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubReactiveIntegrationTests.java b/src/test/java/io/lettuce/core/pubsub/PubSubReactiveIntegrationTests.java index 6f3250db83..138d097889 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubReactiveIntegrationTests.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubReactiveIntegrationTests.java @@ -44,7 +44,6 @@ import io.lettuce.core.pubsub.api.reactive.ChannelMessage; import io.lettuce.core.pubsub.api.reactive.PatternMessage; import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands; -import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; import io.lettuce.test.Delay; import io.lettuce.test.Wait; import io.lettuce.test.condition.EnabledOnCommand; @@ -54,6 +53,7 @@ /** * @author Mark Paluch * @author Ali Takavci + * @author Hari Mani */ @Tag(INTEGRATION_TEST) class PubSubReactiveIntegrationTests extends AbstractRedisClientTest implements RedisPubSubListener { @@ -62,6 +62,10 @@ class PubSubReactiveIntegrationTests extends AbstractRedisClientTest implements private RedisPubSubReactiveCommands pubsub2; + private StatefulRedisPubSubConnection pubSubConnection; + + private StatefulRedisPubSubConnection pubSubConnection2; + private BlockingQueue channels; private BlockingQueue shardChannels; @@ -72,20 +76,21 @@ class PubSubReactiveIntegrationTests extends AbstractRedisClientTest implements private BlockingQueue counts; - private String shardChannel = "shard-channel"; + private final String shardChannel = "shard-channel"; - private String channel = "channel0"; + private final String channel = "channel0"; - private String pattern = "channel*"; + private final String pattern = "channel*"; - private String message = "msg!"; + private final String message = "msg!"; @BeforeEach void openPubSubConnection() { - - pubsub = client.connectPubSub().reactive(); - pubsub2 = client.connectPubSub().reactive(); - pubsub.getStatefulConnection().addListener(this); + pubSubConnection = client.connectPubSub(); + pubSubConnection2 = client.connectPubSub(); + pubsub = pubSubConnection.reactive(); + pubsub2 = pubSubConnection2.reactive(); + pubSubConnection.addListener(this); channels = LettuceFactories.newBlockingQueue(); shardChannels = LettuceFactories.newBlockingQueue(); patterns = LettuceFactories.newBlockingQueue(); @@ -95,8 +100,8 @@ void openPubSubConnection() { @AfterEach void closePubSubConnection() { - pubsub.getStatefulConnection().close(); - pubsub2.getStatefulConnection().close(); + pubSubConnection.close(); + pubSubConnection2.close(); } @Test @@ -134,8 +139,8 @@ void observeChannelsUnsubscribe() { pubsub.observeChannels().doOnNext(channelMessages::add).subscribe().dispose(); - block(redis.getStatefulConnection().reactive().publish(channel, message)); - block(redis.getStatefulConnection().reactive().publish(channel, message)); + block(statefulRedisConnection.reactive().publish(channel, message)); + block(statefulRedisConnection.reactive().publish(channel, message)); Delay.delay(Duration.ofMillis(500)); assertThat(channelMessages).isEmpty(); @@ -252,9 +257,9 @@ void pubsubMultipleChannels() { void pubsubChannelsWithArg() { StepVerifier.create(pubsub.subscribe(channel)).verifyComplete(); - Wait.untilTrue(() -> mono(pubsub2.pubsubChannels(pattern).filter(s -> channel.equals(s))) != null).waitOrTimeout(); + Wait.untilTrue(() -> mono(pubsub2.pubsubChannels(pattern).filter(channel::equals)) != null).waitOrTimeout(); - String result = mono(pubsub2.pubsubChannels(pattern).filter(s -> channel.equals(s))); + String result = mono(pubsub2.pubsubChannels(pattern).filter(channel::equals)); assertThat(result).isEqualToIgnoringCase(channel); } @@ -292,10 +297,10 @@ void pubsubShardMultipleChannels() { void pubsubShardChannelsWithArg() { StepVerifier.create(pubsub.ssubscribe(shardChannel)).verifyComplete(); - Wait.untilTrue(() -> mono(pubsub2.pubsubShardChannels(shardChannel).filter(s -> shardChannel.equals(s))) != null) + Wait.untilTrue(() -> mono(pubsub2.pubsubShardChannels(shardChannel).filter(shardChannel::equals)) != null) .waitOrTimeout(); - String result = mono(pubsub2.pubsubShardChannels(shardChannel).filter(s -> shardChannel.equals(s))); + String result = mono(pubsub2.pubsubShardChannels(shardChannel).filter(shardChannel::equals)); assertThat(result).isEqualToIgnoringCase(shardChannel); } @@ -369,12 +374,10 @@ void sunsubscribe() throws Exception { @Test void pubsubCloseOnClientShutdown() { - - RedisClient redisClient = RedisClient.create(TestClientResources.get(), RedisURI.Builder.redis(host, port).build()); - - RedisPubSubCommands connection = redisClient.connectPubSub().sync(); + final RedisURI uri = RedisURI.Builder.redis(host, port).build(); + RedisClient redisClient = RedisClient.create(TestClientResources.get(), uri); + final StatefulRedisPubSubConnection connection = redisClient.connectPubSub(); FastShutdown.shutdown(redisClient); - assertThat(connection.isOpen()).isFalse(); } @@ -403,7 +406,7 @@ void resubscribeChannelsOnReconnect() throws Exception { assertThat(channels.take()).isEqualTo(channel); assertThat((long) counts.take()).isEqualTo(1); - Wait.untilTrue(pubsub::isOpen).waitOrTimeout(); + Wait.untilTrue(pubSubConnection::isOpen).waitOrTimeout(); redis.publish(channel, message); assertThat(channels.take()).isEqualTo(channel); @@ -422,7 +425,7 @@ void resubscribePatternsOnReconnect() throws Exception { assertThat(patterns.take()).isEqualTo(pattern); assertThat((long) counts.take()).isEqualTo(1); - Wait.untilTrue(pubsub::isOpen).waitOrTimeout(); + Wait.untilTrue(pubSubConnection::isOpen).waitOrTimeout(); StepVerifier.create(pubsub2.publish(channel, message)).expectNextCount(1).verifyComplete(); assertThat(channels.take()).isEqualTo(channel); @@ -450,7 +453,7 @@ public void unsubscribed(String channel, long count) { }; - pubsub.getStatefulConnection().addListener(adapter); + pubSubConnection.addListener(adapter); StepVerifier.create(pubsub.subscribe(channel)).verifyComplete(); StepVerifier.create(pubsub.psubscribe(pattern)).verifyComplete(); @@ -473,7 +476,7 @@ void removeListener() throws Exception { assertThat(channels.take()).isEqualTo(channel); assertThat(messages.take()).isEqualTo(message); - pubsub.getStatefulConnection().removeListener(this); + pubSubConnection.removeListener(this); StepVerifier.create(pubsub2.publish(channel, message)).expectNextCount(1).verifyComplete(); assertThat(channels.poll(10, TimeUnit.MILLISECONDS)).isNull(); diff --git a/src/test/java/io/lettuce/core/reliability/AtMostOnceIntegrationTests.java b/src/test/java/io/lettuce/core/reliability/AtMostOnceIntegrationTests.java index 117dc24cea..f2874d5c6f 100644 --- a/src/test/java/io/lettuce/core/reliability/AtMostOnceIntegrationTests.java +++ b/src/test/java/io/lettuce/core/reliability/AtMostOnceIntegrationTests.java @@ -66,10 +66,11 @@ public AtMostOnceIntegrationTests() { @BeforeEach void before() { - RedisCommands connection = client.connect().sync(); - connection.flushall(); - connection.flushdb(); - connection.getStatefulConnection().close(); + try (StatefulRedisConnection connection = client.connect()) { + RedisCommands command = connection.sync(); + command.flushall(); + command.flushdb(); + } } @AfterEach @@ -99,13 +100,12 @@ void noReconnectHandler() { @Test void basicOperations() { + try (StatefulRedisConnection connection = client.connect()) { + RedisCommands command = connection.sync(); - RedisCommands connection = client.connect().sync(); - - connection.set(key, "1"); - assertThat(connection.get("key")).isEqualTo("1"); - - connection.getStatefulConnection().close(); + command.set(key, "1"); + assertThat(command.get("key")).isEqualTo("1"); + } } @Test @@ -294,13 +294,10 @@ void noCommandsExecutedAfterConnectionIsDisconnected() { @Test void commandsCancelledOnDisconnect() { - - StatefulRedisConnection connection = client.connect(); - - try { + try (StatefulRedisConnection connection = client.connect()) { RedisAsyncCommands async = connection.async(); - async.setAutoFlushCommands(false); + connection.setAutoFlushCommands(false); async.quit(); RedisFuture incr = async.incr(key); @@ -312,8 +309,6 @@ void commandsCancelledOnDisconnect() { } catch (Exception e) { assertThat(e).hasRootCauseInstanceOf(RedisException.class).hasMessageContaining("Connection disconnected"); } - - connection.close(); } private Throwable getException(RedisFuture command) { diff --git a/src/test/java/io/lettuce/core/sentinel/SentinelConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/sentinel/SentinelConnectionIntegrationTests.java index c1deefa942..313568b067 100644 --- a/src/test/java/io/lettuce/core/sentinel/SentinelConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/sentinel/SentinelConnectionIntegrationTests.java @@ -114,29 +114,6 @@ void testSyncAsyncConversion() { assertThat(statefulConnection.sync().getStatefulConnection().sync()).isSameAs(statefulConnection.sync()); } - @Test - void testSyncClose() { - - StatefulRedisSentinelConnection statefulConnection = sentinel.getStatefulConnection(); - statefulConnection.sync().getStatefulConnection().close(); - - Wait.untilTrue(() -> !sentinel.isOpen()).waitOrTimeout(); - - assertThat(sentinel.isOpen()).isFalse(); - assertThat(statefulConnection.isOpen()).isFalse(); - } - - @Test - void testAsyncClose() { - StatefulRedisSentinelConnection statefulConnection = sentinel.getStatefulConnection(); - statefulConnection.async().getStatefulConnection().close(); - - Wait.untilTrue(() -> !sentinel.isOpen()).waitOrTimeout(); - - assertThat(sentinel.isOpen()).isFalse(); - assertThat(statefulConnection.isOpen()).isFalse(); - } - @Test void connectToOneNode() { RedisSentinelCommands connection = redisClient.connectSentinel(SentinelTestSettings.SENTINEL_URI)