Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add flush_all_blocks option to also write intermediate blocks #2101

Merged
merged 3 commits into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions fvm/src/blockstore/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,25 @@ where
pub fn into_inner(self) -> BS {
self.base
}

/// Flushes all blocks from the write cache to the provided blockstore,
/// regardless of whether they're reachable from any state root.
/// Unlike the standard flush() operation which only writes blocks connected
/// to the final state tree, this writes every block created during execution.
pub fn flush_all(&self) -> Result<()> {
log::debug!(
"Flushing all ({}) cache blocks to blockstore",
self.buffer_len()
);

self.base.put_many_keyed(self.write.borrow_mut().drain())?;

Ok(())
}

pub fn buffer_len(&self) -> usize {
self.write.borrow().len()
}
}

impl<BS> Buffered for BufferedBlockstore<BS>
Expand Down Expand Up @@ -329,4 +348,99 @@ mod tests {
assert_eq!(buf_store.get(&sealed_comm_cid).unwrap(), None);
assert_eq!(mem.get_cbor::<u8>(&unconnected).unwrap(), None);
}

#[test]
fn test_flush_vs_flush_all() {
fn setup(
mem: &MemoryBlockstore,
buf_store: &BufferedBlockstore<&MemoryBlockstore>,
) -> (Cid, Cid, Cid, Cid) {
// A DAG of 2 blocks
let value1 = 42u8;
let value2 = 84u8;
let value1_cid = buf_store.put_cbor(&value1, Code::Blake2b256).unwrap();
let root_cid = buf_store
.put_cbor(&(value1_cid, value2), Code::Blake2b256)
.unwrap();

// Two additional disconnected blocks
let disconnected1 = 100u8;
let disconnected2 = 200u8;
let disconnected1_cid = buf_store
.put_cbor(&disconnected1, Code::Blake2b256)
.unwrap();
let disconnected2_cid = buf_store
.put_cbor(&disconnected2, Code::Blake2b256)
.unwrap();

// Verify initial state - everything in buffer, nothing in backing store
assert_eq!(buf_store.get_cbor::<u8>(&value1_cid).unwrap(), Some(value1));
assert_eq!(
buf_store.get_cbor::<(Cid, u8)>(&root_cid).unwrap(),
Some((value1_cid, value2))
);
assert_eq!(
buf_store.get_cbor::<u8>(&disconnected1_cid).unwrap(),
Some(disconnected1)
);
assert_eq!(
buf_store.get_cbor::<u8>(&disconnected2_cid).unwrap(),
Some(disconnected2)
);
assert_eq!(mem.get_cbor::<u8>(&value1_cid).unwrap(), None);
assert_eq!(mem.get_cbor::<(Cid, u8)>(&root_cid).unwrap(), None);
assert_eq!(mem.get_cbor::<u8>(&disconnected1_cid).unwrap(), None);
assert_eq!(mem.get_cbor::<u8>(&disconnected2_cid).unwrap(), None);

(root_cid, value1_cid, disconnected1_cid, disconnected2_cid)
}

// Case 1: flush operation only writes connected blocks
{
let mem = MemoryBlockstore::default();
let buf_store = BufferedBlockstore::new(&mem);
let (root_cid, value1_cid, disconnected1_cid, disconnected2_cid) =
setup(&mem, &buf_store);

// flush() should write only he DAG
buf_store.flush(&root_cid).unwrap();

// DAG should be in backing store
assert_eq!(mem.get_cbor::<u8>(&value1_cid).unwrap(), Some(42u8));
assert_eq!(
mem.get_cbor::<(Cid, u8)>(&root_cid).unwrap(),
Some((value1_cid, 84u8))
);

// Disconnected blocks should NOT be in backing store
assert_eq!(mem.get_cbor::<u8>(&disconnected1_cid).unwrap(), None);
assert_eq!(mem.get_cbor::<u8>(&disconnected2_cid).unwrap(), None);

// Verify that the buffer still contains the disconnected blocks
assert_eq!(buf_store.buffer_len(), 2);
}

// Case 2: flush_all operation writes all blocks
{
let mem = MemoryBlockstore::default();
let buf_store = BufferedBlockstore::new(&mem);
let (root_cid, value1_cid, disconnected1_cid, disconnected2_cid) =
setup(&mem, &buf_store);

// flush_all() should write all blocks
buf_store.flush_all().unwrap();

// All blocks should be in backing store
assert_eq!(mem.get_cbor::<u8>(&value1_cid).unwrap(), Some(42u8));
assert_eq!(
mem.get_cbor::<(Cid, u8)>(&root_cid).unwrap(),
Some((value1_cid, 84u8))
);
assert_eq!(mem.get_cbor::<u8>(&disconnected1_cid).unwrap(), Some(100u8));
assert_eq!(mem.get_cbor::<u8>(&disconnected2_cid).unwrap(), Some(200u8));

// Verify that all blocks are removed from the buffer
assert_eq!(buf_store.buffer_len(), 0);
}
}
}
9 changes: 8 additions & 1 deletion fvm/src/machine/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,16 @@ where
/// This method also flushes all new blocks (reachable from this new root CID) from the write
/// buffer into the underlying blockstore (the blockstore with which the machine was
/// constructed).
///
/// If an intermediate blockstore was provided, all intermediate blocks created during message
/// execution are flushed to it.
fn flush(&mut self) -> Result<Cid> {
let root = self.state_tree_mut().flush()?;
self.blockstore().flush(&root).or_fatal()?;
if self.context.flush_all_blocks {
self.blockstore().flush_all().or_fatal()?;
} else {
self.blockstore().flush(&root).or_fatal()?;
}
Ok(root)
}

Expand Down
11 changes: 11 additions & 0 deletions fvm/src/machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ impl NetworkConfig {
initial_state_root: initial_state,
circ_supply: fvm_shared::TOTAL_FILECOIN.clone(),
tracing: false,
flush_all_blocks: false,
}
}

Expand Down Expand Up @@ -240,6 +241,10 @@ pub struct MachineContext {
/// Whether or not to produce execution traces in the returned result.
/// Not consensus-critical, but has a performance impact.
pub tracing: bool,

/// When true, flush() will write all blocks created during execution to the
/// blockstore, not just those reachable from the final state root.
pub flush_all_blocks: bool,
}

impl MachineContext {
Expand All @@ -266,4 +271,10 @@ impl MachineContext {
self.tracing = true;
self
}

/// Enable flushing all blocks. [`MachineContext::flush_all_blocks`].
pub fn enable_flush_all_blocks(&mut self) -> &mut Self {
self.flush_all_blocks = true;
self
}
}