-
Notifications
You must be signed in to change notification settings - Fork 260
Pruning: add batched commits with metrics instrumentation #753
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
a1a9674 to
3ea6111
Compare
3ea6111 to
332b803
Compare
| mem::take(&mut self.batch) | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do support giving off these statistics, but these classes should be in a different file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. These helper structs do not belong in this file. I will move CommitStats, PruningPhaseMetrics, PruneBatch, and the consts into a dedicated module to keep processor.rs focused.
| .expect("reachability guard should be available") | ||
| .is_dag_ancestor_of_result(new_pruning_point, h) | ||
| .unwrap() | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you explain what is the point of this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change is fallout of making reachability_read movable into staging. I will refactor to avoid the Option indirection and keep direct reads here. Refactored to keep reachability guard direct; now take a fresh upgradable read per batch iteration and drop the read-only guard before staging commit to avoid the lock hold. Behavior unchanged; this fixes a deadlock we saw in pruning tests.
| let mut queue = VecDeque::<Hash>::from_iter(reachability_read.get_children(ORIGIN).unwrap().iter().copied()); | ||
| let mut queue = VecDeque::<Hash>::from_iter( | ||
| reachability_read.as_ref().expect("reachability guard should be available").get_children(ORIGIN).unwrap().iter().copied(), | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment is above on why this is an option
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above; I will refactor so reachability_read stays non-optional. Refactored to keep reachability guard direct; now take a fresh upgradable read per batch iteration and drop the read-only guard before staging commit to avoid the lock hold. Behavior unchanged; this fixes a deadlock we saw in pruning tests.
| use std::{str::FromStr, sync::Arc}; | ||
|
|
||
| macro_rules! from { | ||
| // Response capture |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how I feel about all these comment changes here and below and above, they require a big context switch from me to review and I don't understand how you got to them anyways.
To what extent are you sure about their veracity?
Anyway, I think you can submit them in a separate cosmetic PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree; cosmetic comment change should be separate. I will revert this comment change in this PR.
| let mut staging_relations = StagingRelationsStore::new(&mut relations_write); | ||
| let mut staging_reachability_relations = StagingRelationsStore::new(&mut reachability_relations_write); | ||
| let mut staging_reachability = | ||
| StagingReachabilityStore::new(reachability_read.take().expect("reachability guard should be available")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above; I will remove the Option/take pattern and pass a scoped guard. Refactored to keep reachability guard direct; now take a fresh upgradable read per batch iteration and drop the read-only guard before staging commit to avoid the lock hold. Behavior unchanged; this fixes a deadlock we saw in pruning tests.
| drop(pruning_point_write); | ||
| } | ||
|
|
||
| self.flush_prune_batch(&mut prune_batch, &mut metrics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you finally flush the prune batch post the loop, but as mentioned, you are not protected by the prune_guard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree; I will ensure the flush happens while prune_guard is held (along with the retention checkpoint).
|
|
||
| fn record_lock_reacquire(&mut self) { | ||
| self.lock_reacquire_count += 1; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the lock datas is TMI though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree; lock stats are too noisy. I will drop lock metrics and keep only batch-write stats.
| let bytes = batch.size_in_bytes(); | ||
| let commit_start = Instant::now(); | ||
| self.db.write(batch).unwrap(); | ||
| metrics.record_commit("ghostdag_adjust", ops, bytes, commit_start.elapsed()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the only thing we should collect statistics on is the batch writes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree; I will remove per-commit metrics and keep only batch-write stats.
| let mut reachability_read = self.reachability_store.upgradable_read(); | ||
| lock_acquire_time = Instant::now(); | ||
| metrics.record_lock_reacquire(); | ||
| let mut reachability_read = Some(self.reachability_store.upgradable_read()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did we make it an option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree; I will refactor to avoid the Option for reachability read. Refactored to keep reachability guard direct; now take a fresh upgradable read per batch iteration and drop the read-only guard before staging commit to avoid the lock hold. Behavior unchanged; this fixes a deadlock we saw in pruning tests.
| let bytes = batch.size_in_bytes(); | ||
| let commit_start = Instant::now(); | ||
| self.db.write(batch).unwrap(); | ||
| metrics.record_commit("tips_and_selected_chain", ops, bytes, commit_start.elapsed()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment, we don't actually care for this statistic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree; I will drop this per-commit metric and keep batch-write stats only.
|
Changes:
Tests:
|
| prune_guard.blocking_yield(); | ||
| lock_acquire_time = Instant::now(); | ||
| queue.push_front(current); | ||
| continue 'prune_batch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This structure of repushing to queue and continuing another iteration is not unheard of, but is a bit harder to follow. Come to think, I'm not sure we really need an outer loop at all. We can reacquire the write locks here.
| } | ||
| let is_block_in_retention_root_future = { | ||
| let reachability_read_only = self.reachability_store.read(); | ||
| reachability_read_only.is_dag_ancestor_of_result(retention_period_root, current).unwrap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove only
|
@freshair18 addressed both items. Traversal loop now avoids requeue/outer loop: we peek |
| drop(reachability_relations_write); | ||
| drop(relations_write); | ||
|
|
||
| prune_batch.flush(&self.db, &mut metrics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed you measure the time on how long a batch write executes, do you have some info on this? unsure if i am missing something but since you flush after measuring time elapsed, and initiate a new timer afterwards, after blocking yield, i believe the time it takes to flush is added to the prune lock time? which might be negligible.. but should maybe be considered as this adds to the expected PRUNE_LOCK_MAX_DURATION_MS .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I’d overlooked that. Flush latency is currently inside the lock window, so the limit is best‑effort. We could either document it, or subtract a rolling avg commit time from the check to keep holds closer to the bound (at the cost of potentially smaller batches/more churn though). What do you prefer?
| drop(relations_write); | ||
|
|
||
| reachability_read = self.reachability_store.upgradable_read(); | ||
| prune_batch.flush(&self.db, &mut metrics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same exceeding of the PRUNE_LOCK_MAX_DURATION_MS, as in the first closure applies here btw.. but maybe in a more obscured "worst case" scenario, as there are other triggers besides PRUNE_LOCK_MAX_DURATION_MS that can influence the trigger to enter into this closure. But the fact remains that exceeding PRUNE_LOCK_MAX_DURATION_MS is possible here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep same issue as your first comment. Other triggers can push us over; the budget is best‑effort. We can handle it the same way: document it, or bias the check by subtracting avg commit time. Whats your preference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for now I would just add a comment and document this potential behavior over this line. Would prefer if other reviewers could also assess relevance of this behavior.
Taking just the average times (that have been recorded) of a few milliseconds, or a fraction thereof on newer hardware, it seems like it shouldn't influence lock time much. But then again it seems like worst case goes to a few hundred milliseconds, which seems relevant. But I am assuming that removing block bodies from the batch might add some consistency to these times.
Might also be worth renaming PRUNE_LOCK_MAX_DURATION_MS to PRUNE_LOCK_TARGET_MAX_DURATION_MS, I think current naming suggests a hard upper bound, which kind of is no longer a given.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment noting the lock budget is best‑effort and renamed to PRUNE_LOCK_TARGET_MAX_DURATION_MS.
| let mut staging_reachability = StagingReachabilityStore::new(self.reachability_store.upgradable_read()); | ||
| let mut statuses_write = self.statuses_store.write(); | ||
|
|
||
| loop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be re-written more succinctly: while !queue.is_empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Np, happy to switch to the simpler loop.
| // If we have the lock for more than a few milliseconds, release and recapture to allow consensus progress during pruning | ||
| if lock_acquire_time.elapsed() > Duration::from_millis(5) { | ||
| drop(reachability_read); | ||
| if lock_acquire_time.elapsed() > Duration::from_millis(PRUNE_LOCK_MAX_DURATION_MS) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually don't really see the point of this closure, at the end of the loop you call the exact same things as in this closure, only with the should_flush condition, should_flush also implicitly does this same check, so i think you can just use should_flush right here? and remove the closure at the end. So, either a) should_flush triggers at the end, you reset the timer, and you will not enter this closure b) if should_flush didn't trigger, we don't expect this to trigger (unless in some extreme edge cases, where condition is met in the few nano secs timeframe upon loop restart).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I was aiming for a pre‑work budget check so we can yield even when the batch is empty (e.g. keep‑only traversal). should_flush returns false on empty, so moving everything there would drop that yield. If you want one path, I can tweak should_flush or add a should_yield helper to cover both while keeping behaviour identical?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should just go for an or operator something like:
if prune_batch.should_flush() || lock_acquire_time.elapsed() > Duration::from_millis(PRUNE_LOCK_MAX_DURATION_MS) {
//Do work
}I think then you can also avoid sending the lock_acquire_time into the should_flush condition and we keep the lock_acquire condition separate and explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Lock budget check is now explicit (should_flush() + lock_elapsed check), and should_flush no longer takes lock_elapsed.
cli/src/modules/rpc.rs
Outdated
| // } | ||
| RpcApiOps::GetMempoolEntries => { | ||
| // TODO | ||
| // TODO: expose filter flags in CLI args instead of hard-coded defaults. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure how this pertains to the current pr, maybe remove this, or pr it separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree off‑scope. I’ll drop it.
cli/src/modules/rpc.rs
Outdated
|
|
||
| async fn display_help(self: Arc<Self>, ctx: Arc<KaspaCli>, _argv: Vec<String>) -> Result<()> { | ||
| // RpcApiOps that do not contain docs are not displayed | ||
| // Hide ops without docs so help output does not include empty entries. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure how this pertains to the current pr, or why it was changed, maybe remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree off‑scope. I’ll drop it.
|
|
||
| while !queue.is_empty() { | ||
| // Lock budget is best-effort because batch flush happens under the prune lock. | ||
| if lock_acquire_time.elapsed() > Duration::from_millis(PRUNE_LOCK_TARGET_MAX_DURATION_MS) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what I meant is now you can move the closure, with its condition, at line 574, here, and remove the whole closure at line 574.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry i misunderstood earlier, thanks for clarifying. I’ve now moved the flush/yield block to the top of the loop and removed the bottom one, so there’s a single flush path with the same condition.
Summary
Inspired by Michael Sutton’s pruning discussion during the June‑2025 dev workshop (https://www.youtube.com/watch?v=cMFeijKSv1g), this implements the pruning-batching strategy in
consensus/src/pipeline/pruning_processor/processor.rs. Batches flush when they hit 256 blocks, 50 000 ops, 4 MB, or 50 ms, and the lock yields every 25 ms so other consensus work continues smoothly.Every prune pass now emits a
[PRUNING METRICS]summary that includes the active config plus per–commit-type stats (count, avg/max ops, bytes, latency). Operators can confirm exactly how a run behaved just by inspecting the logs.Benchmarking on my 2024 NVMe-backed M3 Air shows the expected behavior:
Hypothesis: Slow SSD's/HDD nodes should see dramatically larger gains because per-commit latency dominates there. I’m reaching out to community members with slower disks to capture additional runs before we merge, and I’ll update the PR description/thread with their numbers as they come in.
Context & Motivation
Field Data
Run folders:
Remaining Follow-ups
Testing
cargo fmt --all -- --checkcargo test -p kaspa-consensus --lib