Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 83 additions & 52 deletions crates/optimism/trie/src/db/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,15 @@ use reth_trie::{
BranchNodeCompact, HashedPostState, Nibbles,
};
use std::{cmp::max, ops::RangeBounds, path::Path};
use tracing::info;

/// Preprocessed delete work for a prune range
#[derive(Debug, Default, Clone)]
struct HistoryDeleteBatch {
account_trie: Vec<(<AccountTrieHistory as Table>::Key, u64)>,
storage_trie: Vec<(<StorageTrieHistory as Table>::Key, u64)>,
hashed_account: Vec<(<HashedAccountHistory as Table>::Key, u64)>,
hashed_storage: Vec<(<HashedStorageHistory as Table>::Key, u64)>,
}

/// MDBX implementation of [`OpProofsStore`].
#[derive(Debug)]
Expand Down Expand Up @@ -175,7 +183,10 @@ impl MdbxProofsStorage {
}

// Delete tombstones after updates to avoid overwrites
self.delete_dup_sorted::<T, _, V>(tx, block_number, to_delete.into_iter().map(|(k, _)| k))?;
self.delete_dup_sorted::<T, _, V>(
tx,
to_delete.into_iter().map(|(k, _)| (k, block_number)),
)?;

Ok(keys)
}
Expand All @@ -185,20 +196,19 @@ impl MdbxProofsStorage {
fn delete_dup_sorted<T, I, V>(
&self,
tx: &(impl DbTxMut + DbTx),
block_number: u64,
items: I,
) -> OpProofsStorageResult<()>
where
T: Table<Value = VersionedValue<V>> + DupSort<SubKey = u64>,
T::Key: Clone,
T::SubKey: PartialEq + Clone,
I: IntoIterator<Item = T::Key>,
I: IntoIterator<Item = (T::Key, T::SubKey)>,
{
let mut cur = tx.cursor_dup_write::<T>()?;
for key in items {
if let Some(vv) = cur.seek_by_key_subkey(key.clone(), block_number)? {
for (key, subkey) in items {
if let Some(vv) = cur.seek_by_key_subkey(key, subkey)? {
// ensure we didn't land on a >subkey
if vv.block_number == block_number {
if vv.block_number == subkey {
cur.delete_current()?;
}
}
Expand Down Expand Up @@ -235,55 +245,67 @@ impl MdbxProofsStorage {
Ok(keys)
}

/// Delete versioned history over `block_range` using `BlockChangeSet`.
/// For each block: delete referenced rows at that block and drop the changeset entry.
/// Collect versioned history over `block_range` using `BlockChangeSet`.
fn collect_history_ranged(
&self,
tx: &impl DbTx,
block_range: impl RangeBounds<u64>,
) -> OpProofsStorageResult<HistoryDeleteBatch> {
let mut history = HistoryDeleteBatch::default();
let mut change_set_cursor = tx.cursor_read::<BlockChangeSet>()?;
let mut walker = change_set_cursor.walk_range(block_range)?;

while let Some(Ok((block_number, change_set))) = walker.next() {
// Push (key, subkey=block_number) pairs
history
.account_trie
.extend(change_set.account_trie_keys.into_iter().map(|k| (k, block_number)));
history
.storage_trie
.extend(change_set.storage_trie_keys.into_iter().map(|k| (k, block_number)));
history
.hashed_account
.extend(change_set.hashed_account_keys.into_iter().map(|k| (k, block_number)));
history
.hashed_storage
.extend(change_set.hashed_storage_keys.into_iter().map(|k| (k, block_number)));
}

// Sorting by tuple sorts by key first, then by block_number.
history.account_trie.sort_by(|(k1, b1), (k2, b2)| k1.cmp(k2).then_with(|| b1.cmp(b2)));
history.storage_trie.sort_by(|(k1, b1), (k2, b2)| k1.cmp(k2).then_with(|| b1.cmp(b2)));
history.hashed_account.sort_by(|(k1, b1), (k2, b2)| k1.cmp(k2).then_with(|| b1.cmp(b2)));
history.hashed_storage.sort_by(|(k1, b1), (k2, b2)| k1.cmp(k2).then_with(|| b1.cmp(b2)));

Ok(history)
}

/// Delete versioned history over `block_range` using history batch.
fn delete_history_ranged(
&self,
tx: &(impl DbTxMut + DbTx),
block_range: impl RangeBounds<u64>,
history: HistoryDeleteBatch,
) -> OpProofsStorageResult<WriteCounts> {
let mut write_count = WriteCounts::default();
let mut change_set_cursor = tx.cursor_write::<BlockChangeSet>()?;
let mut walker = change_set_cursor.walk_range(block_range)?;
let mut blocks_deleted = 0;
while let Some(Ok((block_number, change_set))) = walker.next() {
write_count += WriteCounts::new(
change_set.account_trie_keys.len() as u64,
change_set.storage_trie_keys.len() as u64,
change_set.hashed_account_keys.len() as u64,
change_set.hashed_storage_keys.len() as u64,
);

self.delete_dup_sorted::<AccountTrieHistory, _, _>(
tx,
block_number,
change_set.account_trie_keys,
)?;
self.delete_dup_sorted::<StorageTrieHistory, _, _>(
tx,
block_number,
change_set.storage_trie_keys,
)?;
self.delete_dup_sorted::<HashedAccountHistory, _, _>(
tx,
block_number,
change_set.hashed_account_keys,
)?;
self.delete_dup_sorted::<HashedStorageHistory, _, _>(
tx,
block_number,
change_set.hashed_storage_keys,
)?;

while let Some(Ok((_, _))) = walker.next() {
walker.delete_current()?;

blocks_deleted += 1;
// Progress log: only every 20 blocks, only if total >= 20
if blocks_deleted >= 1000 && blocks_deleted % 1000 == 0 {
info!(target: "optimism.trie", %blocks_deleted, "Deleting Proofs History");
}
}
Ok(write_count)

// Delete using the simplified API: iterator of (key, subkey)
self.delete_dup_sorted::<AccountTrieHistory, _, _>(tx, history.clone().account_trie)?;
self.delete_dup_sorted::<StorageTrieHistory, _, _>(tx, history.clone().storage_trie)?;
self.delete_dup_sorted::<HashedAccountHistory, _, _>(tx, history.clone().hashed_account)?;
self.delete_dup_sorted::<HashedStorageHistory, _, _>(tx, history.clone().hashed_storage)?;

Ok(WriteCounts {
account_trie_updates_written_total: history.account_trie.len() as u64,
storage_trie_updates_written_total: history.storage_trie.len() as u64,
hashed_accounts_written_total: history.hashed_account.len() as u64,
hashed_storages_written_total: history.hashed_storage.len() as u64,
})
}

/// Write trie/state history for `block_number` from `block_state_diff`.
Expand Down Expand Up @@ -708,6 +730,11 @@ impl OpProofsStore for MdbxProofsStorage {
return Ok(write_counts); // Nothing to prune
}

// collect history for deletion
let history_range = max(old_earliest_block_number, 1)..=new_earliest_block_number;
let history_to_delete =
self.env.view(|tx| self.collect_history_ranged(tx, history_range.clone()))??;

self.env.update(|tx| {
// Update the initial state (block zero)
let change_set = self.store_trie_updates_for_block(tx, 0, diff, false)?;
Expand All @@ -719,10 +746,7 @@ impl OpProofsStore for MdbxProofsStorage {
);

// Delete the old entries for the block range excluding block 0
let delete_counts = self.delete_history_ranged(
tx,
max(old_earliest_block_number, 1)..=new_earliest_block_number,
)?;
let delete_counts = self.delete_history_ranged(tx, history_range, history_to_delete)?;
write_counts += delete_counts;

// Set the earliest block number to the new value
Expand All @@ -739,6 +763,9 @@ impl OpProofsStore for MdbxProofsStorage {
/// starting from provided block. Also updates the `ProofWindow::LatestBlock` to parent of
/// `unwind_upto_block`.
async fn unwind_history(&self, to: BlockWithParent) -> OpProofsStorageResult<()> {
let history_to_delete =
self.env.view(|tx| self.collect_history_ranged(tx, to.block.number..))??;

self.env.update(|tx| {
let proof_window = match self.inner_get_proof_window(tx)? {
Some(pw) => pw,
Expand All @@ -756,7 +783,7 @@ impl OpProofsStore for MdbxProofsStorage {
});
}

self.delete_history_ranged(tx, (to.block.number)..)?;
self.delete_history_ranged(tx, to.block.number.., history_to_delete)?;

let new_latest_block =
BlockNumberHash::new(to.block.number.saturating_sub(1), to.parent);
Expand All @@ -772,8 +799,12 @@ impl OpProofsStore for MdbxProofsStorage {
latest_common_block_number: u64,
blocks_to_add: HashMap<BlockWithParent, BlockStateDiff>,
) -> OpProofsStorageResult<()> {
let history_to_delete = self
.env
.view(|tx| self.collect_history_ranged(tx, latest_common_block_number + 1..))??;

self.env.update(|tx| {
self.delete_history_ranged(tx, latest_common_block_number + 1..)?;
self.delete_history_ranged(tx, latest_common_block_number + 1.., history_to_delete)?;

// Sort by block number: Hashmap does not guarantee order
// todo: use a sorted vec instead
Expand Down
Loading