diff --git a/README.md b/README.md index c3ec47b3..e64dc47c 100644 --- a/README.md +++ b/README.md @@ -798,6 +798,18 @@ const redis = new Redis({ Set maxRetriesPerRequest to `null` to disable this behavior, and every command will wait forever until the connection is alive again (which is the default behavior before ioredis v4). +### Blocking Command Timeout + +Blocking commands (such as `blpop`, `brpop`, `bzpopmin`, `bzpopmax`, `xread`, `xreadgroup`, etc.) can sometimes hang indefinitely if a connection issue occurs while the command is blocked. To prevent this, you can set the `blockingTimeout` option: + +```javascript +const redis = new Redis({ + blockingTimeout: 30000, // 30 seconds +}); +``` + +If a blocking command does not return a reply within the specified time, the connection will be destroyed and re-established, and the command's promise will be rejected with a "Blocking command timed out" error. + ### Reconnect on Error Besides auto-reconnect when the connection is closed, ioredis supports reconnecting on certain Redis errors using the `reconnectOnError` option. Here's an example that will reconnect when receiving `READONLY` error: diff --git a/lib/Command.ts b/lib/Command.ts index b225d453..70990b7f 100644 --- a/lib/Command.ts +++ b/lib/Command.ts @@ -59,6 +59,19 @@ export interface CommandNameFlags { HANDSHAKE_COMMANDS: ["auth", "select", "client", "readonly", "info"]; // Commands that should not trigger a reconnection when errors occur IGNORE_RECONNECT_ON_ERROR: ["client"]; + // Commands that are blocking or could block optionally + BLOCKING_COMMANDS: [ + "blpop", + "brpop", + "brpoplpush", + "blmove", + "bzpopmin", + "bzpopmax", + "bzmpop", + "blmpop", + "xread", + "xreadgroup" + ]; } /** @@ -101,6 +114,18 @@ export default class Command implements Respondable { WILL_DISCONNECT: ["quit"], HANDSHAKE_COMMANDS: ["auth", "select", "client", "readonly", "info"], IGNORE_RECONNECT_ON_ERROR: ["client"], + BLOCKING_COMMANDS: [ + "blpop", + "brpop", + "brpoplpush", + "blmove", + "bzpopmin", + "bzpopmax", + "bzmpop", + "blmpop", + "xread", + "xreadgroup", + ], }; private static flagMap?: FlagMap; @@ -165,6 +190,7 @@ export default class Command implements Respondable { private callback: Callback; private transformed = false; private _commandTimeoutTimer?: NodeJS.Timeout; + private _blockingTimeoutTimer?: NodeJS.Timeout; private slot?: number | null; private keys?: Array; @@ -326,6 +352,38 @@ export default class Command implements Respondable { }, ms); } } + /** + * Set the wait time before terminating a blocking command + * and generating an error and reconnecting. + */ + setBlockingTimeout(ms: number, onTimeout?: () => void) { + if (!this._blockingTimeoutTimer) { + this._blockingTimeoutTimer = setTimeout(() => { + if (!this.isResolved) { + const err = new Error("Blocking command timed out"); + this.reject(err); + onTimeout?.(); + } + }, ms); + } + } + + /** + * Clear the command and blocking timers + */ + private _clearTimers() { + const existingTimer = this._commandTimeoutTimer; + if (existingTimer) { + clearTimeout(existingTimer); + delete this._commandTimeoutTimer; + } + + const blockingTimer = this._blockingTimeoutTimer; + if (blockingTimer) { + clearTimeout(blockingTimer); + delete this._blockingTimeoutTimer; + } + } private initPromise() { const promise = new Promise((resolve, reject) => { @@ -339,13 +397,14 @@ export default class Command implements Respondable { } this.resolve = this._convertValue(resolve); - if (this.errorStack) { - this.reject = (err) => { + this.reject = (err: Error) => { + this._clearTimers(); + if (this.errorStack) { reject(optimizeErrorStack(err, this.errorStack.stack, __dirname)); - }; - } else { - this.reject = reject; - } + } else { + reject(err); + } + }; }); this.promise = asCallback(promise, this.callback); @@ -379,12 +438,7 @@ export default class Command implements Respondable { private _convertValue(resolve: Function): (result: any) => void { return (value) => { try { - const existingTimer = this._commandTimeoutTimer; - if (existingTimer) { - clearTimeout(existingTimer); - delete this._commandTimeoutTimer; - } - + this._clearTimers(); resolve(this.transformReply(value)); this.isResolved = true; } catch (err) { diff --git a/lib/Redis.ts b/lib/Redis.ts index 00176582..a1984074 100644 --- a/lib/Redis.ts +++ b/lib/Redis.ts @@ -445,6 +445,19 @@ class Redis extends Commander implements DataHandledable { command.setTimeout(this.options.commandTimeout); } + if ( + typeof this.options.blockingTimeout === "number" && + this.options.blockingTimeout > 0 && + Command.checkFlag("BLOCKING_COMMANDS", command.name) + ) { + command.setBlockingTimeout(this.options.blockingTimeout, () => { + // Destroy stream to force reconnection + this.stream?.destroy( + new Error("Blocking command timed out - reconnecting") + ); + }); + } + let writable = this.status === "ready" || (!stream && diff --git a/lib/redis/RedisOptions.ts b/lib/redis/RedisOptions.ts index 1fa9f351..fddb4b63 100644 --- a/lib/redis/RedisOptions.ts +++ b/lib/redis/RedisOptions.ts @@ -15,6 +15,12 @@ export interface CommonRedisOptions extends CommanderOptions { */ commandTimeout?: number; + /** + * If a blocking command does not return a reply within a set number of milliseconds, + * the connection will be reestablished. + */ + blockingTimeout?: number; + /** * If the socket does not receive data within a set number of milliseconds: * 1. the socket is considered "dead" and will be destroyed diff --git a/test/functional/blocking_timeout.ts b/test/functional/blocking_timeout.ts new file mode 100644 index 00000000..840d19ac --- /dev/null +++ b/test/functional/blocking_timeout.ts @@ -0,0 +1,222 @@ +import { expect } from "chai"; +import Redis from "../../lib/Redis"; + +// Example structure for mock-based blocking timeout tests +import * as sinon from "sinon"; +import MockServer from "../helpers/mock_server"; + +describe("blockingTimeout with MockServer", () => { + describe("timeout behavior", () => { + it("rejects with 'Blocking command timed out' when server hangs", (done) => { + let connectionCount = 0; + + const server = new MockServer(30001, async (argv, socket, flags) => { + if (argv[0] === "blpop") { + flags.hang = true; + return; + } + }); + + const redis = new Redis({ + port: 30001, + blockingTimeout: 50, + }); + + redis.on("connect", () => { + connectionCount++; + }); + + redis.on("error", () => {}); + + redis.blpop("test-list", 0).catch((err) => { + expect(err.message).to.include("Blocking command timed out"); + redis.once("ready", () => { + expect(connectionCount).to.equal(2); // Should reconnect + redis.disconnect(); + server.disconnect(() => done()); + }); + }); + }); + + it("non-blocking commands are not affected by blockingTimeout", async () => { + const server = new MockServer(30001, async (argv, socket, flags) => { + if (argv[0] === "get") { + flags.hang = true; + return; + } + }); + + const redis = new Redis({ + port: 30001, + blockingTimeout: 10, + }); + + const result = await Promise.race([ + redis.get("test-key"), + new Promise((resolve) => setTimeout(() => resolve("timeout"), 50)), + ]); + + expect(result).to.equal("timeout"); + redis.disconnect(); + server.disconnect(); + }); + + it("does not leak timers when blocking command succeeds", async () => { + const server = new MockServer(30001, (argv) => { + if (argv[0] === "blpop") { + return ["test-list", "value"]; // Respond immediately + } + }); + + const redis = new Redis({ port: 30001, blockingTimeout: 1000 }); + const clock = sinon.useFakeTimers(); + + await redis.blpop("test-list", 0); + + expect(clock.countTimers()).to.equal(0); // No lingering timers + clock.restore(); + redis.disconnect(); + server.disconnect(); + }); + + it("each blocking command has independent timeout", (done) => { + let blpopCount = 0; + const server = new MockServer(30001, (argv, socket, flags) => { + if (argv[0] === "blpop") { + blpopCount++; + flags.hang = true; + } + }); + + const clock = sinon.useFakeTimers(); + + const redis = new Redis({ port: 30001, blockingTimeout: 50 }); + redis.on("error", () => {}); + + redis.blpop("list1", 0).catch(() => {}); + redis.blpop("list2", 0).catch(() => {}); + + expect(clock.countTimers()).to.equal(2); + + clock.restore(); + redis.disconnect(); + server.disconnect(() => done()); + }); + + it("server error does not trigger blockingTimeout reconnect", async () => { + let connectionCount = 0; + const server = new MockServer(30001, (argv) => { + if (argv[0] === "blpop") { + return new Error("WRONGTYPE Operation against a key"); + } + }); + + const redis = new Redis({ port: 30001, blockingTimeout: 100 }); + redis.on("connect", () => connectionCount++); + + try { + await redis.blpop("not-a-list", 0); + } catch (err: any) { + expect(err.message).to.include("WRONGTYPE"); + } + + expect(connectionCount).to.equal(1); // No reconnection + expect(redis.status).to.equal("ready"); + redis.disconnect(); + server.disconnect(); + }); + + it("destroys stream with correct error on timeout", (done) => { + const server = new MockServer(30001, (argv, socket, flags) => { + if (argv[0] === "blpop") { + flags.hang = true; + } + }); + + const redis = new Redis({ port: 30001, blockingTimeout: 50 }); + + redis.on("error", (err) => { + expect(err.message).to.equal( + "Blocking command timed out - reconnecting" + ); + redis.disconnect(); + server.disconnect(() => done()); + }); + + redis.blpop("list", 0).catch(() => {}); // Handle rejection + }); + }); +}); + +describe("blockingTimeout with Redis", function () { + this.timeout(15000); + + let redis: Redis; + let pusher: Redis; + + beforeEach(() => { + redis = new Redis({ lazyConnect: true }); + pusher = new Redis(); + }); + + afterEach(() => { + redis.disconnect(); + pusher.disconnect(); + }); + + describe("basic functionality", () => { + it("does not timeout when data arrives before blockingTimeout", async () => { + redis = new Redis({ blockingTimeout: 5000 }); + + // Push data after a short delay + setTimeout(() => { + pusher.lpush("test-list", "value"); + }, 100); + + const result = await redis.blpop("test-list", 10); + expect(result).to.deep.equal(["test-list", "value"]); + }); + + it("does not timeout when command has its own timeout and resolves", async () => { + redis = new Redis({ blockingTimeout: 5000 }); + + // Push data after a short delay + setTimeout(() => { + pusher.lpush("test-list-2", "value"); + }, 100); + + // blpop with 2 second timeout + const result = await redis.blpop("test-list-2", 2); + expect(result).to.deep.equal(["test-list-2", "value"]); + }); + + it("returns null when Redis command timeout expires (not blockingTimeout)", async () => { + redis = new Redis({ blockingTimeout: 5000 }); + + // blpop with 1 second timeout - should return null, not throw + const result = await redis.blpop("nonexistent-list", 1); + expect(result).to.be.null; + }); + }); + describe("reconnection behavior", () => { + it("reconnects after blockingTimeout", (done) => { + let connectCount = 0; + + redis = new Redis({ + blockingTimeout: 10, + }); + + redis.on("connect", () => { + connectCount++; + if (connectCount === 2) { + redis.disconnect(); + done(); + } + }); + + redis.on("error", () => {}); + + redis.blpop("nonexistent-list", 0).catch(() => {}); + }); + }); +}); diff --git a/test/unit/command.ts b/test/unit/command.ts index 99c6e1e3..6397b4c4 100644 --- a/test/unit/command.ts +++ b/test/unit/command.ts @@ -1,5 +1,6 @@ import { expect } from "chai"; import Command from "../../lib/Command"; +import * as sinon from "sinon"; describe("Command", () => { describe("constructor()", () => { @@ -166,7 +167,7 @@ describe("Command", () => { expect(Command.checkFlag("WILL_DISCONNECT", "quit")).to.eql(true); }); - it('should be case insensitive for command name', () => { + it("should be case insensitive for command name", () => { expect(Command.checkFlag("VALID_IN_SUBSCRIBER_MODE", "PING")).to.eql( true ); @@ -176,4 +177,64 @@ describe("Command", () => { expect(Command.checkFlag("WILL_DISCONNECT", "QuIt")).to.eql(true); }); }); + + describe("#setBlockingTimeout()", () => { + it("should reject command when blocking timeout expires", (done) => { + const command = new Command("blpop", ["key", "0"]); + command.setBlockingTimeout(50); // 50ms timeout + + command.promise.catch((err) => { + expect(err.message).to.include("Blocking command timed out"); + done(); + }); + }); + + it("should not reject if command resolves before timeout", async () => { + const command = new Command("blpop", ["key", "0"]); + command.setBlockingTimeout(100); + + // Resolve immediately + setTimeout(() => command.resolve(["key", "value"]), 10); + + const result = await command.promise; + expect(result).to.deep.equal(["key", "value"]); + }); + + it("should call onTimeout callback when timeout expires", (done) => { + const command = new Command("bzpopmin", ["key", "0"]); + let callbackCalled = false; + + command.setBlockingTimeout(50, () => { + callbackCalled = true; + }); + + command.promise.catch(() => { + expect(callbackCalled).to.be.true; + done(); + }); + }); + + it("should clear timer when command is resolved", (done) => { + const clock = sinon.useFakeTimers(); + const command = new Command("blpop", ["key", "0"]); + command.setBlockingTimeout(100); + + command.resolve(["key", "value"]); + + command.promise.then(() => { + clock.tick(150); // Advance past timeout + // Should not throw/reject since already resolved + clock.restore(); + done(); + }); + }); + + it("should not set timer if already resolved", () => { + const command = new Command("blpop", ["key", "0"]); + command.resolve(["key", "value"]); + expect(() => { + command.setBlockingTimeout(100); + }).to.not.throw(); + }); + }); });