Skip to content

Commit f09f842

Browse files
committed
Makes it possible to register batching listeners
Upon reset_node and apply_update, we want to be able to react to a batch of changes rather than individual changes. In quickwit for instance, we want to be able to react to the reception of a batch of deleted shard and group this reaction into a single metastore call.
1 parent 2e63747 commit f09f842

File tree

6 files changed

+950
-492
lines changed

6 files changed

+950
-492
lines changed

chitchat-test/src/main.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ impl Api {
3737
#[oai(path = "/set_kv/", method = "get")]
3838
async fn set_kv(&self, key: Query<String>, value: Query<String>) -> Json<serde_json::Value> {
3939
let mut chitchat_guard = self.chitchat.lock().await;
40-
41-
let cc_state = chitchat_guard.self_node_state();
40+
let mut cc_state = chitchat_guard.self_node_state();
4241
cc_state.set(key.as_str(), value.as_str());
4342

4443
Json(serde_json::to_value(&SetKeyValueResponse { status: true }).unwrap())
@@ -48,8 +47,7 @@ impl Api {
4847
#[oai(path = "/mark_for_deletion/", method = "get")]
4948
async fn mark_for_deletion(&self, key: Query<String>) -> Json<serde_json::Value> {
5049
let mut chitchat_guard = self.chitchat.lock().await;
51-
52-
let cc_state = chitchat_guard.self_node_state();
50+
let mut cc_state = chitchat_guard.self_node_state();
5351
cc_state.delete(key.as_str());
5452
Json(serde_json::to_value(&SetKeyValueResponse { status: true }).unwrap())
5553
}

chitchat/src/delta.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,30 @@ impl NodeDelta {
330330
}
331331
}
332332

333+
impl NodeDelta {
334+
pub(crate) fn applicable_key_value_mutations(
335+
self,
336+
max_version: u64,
337+
last_gc_version: u64,
338+
) -> impl Iterator<Item = KeyValueMutation> {
339+
self.key_values
340+
.into_iter()
341+
.filter(move |key_value_mutation| {
342+
if key_value_mutation.version <= max_version {
343+
// We already know about this KV.
344+
return false;
345+
}
346+
if key_value_mutation.status.scheduled_for_deletion() {
347+
// This KV has already been GCed.
348+
if key_value_mutation.version <= last_gc_version {
349+
return false;
350+
}
351+
}
352+
true
353+
})
354+
}
355+
}
356+
333357
#[derive(Default)]
334358
struct DeltaBuilder {
335359
existing_nodes: HashSet<ChitchatId>,

0 commit comments

Comments
 (0)