Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmake/Modules/SourceFiles.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ set(VALKEY_SERVER_SRCS
${CMAKE_SOURCE_DIR}/src/ae.c
${CMAKE_SOURCE_DIR}/src/anet.c
${CMAKE_SOURCE_DIR}/src/dict.c
${CMAKE_SOURCE_DIR}/src/durable_write.c
${CMAKE_SOURCE_DIR}/src/hashtable.c
${CMAKE_SOURCE_DIR}/src/kvstore.c
${CMAKE_SOURCE_DIR}/src/sds.c
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_TRACE_OBJ=trace/trace.o trace/trace_commands.o trace/trace_db.o trace/trace_cluster.o trace/trace_server.o trace/trace_rdb.o trace/trace_aof.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o vector.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o cluster_migrateslots.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut9.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script.o functions.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o entry.o vset.o lua/script_lua.o lua/function_lua.o lua/engine_lua.o lua/debug_lua.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o vector.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o cluster_migrateslots.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut9.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script.o functions.o commands.o durable_write.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o entry.o vset.o lua/script_lua.o lua/function_lua.o lua/engine_lua.o lua/debug_lua.o
ENGINE_SERVER_OBJ+=$(ENGINE_TRACE_OBJ)
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o sds.o util.o sha256.o
Expand Down
993 changes: 993 additions & 0 deletions src/durable_write.c
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the code in this file does is to implement the WBL and block/unblock client replies. Durably replicated writes is more than just this. A name like wbl.c or replyblocking.c would be more exact for this file.

Large diffs are not rendered by default.

116 changes: 116 additions & 0 deletions src/durable_write.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#ifndef DURABLE_WRITE_H
#define DURABLE_WRITE_H

#include <inttypes.h>
#include <sys/types.h>
#include <stdbool.h>
#include "expire.h"
#include "sds.h"

#define DURABLE_ACCESSED_DATA_UNAVAILABLE "Accessed data unavailable to be served"
/* Command filter codes that are used in pre execution stage of a command. */
#define CMD_FILTER_ALLOW 0
#define CMD_FILTER_REJECT 1

struct client;
struct serverObject;
struct serverDb;
struct list;
struct listNode;

typedef long long mstime_t;

/**
* Durability container to house all the durability related fields.
*/
typedef struct durable_t {
/* Uncommitted keys cleanup configuration time limit in milliseconds */
unsigned int keys_cleanup_time_limit_ms;
/* The current scanning database index, starting from 0 */
int curr_db_scan_idx;

/* Number of replicas to ack for an update to be considered committed */
long long num_replicas_to_ack;

/* clients waiting for offset ack/quorum*/
struct list *clients_waiting_replica_ack;

// cached allocation of replica offsets to prevent allocation per cmd.
unsigned long replica_offsets_size;
long long *replica_offsets;
// Previously acknowledged replication offset by replicas
long long previous_acked_offset;
} durable_t;

// Blocked response structure used by client to mark
// the blocking information associated with each response
typedef struct blockedResponse {
// Pointer to the client's reply node where the blocked response starts.
// NULL if the blocked response starts from the 16KB initial buffer
// Here we don't take ownership of this pointer so we never
// release the memory pointed to by this block.
struct listNode *disallowed_reply_block;
// The boundary in the reply buffer where the blocked response starts.
// We don't write data from this point onwards to the client socket
size_t disallowed_byte_offset;
// The replication offset to wait for ACK from replicas
long long primary_repl_offset;
} blockedResponse;

// Describes a pre-execution COB offset for a client
typedef struct preExecutionOffsetPosition {
// True if the pre execution offset/reply block are initialized
bool recorded;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would it be not recorded?

// Track initial client COB position for client blocking
// Pointer to the pre-execution reply node, NULL for initial buffer
struct listNode *reply_block;
// Byte position boundary within the pre-execution reply block
size_t byte_offset;
} preExecutionOffsetPosition;

typedef struct clientDurabilityInfo {
// Blocked client responses list for consistency
struct list *blocked_responses;

/* Pre-execution data recorded before a command is executed
* to record the boundaries of the COB. */
preExecutionOffsetPosition offset;

// Replication offset to block this current command response
long long current_command_repl_offset;
} clientDurableInfo;

/**
* Init
*/
void durableInit(void);
void durableClientInit(struct client *c);
void durableClientReset(struct client *c);
/*
Command processing hooks for offset and cob tracking
*/
void preCall(void);
void postCall(struct client *c);
int preCommandExec(struct client *c);
void postCommandExec(struct client *c);
void postReplicaAck(void);

/*
Utils
*/
int isPrimaryDurabilityEnabled(void);
bool isClientReplyBufferLimited(struct client *c);
long long durablePurgeAndGetUncommittedKeyOffset(const sds key, struct serverDb *db);
// TODO: naming of these flags.
int isDurabilityEnabled(void);
void clearUncommittedKeysAcknowledged(void);
// TODO:
// preReplyToBlockedClient
// for streams and timeounts, when a blocked client is being unblocked
// before a reply is added, the command will not be reprocessed via processCommand()
// we should hook this to get pre-execution offsets




#endif /* DURABLE_WRITE_H */
23 changes: 23 additions & 0 deletions src/networking.c
Copy link
Collaborator

@hpatro hpatro Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ranshid Do we remove the read handler callback while the client is blocked?

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "durable_write.h"
#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
Expand Down Expand Up @@ -367,6 +368,14 @@ client *createClient(connection *conn) {
c->io_last_written.buf = NULL;
c->io_last_written.bufpos = 0;
c->io_last_written.data_len = 0;

// init durability info like
// key blocking on primary
// TODO: this probably doesn't need to be a separate function
// just makes it a bit easier to review the POC with all related functionality
// together
durableClientInit(c);

return c;
}

Expand Down Expand Up @@ -1668,6 +1677,18 @@ void copyReplicaOutputBuffer(client *dst, client *src) {
/* Return true if the specified client has pending reply buffers to write to
* the socket. */
int clientHasPendingReplies(client *c) {
if (isClientReplyBufferLimited(c)) {
// Check if our first allowed reply boundary is in a position that comes
// after the current position that valkey has written up to in the COB.
const blockedResponse *n = listNodeValue(listFirst(c->clientDurabilityInfo.blocked_responses));
if ((c->bufpos && n->disallowed_reply_block == NULL) ||
(c->bufpos == 0 && n->disallowed_reply_block != NULL && listFirst(c->reply) == n->disallowed_reply_block)) {
// Both positions are pointing both at the initial 16KB buffer or the
// first reply block, compare the sentlen with the last allowed byte offset
return c->io_last_written.data_len < n->disallowed_byte_offset;
}
}

if (getClientType(c) == CLIENT_TYPE_REPLICA) {
/* Replicas use global shared replication buffer instead of
* private output buffer. */
Expand Down Expand Up @@ -1866,6 +1887,8 @@ void unlinkClient(client *c) {
/* Wait for IO operations to be done before unlinking the client. */
waitForClientIO(c);

durableClientReset(c);

/* If this is marked as current client unset it. */
if (c->conn && server.current_client == c) server.current_client = NULL;

Expand Down
10 changes: 10 additions & 0 deletions src/object.c
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,16 @@ int getPositiveLongFromObjectOrReply(client *c, robj *o, long *target, const cha
}
}

int getIntFromObject(robj *o, int *target) {
long long value;

if (getLongLongFromObject(o, &value) != C_OK) return C_ERR;
if (value < INT_MIN || value > INT_MAX) return C_ERR;

*target = value;
return C_OK;
}

int getIntFromObjectOrReply(client *c, robj *o, int *target, const char *msg) {
long value;

Expand Down
3 changes: 3 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,9 @@ void replconfCommand(client *c) {
if (c->repl_data->repl_state == REPLICA_STATE_BG_RDB_LOAD) {
replicaPutOnline(c);
}

// Process all clients waiting ACK from a quorum
postReplicaAck();
/* Note: this command does not reply anything! */
return;
} else if (!strcasecmp(c->argv[j]->ptr, "getack")) {
Expand Down
15 changes: 14 additions & 1 deletion src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
*/
#include "server.h"
#include "connection.h"
#include "durable_write.h"
#include "monotonic.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
Expand Down Expand Up @@ -1694,6 +1695,7 @@ long long serverCron(struct aeEventLoop *eventLoop, long long id, void *clientDa
run_with_period(100) modulesCron();
}

run_with_period(1000) clearUncommittedKeysAcknowledged();
/* Fire the cron loop modules event. */
ValkeyModuleCronLoopV1 ei = {VALKEYMODULE_CRON_LOOP_VERSION, server.hz};
moduleFireServerEvent(VALKEYMODULE_EVENT_CRON_LOOP, 0, &ei);
Expand Down Expand Up @@ -2815,6 +2817,10 @@ serverDb *createDatabase(int id) {
db->ready_keys = dictCreate(&objectKeyPointerValueDictType);
db->watched_keys = dictCreate(&keylistDictType);
db->id = id;

db->uncommitted_keys = raxNew();
db->dirty_repl_offset = -1;
db->scan_in_progress = 0;
resetDbExpiryState(db);
return db;
}
Expand Down Expand Up @@ -3035,7 +3041,7 @@ void initServer(void) {
commandlogInit();
latencyMonitorInit();
initSharedQueryBuf();

durableInit();
/* Initialize ACL default password if it exists */
ACLUpdateDefaultUserPassword(server.requirepass);

Expand Down Expand Up @@ -3785,6 +3791,7 @@ void call(client *c, int flags) {
struct ClientFlags client_old_flags = c->flag;

struct serverCommand *real_cmd = c->realcmd;
preCall();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe beforeCommandTrackReplOffset or something like that better explains what it does.

(Functions in some other areas use prefixes like cluster or repl. I don't know if we should consider this WBL mechanism an integrated part of replication or if we should consider a new prefix like wbl.)

client *prev_client = server.executing_client;
server.executing_client = c;

Expand Down Expand Up @@ -3982,7 +3989,9 @@ void call(client *c, int flags) {
if (zmalloc_used > server.stat_peak_memory) server.stat_peak_memory = zmalloc_used;

/* Do some maintenance job and cleanup */
// TODO: should blocking postCall could be moved into afterCommand?
afterCommand(c);
postCall(c);
Comment on lines 3991 to +3994
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The names "after command" and "post call" basically mean the same thing. Let's rename both of them to better explain what they do. For example something like afterCommandCleanup and afterCommandTrackReplOffset.


/* Remember the replication offset of the client, right after its last
* command that resulted in propagation. */
Expand Down Expand Up @@ -4516,8 +4525,12 @@ int processCommand(client *c) {
queueMultiCommand(c, cmd_flags);
addReply(c, shared.queued);
} else {
if (preCommandExec(c) == CMD_FILTER_REJECT) {
return C_OK;
}
int flags = CMD_CALL_FULL;
call(c, flags);
postCommandExec(c);
if (listLength(server.ready_keys) && !isInsideYieldingLongCommand()) handleClientsBlockedOnKeys();
}
return C_OK;
Expand Down
18 changes: 17 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "commands.h"
#include "allocator_defrag.h"

#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <stddef.h>
Expand All @@ -52,7 +53,7 @@
#include <netinet/in.h>
#include <sys/socket.h>
#include <signal.h>

#include "durable_write.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other #include "..." are above the #include <...> ones. Put this new one with the other ones of this kind.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha! will do and keep it in mind for other revisions/prs

#ifdef HAVE_LIBSYSTEMD
#include <systemd/sd-daemon.h>
#endif
Expand Down Expand Up @@ -868,6 +869,7 @@ typedef struct replBufBlock {
char buf[];
} replBufBlock;


/* Database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
Expand All @@ -886,6 +888,13 @@ typedef struct serverDb {
long long avg_ttl; /* Average TTL, just for stats */
unsigned long cursor; /* Cursor of the active expire cycle. */
} expiry[ACTIVE_EXPIRY_TYPE_COUNT];

/* fields related to dirty key tracking
* for consistent writes with durability */
rax *uncommitted_keys; /* Map of dirty keys to the offset required by replica acknowledgement */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A RAX probably fine for the PoC. It's similar to the RAX that we used in the past to track the slot-to-key mapping. We replaced that one with one hashtable per slot instead. This RAX tracking is costly because it duplicates the key names and there are multiple pointers to follow. A hashtable is flat.

In the long term, I think we should store the offset within the reference-counted key-value object itself (robj metadata of some sort). If we need a flag to mark it as dirty, we can steal one of the bits from the reference counter.

To find all the dirty keys, we can use a hashtable, probably one per slot in cluster mode, so we can probably use a kvstore.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had some discussion on this and would prefer to porting into hashtable as well.

Thing to note, this isn't permanent data and is ephemeral in nature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thing to note, this isn't permanent data and is ephemeral in nature.

Yes, so a middle way is to only add a bit flag in the robj so we can avoid looking up the RAX every time a non-dirty key is accessed.

long long dirty_repl_offset; /* Replication offset for a dirty DB */
raxIterator next_scan_iter; /* The next iterator for db scan */
int scan_in_progress; /* Flag of showing whether db is in scan or not */
} serverDb;

/* forward declaration for functions ctx */
Expand Down Expand Up @@ -1168,6 +1177,8 @@ typedef struct ClientFlags {
or client::buf. */
uint64_t keyspace_notified : 1; /* Indicates that a keyspace notification was triggered during the execution of the
current command. */
uint64_t durable_blocked_client: 1; /* This is a durable blocked client that is waiting for the server to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the 65th flag? It means this bit field grows from 64 bits to 128 and we get 63 unused bits in the end.

I think it doesn't go well with the union in the client struct, which requires it to be 64 bits, so we can do things like clearing all flags by setting raw_flags to 0 and things like that.

/* Client flags and state indicators */
union {
    uint64_t raw_flag;
    struct ClientFlags flag;
};

Perhaps we can consider adding a new, separate, set of flags, like ClientFlags2 or something.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, fair enough. Maybe a separate set of durability-related flags? (maybe it doesn't much help unless we plan to have several here)

* acknowledge the write of the command that caused it to be blocked. */
} ClientFlags;

typedef struct ClientPubSubData {
Expand Down Expand Up @@ -1366,6 +1377,7 @@ typedef struct client {
#ifdef LOG_REQ_RES
clientReqResInfo reqres;
#endif
struct clientDurabilityInfo clientDurabilityInfo;
} client;

/* When a command generates a lot of discrete elements to the client output buffer, it is much faster to
Expand Down Expand Up @@ -1665,6 +1677,7 @@ typedef enum childInfoType {
} childInfoType;

struct valkeyServer {
durable_t durability;
/* General */
pid_t pid; /* Main process pid. */
pthread_t main_thread_id; /* Main thread id */
Expand Down Expand Up @@ -2927,6 +2940,9 @@ int processIOThreadsWriteDone(void);
void releaseReplyReferences(client *c);
void resetLastWrittenBuf(client *c);

//TODO:jules move this elsewhere
int getIntFromObject(robj *o, int *target);

int parseExtendedCommandArgumentsOrReply(client *c, int *flags, int *unit, robj **expire, robj **compare_val, int command_type, int max_args);

/* logreqres.c - logging of requests and responses */
Expand Down
Loading