Skip to content

Conversation

@jjuleslasarte
Copy link

This draft PR is a POC for a dirty key blocking and tracking mechanism to support durability implemented with WBL approach, as outlined in the durablily HLD. It implements 'blocking' of:

  • acknowledgement of a write until there's consensus
  • dirty reads in the primary node.

Dirty key tracking implementation

Blocking of dirty reads and delaying ack of writes is centered around the ability to determine the set of keys that are dirty or uncommitted by the "quorum" of nodes in the cluster.

This can be realized by keeping track of the list of items that are dirty in the some data-structure (a radix tree in this draft) where each key is corresponded with the byte offset it is currently waiting on acknowledgements from replicas. An incoming read operation can quickly determine whether the keys it tries to access are dirty or not, and if dirty, what offset is required to unblock its responses. This mechanism can also serve if we want to allow a configurable "tail data loss" (i.e allow up to x seconds / x amount of data to be acked before consensus)

The downside of this approach is that the response message gets buffered in memory on the valkey server. This creates certain amount of memory pressure on the server for 2 categories of scenarios: 

  • The incoming reader client tries to read a very large item (i.e. a composite item that contains many fields) thus generates a large response buffer in memory
  • Many incoming clients try to access dirty items (i.e. hot items) and accumulates a large amount of responses buffered in memory.

For those cases, we could implement a throttling mechanism in the valkey server.

Hooks/touchpoints

This POC hooks into the processing of the command in the following places. The existing flow for most commands is

processCommand -> call() -> c->cmnd->proc()

This PR adds the following hooks:

  • processCommand
    • preCommandExec: track the pre-execution positions in the reply cob of the client (and, eventually, connected monitors)
    • call
      • preCall: record the starting replication offset of the command about to be executed.
      • cmd->cmnd->proc()
      • postCall : here, we detect whether we have a command that has made a key dirty (by comparing the replication offset from preCall with the current one), or if the command has accessed a dirty key.
        • In the write path, the blocking offset is the current offset.
        • In the read path, we get the offset by searching for the key in the rax tree.
    • postCommandExec
      • block the client for the given offset

The separation of preCall/postCall and preCommandExect and postCommandExect is so that we can handle multi and lua.

In multi/exec, the individual commands in a transaction

MULTI 
SET a b
SET b c
EXEC

don't go trough processCommand - and get directly called() in the exec command, but form part of the same command, so client needs to block at the position pre-execution of multi. I had included the support for multi (you can see it here) in this PR to better show this, but it was a bit too long for one draft, so I decided to leave that for the next PR.

example

client sends SET FOO BAR

primary

27121:M 23 Nov 2025 15:59:42.820 b postCommandExec hook entered for command 'set'
27121:M 23 Nov 2025 15:59:42.820 b client should be blocked at offset 5509,
// ...
// replica one acks
27121:M 23 Nov 2025 15:59:44.404 b preCall hook: pre_call_replication_offset=5523, pre_call_num_ops_pending_propagation=0

// replica two acks
27121:M 23 Nov 2025 15:59:44.802 b preCall hook: pre_call_replication_offset=5523, pre_call_num_ops_pending_propagation=0

// ...client unblocked
27121:M 23 Nov 2025 15:59:44.802 b unblocking clients for consensus offset 5523,

TODO

  • testing
  • handling of multi/exec (see here)
  • handling of lua
  • handling of db level commands
  • performance testing
  • configs and handling of turning on / off
  • telemetry

Looking for feedback on the general approach, the touchpoints with the major codepaths and naming. I will edit this PR

Copy link
Contributor

@zuiderkwast zuiderkwast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice to see this PoC!

I'm posting some random comments. I know it's not really ready for review yet so there's no need to reply to them at this point.

There are many TODOs throughout the code which is nice to see. We obviously don't need everything in the PoC, but still good to have them written down. We can convert them to issues or something later.

When we do raft-based replication later, each replica will need to be added to the quorum in some explicit way and only these replicas in the quorum actually count when we count acks. We can keep this in mind and modify how we count what's "committed" and not, later.

#include <math.h>

// TODO: handle PSYNC
// TODO: handle durability on/off?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to have this as immutable configs.

// TODO: handle lua & multi
// TODO: handle blocking commands
// TODO: handle DB level commands (swap flushall etc)
// TODO: handle monitors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's great that you've captured these TODOs!

I feel monitors can be lower prio than the other of these todos, at least initially.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, that makes sense to me as well.

Comment on lines +177 to +183
// make sure we have enough space for the replicas. Resize only if the required
// replica count is larger. No need to downsize.
if(durability->replica_offsets_size < numReplicas) {
durability->replica_offsets = zrealloc(durability->replica_offsets, numReplicas * sizeof(long long));
durability->replica_offsets_size = numReplicas;
}
populateReplicaOffsets(durability->replica_offsets, numReplicas);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like durability->replica_offsets is only used within this function. Instead of a global and heap-allocated, we can turn this into a variable-length array on the stack.

Suggested change
// make sure we have enough space for the replicas. Resize only if the required
// replica count is larger. No need to downsize.
if(durability->replica_offsets_size < numReplicas) {
durability->replica_offsets = zrealloc(durability->replica_offsets, numReplicas * sizeof(long long));
durability->replica_offsets_size = numReplicas;
}
populateReplicaOffsets(durability->replica_offsets, numReplicas);
long long replica_offsets[numReplicas];
populateReplicaOffsets(replica_offsets, numReplicas);

#include <sys/socket.h>
#include <signal.h>

#include "durable_write.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other #include "..." are above the #include <...> ones. Put this new one with the other ones of this kind.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha! will do and keep it in mind for other revisions/prs


/* fields related to dirty key tracking
* for consistent writes with durability */
rax *uncommitted_keys; /* Map of dirty keys to the offset required by replica acknowledgement */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A RAX probably fine for the PoC. It's similar to the RAX that we used in the past to track the slot-to-key mapping. We replaced that one with one hashtable per slot instead. This RAX tracking is costly because it duplicates the key names and there are multiple pointers to follow. A hashtable is flat.

In the long term, I think we should store the offset within the reference-counted key-value object itself (robj metadata of some sort). If we need a flag to mark it as dirty, we can steal one of the bits from the reference counter.

To find all the dirty keys, we can use a hashtable, probably one per slot in cluster mode, so we can probably use a kvstore.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had some discussion on this and would prefer to porting into hashtable as well.

Thing to note, this isn't permanent data and is ephemeral in nature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thing to note, this isn't permanent data and is ephemeral in nature.

Yes, so a middle way is to only add a bit flag in the robj so we can avoid looking up the RAX every time a non-dirty key is accessed.

or client::buf. */
uint64_t keyspace_notified : 1; /* Indicates that a keyspace notification was triggered during the execution of the
current command. */
uint64_t durable_blocked_client: 1; /* This is a durable blocked client that is waiting for the server to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the 65th flag? It means this bit field grows from 64 bits to 128 and we get 63 unused bits in the end.

I think it doesn't go well with the union in the client struct, which requires it to be 64 bits, so we can do things like clearing all flags by setting raw_flags to 0 and things like that.

/* Client flags and state indicators */
union {
    uint64_t raw_flag;
    struct ClientFlags flag;
};

Perhaps we can consider adding a new, separate, set of flags, like ClientFlags2 or something.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, fair enough. Maybe a separate set of durability-related flags? (maybe it doesn't much help unless we plan to have several here)

Comment on lines 3991 to +3994
/* Do some maintenance job and cleanup */
// TODO: should blocking postCall could be moved into afterCommand?
afterCommand(c);
postCall(c);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The names "after command" and "post call" basically mean the same thing. Let's rename both of them to better explain what they do. For example something like afterCommandCleanup and afterCommandTrackReplOffset.

struct ClientFlags client_old_flags = c->flag;

struct serverCommand *real_cmd = c->realcmd;
preCall();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe beforeCommandTrackReplOffset or something like that better explains what it does.

(Functions in some other areas use prefixes like cluster or repl. I don't know if we should consider this WBL mechanism an integrated part of replication or if we should consider a new prefix like wbl.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the code in this file does is to implement the WBL and block/unblock client replies. Durably replicated writes is more than just this. A name like wbl.c or replyblocking.c would be more exact for this file.

Copy link
Collaborator

@hpatro hpatro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few things @jjuleslasarte and I had discussed and would need to revisit at a later point:

  1. Move from offset tracking mechanism to raft's term/index/commit for consensus (modify getConsensusOffset).
  2. Figure out throttling mechanism on accumulation of buffer. Dropping COB on overrun isn't feasible.
  3. RAX to be replaced with hashtable (if deemed not efficient). Note: This is only ephemeral data though i.e. during the blocking phase.

Hook points to discuss:

  • server.c:3794: preCall(); - Capture replication offset (pre execution)
  • server.c:3994: postCall(); - Determine if write command got executed and handles special blocking (scripts)
  • server.c:4528: preCommandExec(); - Prepares individual client for blocking
  • server.c:4533: postCommandExec(); - Blocks individual client
  • networking.c:1680: isClientReplyBufferLimited(c) - Response buffering and limited
  • replication.c:1426: postReplicaAck(); Processes acknowledgment and allows clients to progress to certain offset.

Copy link
Collaborator

@hpatro hpatro Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ranshid Do we remove the read handler callback while the client is blocked?

@hpatro
Copy link
Collaborator

hpatro commented Nov 25, 2025

@allenss-amazon @yairgott @madolson @rjd15372 Could you folks take a look and provide your feedback?

// Describes a pre-execution COB offset for a client
typedef struct preExecutionOffsetPosition {
// True if the pre execution offset/reply block are initialized
bool recorded;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would it be not recorded?

serverLog(LOG_DEBUG, "preCall hook entered");
if (!isPrimaryDurabilityEnabled()) return;

pre_call_replication_offset = server.primary_repl_offset;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Touch point for consensus layer to provide the metadata (term + index) before any command execution. And why is this not in the durable_t ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe define a cb like recordIndex(void *);

Comment on lines +160 to +171
long long getConsensusOffset(const unsigned long numAcksNeeded) {
const unsigned long numReplicas = listLength(server.replicas);
if (numAcksNeeded == 0) {
// If no ack is needed, then the consensus offset is the one primary is at.
return server.primary_repl_offset;
}

// If the number of connected replicas is less than the number of required replicas,
// return -1 because we don't have enough number of replicas for the ACK.
if (numReplicas < numAcksNeeded) {
return -1;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might as well be a callback

size_t getConsensusOffset();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants