diff --git a/module/database/database-lettuce-redis/build.gradle.kts b/module/database/database-lettuce-redis/build.gradle.kts index ea877dd85..8d35c1a79 100644 --- a/module/database/database-lettuce-redis/build.gradle.kts +++ b/module/database/database-lettuce-redis/build.gradle.kts @@ -4,8 +4,10 @@ import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar // LettuceGithub: https://github.com/lettuce-io/lettuce-core dependencies { - compileOnly("io.lettuce:lettuce-core:6.6.0.RELEASE") + // 使用 api 传递依赖 + api("io.lettuce:lettuce-core:7.2.1.RELEASE") compileOnly("org.apache.commons:commons-pool2:2.12.1") + compileOnly(project(":common")) compileOnly(project(":common-env")) compileOnly(project(":common-util")) @@ -15,9 +17,11 @@ dependencies { tasks { withType { - relocate("org.reactivestreams.", "org.reactivestreams_1_0_4.") - relocate("reactor.", "reactor_3_6_6.") + relocate("io.netty.", "io.netty_4_2_5_final.") relocate("org.apache.commons.pool2.", "org.apache.commons.pool2_2_12_1.") - relocate("io.netty.", "io.netty._4_1_107_final.") + relocate("reactor.", "reactor_3_6_6.") + relocate("org.reactivestreams.", "org.reactivestreams_1_0_4.") + relocate("org.slf4j.", "org.slf4j_1_7_36.") + relocate("redis.clients.authentication.", "redis.clients.authentication_0_1_1_beta2.") } } \ No newline at end of file diff --git a/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/LettuceClusterRedisClient.kt b/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/LettuceClusterRedisClient.kt index e353165a6..4f2707f23 100644 --- a/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/LettuceClusterRedisClient.kt +++ b/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/LettuceClusterRedisClient.kt @@ -39,6 +39,7 @@ class LettuceClusterRedisClient(val redisConfig: LettuceRedisConfig): IRedisClie lateinit var pubSubConnection: StatefulRedisClusterPubSubConnection lateinit var resources: DefaultClientResources + @OptIn(ExperimentalStdlibApi::class) override fun start(autoRelease: Boolean): CompletableFuture { val completableFuture = CompletableFuture() val resource = DefaultClientResources.builder() @@ -63,11 +64,25 @@ class LettuceClusterRedisClient(val redisConfig: LettuceRedisConfig): IRedisClie val topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() .enablePeriodicRefresh(cluster.enablePeriodicRefresh) - .enableAdaptiveRefreshTrigger(*cluster.enableAdaptiveRefreshTrigger.toTypedArray()) .refreshTriggersReconnectAttempts(cluster.refreshTriggersReconnectAttempts) .dynamicRefreshSources(cluster.dynamicRefreshSources) .closeStaleConnections(cluster.closeStaleConnections) + // Lettuce 7.0+ 默认启用所有自适应触发器,需要禁用未配置的触发器 + val configuredTriggers = cluster.enableAdaptiveRefreshTrigger.toSet() + if (configuredTriggers.isEmpty()) { + // 如果未配置任何触发器,禁用所有 + topologyRefreshOptions.disableAllAdaptiveRefreshTriggers() + } else { + // 禁用未配置的触发器 + val triggersToDisable = ClusterTopologyRefreshOptions.RefreshTrigger.entries + .filter { it !in configuredTriggers } + .toTypedArray() + if (triggersToDisable.isNotEmpty()) { + topologyRefreshOptions.disableAdaptiveRefreshTrigger(*triggersToDisable) + } + } + cluster.adaptiveRefreshTriggersTimeout?.toJavaDuration()?.let { topologyRefreshOptions.adaptiveRefreshTriggersTimeout(it) } cluster.refreshPeriod?.toJavaDuration()?.let { topologyRefreshOptions.refreshPeriod(it) } clientOptions @@ -112,6 +127,89 @@ class LettuceClusterRedisClient(val redisConfig: LettuceRedisConfig): IRedisClie return completableFuture } + @OptIn(ExperimentalStdlibApi::class) + override fun startSync(autoRelease: Boolean) { + val resource = DefaultClientResources.builder() + + if (redisConfig.ioThreadPoolSize != 0) { + resource.ioThreadPoolSize(redisConfig.ioThreadPoolSize) + } + if (redisConfig.computationThreadPoolSize != 0) { + resource.computationThreadPoolSize(redisConfig.computationThreadPoolSize) + } + + val cluster = redisConfig.cluster + + val uris = cluster.nodes.map { + it.redisURIBuilder().build() + } + val clientOptions = ClusterClientOptions.builder() + + if (redisConfig.ssl) { + clientOptions.sslOptions(redisConfig.sslOptions) + } + + val topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() + .enablePeriodicRefresh(cluster.enablePeriodicRefresh) + .refreshTriggersReconnectAttempts(cluster.refreshTriggersReconnectAttempts) + .dynamicRefreshSources(cluster.dynamicRefreshSources) + .closeStaleConnections(cluster.closeStaleConnections) + + // Lettuce 7.0+ 默认启用所有自适应触发器,需要禁用未配置的触发器 + val configuredTriggers = cluster.enableAdaptiveRefreshTrigger.toSet() + if (configuredTriggers.isEmpty()) { + // 如果未配置任何触发器,禁用所有 + topologyRefreshOptions.disableAllAdaptiveRefreshTriggers() + } else { + // 禁用未配置的触发器 + val triggersToDisable = ClusterTopologyRefreshOptions.RefreshTrigger.entries + .filter { it !in configuredTriggers } + .toTypedArray() + if (triggersToDisable.isNotEmpty()) { + topologyRefreshOptions.disableAdaptiveRefreshTrigger(*triggersToDisable) + } + } + + cluster.adaptiveRefreshTriggersTimeout?.toJavaDuration()?.let { topologyRefreshOptions.adaptiveRefreshTriggersTimeout(it) } + cluster.refreshPeriod?.toJavaDuration()?.let { topologyRefreshOptions.refreshPeriod(it) } + clientOptions + .topologyRefreshOptions(topologyRefreshOptions.build()) + .autoReconnect(redisConfig.autoReconnect) + .maxRedirects(cluster.maxRedirects) + .validateClusterNodeMembership(cluster.validateClusterNodeMembership) + .pingBeforeActivateConnection(redisConfig.pingBeforeActivateConnection) + + resources = resource.build() + client = RedisClusterClient.create(resources, uris) + client.setOptions(clientOptions.build()) + + // 连接 pub/sub 通道 + pubSubConnection = client.connectPubSub() + // 连接同步 + pool = ConnectionPoolSupport.createGenericObjectPool( + { client.connect().apply { + if (redisConfig.enableSlaves) { + val slaves = redisConfig.slaves + readFrom = slaves.readFrom + } + } }, + redisConfig.pool.clusterPoolConfig() + ) + // 连接异步(同步方式创建) + asyncPool = AsyncConnectionPoolSupport.createBoundedObjectPool( + { client.connectAsync(StringCodec.UTF8).whenComplete { v, _ -> + if (redisConfig.enableSlaves) { + val slaves = redisConfig.slaves + v.readFrom = slaves.readFrom + } + } }, + redisConfig.asyncPool.poolConfig() + ) + if (autoRelease) { + LettuceRedis.clusterClients += this + } + } + override fun stop() { pubSubConnection.close() asyncPool.close() diff --git a/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/LettuceRedis.kt b/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/LettuceRedis.kt index 7e895a38c..dcb3768a7 100644 --- a/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/LettuceRedis.kt +++ b/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/LettuceRedis.kt @@ -9,72 +9,99 @@ import taboolib.common.platform.Awake @Inject @RuntimeDependencies( RuntimeDependency( - "!io.lettuce:lettuce-core:6.6.0.RELEASE", + "!io.lettuce:lettuce-core:7.2.1.RELEASE", test = "!io.lettuce.core.RedisURI", - relocate = ["!io.netty", "!io.netty_4_1_118_final", + relocate = ["!io.netty", "!io.netty_4_2_5_final", "!org.apache.commons.pool2", "!org.apache.commons.pool2_2_12_1", "!reactor", "!reactor_3_6_6", - "!org.reactivestreams", "!org.reactivestreams_1_0_4"], + "!org.reactivestreams", "!org.reactivestreams_1_0_4", + "!org.slf4j", "!org.slf4j_1_7_36", + "!redis.clients.authentication", "!redis.clients.authentication_0_1_1_beta2"], transitive = false ), RuntimeDependency( - "!org.reactivestreams:reactive-streams:1.0.4", - test = "!org.reactivestreams_1_0_4.Publisher", - relocate = ["!org.reactivestreams", "!org.reactivestreams_1_0_4"], + "!org.apache.commons:commons-pool2:2.12.1", + test = "!org.apache.commons.pool2_2_12_1.BaseObject", + relocate = ["!org.apache.commons.pool2", "!org.apache.commons.pool2_2_12_1"], transitive = false ), RuntimeDependency( - "!io.projectreactor:reactor-core:3.6.6", - test = "!reactor_3_6_6.core.CorePublisher", - relocate = ["!reactor", "!reactor_3_6_6", "!org.reactivestreams", "!org.reactivestreams_1_0_4"], + value = "!io.netty:netty-common:4.2.5.Final", + test = "!io.netty_4_2_5_final.util.AbstractConstant", + relocate = ["!io.netty", "!io.netty_4_2_5_final"], transitive = false ), RuntimeDependency( - "!org.apache.commons:commons-pool2:2.12.1", - test = "!org.apache.commons.pool2_2_12_1.BaseObject", - relocate = ["!org.apache.commons.pool2", "!org.apache.commons.pool2_2_12_1"], + value = "!io.netty:netty-handler:4.2.5.Final", + test = "!io.netty_4_2_5_final.handler.ssl.SslHandler", + relocate = ["!io.netty", "!io.netty_4_2_5_final"], + transitive = false + ), + RuntimeDependency( + value = "!io.netty:netty-resolver-dns:4.2.5.Final", + test = "!io.netty_4_2_5_final.resolver.dns.DnsNameResolver", + relocate = ["!io.netty", "!io.netty_4_2_5_final"], transitive = false ), RuntimeDependency( - value = "!io.netty:netty-common:4.1.118.Final", - test = "!io.netty_4_1_118_final.util.AbstractConstant", - relocate = ["!io.netty", "!io.netty_4_1_118_final"], + value = "!io.netty:netty-transport:4.2.5.Final", + test = "!io.netty_4_2_5_final.channel.Channel", + relocate = ["!io.netty", "!io.netty_4_2_5_final"], transitive = false ), RuntimeDependency( - value = "!io.netty:netty-buffer:4.1.118.Final", - test = "!io.netty_4_1_118_final.buffer.AbstractByteBuf", - relocate = ["!io.netty", "!io.netty_4_1_118_final"], + value = "!io.netty:netty-buffer:4.2.5.Final", + test = "!io.netty_4_2_5_final.buffer.ByteBuf", + relocate = ["!io.netty", "!io.netty_4_2_5_final"], transitive = false ), RuntimeDependency( - value = "!io.netty:netty-codec:4.1.118.Final", - test = "!io.netty_4_1_118_final.handler.codec.AsciiHeadersEncoder", - relocate = ["!io.netty", "!io.netty_4_1_118_final"], + value = "!io.netty:netty-codec-base:4.2.5.Final", + test = "!io.netty_4_2_5_final.handler.codec.ByteToMessageDecoder", + relocate = ["!io.netty", "!io.netty_4_2_5_final"], transitive = false ), RuntimeDependency( - value = "!io.netty:netty-handler:4.1.118.Final", - test = "!io.netty_4_1_118_final.handler.address.ResolveAddressHandler", - relocate = ["!io.netty", "!io.netty_4_1_118_final"], + value = "!io.netty:netty-resolver:4.2.5.Final", + test = "!io.netty_4_2_5_final.resolver.AddressResolver", + relocate = ["!io.netty", "!io.netty_4_2_5_final"], transitive = false ), RuntimeDependency( - value = "!io.netty:netty-resolver:4.1.118.Final", - test = "!io.netty_4_1_118_final.resolver.AbstractAddressResolver", - relocate = ["!io.netty", "!io.netty_4_1_118_final"], + value = "!io.netty:netty-transport-native-unix-common:4.2.5.Final", + test = "!io.netty_4_2_5_final.channel.unix.UnixChannel", + relocate = ["!io.netty", "!io.netty_4_2_5_final"], transitive = false ), RuntimeDependency( - value = "!io.netty:netty-transport:4.1.118.Final", - test = "!io.netty_4_1_118_final.bootstrap.Bootstrap", - relocate = ["!io.netty", "!io.netty_4_1_118_final"], + value = "!io.netty:netty-codec-dns:4.2.5.Final", + test = "!io.netty_4_2_5_final.handler.codec.dns.DnsRecord", + relocate = ["!io.netty", "!io.netty_4_2_5_final"], + transitive = false + ), + RuntimeDependency( + value = "!org.reactivestreams:reactive-streams:1.0.4", + test = "!org.reactivestreams_1_0_4.Publisher", + relocate = ["!org.reactivestreams", "!org.reactivestreams_1_0_4"], + transitive = false + ), + RuntimeDependency( + value = "!org.slf4j:slf4j-api:1.7.36", + test = "!org.slf4j_1_7_36.Logger", + relocate = ["!org.slf4j", "!org.slf4j_1_7_36"], + transitive = false + ), + RuntimeDependency( + value = "!io.projectreactor:reactor-core:3.6.6", + test = "!reactor_3_6_6.core.publisher.Flux", + relocate = ["!reactor", "!reactor_3_6_6", + "!org.reactivestreams", "!org.reactivestreams_1_0_4"], transitive = false ), RuntimeDependency( - value = "!io.netty:netty-transport-native-unix-common:4.1.118.Final", - test = "!io.netty_4_1_118_final.channel.unix.Buffer", - relocate = ["!io.netty", "!io.netty_4_1_118_final"], + value = "!redis.clients.authentication:redis-authx-core:0.1.1-beta2", + test = "!redis.clients.authentication_0_1_1_beta2.core.TokenManager", + relocate = ["!redis.clients.authentication", "!redis.clients.authentication_0_1_1_beta2"], transitive = false ) ) diff --git a/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/LettuceRedisClient.kt b/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/LettuceRedisClient.kt index 2c392ecc3..0cad9bd92 100644 --- a/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/LettuceRedisClient.kt +++ b/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/LettuceRedisClient.kt @@ -112,6 +112,67 @@ class LettuceRedisClient(val redisConfig: LettuceRedisConfig): IRedisClient, IRe return completableFuture } + override fun startSync(autoRelease: Boolean) { + val resource = DefaultClientResources.builder() + + if (redisConfig.ioThreadPoolSize != 0) { + resource.ioThreadPoolSize(redisConfig.ioThreadPoolSize) + } + if (redisConfig.computationThreadPoolSize != 0) { + resource.computationThreadPoolSize(redisConfig.computationThreadPoolSize) + } + + val clientOptions = ClientOptions.builder() + .autoReconnect(redisConfig.autoReconnect) + .pingBeforeActivateConnection(redisConfig.pingBeforeActivateConnection) + + if (redisConfig.ssl) { + clientOptions.sslOptions(redisConfig.sslOptions) + } + val uri = redisConfig.redisURIBuilder().build() + + resources = resource.build() + client = RedisClient.create(resources, uri).apply { + options = clientOptions.build() + } + // 连接 pub/sub 通道 + pubSubConnection = client.connectPubSub() + + if (redisConfig.enableSlaves) { + enabledSlaves = true + val slaves = redisConfig.slaves + + // 连接同步 + masterReplicaPool = ConnectionPoolSupport.createGenericObjectPool( + { MasterReplica.connect(client, StringCodec.UTF8, uri).apply { + readFrom = slaves.readFrom + } }, + redisConfig.pool.slavesPoolConfig() + ) + // 连接异步(同步方式创建) + masterAsyncReplicaPool = AsyncConnectionPoolSupport.createBoundedObjectPool( + { MasterReplica.connectAsync(client, StringCodec.UTF8, uri).whenComplete { v, _ -> + v.readFrom = slaves.readFrom + } }, + redisConfig.asyncPool.poolConfig() + ) + } else { + // 连接同步 + pool = ConnectionPoolSupport.createGenericObjectPool( + { client.connect() }, + redisConfig.pool.poolConfig() + ) + // 连接异步(同步方式创建) + asyncPool = AsyncConnectionPoolSupport.createBoundedObjectPool( + { client.connectAsync(StringCodec.UTF8, uri) }, + redisConfig.asyncPool.poolConfig() + ) + } + if (autoRelease) { + LettuceRedis.clients += this + } + } + override fun stop() { pubSubConnection.close() if (enabledSlaves) { diff --git a/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/lettuce/IRedisClient.kt b/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/lettuce/IRedisClient.kt index 77d2569c5..f352399f0 100644 --- a/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/lettuce/IRedisClient.kt +++ b/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/lettuce/IRedisClient.kt @@ -5,11 +5,17 @@ import java.util.concurrent.CompletableFuture interface IRedisClient { /** - * 启动 Redis 客户端 + * 启动 Redis 客户端(异步) * @param autoRelease 关服是否自动释放 * */ fun start(autoRelease: Boolean = true): CompletableFuture + /** + * 启动 Redis 客户端(同步) + * @param autoRelease 关服是否自动释放 + * */ + fun startSync(autoRelease: Boolean = true) + /** * 结束 Redis 客户端 * diff --git a/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/util/Creator.kt b/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/util/Creator.kt index 8e7be6fa3..c3be75afe 100644 --- a/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/util/Creator.kt +++ b/module/database/database-lettuce-redis/src/main/kotlin/taboolib/expansion/util/Creator.kt @@ -16,8 +16,6 @@ fun createLettuceRedisConfig(configurationSection: ConfigurationSection) = Lettu * 根据 [LettuceRedisConfig] 创建一个 集群 Redis 客户端 * * 需要使用 ```IRedisClient.start()``` 方法启动 - * - * 记得导入 ```compileOnly("io.lettuce:lettuce-core:6.6.0.RELEASE")``` * */ fun LettuceRedisConfig.createClusterClient(): LettuceClusterRedisClient = LettuceClusterRedisClient(this) @@ -25,7 +23,5 @@ fun LettuceRedisConfig.createClusterClient(): LettuceClusterRedisClient = Lettuc * 根据 [LettuceRedisConfig] 创建一个 Redis 客户端 * * 需要使用 ```IRedisClient.start()``` 方法启动 - * - * 记得导入 ```compileOnly("io.lettuce:lettuce-core:6.6.0.RELEASE")``` * */ fun LettuceRedisConfig.createClient(): LettuceRedisClient = LettuceRedisClient(this) \ No newline at end of file