Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,8 @@
<include>**/DriverInfo.java</include>
<include>**/ClientSetInfo*.java</include>
<include>**/ClientCommandsTest*.java</include>
<include>**/Delay*.java</include>
<include>**/SentineledConnectionProviderReconnectionTest.java</include>
</includes>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package redis.clients.jedis.builders;

import java.time.Duration;
import java.util.Set;
import redis.clients.jedis.*;
import redis.clients.jedis.providers.ConnectionProvider;
import redis.clients.jedis.providers.SentineledConnectionProvider;
import redis.clients.jedis.util.Delay;
import redis.clients.jedis.util.JedisAsserts;

/**
* Builder for creating JedisSentineled instances (Redis Sentinel connections).
Expand All @@ -16,11 +19,16 @@
public abstract class SentinelClientBuilder<C>
extends AbstractClientBuilder<SentinelClientBuilder<C>, C> {

private static final Delay DEFAULT_RESUBSCRIBE_DELAY = Delay.constant(Duration.ofMillis(5000));

// Sentinel-specific configuration fields
private String masterName = null;
private Set<HostAndPort> sentinels = null;
private JedisClientConfig sentinelClientConfig = null;

// delay between re-subscribing to sentinel nodes after a disconnection
private Delay sentinellReconnectDelay = DEFAULT_RESUBSCRIBE_DELAY;
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable name "sentinellReconnectDelay" contains a typo with double 'l'. It should be "sentinelReconnectDelay" (single 'l') to match the method name "sentinelReconnectDelay" and standard spelling.

Suggested change
private Delay sentinellReconnectDelay = DEFAULT_RESUBSCRIBE_DELAY;
private Delay sentinelReconnectDelay = DEFAULT_RESUBSCRIBE_DELAY;

Copilot uses AI. Check for mistakes.

/**
* Sets the master name for the Redis Sentinel configuration.
* <p>
Expand Down Expand Up @@ -60,6 +68,21 @@ public SentinelClientBuilder<C> sentinelClientConfig(JedisClientConfig sentinelC
return this;
}

/**
* Sets the delay between re-subscribing to sentinel node after a disconnection.*
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JavaDoc has an extra asterisk at the end of the sentence. The line should end with a period, not ".*"

Suggested change
* Sets the delay between re-subscribing to sentinel node after a disconnection.*
* Sets the delay between re-subscribing to sentinel node after a disconnection.

Copilot uses AI. Check for mistakes.
* <p>
* In case connection to sentinel nodes is lost, the client will try to reconnect to them. This
* method sets the delay between re-subscribing to sentinel nodes after a disconnection.
* </p>
* @param reconnectDelay
* @return
*/
public SentinelClientBuilder<C> sentinelReconnectDelay(Delay reconnectDelay) {
JedisAsserts.notNull(reconnectDelay, "reconnectDelay must not be null");
this.sentinellReconnectDelay = reconnectDelay;
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable name "sentinellReconnectDelay" contains a typo with double 'l'. It should be "sentinelReconnectDelay" (single 'l') to match the method name "sentinelReconnectDelay" and standard spelling.

Copilot uses AI. Check for mistakes.
return this;
}

@Override
protected SentinelClientBuilder<C> self() {
return this;
Expand All @@ -68,7 +91,7 @@ protected SentinelClientBuilder<C> self() {
@Override
protected ConnectionProvider createDefaultConnectionProvider() {
return new SentineledConnectionProvider(this.masterName, this.clientConfig, this.cache,
this.poolConfig, this.sentinels, this.sentinelClientConfig);
this.poolConfig, this.sentinels, this.sentinelClientConfig, sentinellReconnectDelay);
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable name "sentinellReconnectDelay" contains a typo with double 'l'. It should be "sentinelReconnectDelay" (single 'l') to match the method name "sentinelReconnectDelay" and standard spelling.

Copilot uses AI. Check for mistakes.
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis.providers;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -25,6 +26,7 @@
import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.Delay;
import redis.clients.jedis.util.IOUtils;
import redis.clients.jedis.util.Pool;

Expand All @@ -34,6 +36,9 @@ public class SentineledConnectionProvider implements ConnectionProvider {

protected static final long DEFAULT_SUBSCRIBE_RETRY_WAIT_TIME_MILLIS = 5000;

static final Delay DEFAULT_RESUBSCRIBE_DELAY = Delay
.constant(Duration.ofMillis(DEFAULT_SUBSCRIBE_RETRY_WAIT_TIME_MILLIS));

private volatile HostAndPort currentMaster;

private volatile ConnectionPool pool;
Expand All @@ -50,10 +55,14 @@ public class SentineledConnectionProvider implements ConnectionProvider {

private final JedisClientConfig sentinelClientConfig;

private final long subscribeRetryWaitTimeMillis;
private final Delay resubscribeDelay;

private final Lock initPoolLock = new ReentrantLock(true);

private final SentinelConnectionFactory sentinelConnectionFactory;

private final Sleeper sleeper;

public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
this(masterName, masterClientConfig, null, null, sentinels, sentinelClientConfig);
Expand All @@ -73,33 +82,62 @@ public SentineledConnectionProvider(String masterName, final JedisClientConfig m
}

@Experimental
public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
Cache clientSideCache, final GenericObjectPoolConfig<Connection> poolConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig, Cache clientSideCache,
final GenericObjectPoolConfig<Connection> poolConfig, Set<HostAndPort> sentinels,
final JedisClientConfig sentinelClientConfig) {
this(masterName, masterClientConfig, clientSideCache, poolConfig, sentinels, sentinelClientConfig,
DEFAULT_SUBSCRIBE_RETRY_WAIT_TIME_MILLIS);
DEFAULT_RESUBSCRIBE_DELAY);
}

public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
final GenericObjectPoolConfig<Connection> poolConfig,
final GenericObjectPoolConfig<Connection> poolConfig, Set<HostAndPort> sentinels,
final JedisClientConfig sentinelClientConfig, final long subscribeRetryWaitTimeMillis) {
this(masterName, masterClientConfig, null, poolConfig, sentinels, sentinelClientConfig,
Delay.constant(Duration.ofMillis(subscribeRetryWaitTimeMillis)));
}

/**
* @deprecated use
* {@link #SentineledConnectionProvider(String, JedisClientConfig, Cache, GenericObjectPoolConfig, Set, JedisClientConfig, Delay)}
*/
@Experimental
@Deprecated
public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
Cache clientSideCache, final GenericObjectPoolConfig<Connection> poolConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig,
final long subscribeRetryWaitTimeMillis) {
this(masterName, masterClientConfig, null, poolConfig, sentinels, sentinelClientConfig, subscribeRetryWaitTimeMillis);

this(masterName, masterClientConfig, clientSideCache, poolConfig, sentinels,
sentinelClientConfig, Delay.constant(Duration.ofMillis(subscribeRetryWaitTimeMillis)));
}

@Experimental
public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
Cache clientSideCache, final GenericObjectPoolConfig<Connection> poolConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig,
final long subscribeRetryWaitTimeMillis) {
final Delay resubscribeDelay) {
this(masterName, masterClientConfig, clientSideCache, poolConfig, sentinels,
sentinelClientConfig, resubscribeDelay, null, null);
}

SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
Cache clientSideCache, final GenericObjectPoolConfig<Connection> poolConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig,
final Delay resubscribeDelay, SentinelConnectionFactory sentinelConnectionFactory,
Sleeper sleeper) {

this.masterName = masterName;
this.masterClientConfig = masterClientConfig;
this.clientSideCache = clientSideCache;
this.masterPoolConfig = poolConfig;

this.sentinelClientConfig = sentinelClientConfig;
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
this.resubscribeDelay = resubscribeDelay;

this.sentinelConnectionFactory = sentinelConnectionFactory != null ? sentinelConnectionFactory
: defaultSentinelConnectionFactory();

this.sleeper = sleeper != null ? sleeper : Thread::sleep;

HostAndPort master = initSentinels(sentinels);
initMaster(master);
Expand Down Expand Up @@ -191,7 +229,8 @@ private HostAndPort initSentinels(Set<HostAndPort> sentinels) {

LOG.debug("Connecting to Sentinel {}...", sentinel);

try (Jedis jedis = new Jedis(sentinel, sentinelClientConfig)) {
try (Jedis jedis = sentinelConnectionFactory.createConnection(sentinel,
sentinelClientConfig)) {

List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);

Expand Down Expand Up @@ -254,6 +293,7 @@ protected class SentinelListener extends Thread {
protected final HostAndPort node;
protected volatile Jedis sentinelJedis;
protected AtomicBoolean running = new AtomicBoolean(false);
protected long subscribeAttempt = 0;

public SentinelListener(HostAndPort node) {
super(String.format("%s-SentinelListener-[%s]", masterName, node.toString()));
Expand All @@ -266,14 +306,13 @@ public void run() {
running.set(true);

while (running.get()) {

try {
// double check that it is not being shutdown
if (!running.get()) {
break;
}

sentinelJedis = new Jedis(node, sentinelClientConfig);
sentinelJedis = sentinelConnectionFactory.createConnection(node, sentinelClientConfig);

// code for active refresh
List<String> masterAddr = sentinelJedis.sentinelGetMasterAddrByName(masterName);
Expand All @@ -284,6 +323,14 @@ public void run() {
}

sentinelJedis.subscribe(new JedisPubSub() {
@Override
public void onSubscribe(String channel, int subscribedChannels) {
// Successfully subscribed - reset attempt counter
subscribeAttempt = 0;
LOG.debug("Successfully subscribed to {} on Sentinel {}. Reset attempt counter.",
channel, node);
}

@Override
public void onMessage(String channel, String message) {
LOG.debug("Sentinel {} published: {}.", node, message);
Expand All @@ -295,25 +342,26 @@ public void onMessage(String channel, String message) {
if (masterName.equals(switchMasterMsg[0])) {
initMaster(toHostAndPort(switchMasterMsg[3], switchMasterMsg[4]));
} else {
LOG.debug(
"Ignoring message on +switch-master for master {}. Our master is {}.",
switchMasterMsg[0], masterName);
LOG.debug("Ignoring message on +switch-master for master {}. Our master is {}.",
switchMasterMsg[0], masterName);
}

} else {
LOG.error("Invalid message received on sentinel {} on channel +switch-master: {}.",
node, message);
node, message);
}
}
}, "+switch-master");

} catch (JedisException e) {

if (running.get()) {
LOG.error("Lost connection to Sentinel {}. Sleeping {}ms and retrying.", node,
subscribeRetryWaitTimeMillis, e);
long subscribeRetryWaitTimeMillis = resubscribeDelay.delay(subscribeAttempt).toMillis();
subscribeAttempt++;
LOG.warn("Lost connection to Sentinel {}. Sleeping {}ms and retrying.", node,
subscribeRetryWaitTimeMillis, e);
try {
Thread.sleep(subscribeRetryWaitTimeMillis);
sleeper.sleep(subscribeRetryWaitTimeMillis);
} catch (InterruptedException se) {
LOG.error("Sleep interrupted.", se);
}
Expand All @@ -340,4 +388,22 @@ public void shutdown() {
}
}
}

protected SentinelConnectionFactory defaultSentinelConnectionFactory() {
return (node, config) -> new Jedis(node, config);
}

@FunctionalInterface
protected interface Sleeper {

void sleep(long millis) throws InterruptedException;

}

@FunctionalInterface
protected interface SentinelConnectionFactory {

Jedis createConnection(HostAndPort node, JedisClientConfig config);

}
}
84 changes: 84 additions & 0 deletions src/main/java/redis/clients/jedis/util/Delay.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package redis.clients.jedis.util;

import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;

public abstract class Delay {

protected Delay() {
}

/**
* Calculate a specific delay based on the attempt.
* @param attempt the attempt to calculate the delay from.
* @return the calculated delay.
*/
public abstract Duration delay(long attempt);

/**
* Creates a constant delay.
* @param delay the constant delay duration
* @return a Delay that always returns the same duration
*/
public static Delay constant(Duration delay) {
return new ConstantDelay(delay);
}

/**
* Creates an exponential delay with equal jitter. Based on AWS exponential backoff strategy:
* https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ Formula: temp =
* min(upper, base * 2^attempt) sleep = temp/2 + random_between(0, temp/2) result = max(lower,
* sleep)
* @param lower the minimum delay duration (lower bound)
* @param upper the maximum delay duration (upper bound)
* @param base the base delay duration
* @return a Delay with exponential backoff and equal jitter
*/
public static Delay exponentialWithJitter(Duration lower, Duration upper, Duration base) {
return new EqualJitterDelay(lower, upper, base);
}

static class ConstantDelay extends Delay {

private final Duration delay;

ConstantDelay(Duration delay) {
this.delay = delay;
}

@Override
public Duration delay(long attempt) {
return delay;
}
}

static class EqualJitterDelay extends Delay {

private final long lowerMillis;
private final long upperMillis;
private final long baseMillis;

EqualJitterDelay(Duration lower, Duration upper, Duration base) {
this.lowerMillis = lower.toMillis();
this.upperMillis = upper.toMillis();
this.baseMillis = base.toMillis();
}

@Override
public Duration delay(long attempt) {
// temp = min(upper, base * 2^attempt)
long exponential = baseMillis * (1L << Math.min(attempt, 62));
long temp = Math.min(upperMillis, exponential);

// sleep = temp/2 + random_between(0, temp/2)
long half = temp / 2;
long jitter = ThreadLocalRandom.current().nextLong(half + 1);
long delayMillis = half + jitter;

// Apply lower bound
delayMillis = Math.max(lowerMillis, delayMillis);

return Duration.ofMillis(delayMillis);
}
}
}
Loading
Loading