Skip to content

Commit 7d39156

Browse files
authored
fix: [breaking] correct lexicographical ordering between topics for the topics index (#55)
Also: improved idx_context_key_range_end implementation plus some tests
1 parent 271a94a commit 7d39156

File tree

3 files changed

+159
-25
lines changed

3 files changed

+159
-25
lines changed

src/api.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ pub async fn serve(
358358
store: Store,
359359
engine: nu::Engine,
360360
expose: Option<String>,
361-
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
361+
) -> Result<(), BoxError> {
362362
if let Err(e) = store.append(
363363
Frame::builder("xs.start", store::ZERO_CONTEXT)
364364
.maybe_meta(expose.as_ref().map(|e| serde_json::json!({"expose": e})))
@@ -397,7 +397,7 @@ async fn listener_loop(
397397
mut listener: Listener,
398398
store: Store,
399399
engine: nu::Engine,
400-
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
400+
) -> Result<(), BoxError> {
401401
loop {
402402
let (stream, _) = listener.accept().await?;
403403
let io = TokioIo::new(stream);
@@ -506,9 +506,7 @@ async fn handle_import(store: &mut Store, body: hyper::body::Incoming) -> HTTPRe
506506
Err(e) => return response_400(format!("Invalid frame JSON: {}", e)),
507507
};
508508

509-
store
510-
.insert_frame(&frame)
511-
.map_err(|e| Box::new(e) as BoxError)?;
509+
store.insert_frame(&frame).map_err(BoxError::from)?;
512510

513511
Ok(Response::builder()
514512
.status(StatusCode::OK)

src/store/mod.rs

+37-20
Original file line numberDiff line numberDiff line change
@@ -414,15 +414,18 @@ impl Store {
414414
}
415415

416416
#[tracing::instrument(skip(self), fields(id = %id.to_string()))]
417-
pub fn remove(&self, id: &Scru128Id) -> Result<(), fjall::Error> {
417+
pub fn remove(&self, id: &Scru128Id) -> Result<(), crate::error::Error> {
418418
let Some(frame) = self.get(id) else {
419419
// Already deleted
420420
return Ok(());
421421
};
422422

423+
// Get the index topic key
424+
let topic_key = idx_topic_key_from_frame(&frame)?;
425+
423426
let mut batch = self.keyspace.batch();
424427
batch.remove(&self.frame_partition, id.as_bytes());
425-
batch.remove(&self.idx_topic, idx_topic_key_from_frame(&frame));
428+
batch.remove(&self.idx_topic, topic_key);
426429
batch.remove(&self.idx_context, idx_context_key_from_frame(&frame));
427430

428431
// If this is a context frame, remove it from the contexts set
@@ -431,7 +434,8 @@ impl Store {
431434
}
432435

433436
batch.commit()?;
434-
self.keyspace.persist(fjall::PersistMode::SyncAll)
437+
self.keyspace.persist(fjall::PersistMode::SyncAll)?;
438+
Ok(())
435439
}
436440

437441
pub async fn cas_reader(&self, hash: ssri::Integrity) -> cacache::Result<cacache::Reader> {
@@ -469,14 +473,19 @@ impl Store {
469473
}
470474

471475
#[tracing::instrument(skip(self))]
472-
pub fn insert_frame(&self, frame: &Frame) -> Result<(), fjall::Error> {
476+
pub fn insert_frame(&self, frame: &Frame) -> Result<(), crate::error::Error> {
473477
let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
478+
479+
// Get the index topic key
480+
let topic_key = idx_topic_key_from_frame(frame)?;
481+
474482
let mut batch = self.keyspace.batch();
475483
batch.insert(&self.frame_partition, frame.id.as_bytes(), encoded);
476-
batch.insert(&self.idx_topic, idx_topic_key_from_frame(frame), b"");
484+
batch.insert(&self.idx_topic, topic_key, b"");
477485
batch.insert(&self.idx_context, idx_context_key_from_frame(frame), b"");
478486
batch.commit()?;
479-
self.keyspace.persist(fjall::PersistMode::SyncAll)
487+
self.keyspace.persist(fjall::PersistMode::SyncAll)?;
488+
Ok(())
480489
}
481490

482491
pub fn append(&self, mut frame: Frame) -> Result<Frame, crate::error::Error> {
@@ -497,6 +506,9 @@ impl Store {
497506
}
498507
}
499508

509+
// Check for null byte in topic (in case we're not storing the frame)
510+
idx_topic_key_from_frame(&frame)?;
511+
500512
// only store the frame if it's not ephemeral
501513
if frame.ttl != Some(TTL::Ephemeral) {
502514
self.insert_frame(&frame)?;
@@ -609,18 +621,28 @@ fn is_expired(id: &Scru128Id, ttl: &Duration) -> bool {
609621
now_ms >= expires_ms
610622
}
611623

624+
const NULL_DELIMITER: u8 = 0;
625+
612626
fn idx_topic_key_prefix(context_id: Scru128Id, topic: &str) -> Vec<u8> {
613627
let mut v = Vec::with_capacity(16 + topic.len() + 1); // context_id (16) + topic bytes + delimiter
614628
v.extend(context_id.as_bytes()); // binary context_id (16 bytes)
615629
v.extend(topic.as_bytes()); // topic string as UTF-8 bytes
616-
v.push(0xFF); // delimiter
630+
v.push(NULL_DELIMITER); // Delimiter for variable-sized keys
617631
v
618632
}
619633

620-
fn idx_topic_key_from_frame(frame: &Frame) -> Vec<u8> {
634+
pub(crate) fn idx_topic_key_from_frame(frame: &Frame) -> Result<Vec<u8>, crate::error::Error> {
635+
// Check if the topic contains a null byte when encoded as UTF-8
636+
if frame.topic.as_bytes().contains(&NULL_DELIMITER) {
637+
return Err(
638+
"Topic cannot contain null byte (0x00) as it's used as a delimiter"
639+
.to_string()
640+
.into(),
641+
);
642+
}
621643
let mut v = idx_topic_key_prefix(frame.context_id, &frame.topic);
622644
v.extend(frame.id.as_bytes());
623-
v
645+
Ok(v)
624646
}
625647

626648
fn idx_topic_frame_id_from_key(key: &[u8]) -> Scru128Id {
@@ -638,17 +660,12 @@ fn idx_context_key_from_frame(frame: &Frame) -> Vec<u8> {
638660

639661
// Returns the key prefix for the next context after the given one
640662
fn idx_context_key_range_end(context_id: Scru128Id) -> Vec<u8> {
641-
let mut bytes = context_id.as_bytes().to_vec();
642-
for i in (0..bytes.len()).rev() {
643-
if bytes[i] == 0xFF {
644-
bytes[i] = 0;
645-
} else {
646-
bytes[i] += 1;
647-
return bytes;
648-
}
649-
}
650-
bytes.push(0);
651-
bytes
663+
let mut i = context_id.to_u128();
664+
665+
// NOTE: Reaching u128::MAX is probably not gonna happen...
666+
i = i.saturating_add(1);
667+
668+
Scru128Id::from(i).as_bytes().to_vec()
652669
}
653670

654671
fn deserialize_frame<B1: AsRef<[u8]>, B2: AsRef<[u8]>>(record: (B1, B2)) -> Frame {

src/store/tests.rs

+119
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,37 @@ mod tests_read_options {
2323
reencoded: Option<&'a str>,
2424
}
2525

26+
#[tokio::test]
27+
async fn test_topic_index_order() {
28+
let folder = tempfile::tempdir().unwrap();
29+
30+
let store = Store::new(folder.path().to_path_buf());
31+
32+
let frame1 = Frame {
33+
id: scru128::new(),
34+
topic: "ab".to_owned(),
35+
..Default::default()
36+
};
37+
let frame1 = store.append(frame1).unwrap();
38+
39+
let frame2 = Frame {
40+
id: scru128::new(),
41+
topic: "abc".to_owned(),
42+
..Default::default()
43+
};
44+
let frame2 = store.append(frame2).unwrap();
45+
46+
let keys = store.idx_topic.keys().flatten().collect::<Vec<_>>();
47+
48+
assert_eq!(
49+
&[
50+
fjall::Slice::from(idx_topic_key_from_frame(&frame1).unwrap()),
51+
fjall::Slice::from(idx_topic_key_from_frame(&frame2).unwrap()),
52+
],
53+
&*keys,
54+
);
55+
}
56+
2657
#[tokio::test]
2758
async fn test_topic_index() {
2859
let folder = tempfile::tempdir().unwrap();
@@ -493,6 +524,32 @@ mod tests_context {
493524
use super::*;
494525
use tempfile::TempDir;
495526

527+
#[tokio::test]
528+
async fn test_reject_null_byte_in_topic() {
529+
let temp_dir = tempfile::tempdir().unwrap();
530+
let store = Store::new(temp_dir.path().to_path_buf());
531+
532+
// Try to create a frame with a topic containing a null byte
533+
let frame = Frame {
534+
id: scru128::new(),
535+
topic: "test\0topic".to_owned(),
536+
context_id: ZERO_CONTEXT,
537+
hash: None,
538+
meta: None,
539+
ttl: None,
540+
};
541+
542+
// Creating the index key should fail
543+
let result = idx_topic_key_from_frame(&frame);
544+
assert!(result.is_err());
545+
assert!(result.unwrap_err().to_string().contains("null byte"));
546+
547+
// Trying to append the frame should also fail
548+
let result = store.append(frame);
549+
assert!(result.is_err());
550+
assert!(result.unwrap_err().to_string().contains("null byte"));
551+
}
552+
496553
#[tokio::test]
497554
async fn test_context_operations() {
498555
let temp_dir = TempDir::new().unwrap();
@@ -891,6 +948,68 @@ mod tests_context {
891948
let frames_ctx2: Vec<_> = store.iter_frames(Some(ctx2), None).collect();
892949
assert_eq!(frames_ctx2, vec![ctx2_frame1, ctx2_frame2]);
893950
}
951+
952+
#[test]
953+
fn test_idx_context_key_range_end() {
954+
// Test 1: Normal case - verify basic increment works
955+
let context_id = Scru128Id::from_u128(100);
956+
let next_id = Scru128Id::from_u128(101);
957+
let result = idx_context_key_range_end(context_id);
958+
assert_eq!(result, next_id.as_bytes().to_vec());
959+
960+
// Test 2: Test with a complex key that's not all 0xFF
961+
let complex_id = Scru128Id::from_u128(0x8000_FFFF_0000_AAAA_1234_5678_9ABC_DEF0);
962+
let expected_next = Scru128Id::from_u128(0x8000_FFFF_0000_AAAA_1234_5678_9ABC_DEF1);
963+
assert_eq!(
964+
idx_context_key_range_end(complex_id),
965+
expected_next.as_bytes().to_vec()
966+
);
967+
968+
// Test 3: Boundary case - near maximum value
969+
let near_max = Scru128Id::from_u128(u128::MAX - 1);
970+
let max = Scru128Id::from_u128(u128::MAX);
971+
assert_eq!(idx_context_key_range_end(near_max), max.as_bytes().to_vec());
972+
973+
// Test 4: Boundary case - at maximum value (saturating_add should prevent overflow)
974+
let at_max = Scru128Id::from_u128(u128::MAX);
975+
assert_eq!(
976+
idx_context_key_range_end(at_max),
977+
at_max.as_bytes().to_vec(),
978+
"When at u128::MAX, saturating_add should keep the same value"
979+
);
980+
981+
// Test 5: Integration test - make sure it works in range queries
982+
let temp_dir = tempfile::tempdir().unwrap();
983+
let store = Store::new(temp_dir.path().to_path_buf());
984+
985+
// Create first context normally
986+
let ctx1_frame = store
987+
.append(Frame::builder("xs.context", ZERO_CONTEXT).build())
988+
.unwrap();
989+
let ctx1 = ctx1_frame.id;
990+
991+
// For ctx2, we need to manually create and register it
992+
let ctx2 = Scru128Id::from_u128(ctx1.to_u128() + 1);
993+
let ctx2_frame = Frame::builder("xs.context", ZERO_CONTEXT)
994+
.id(ctx2)
995+
.ttl(TTL::Forever)
996+
.build();
997+
998+
// Manually insert the frame and register the context
999+
store.insert_frame(&ctx2_frame).unwrap();
1000+
store.contexts.write().unwrap().insert(ctx2);
1001+
1002+
// Add frames to both contexts
1003+
let frame1 = store.append(Frame::builder("test", ctx1).build()).unwrap();
1004+
let frame2 = store.append(Frame::builder("test", ctx2).build()).unwrap();
1005+
1006+
// Test that range query correctly separates the contexts
1007+
let frames1: Vec<_> = store.read_sync(None, None, Some(ctx1)).collect();
1008+
assert_eq!(frames1, vec![frame1], "Should only return frames from ctx1");
1009+
1010+
let frames2: Vec<_> = store.read_sync(None, None, Some(ctx2)).collect();
1011+
assert_eq!(frames2, vec![frame2], "Should only return frames from ctx2");
1012+
}
8941013
}
8951014

8961015
mod tests_ttl_expire {

0 commit comments

Comments
 (0)