Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The `@socket.io/redis-adapter` package allows broadcasting packets between multi
- [With the `ioredis` package](#with-the-ioredis-package)
- [With the `ioredis` package and a Redis cluster](#with-the-ioredis-package-and-a-redis-cluster)
- [With Redis sharded Pub/Sub](#with-redis-sharded-pubsub)
- [With the `ioredis` package and a Redis cluster](#with-the-ioredis-package-and-a-redis-cluster-1)
- [Options](#options)
- [Default adapter](#default-adapter)
- [Sharded adapter](#sharded-adapter)
Expand Down Expand Up @@ -186,7 +187,44 @@ Minimum requirements:
- Redis 7.0
- [`redis@4.6.0`](https://github.com/redis/node-redis/commit/3b1bad229674b421b2bc6424155b20d4d3e45bd1)

Note: it is not currently possible to use the sharded adapter with the `ioredis` package and a Redis cluster ([reference](https://github.com/luin/ioredis/issues/1759)).
#### With the `ioredis` package and a Redis cluster

Starting with `ioredis@5.9.0`, you can use the sharded adapter with an ioredis Cluster by enabling the `shardedSubscribers` option:

```js
import { Cluster } from "ioredis";
import { Server } from "socket.io";
import { createShardedAdapter } from "@socket.io/redis-adapter";

const pubClient = new Cluster(
[
{
host: "localhost",
port: 7000,
},
{
host: "localhost",
port: 7001,
},
{
host: "localhost",
port: 7002,
},
],
{
shardedSubscribers: true,
}
);
const subClient = pubClient.duplicate();

const io = new Server({
adapter: createShardedAdapter(pubClient, subClient)
});

io.listen(3000);
```

Reference: https://github.com/redis/ioredis/pull/1956

## Options

Expand Down
19 changes: 18 additions & 1 deletion lib/sharded-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@ import {
Offset,
} from "socket.io-adapter";
import { decode, encode } from "notepack.io";
import { hasBinary, PUBSUB, SPUBLISH, SSUBSCRIBE, SUNSUBSCRIBE } from "./util";
import {
hasBinary,
hasShardedSubscribers,
isIoRedisCluster,
PUBSUB,
SPUBLISH,
SSUBSCRIBE,
SUNSUBSCRIBE,
} from "./util";
import debugModule from "debug";

const debug = debugModule("socket.io-redis");
Expand Down Expand Up @@ -85,6 +93,15 @@ class ShardedRedisAdapter extends ClusterAdapter {
opts
);

// Validate ioredis Cluster configuration
if (isIoRedisCluster(subClient) && !hasShardedSubscribers(subClient)) {
throw new Error(
"When using the sharded adapter with an ioredis Cluster, " +
"you must enable the 'shardedSubscribers' option. " +
"See https://github.com/redis/ioredis/pull/1956"
);
}

this.channel = `${this.opts.channelPrefix}#${nsp.name}#`;
this.responseChannel = `${this.opts.channelPrefix}#${nsp.name}#${this.uid}#`;

Expand Down
23 changes: 23 additions & 0 deletions lib/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,29 @@ function isRedisV4Client(redisClient: any) {
return typeof redisClient.sSubscribe === "function";
}

/**
* Whether the client is an ioredis Cluster instance
*
* @param redisClient
*/
export function isIoRedisCluster(redisClient: any) {
return redisClient.constructor.name === "Cluster" || redisClient.isCluster;
}

/**
* Whether the ioredis Cluster has shardedSubscribers enabled
*
* @param redisClient
*
* @see https://github.com/redis/ioredis/pull/1956
*/
export function hasShardedSubscribers(redisClient: any) {
return (
isIoRedisCluster(redisClient) &&
redisClient.options?.shardedSubscribers === true
);
}

const kHandlers = Symbol("handlers");

export function SSUBSCRIBE(
Expand Down
48 changes: 29 additions & 19 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"@types/mocha": "^8.2.1",
"@types/node": "^14.14.7",
"expect.js": "0.3.1",
"ioredis": "^5.3.2",
"ioredis": "^5.9.1",
"mocha": "^10.1.0",
"nyc": "^15.1.0",
"prettier": "^2.8.7",
Expand Down
23 changes: 19 additions & 4 deletions test/test-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ const clusterNodes = [
},
];

// NAT mapping for ioredis Cluster (Docker container returns internal IP)
const ioredisNatMap = {
"172.20.0.3:7000": { host: "localhost", port: 7000 },
"172.20.0.3:7001": { host: "localhost", port: 7001 },
"172.20.0.3:7002": { host: "localhost", port: 7002 },
"172.20.0.3:7003": { host: "localhost", port: 7003 },
"172.20.0.3:7004": { host: "localhost", port: 7004 },
"172.20.0.3:7005": { host: "localhost", port: 7005 },
};

function testSuite(
createAdapter: any,
redisPackage: string = "redis@4",
Expand Down Expand Up @@ -139,7 +149,9 @@ describe("@socket.io/redis-adapter", () => {

describe("ioredis cluster", () =>
testSuite(async () => {
const pubClient = new Cluster(clusterNodes);
const pubClient = new Cluster(clusterNodes, {
natMap: ioredisNatMap,
});
const subClient = pubClient.duplicate();

return [
Expand Down Expand Up @@ -259,11 +271,14 @@ describe("@socket.io/redis-adapter", () => {
true
));

// FIXME see https://github.com/luin/ioredis/issues/1759
describe.skip("[sharded] ioredis cluster", () =>
// Fixed in ioredis 5.9.0, see https://github.com/redis/ioredis/pull/1956
describe("[sharded] ioredis cluster", () =>
testSuite(
async () => {
const pubClient = new Cluster(clusterNodes);
const pubClient = new Cluster(clusterNodes, {
shardedSubscribers: true,
natMap: ioredisNatMap,
});
const subClient = pubClient.duplicate();

return [
Expand Down