Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions lib/cluster/ClusterOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ export interface IClusterOptions {
*/
slotsRefreshInterval?: number;

/**
* Whether to refresh the cluster slot cache when a node connection is closed.
*
* Useful when slotsRefreshInterval is disabled (-1) but you still want
* topology to be updated promptly on node disconnect/failover.
*
* @default false
*/
slotsRefreshOnDisconnect?: boolean;

/**
* Use sharded subscribers instead of a single subscriber.
*
Expand Down Expand Up @@ -211,6 +221,7 @@ export const DEFAULT_CLUSTER_OPTIONS: IClusterOptions = {
retryDelayOnTryAgain: 100,
slotsRefreshTimeout: 1000,
slotsRefreshInterval: 5000,
slotsRefreshOnDisconnect: false,
useSRVRecords: false,
resolveSrv: resolveSrv,
dnsLookup: lookup,
Expand Down
222 changes: 115 additions & 107 deletions lib/cluster/ClusterSubscriberGroup.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
import { Debug } from "../utils";
import ClusterSubscriber from "./ClusterSubscriber";
import Cluster from "./index";
import ConnectionPool from "./ConnectionPool";
import { getNodeKey } from "./util";
import * as calculateSlot from "cluster-key-slot";
import ShardedSubscriber from "./ShardedSubscriber";
import * as EventEmitter from "events";
const debug = Debug("cluster:subscriberGroup");

/**
* Redis differs between "normal" and sharded PubSub. If using the "normal" PubSub feature, exactly one
* ClusterSubscriber exists per cluster instance. This works because the Redis cluster bus forwards m
* messages between shards. However, this has scalability limitations, which is the reason why the sharded
* PubSub feature was added to Redis. With sharded PubSub, each shard is responsible for its own messages.
* Given that, we need at least one ClusterSubscriber per master endpoint/node.
* Redis distinguishes between "normal" and sharded PubSub. When using the normal PubSub feature,
* exactly one subscriber exists per cluster instance because the Redis cluster bus forwards
* messages between shards. Sharded PubSub removes this limitation by making each shard
* responsible for its own messages.
*
* This class leverages the previously exising ClusterSubscriber by adding support for multiple such subscribers
* in alignment to the master nodes of the cluster. The ClusterSubscriber class was extended in a non-breaking way
* to support this feature.
* This class coordinates one ShardedSubscriber per master node in the cluster, providing
* sharded PubSub support while keeping the public API backward compatible.
*/
export default class ClusterSubscriberGroup {
private shardedSubscribers: Map<string, ClusterSubscriber> = new Map();
private shardedSubscribers: Map<string, ShardedSubscriber> = new Map();
private clusterSlots: string[][] = [];
//Simple [min, max] slot ranges aren't enough because you can migrate single slots
// Simple [min, max] slot ranges aren't enough because you can migrate single slots
private subscriberToSlotsIndex: Map<string, number[]> = new Map();
private channels: Map<number, Array<string | Buffer>> = new Map();

Expand All @@ -29,32 +26,14 @@
*
* @param cluster
*/
constructor(private cluster: Cluster, refreshSlotsCacheCallback: () => void) {
cluster.on("+node", (redis) => {
this._addSubscriber(redis);
});

cluster.on("-node", (redis) => {
this._removeSubscriber(redis);
});

cluster.on("refresh", () => {
this._refreshSlots(cluster);
});

cluster.on("forceRefresh", () => {
refreshSlotsCacheCallback();
});
}
constructor(private readonly subscriberGroupEmitter: EventEmitter) {}

/**
* Get the responsible subscriber.
*
* Returns null if no subscriber was found
*
* @param slot
*/
getResponsibleSubscriber(slot: number): ClusterSubscriber {
getResponsibleSubscriber(slot: number): ShardedSubscriber | undefined {
const nodeKey = this.clusterSlots[slot][0];
return this.shardedSubscribers.get(nodeKey);
}
Expand All @@ -67,10 +46,12 @@
addChannels(channels: (string | Buffer)[]): number {
const slot = calculateSlot(channels[0]);

//Check if the all channels belong to the same slot and otherwise reject the operation
channels.forEach((c: string) => {
if (calculateSlot(c) != slot) return -1;
});
// Check if the all channels belong to the same slot and otherwise reject the operation
for (const c of channels) {
if (calculateSlot(c) !== slot) {
return -1;
}
}

const currChannels = this.channels.get(slot);

Expand All @@ -93,10 +74,12 @@
removeChannels(channels: (string | Buffer)[]): number {
const slot = calculateSlot(channels[0]);

//Check if the all channels belong to the same slot and otherwise reject the operation
channels.forEach((c: string) => {
if (calculateSlot(c) != slot) return -1;
});
// Check if the all channels belong to the same slot and otherwise reject the operation
for (const c of channels) {
if (calculateSlot(c) !== slot) {
return -1;
}
}

const slotChannels = this.channels.get(slot);

Expand Down Expand Up @@ -124,96 +107,123 @@
* Start all not yet started subscribers
*/
start() {
const startPromises = [];
for (const s of this.shardedSubscribers.values()) {
if (!s.isStarted()) {
s.start();
startPromises.push(s.start());
}
}
return Promise.all(startPromises);
}

/**
* Add a subscriber to the group of subscribers
*
* @param redis
* Resets the subscriber group by disconnecting all subscribers that are no longer needed and connecting new ones.
*/
private _addSubscriber(redis: any): ClusterSubscriber {
const pool: ConnectionPool = new ConnectionPool(redis.options);
public async reset(
clusterSlots: string[][],
clusterNodes: any[]
): Promise<void> {
// Update the slots cache and continue if there was a change
if (!this._refreshSlots(clusterSlots)) {
return;
}

if (pool.addMasterNode(redis)) {
const sub = new ClusterSubscriber(pool, this.cluster, true);
const nodeKey = getNodeKey(redis.options);
this.shardedSubscribers.set(nodeKey, sub);
sub.start();
// For each of the sharded subscribers
for (const [nodeKey, shardedSubscriber] of this.shardedSubscribers) {
if (
// If the subscriber is still responsible for a slot range and is running then keep it
this.subscriberToSlotsIndex.has(nodeKey) &&
shardedSubscriber.isStarted()
) {
continue;
}

// We need to attempt to resubscribe them in case the new node serves their slot
this._resubscribe();
this.cluster.emit("+subscriber");
return sub;
// Otherwise stop the subscriber and remove it
shardedSubscriber.stop();
this.shardedSubscribers.delete(nodeKey);

this.subscriberGroupEmitter.emit("-subscriber");
}

return null;
}
const startPromises = [];
// For each node in slots cache
for (const [nodeKey, _] of this.subscriberToSlotsIndex) {

Check warning on line 150 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (8.x)

'_' is assigned a value but never used
// If we already have a subscriber for this node then keep it
if (this.shardedSubscribers.has(nodeKey)) {
continue;
}

/**
* Removes a subscriber from the group
* @param redis
*/
private _removeSubscriber(redis: any): Map<string, ClusterSubscriber> {
const nodeKey = getNodeKey(redis.options);
const sub = this.shardedSubscribers.get(nodeKey);
// Otherwise create a new subscriber
const redis = clusterNodes.find((node) => {
return getNodeKey(node.options) === nodeKey;
});

if (sub) {
sub.stop();
this.shardedSubscribers.delete(nodeKey);
if (!redis) {
debug("Failed to find node for key %s", nodeKey);
continue;
}

// Even though the subscriber to this node is going down, we might have another subscriber
// handling the same slots, so we need to attempt to subscribe the orphaned channels
this._resubscribe();
this.cluster.emit("-subscriber");
const sub = new ShardedSubscriber(
this.subscriberGroupEmitter,
redis.options
);

this.shardedSubscribers.set(nodeKey, sub);

startPromises.push(sub.start());

this.subscriberGroupEmitter.emit("+subscriber");
}

return this.shardedSubscribers;
// It's vital to await the start promises before resubscribing
// Otherwise we might try to resubscribe to a subscriber that is not yet connected
// This can cause a race condition
try {
await Promise.all(startPromises);
} catch (err) {
debug("Error while starting subscribers: %s", err);
this.subscriberGroupEmitter.emit("error", err);
}

this._resubscribe();
this.subscriberGroupEmitter.emit("subscribersReady");
}

/**
* Refreshes the subscriber-related slot ranges
*
* Returns false if no refresh was needed
*
* @param cluster
* @param targetSlots
*/
private _refreshSlots(cluster: Cluster): boolean {
private _refreshSlots(targetSlots: string[][]): boolean {
//If there was an actual change, then reassign the slot ranges
if (this._slotsAreEqual(cluster.slots)) {
if (this._slotsAreEqual(targetSlots)) {
debug(
"Nothing to refresh because the new cluster map is equal to the previous one."
);
} else {
debug("Refreshing the slots of the subscriber group.");

//Rebuild the slots index
this.subscriberToSlotsIndex = new Map();

for (let slot = 0; slot < cluster.slots.length; slot++) {
const node: string = cluster.slots[slot][0];
return false;
}

if (!this.subscriberToSlotsIndex.has(node)) {
this.subscriberToSlotsIndex.set(node, []);
}
this.subscriberToSlotsIndex.get(node).push(Number(slot));
}
debug("Refreshing the slots of the subscriber group.");

//Update the subscribers from the index
this._resubscribe();
//Rebuild the slots index
this.subscriberToSlotsIndex = new Map();

//Update the cached slots map
this.clusterSlots = JSON.parse(JSON.stringify(cluster.slots));
for (let slot = 0; slot < targetSlots.length; slot++) {
const node: string = targetSlots[slot][0];

this.cluster.emit("subscribersReady");
return true;
if (!this.subscriberToSlotsIndex.has(node)) {
this.subscriberToSlotsIndex.set(node, []);
}
this.subscriberToSlotsIndex.get(node).push(Number(slot));
}

return false;
//Update the cached slots map
this.clusterSlots = JSON.parse(JSON.stringify(targetSlots));

return true;
}

/**
Expand All @@ -224,12 +234,9 @@
private _resubscribe() {
if (this.shardedSubscribers) {
this.shardedSubscribers.forEach(
(s: ClusterSubscriber, nodeKey: string) => {
(s: ShardedSubscriber, nodeKey: string) => {
const subscriberSlots = this.subscriberToSlotsIndex.get(nodeKey);
if (subscriberSlots) {
//More for debugging purposes
s.associateSlotRange(subscriberSlots);

//Resubscribe on the underlying connection
subscriberSlots.forEach((ss) => {
//Might return null if being disconnected
Expand All @@ -238,12 +245,10 @@

if (channels && channels.length > 0) {
//Try to subscribe now
if (redis) {
redis.ssubscribe(channels);

//If the instance isn't ready yet, then register the re-subscription for later
redis.on("ready", () => {
redis.ssubscribe(channels);
if (redis && redis.status !== "end") {
redis.ssubscribe(channels).catch((err) => {
// TODO: Should we emit an error event here?
debug("Failed to ssubscribe on node %s: %s", nodeKey, err);
});
}
}
Expand All @@ -261,7 +266,10 @@
* @private
*/
private _slotsAreEqual(other: string[][]) {
if (this.clusterSlots === undefined) return false;
else return JSON.stringify(this.clusterSlots) === JSON.stringify(other);
if (this.clusterSlots === undefined) {
return false;
} else {
return JSON.stringify(this.clusterSlots) === JSON.stringify(other);
}
}
}
Loading
Loading