Skip to content

Commit 252c29a

Browse files
committed
using a callback id indirection
1 parent f783620 commit 252c29a

File tree

2 files changed

+91
-24
lines changed

2 files changed

+91
-24
lines changed

chitchat/src/listener.rs

Lines changed: 62 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ use std::ops::Bound;
33
use std::sync::atomic::{AtomicUsize, Ordering};
44
use std::sync::{Arc, RwLock, Weak};
55

6+
use tracing::error;
7+
68
use crate::KeyChangeEvent;
79

810
pub struct ListenerHandle {
9-
prefix: String,
11+
// The prefix and listener_id are used for removal of the listener.
1012
listener_id: usize,
1113
listeners: Weak<RwLock<InnerListeners>>,
1214
}
@@ -25,7 +27,7 @@ impl Drop for ListenerHandle {
2527
fn drop(&mut self) {
2628
if let Some(listeners) = self.listeners.upgrade() {
2729
let mut listeners_guard = listeners.write().unwrap();
28-
listeners_guard.remove_listener(&self.prefix, self.listener_id);
30+
listeners_guard.remove_listener(self.listener_id);
2931
}
3032
}
3133
}
@@ -60,9 +62,15 @@ impl Listeners {
6062
let new_idx = inner_listener_guard
6163
.listener_idx
6264
.fetch_add(1, Ordering::Relaxed);
63-
inner_listener_guard.subscribe_event(&key_prefix, new_idx, boxed_listener);
65+
let callback_entry = CallbackEntry {
66+
prefix: key_prefix.clone(),
67+
callback: boxed_listener,
68+
};
69+
inner_listener_guard
70+
.callbacks
71+
.insert(new_idx, callback_entry);
72+
inner_listener_guard.subscribe_event(&key_prefix, new_idx);
6473
ListenerHandle {
65-
prefix: key_prefix,
6674
listener_id: new_idx,
6775
listeners: weak_listeners,
6876
}
@@ -73,33 +81,50 @@ impl Listeners {
7381
}
7482
}
7583

84+
struct CallbackEntry {
85+
prefix: String,
86+
callback: BoxedListener,
87+
}
88+
89+
type CallbackId = usize;
90+
7691
#[derive(Default)]
7792
struct InnerListeners {
7893
// A trie would have been more efficient, but in reality we don't have
7994
// that many listeners.
80-
listeners: BTreeMap<String, HashMap<usize, BoxedListener>>,
95+
listeners: BTreeMap<String, Vec<CallbackId>>,
8196
listener_idx: AtomicUsize,
97+
// Callbacks is a hashmap because as we delete listeners, we create "holes" in the
98+
// callback_id -> callback mapping
99+
callbacks: HashMap<usize, CallbackEntry>,
82100
}
83101

84102
impl InnerListeners {
85103
// We don't inline this to make sure monomorphization generates as little code as possible.
86-
fn subscribe_event(&mut self, key_prefix: &str, idx: usize, callback: BoxedListener) {
87-
if let Some(callbacks) = self.listeners.get_mut(key_prefix) {
88-
callbacks.insert(idx, callback);
89-
} else {
90-
let mut listener_map = HashMap::new();
91-
listener_map.insert(idx, callback);
92-
self.listeners.insert(key_prefix.to_string(), listener_map);
93-
}
104+
fn subscribe_event(&mut self, key_prefix: &str, idx: CallbackId) {
105+
self.listeners
106+
.entry(key_prefix.to_string())
107+
.or_default()
108+
.push(idx);
109+
}
110+
111+
fn call(&self, callback_id: CallbackId, key_change_event: KeyChangeEvent) {
112+
let Some(CallbackEntry { callback, .. }) = self.callbacks.get(&callback_id) else {
113+
error!(
114+
"callback {callback_id} not found upon call. this should not happen, please report"
115+
);
116+
return;
117+
};
118+
(*callback)(key_change_event);
94119
}
95120

96121
fn trigger_event(&self, key_change_event: KeyChangeEvent) {
97122
// We treat the empty prefix a tiny bit separately to get able to at least
98123
// use the first character as a range bound, as if we were going to the first level of
99124
// a trie.
100-
if let Some(listeners) = self.listeners.get("") {
101-
for listener in listeners.values() {
102-
(*listener)(key_change_event);
125+
if let Some(callback_ids) = self.listeners.get("") {
126+
for &callback_id in callback_ids {
127+
self.call(callback_id, key_change_event);
103128
}
104129
}
105130
if key_change_event.key.is_empty() {
@@ -115,17 +140,32 @@ impl InnerListeners {
115140
break;
116141
}
117142
if let Some(stripped_key_change_event) = key_change_event.strip_key_prefix(prefix_key) {
118-
for listener in listeners.values() {
119-
(*listener)(stripped_key_change_event);
143+
for &callback_id in listeners {
144+
self.call(callback_id, stripped_key_change_event)
120145
}
121146
}
122147
}
123148
}
124149

125-
fn remove_listener(&mut self, key_prefix: &str, idx: usize) {
126-
if let Some(callbacks) = self.listeners.get_mut(key_prefix) {
127-
callbacks.remove(&idx);
128-
}
150+
fn remove_listener(&mut self, callback_id: CallbackId) {
151+
let Some(CallbackEntry { prefix, .. }) = self.callbacks.remove(&callback_id) else {
152+
error!(
153+
"callback {callback_id} not found upon remove. this should not happen, please \
154+
report"
155+
);
156+
return;
157+
};
158+
let Some(callbacks) = self.listeners.get_mut(&prefix) else {
159+
error!(
160+
"callback prefix not foudn upon remove. this should never happen, please report"
161+
);
162+
return;
163+
};
164+
let position = callbacks
165+
.iter()
166+
.position(|x| *x == callback_id)
167+
.expect("callback not found");
168+
callbacks.swap_remove(position);
129169
}
130170
}
131171

chitchat/src/state.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use tracing::{info, warn};
1717
use crate::delta::{Delta, DeltaSerializer, NodeDelta};
1818
use crate::digest::{Digest, NodeDigest};
1919
use crate::listener::Listeners;
20-
use crate::types::{DeletionStatus, DeletionStatusMutation};
20+
use crate::types::{DeletionStatus, DeletionStatusMutation, KeyValueMutation};
2121
use crate::{ChitchatId, Heartbeat, KeyChangeEvent, Version, VersionedValue};
2222

2323
#[derive(Clone, Serialize, Deserialize)]
@@ -110,6 +110,7 @@ impl NodeState {
110110
}
111111

112112
/// Returns the node's max version.
113+
#[inline]
113114
pub fn max_version(&self) -> Version {
114115
self.max_version
115116
}
@@ -210,7 +211,11 @@ impl NodeState {
210211
current_last_gc_version=self.last_gc_version,
211212
"resetting node");
212213
*self = NodeState::new(node_delta.chitchat_id.clone(), self.listeners.clone());
213-
// The node_delta max_version whe
214+
// It is possible for the node delta to not contain any KVs.
215+
// (for instance they all have been GCed.)
216+
//
217+
// In that case, no KV are here to tell us what the max version is, so the
218+
// node_delta itself holds a max_version.
214219
if let Some(max_version) = node_delta.max_version {
215220
if node_delta.key_values.is_empty() {
216221
self.max_version = max_version;
@@ -226,6 +231,28 @@ impl NodeState {
226231
true
227232
}
228233

234+
fn applicable_key_value_mutations(
235+
&self,
236+
node_delta: NodeDelta,
237+
) -> impl Iterator<Item = KeyValueMutation> + '_ {
238+
node_delta
239+
.key_values
240+
.into_iter()
241+
.filter(move |key_value_mutation| {
242+
if key_value_mutation.version <= self.max_version() {
243+
// We already know about this KV.
244+
return false;
245+
}
246+
if key_value_mutation.status.scheduled_for_deletion() {
247+
// This KV has already been GCed.
248+
if key_value_mutation.version <= self.last_gc_version {
249+
return false;
250+
}
251+
}
252+
true
253+
})
254+
}
255+
229256
fn apply_delta(&mut self, node_delta: NodeDelta, now: Instant) {
230257
if !self.prepare_apply_delta(&node_delta) {
231258
return;

0 commit comments

Comments
 (0)