Skip to content

Fix engine crash on module client blocking during keyspace events #1819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/adlist.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ list *listCreate(void) {

/* Remove all the elements from the list without destroying the list itself. */
void listEmpty(list *list) {
if (!list) return;
unsigned long len;
listNode *current, *next;

Expand All @@ -72,7 +73,6 @@ void listEmpty(list *list) {
*
* This function can't fail. */
void listRelease(list *list) {
if (!list) return;
listEmpty(list);
zfree(list);
}
Expand Down
2 changes: 2 additions & 0 deletions src/bitops.c
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,7 @@ void bitfieldGeneric(client *c, int flags) {
}
}

initDeferredReplyBuffer(c);
addReplyArrayLen(c, numops);

/* Actually process the operations. */
Expand Down Expand Up @@ -1364,6 +1365,7 @@ void bitfieldGeneric(client *c, int flags) {
notifyKeyspaceEvent(NOTIFY_STRING, "setbit", c->argv[1], c->db->id);
server.dirty += changes;
}
commitDeferredReplyBuffer(c, 1);
zfree(ops);
}

Expand Down
3 changes: 3 additions & 0 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ void freeClientBlockingState(client *c) {
dictRelease(c->bstate->keys);
zfree(c->bstate);
c->bstate = NULL;

debugServerAssert(!isDeferredReplyEnabled(c));
commitDeferredReplyBuffer(c, 0);
}

/* Block a client for the specific operation type. Once the CLIENT_BLOCKED
Expand Down
7 changes: 3 additions & 4 deletions src/expire.c
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,9 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
return;
} else {
obj = setExpire(c, c->db, key, when);
signalModifiedKey(c, c->db, key);
notifyKeyspaceEvent(NOTIFY_GENERIC, "expire", key, c->db->id);
server.dirty++;
addReply(c, shared.cone);
/* Propagate as PEXPIREAT millisecond-timestamp
* Only rewrite the command arg if not already PEXPIREAT */
Expand All @@ -695,10 +698,6 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
rewriteClientCommandArgument(c, 2, when_obj);
decrRefCount(when_obj);
}

signalModifiedKey(c, c->db, key);
notifyKeyspaceEvent(NOTIFY_GENERIC, "expire", key, c->db->id);
server.dirty++;
return;
}
}
Expand Down
91 changes: 78 additions & 13 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ typedef struct ValkeyModuleCtx ValkeyModuleCtx;
#define VALKEYMODULE_CTX_NEW_CLIENT (1 << 7) /* Free client object when the \
context is destroyed */
#define VALKEYMODULE_CTX_CHANNELS_POS_REQUEST (1 << 8)
#define VALKEYMODULE_CTX_COMMAND (1 << 9) /* Context created to serve a command from call() or AOF (which calls cmd->proc directly) */

#define VALKEYMODULE_CTX_COMMAND (1 << 9) /* Context created to serve a command from call() or AOF (which calls cmd->proc directly) */
#define VALKEYMODULE_CTX_KEYSPACE_NOTIFICATION (1 << 10) /* Context created a keyspace notification event */

/* This represents a key opened with VM_OpenKey(). */
struct ValkeyModuleKey {
Expand Down Expand Up @@ -7795,6 +7795,8 @@ void unblockClientFromModule(client *c) {
* in that case the privdata argument is disregarded, because we pass the
* reply callback the privdata that is set here while blocking.
*
* For details on return values and error codes, see the comment block for
* VM_BlockClient.
*/
ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx,
ValkeyModuleCmdFunc reply_callback,
Expand All @@ -7807,8 +7809,27 @@ ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx,
void *privdata,
int flags) {
client *c = ctx->client;
if (c->flag.blocked || getClientType(c) != CLIENT_TYPE_NORMAL || c->flag.deny_blocking) {
/* Early return if duplicate block attempt or client is not normal or
* client is set to deny blocking. */
errno = ENOTSUP;
return NULL;
}

if (ctx->flags & (VALKEYMODULE_CTX_TEMP_CLIENT | VALKEYMODULE_CTX_NEW_CLIENT)) {
/* Temporary clients can't be blocked */
errno = EINVAL;
return NULL;
}
int is_keyspace_notification = ctx->flags & (VALKEYMODULE_CTX_KEYSPACE_NOTIFICATION);
int islua = scriptIsRunning();
int ismulti = server.in_exec;
if ((islua || ismulti) && is_keyspace_notification) {
/* Avoid blocking within transactions when context initiated by
* keyspace notification. */
errno = EINVAL;
return NULL;
}
initClientBlockingState(c);

c->bstate->module_blocked_handle = zmalloc(sizeof(ValkeyModuleBlockedClient));
Expand Down Expand Up @@ -7864,6 +7885,11 @@ ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx,
c->bstate->timeout = timeout;
blockClient(c, BLOCKED_MODULE);
}
/* Defer response until after being unblocked for a context originated from
* keyspace notification events */
if (is_keyspace_notification) {
initDeferredReplyBuffer(c);
}
}
return bc;
}
Expand Down Expand Up @@ -8091,14 +8117,27 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
* free_privdata: called in order to free the private data that is passed
* by ValkeyModule_UnblockClient() call.
*
* Note: ValkeyModule_UnblockClient should be called for every blocked client,
* even if client was killed, timed-out or disconnected. Failing to do so
* will result in memory leaks.
* Notes:
* 1. ValkeyModule_UnblockClient should be called for every blocked client,
* even if client was killed, timed-out or disconnected. Failing to do so
* will result in memory leaks.
* 2. Attempting to block the client on keyspace event notification in versions
* prior to 8.1.1 leads to a crash.
*
* There are some cases where ValkeyModule_BlockClient() cannot be used:
*
* 1. If the client is a Lua script.
* 2. If the client is executing a MULTI block.
* 3. If the client is a temporary module client.
* 4. If the client is already blocked.
*
* In cases 1 and 2, a call to ValkeyModule_BlockClient() will **not** block the
* client, but instead produce a specific error reply. Note that if the
* BlockClient call originated from within a keyspace notification, no error
* reply is generated but nullptr is returned while the errno is set to EINVAL.
*
* In case 3 and 4, a call to ValkeyModule_BlockClient() are no-op, returning
* nullptr. errno is set to EINVAL for case 3 while ENOTSUP for case 4.
*
* In these cases, a call to ValkeyModule_BlockClient() will **not** block the
* client, but instead produce a specific error reply.
Expand Down Expand Up @@ -8290,6 +8329,12 @@ int moduleClientIsBlockedOnKeys(client *c) {
* needs to be passed to the client, included but not limited some slow
* to compute reply or some reply obtained via networking.
*
* Returns VALKEYMODULE_OK on success. On failure, VALKEYMODULE_ERR is returned
* and `errno` is set as follows:
*
* - EINVAL if bc is NULL.
* - ENOTSUP if bc contains `blocked on keys` but its timeout callback is NULL.
*
* Note 1: this function can be called from threads spawned by the module.
*
* Note 2: when we unblock a client that is blocked for keys using the API
Expand All @@ -8300,10 +8345,17 @@ int moduleClientIsBlockedOnKeys(client *c) {
* ValkeyModule_BlockClientOnKeys() is accessible from the timeout
* callback via VM_GetBlockedClientPrivateData). */
int VM_UnblockClient(ValkeyModuleBlockedClient *bc, void *privdata) {
if (!bc) {
errno = EINVAL;
return VALKEYMODULE_ERR;
}
if (bc->blocked_on_keys) {
/* In theory the user should always pass the timeout handler as an
* argument, but better to be safe than sorry. */
if (bc->timeout_callback == NULL) return VALKEYMODULE_ERR;
if (bc->timeout_callback == NULL) {
errno = ENOTSUP;
return VALKEYMODULE_ERR;
}
if (bc->unblocked) return VALKEYMODULE_OK;
if (bc->client) moduleBlockedClientTimedOut(bc->client, 1);
}
Expand Down Expand Up @@ -8392,11 +8444,17 @@ void moduleHandleBlockedClients(void) {
moduleInvokeFreePrivDataCallback(c, bc);
}

/* It is possible that this blocked client object accumulated
* replies to send to the client in a thread safe context.
* We need to glue such replies to the client output buffer and
* free the temporary client we just used for the replies. */
if (c) AddReplyFromClient(c, bc->reply_client);
if (c) {
/* Replies which were added after the client is blocked by a module
* are accumulated separately. We need to transmit those replies
* to the client. */
commitDeferredReplyBuffer(c, 0);
/* It is possible that this blocked client object accumulated
* replies to send to the client in a thread safe context.
* We need to glue such replies to the client output buffer and
* free the temporary client we just used for the replies. */
AddReplyFromClient(c, bc->reply_client);
}
moduleReleaseTempClient(bc->reply_client);
moduleReleaseTempClient(bc->thread_safe_ctx_client);

Expand Down Expand Up @@ -8492,9 +8550,10 @@ void moduleBlockedClientTimedOut(client *c, int from_module) {

moduleFreeContext(&ctx);

if (!from_module)
if (!from_module) {
updateStatsOnUnblock(c, bc->background_duration, 0,
((server.stat_total_error_replies != prev_error_replies) ? ERROR_COMMAND_FAILED : 0));
}

/* For timeout events, we do not want to call the disconnect callback,
* because the blocked client will be automatically disconnected in
Expand Down Expand Up @@ -8880,8 +8939,14 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
if ((sub->event_mask & type) &&
(sub->active == 0 || (sub->module->options & VALKEYMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS))) {
ValkeyModuleCtx ctx;
moduleCreateContext(&ctx, sub->module, VALKEYMODULE_CTX_TEMP_CLIENT);
if (server.executing_client == NULL) {
moduleCreateContext(&ctx, sub->module, VALKEYMODULE_CTX_TEMP_CLIENT);
} else {
moduleCreateContext(&ctx, sub->module, VALKEYMODULE_CTX_NONE);
ctx.client = server.executing_client;
}
selectDb(ctx.client, dbid);
ctx.flags |= VALKEYMODULE_CTX_KEYSPACE_NOTIFICATION;

/* mark the handler as active to avoid reentrant loops.
* If the subscriber performs an action triggering itself,
Expand Down
Loading
Loading