Skip to content

Commit 97c9d7a

Browse files
committed
Implement Retention Period support
- new cli flag retention_period_days allows the user to retain more days of data
1 parent cdd4379 commit 97c9d7a

File tree

7 files changed

+94
-8
lines changed

7 files changed

+94
-8
lines changed

consensus/core/src/config/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ pub struct Config {
6868

6969
/// A scale factor to apply to memory allocation bounds
7070
pub ram_scale: f64,
71+
72+
/// The number of days to keep data for
73+
pub retention_period_days: f64,
7174
}
7275

7376
impl Config {
@@ -95,6 +98,7 @@ impl Config {
9598
initial_utxo_set: Default::default(),
9699
disable_upnp: false,
97100
ram_scale: 1.0,
101+
retention_period_days: 3.0,
98102
}
99103
}
100104

consensus/src/model/stores/pruning.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub trait PruningStoreReader {
4242
/// This is usually the pruning point, though it might lag a bit behind until data prune completes (and for archival
4343
/// nodes it will remain the initial syncing point or the last pruning point before turning to an archive)
4444
fn history_root(&self) -> StoreResult<Hash>;
45+
fn retention_period_root(&self) -> StoreResult<Hash>;
4546
}
4647

4748
pub trait PruningStore: PruningStoreReader {
@@ -54,14 +55,16 @@ pub struct DbPruningStore {
5455
db: Arc<DB>,
5556
access: CachedDbItem<PruningPointInfo>,
5657
history_root_access: CachedDbItem<Hash>,
58+
retention_period_root_access: CachedDbItem<Hash>,
5759
}
5860

5961
impl DbPruningStore {
6062
pub fn new(db: Arc<DB>) -> Self {
6163
Self {
6264
db: Arc::clone(&db),
6365
access: CachedDbItem::new(db.clone(), DatabaseStorePrefixes::PruningPoint.into()),
64-
history_root_access: CachedDbItem::new(db, DatabaseStorePrefixes::HistoryRoot.into()),
66+
history_root_access: CachedDbItem::new(db.clone(), DatabaseStorePrefixes::HistoryRoot.into()),
67+
retention_period_root_access: CachedDbItem::new(db, DatabaseStorePrefixes::RetentionPeriodRoot.into()),
6568
}
6669
}
6770

@@ -76,6 +79,10 @@ impl DbPruningStore {
7679
pub fn set_history_root(&mut self, batch: &mut WriteBatch, history_root: Hash) -> StoreResult<()> {
7780
self.history_root_access.write(BatchDbWriter::new(batch), &history_root)
7881
}
82+
83+
pub fn set_retention_period_root(&mut self, batch: &mut WriteBatch, retention_period_root: Hash) -> StoreResult<()> {
84+
self.retention_period_root_access.write(BatchDbWriter::new(batch), &retention_period_root)
85+
}
7986
}
8087

8188
impl PruningStoreReader for DbPruningStore {
@@ -98,6 +105,10 @@ impl PruningStoreReader for DbPruningStore {
98105
fn history_root(&self) -> StoreResult<Hash> {
99106
self.history_root_access.read()
100107
}
108+
109+
fn retention_period_root(&self) -> StoreResult<Hash> {
110+
self.retention_period_root_access.read()
111+
}
101112
}
102113

103114
impl PruningStore for DbPruningStore {

consensus/src/pipeline/pruning_processor/processor.rs

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use kaspa_consensus_core::{
3434
BlockHashMap, BlockHashSet, BlockLevel,
3535
};
3636
use kaspa_consensusmanager::SessionLock;
37-
use kaspa_core::{debug, info, warn};
37+
use kaspa_core::{debug, info, time::unix_now, trace, warn};
3838
use kaspa_database::prelude::{BatchDbWriter, MemoryWriter, StoreResultExtensions, DB};
3939
use kaspa_hashes::Hash;
4040
use kaspa_muhash::MuHash;
@@ -134,6 +134,7 @@ impl PruningProcessor {
134134
let pruning_point_read = self.pruning_point_store.read();
135135
let pruning_point = pruning_point_read.pruning_point().unwrap();
136136
let history_root = pruning_point_read.history_root().unwrap_option();
137+
let retention_period_root = pruning_point_read.retention_period_root().unwrap_or(pruning_point);
137138
let pruning_utxoset_position = self.pruning_utxoset_stores.read().utxoset_position().unwrap_option();
138139
drop(pruning_point_read);
139140

@@ -153,11 +154,17 @@ impl PruningProcessor {
153154
}
154155
}
155156

157+
trace!(
158+
"history_root: {:?} | retention_period_root: {} | pruning_point: {}",
159+
history_root,
160+
retention_period_root,
161+
pruning_point
162+
);
156163
if let Some(history_root) = history_root {
157164
// This indicates the node crashed or was forced to stop during a former data prune operation hence
158165
// we need to complete it
159-
if history_root != pruning_point {
160-
self.prune(pruning_point);
166+
if history_root != retention_period_root {
167+
self.prune(pruning_point, retention_period_root);
161168
}
162169
}
163170

@@ -175,6 +182,15 @@ impl PruningProcessor {
175182
);
176183

177184
if !new_pruning_points.is_empty() {
185+
let retention_period_root = if let Ok(retention_period_root) = pruning_point_read.retention_period_root() {
186+
retention_period_root
187+
} else if let Ok(history_root) = pruning_point_read.history_root() {
188+
history_root
189+
} else {
190+
// pruning point always exist
191+
pruning_point_read.pruning_point().unwrap()
192+
};
193+
178194
// Update past pruning points and pruning point stores
179195
let mut batch = WriteBatch::default();
180196
let mut pruning_point_write = RwLockUpgradableReadGuard::upgrade(pruning_point_read);
@@ -183,10 +199,14 @@ impl PruningProcessor {
183199
}
184200
let new_pp_index = current_pruning_info.index + new_pruning_points.len() as u64;
185201
let new_pruning_point = *new_pruning_points.last().unwrap();
202+
let adjusted_retention_period_root = self.advance_retention_period_root(retention_period_root, new_pruning_point);
186203
pruning_point_write.set_batch(&mut batch, new_pruning_point, new_candidate, new_pp_index).unwrap();
204+
pruning_point_write.set_retention_period_root(&mut batch, adjusted_retention_period_root).unwrap();
187205
self.db.write(batch).unwrap();
188206
drop(pruning_point_write);
189207

208+
trace!("New Pruning Point: {} | New Retention Period Root: {}", new_pruning_point, adjusted_retention_period_root);
209+
190210
// Inform the user
191211
info!("Periodic pruning point movement: advancing from {} to {}", current_pruning_info.pruning_point, new_pruning_point);
192212

@@ -198,7 +218,7 @@ impl PruningProcessor {
198218
info!("Updated the pruning point UTXO set");
199219

200220
// Finally, prune data in the new pruning point past
201-
self.prune(new_pruning_point);
221+
self.prune(new_pruning_point, adjusted_retention_period_root);
202222
} else if new_candidate != current_pruning_info.candidate {
203223
let mut pruning_point_write = RwLockUpgradableReadGuard::upgrade(pruning_point_read);
204224
pruning_point_write.set(current_pruning_info.pruning_point, new_candidate, current_pruning_info.index).unwrap();
@@ -238,7 +258,7 @@ impl PruningProcessor {
238258
info!("Pruning point UTXO commitment was verified correctly (sanity test)");
239259
}
240260

241-
fn prune(&self, new_pruning_point: Hash) {
261+
fn prune(&self, new_pruning_point: Hash, retention_period_root: Hash) {
242262
if self.config.is_archival {
243263
warn!("The node is configured as an archival node -- avoiding data pruning. Note this might lead to heavy disk usage.");
244264
return;
@@ -384,7 +404,7 @@ impl PruningProcessor {
384404
let (mut counter, mut traversed) = (0, 0);
385405
info!("Header and Block pruning: starting traversal from: {} (genesis: {})", queue.iter().reusable_format(", "), genesis);
386406
while let Some(current) = queue.pop_front() {
387-
if reachability_read.is_dag_ancestor_of_result(new_pruning_point, current).unwrap() {
407+
if reachability_read.is_dag_ancestor_of_result(retention_period_root, current).unwrap() {
388408
continue;
389409
}
390410
traversed += 1;
@@ -517,12 +537,47 @@ impl PruningProcessor {
517537
// Set the history root to the new pruning point only after we successfully pruned its past
518538
let mut pruning_point_write = self.pruning_point_store.write();
519539
let mut batch = WriteBatch::default();
520-
pruning_point_write.set_history_root(&mut batch, new_pruning_point).unwrap();
540+
pruning_point_write.set_history_root(&mut batch, retention_period_root).unwrap();
521541
self.db.write(batch).unwrap();
522542
drop(pruning_point_write);
523543
}
524544
}
525545

546+
/// Adjusts the retention period root forward until the maximum chain block is reached that covers the retention period.
547+
/// This is the last chain block B such that B.timestamp < retention_period_days_ago. This may return the old hash if
548+
/// the retention period cannot be covered yet with the node's current history.
549+
///
550+
/// This function is expected to be called only when a new pruning point is determined and right before
551+
/// doing any pruning.
552+
///
553+
/// retention_period_root is guaranteed to be in the past(pruning_point)
554+
fn advance_retention_period_root(&self, retention_period_root: Hash, pruning_point: Hash) -> Hash {
555+
// The retention period in milliseconds we need to cover
556+
let retention_period_ms = (self.config.retention_period_days * 86400.0 * 1000.0).ceil() as u64;
557+
558+
let retention_period_root_ts_target = unix_now().saturating_sub(retention_period_ms);
559+
let mut new_retention_period_root = retention_period_root;
560+
561+
trace!(
562+
"Adjusting the retention period root to cover the required retention period. Target timestamp: {}",
563+
retention_period_root_ts_target,
564+
);
565+
566+
for block in self.reachability_service.forward_chain_iterator(retention_period_root, pruning_point, true) {
567+
let timestamp = self.headers_store.get_compact_header_data(block).unwrap().timestamp;
568+
trace!("block | timestamp = {} | {}", block, timestamp);
569+
if timestamp >= retention_period_root_ts_target {
570+
trace!("block {} timestamp {} >= {}", block, timestamp, retention_period_root_ts_target);
571+
// We are now at a chain block that is at or above our retention period target
572+
// Break here so we can return the hash from the previous iteration
573+
break;
574+
}
575+
new_retention_period_root = block;
576+
}
577+
578+
new_retention_period_root
579+
}
580+
526581
fn past_pruning_points(&self) -> BlockHashSet {
527582
(0..self.pruning_point_store.read().get().unwrap().index)
528583
.map(|index| self.past_pruning_points_store.get(index).unwrap())

consensus/src/pipeline/virtual_processor/processor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1081,6 +1081,7 @@ impl VirtualStateProcessor {
10811081
self.past_pruning_points_store.insert_batch(&mut batch, 0, self.genesis.hash).unwrap_or_exists();
10821082
pruning_point_write.set_batch(&mut batch, self.genesis.hash, self.genesis.hash, 0).unwrap();
10831083
pruning_point_write.set_history_root(&mut batch, self.genesis.hash).unwrap();
1084+
pruning_point_write.set_retention_period_root(&mut batch, self.genesis.hash).unwrap();
10841085
pruning_utxoset_write.set_utxoset_position(&mut batch, self.genesis.hash).unwrap();
10851086
self.db.write(batch).unwrap();
10861087
drop(pruning_point_write);

consensus/src/processes/pruning_proof/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ impl PruningProofManager {
201201
let mut batch = WriteBatch::default();
202202
pruning_point_write.set_batch(&mut batch, new_pruning_point, new_pruning_point, (pruning_points.len() - 1) as u64).unwrap();
203203
pruning_point_write.set_history_root(&mut batch, new_pruning_point).unwrap();
204+
pruning_point_write.set_retention_period_root(&mut batch, new_pruning_point).unwrap();
204205
self.db.write(batch).unwrap();
205206
drop(pruning_point_write);
206207
}

database/src/registry.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ pub enum DatabaseStorePrefixes {
4545
TempGhostdag = 40,
4646
TempGhostdagCompact = 41,
4747

48+
// ---- Retention Period Root ----
49+
RetentionPeriodRoot = 50,
50+
4851
// ---- Metadata ----
4952
MultiConsensusMetadata = 124,
5053
ConsensusEntries = 125,

kaspad/src/args.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ pub struct Args {
9090
#[serde(rename = "nogrpc")]
9191
pub disable_grpc: bool,
9292
pub ram_scale: f64,
93+
pub retention_period_days: f64,
9394
}
9495

9596
impl Default for Args {
@@ -140,6 +141,7 @@ impl Default for Args {
140141
disable_dns_seeding: false,
141142
disable_grpc: false,
142143
ram_scale: 1.0,
144+
retention_period_days: 3.0,
143145
}
144146
}
145147
}
@@ -159,6 +161,7 @@ impl Args {
159161
config.p2p_listen_address = self.listen.unwrap_or(ContextualNetAddress::unspecified());
160162
config.externalip = self.externalip.map(|v| v.normalize(config.default_p2p_port()));
161163
config.ram_scale = self.ram_scale;
164+
config.retention_period_days = self.retention_period_days;
162165

163166
#[cfg(feature = "devnet-prealloc")]
164167
if let Some(num_prealloc_utxos) = self.num_prealloc_utxos {
@@ -369,6 +372,13 @@ Setting to 0 prevents the preallocation and sets the maximum to {}, leading to 0
369372
.help("Apply a scale factor to memory allocation bounds. Nodes with limited RAM (~4-8GB) should set this to ~0.3-0.5 respectively. Nodes with
370373
a large RAM (~64GB) can set this value to ~3.0-4.0 and gain superior performance especially for syncing peers faster"),
371374
)
375+
.arg(
376+
Arg::new("retention-period-days")
377+
.long("retention-period-days")
378+
.require_equals(true)
379+
.value_parser(clap::value_parser!(f64))
380+
.help("The number of total days of data to keep.")
381+
)
372382
;
373383

374384
#[cfg(feature = "devnet-prealloc")]
@@ -448,6 +458,7 @@ impl Args {
448458
disable_dns_seeding: arg_match_unwrap_or::<bool>(&m, "nodnsseed", defaults.disable_dns_seeding),
449459
disable_grpc: arg_match_unwrap_or::<bool>(&m, "nogrpc", defaults.disable_grpc),
450460
ram_scale: arg_match_unwrap_or::<f64>(&m, "ram-scale", defaults.ram_scale),
461+
retention_period_days: arg_match_unwrap_or::<f64>(&m, "retention-period-days", defaults.retention_period_days),
451462

452463
#[cfg(feature = "devnet-prealloc")]
453464
num_prealloc_utxos: m.get_one::<u64>("num-prealloc-utxos").cloned(),

0 commit comments

Comments
 (0)