From e40a33db070ebc5606cf464f1c9add075156100b Mon Sep 17 00:00:00 2001 From: William Hankins Date: Tue, 29 Jul 2025 21:43:02 +0000 Subject: [PATCH 1/6] feat: SPDD module for historical distribution lookup (WIP) Signed-off-by: William Hankins --- Cargo.lock | 16 +++ common/src/rest_helper.rs | 31 ++++- modules/accounts_state/src/accounts_state.rs | 73 +++++++----- modules/accounts_state/src/rest.rs | 42 ------- .../src/epoch_activity_counter.rs | 22 +++- .../parameters_state/src/parameters_state.rs | 10 +- .../rest_blockfrost/src/rest_blockfrost.rs | 4 +- modules/spdd_state/Cargo.toml | 23 ++++ modules/spdd_state/src/rest.rs | 49 ++++++++ modules/spdd_state/src/spdd_state.rs | 110 ++++++++++++++++++ modules/spdd_state/src/state.rs | 47 ++++++++ modules/spo_state/src/spo_state.rs | 12 +- modules/utxo_state/src/utxo_state.rs | 12 +- processes/omnibus/Cargo.toml | 2 +- processes/omnibus/omnibus.toml | 2 + processes/omnibus/src/main.rs | 6 +- 16 files changed, 370 insertions(+), 91 deletions(-) create mode 100644 modules/spdd_state/Cargo.toml create mode 100644 modules/spdd_state/src/rest.rs create mode 100644 modules/spdd_state/src/spdd_state.rs create mode 100644 modules/spdd_state/src/state.rs diff --git a/Cargo.lock b/Cargo.lock index c2e5a458..1c3f9cc6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -201,6 +201,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "acropolis_module_spdd_state" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "anyhow", + "caryatid_sdk", + "config", + "hex", + "imbl", + "serde_json", + "tokio", + "tracing", +] + [[package]] name = "acropolis_module_spo_state" version = "0.1.0" @@ -307,6 +322,7 @@ dependencies = [ "acropolis_module_mithril_snapshot_fetcher", "acropolis_module_parameters_state", "acropolis_module_rest_blockfrost", + "acropolis_module_spdd_state", "acropolis_module_spo_state", "acropolis_module_stake_delta_filter", "acropolis_module_tx_unpacker", diff --git a/common/src/rest_helper.rs b/common/src/rest_helper.rs index 5940ad95..00353903 100644 --- a/common/src/rest_helper.rs +++ b/common/src/rest_helper.rs @@ -44,7 +44,7 @@ where } /// Handle a REST request with path parameters -pub fn handle_rest_with_parameter( +pub fn handle_rest_with_path_parameter( context: Arc>, topic: &str, handler: F, @@ -82,6 +82,35 @@ where } }) } +/* +// Handle a REST request with query parameters +pub fn handle_rest_with_query_parameter( + context: Arc>, + topic: &str, + handler: F, +) -> JoinHandle<()> +where + F: Fn(&[&str]) -> Fut + Send + Sync + Clone + 'static, + Fut: Future> + Send + 'static, +{ + context.handle(topic, move |message: Arc| { + let handler = handler.clone(); + async move { + let response = match message.as_ref() { + Message::RESTRequest(request) => { + let params = request.query_parameters.clone(); + match handler(params).await { + Ok(response) => response, + Err(error) => RESTResponse::with_text(500, &format!("{error:?}")), + } + } + _ => RESTResponse::with_text(500, "Unexpected message in REST request"), + }; + + Arc::new(Message::RESTResponse(response)) + } + }) +}*/ /// Extract parameters from the request path based on the topic pattern. /// Skips the first 3 parts of the topic as these are never parameters diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index 8d436cb6..d7bd2251 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -24,7 +24,7 @@ mod rest; use acropolis_common::queries::accounts::{ AccountInfo, AccountsStateQuery, AccountsStateQueryResponse, }; -use rest::{handle_drdd, handle_pots, handle_spdd}; +use rest::{handle_drdd, handle_pots}; const DEFAULT_SPO_STATE_TOPIC: &str = "cardano.spo.state"; const DEFAULT_EPOCH_ACTIVITY_TOPIC: &str = "cardano.epoch.activity"; @@ -37,7 +37,6 @@ const DEFAULT_DREP_DISTRIBUTION_TOPIC: &str = "cardano.drep.distribution"; const DEFAULT_SPO_DISTRIBUTION_TOPIC: &str = "cardano.spo.distribution"; const DEFAULT_PROTOCOL_PARAMETERS_TOPIC: &str = "cardano.protocol.parameters"; -const DEFAULT_HANDLE_SPDD_TOPIC: (&str, &str) = ("handle-topic-spdd", "rest.get.spdd"); const DEFAULT_HANDLE_POTS_TOPIC: (&str, &str) = ("handle-topic-pots", "rest.get.pots"); const DEFAULT_HANDLE_DRDD_TOPIC: (&str, &str) = ("handle-topic-drdd", "rest.get.drdd"); @@ -127,7 +126,9 @@ impl AccountsState { new_epoch = true; } current_block = Some(block_info.clone()); - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -137,7 +138,10 @@ impl AccountsState { let (_, message) = withdrawals_message_f.await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::Withdrawals(withdrawals_msg))) => { - let span = info_span!("account_state.handle_withdrawals", block = block_info.number); + let span = info_span!( + "account_state.handle_withdrawals", + block = block_info.number + ); async { if let Some(ref block) = current_block { if block.number != block_info.number { @@ -153,7 +157,9 @@ impl AccountsState { .handle_withdrawals(withdrawals_msg) .inspect_err(|e| error!("Withdrawals handling error: {e:#}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -163,7 +169,10 @@ impl AccountsState { let (_, message) = stake_message_f.await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::StakeAddressDeltas(deltas_msg))) => { - let span = info_span!("account_state.handle_stake_deltas", block = block_info.number); + let span = info_span!( + "account_state.handle_stake_deltas", + block = block_info.number + ); async { if let Some(ref block) = current_block { if block.number != block_info.number { @@ -179,7 +188,9 @@ impl AccountsState { .handle_stake_deltas(deltas_msg) .inspect_err(|e| error!("StakeAddressDeltas handling error: {e:#}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -196,7 +207,10 @@ impl AccountsState { let (_, message) = dreps_message_f.await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::DRepState(dreps_msg))) => { - let span = info_span!("account_state.handle_drep_state", block = block_info.number); + let span = info_span!( + "account_state.handle_drep_state", + block = block_info.number + ); async { state.handle_drep_state(&dreps_msg); @@ -204,7 +218,9 @@ impl AccountsState { if let Err(e) = drep_publisher.publish_drdd(block_info, drdd).await { error!("Error publishing drep voting stake distribution: {e:#}") } - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -214,7 +230,8 @@ impl AccountsState { let (_, message) = spos_message_f.await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::SPOState(spo_msg))) => { - let span = info_span!("account_state.handle_spo_state", block = block_info.number); + let span = + info_span!("account_state.handle_spo_state", block = block_info.number); async { if let Some(ref block) = current_block { if block.number != block_info.number { @@ -235,7 +252,9 @@ impl AccountsState { if let Err(e) = spo_publisher.publish_spdd(block_info, spdd).await { error!("Error publishing SPO stake distribution: {e:#}") } - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -245,7 +264,10 @@ impl AccountsState { let (_, message) = ea_message_f.await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::EpochActivity(ea_msg))) => { - let span = info_span!("account_state.handle_epoch_activity", block = block_info.number); + let span = info_span!( + "account_state.handle_epoch_activity", + block = block_info.number + ); async { if let Some(ref block) = current_block { if block.number != block_info.number { @@ -261,7 +283,9 @@ impl AccountsState { .handle_epoch_activity(ea_msg) .inspect_err(|e| error!("EpochActivity handling error: {e:#}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -272,7 +296,10 @@ impl AccountsState { let (_, message) = params_message_f.await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::ProtocolParams(params_msg))) => { - let span = info_span!("account_state.handle_parameters", block = block_info.number); + let span = info_span!( + "account_state.handle_parameters", + block = block_info.number + ); async { if let Some(ref block) = current_block { if block.number != block_info.number { @@ -288,7 +315,9 @@ impl AccountsState { .handle_parameters(params_msg) .inspect_err(|e| error!("Messaging handling error: {e}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -352,11 +381,6 @@ impl AccountsState { .unwrap_or(DEFAULT_SPO_DISTRIBUTION_TOPIC.to_string()); // REST handler topics - let handle_spdd_topic = config - .get_string(DEFAULT_HANDLE_SPDD_TOPIC.0) - .unwrap_or(DEFAULT_HANDLE_SPDD_TOPIC.1.to_string()); - info!("Creating request handler on '{}'", handle_spdd_topic); - let handle_pots_topic = config .get_string(DEFAULT_HANDLE_POTS_TOPIC.0) .unwrap_or(DEFAULT_HANDLE_POTS_TOPIC.1.to_string()); @@ -370,7 +394,6 @@ impl AccountsState { // Create history let history = Arc::new(Mutex::new(StateHistory::::new("AccountsState"))); let history_account_single = history.clone(); - let history_spdd = history.clone(); let history_pots = history.clone(); let history_drdd = history.clone(); let history_tick = history.clone(); @@ -422,10 +445,6 @@ impl AccountsState { } }); - handle_rest(context.clone(), &handle_spdd_topic, move || { - handle_spdd(history_spdd.clone()) - }); - handle_rest(context.clone(), &handle_pots_topic, move || { handle_pots(history_pots.clone()) }); @@ -448,7 +467,9 @@ impl AccountsState { if let Some(state) = history_tick.lock().await.current() { state.tick().await.inspect_err(|e| error!("Tick error: {e}")).ok(); } - }.instrument(span).await; + } + .instrument(span) + .await; } } } diff --git a/modules/accounts_state/src/rest.rs b/modules/accounts_state/src/rest.rs index 57a31594..e730bbcc 100644 --- a/modules/accounts_state/src/rest.rs +++ b/modules/accounts_state/src/rest.rs @@ -1,8 +1,6 @@ //! REST handlers for Acropolis Accounts State module -use std::collections::HashMap; use std::sync::Arc; -use acropolis_common::serialization::Bech32WithHrp; use anyhow::Result; use tokio::sync::Mutex; @@ -10,20 +8,6 @@ use crate::state::State; use acropolis_common::state_history::StateHistory; use acropolis_common::{messages::RESTResponse, Lovelace}; -/// REST response structure for /accounts/{stake_address} -#[derive(serde::Serialize)] -pub struct APIStakeAccount { - pub utxo_value: u64, - pub rewards: u64, - pub delegated_spo: Option, - pub delegated_drep: Option, -} -#[derive(serde::Serialize)] -pub struct APIDRepChoice { - pub drep_type: String, - pub value: Option, -} - /// Handles /drdd #[derive(serde::Serialize, serde::Deserialize)] struct APIDRepDelegationDistribution { @@ -32,32 +16,6 @@ struct APIDRepDelegationDistribution { pub dreps: Vec<(String, u64)>, } -/// Handles /spdd -pub async fn handle_spdd(history: Arc>>) -> Result { - let locked = history.lock().await; - let state = match locked.current() { - Some(state) => state, - None => return Ok(RESTResponse::with_json(200, "{}")), - }; - - let spdd: HashMap = state - .generate_spdd() - .iter() - .map(|(k, v)| { - let bech32 = k.to_bech32_with_hrp("pool").unwrap_or_else(|_| hex::encode(k)); - (bech32, *v) - }) - .collect(); - - match serde_json::to_string(&spdd) { - Ok(body) => Ok(RESTResponse::with_json(200, &body)), - Err(e) => Ok(RESTResponse::with_text( - 500, - &format!("Internal server error retrieving stake pool delegation distribution: {e}"), - )), - } -} - /// Handles /pots pub async fn handle_pots(history: Arc>>) -> Result { let locked = history.lock().await; diff --git a/modules/epoch_activity_counter/src/epoch_activity_counter.rs b/modules/epoch_activity_counter/src/epoch_activity_counter.rs index a9d9c07c..889e695b 100644 --- a/modules/epoch_activity_counter/src/epoch_activity_counter.rs +++ b/modules/epoch_activity_counter/src/epoch_activity_counter.rs @@ -3,7 +3,7 @@ use acropolis_common::{ messages::{CardanoMessage, Message}, - rest_helper::{handle_rest, handle_rest_with_parameter}, + rest_helper::{handle_rest, handle_rest_with_path_parameter}, Era, }; use anyhow::Result; @@ -57,7 +57,10 @@ impl EpochActivityCounter { let (_, message) = headers_message_f.await?; match message.as_ref() { Message::Cardano((block, CardanoMessage::BlockHeader(header_msg))) => { - let span = info_span!("epoch_activity_counter.handle_block_header", block = block.number); + let span = info_span!( + "epoch_activity_counter.handle_block_header", + block = block.number + ); async { // End of epoch? if block.new_epoch && block.epoch > 0 { @@ -93,7 +96,9 @@ impl EpochActivityCounter { Err(e) => error!("Can't decode header {}: {e}", block.slot), } - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -103,11 +108,16 @@ impl EpochActivityCounter { let (_, message) = fees_message_f.await?; match message.as_ref() { Message::Cardano((block, CardanoMessage::BlockFees(fees_msg))) => { - let span = info_span!("epoch_activity_counter.handle_block_fees", block = block.number); + let span = info_span!( + "epoch_activity_counter.handle_block_fees", + block = block.number + ); async { let mut state = state.lock().await; state.handle_fees(&block, fees_msg.total_fees); - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -158,7 +168,7 @@ impl EpochActivityCounter { } }); - handle_rest_with_parameter(context.clone(), &handle_historical_topic, { + handle_rest_with_path_parameter(context.clone(), &handle_historical_topic, { let state = state.clone(); move |param| handle_historical_epoch(state.clone(), param[0].to_string()) }); diff --git a/modules/parameters_state/src/parameters_state.rs b/modules/parameters_state/src/parameters_state.rs index 50ab94f0..18b4e88f 100644 --- a/modules/parameters_state/src/parameters_state.rs +++ b/modules/parameters_state/src/parameters_state.rs @@ -3,7 +3,7 @@ use acropolis_common::{ messages::{CardanoMessage, Message, ProtocolParamsMessage}, - rest_helper::{handle_rest, handle_rest_with_parameter}, + rest_helper::{handle_rest, handle_rest_with_path_parameter}, BlockInfo, }; use anyhow::Result; @@ -13,11 +13,11 @@ use std::sync::Arc; use tokio::sync::Mutex; use tracing::{error, info, info_span, Instrument}; +mod alonzo_genesis; mod genesis_params; mod parameters_updater; mod rest; mod state; -mod alonzo_genesis; use parameters_updater::ParametersUpdater; use rest::handle_current; @@ -118,7 +118,9 @@ impl ParametersState { let new_params = locked.handle_enact_state(&block, &gov).await?; Self::publish_update(&config, &block, new_params)?; Ok::<(), anyhow::Error>(()) - }.instrument(span).await?; + } + .instrument(span) + .await?; } msg => error!("Unexpected message {msg:?} for enact state topic"), } @@ -139,7 +141,7 @@ impl ParametersState { }); let state_handle_historical = state.clone(); - handle_rest_with_parameter( + handle_rest_with_path_parameter( cfg.context.clone(), &cfg.handle_historical_topic, move |param| handle_historical(state_handle_historical.clone(), param[0].to_string()), diff --git a/modules/rest_blockfrost/src/rest_blockfrost.rs b/modules/rest_blockfrost/src/rest_blockfrost.rs index f39249b3..349e020d 100644 --- a/modules/rest_blockfrost/src/rest_blockfrost.rs +++ b/modules/rest_blockfrost/src/rest_blockfrost.rs @@ -4,7 +4,7 @@ use std::{future::Future, sync::Arc}; use acropolis_common::{ messages::{Message, RESTResponse}, - rest_helper::handle_rest_with_parameter, + rest_helper::handle_rest_with_path_parameter, }; use anyhow::Result; use caryatid_sdk::{module, Context, Module}; @@ -282,7 +282,7 @@ where tracing::info!("Creating request handler on '{}'", topic_name); - handle_rest_with_parameter(context.clone(), &topic_name, move |params| { + handle_rest_with_path_parameter(context.clone(), &topic_name, move |params| { let context = context.clone(); let handler_fn = handler_fn.clone(); let params: Vec = params.iter().map(|s| s.to_string()).collect(); diff --git a/modules/spdd_state/Cargo.toml b/modules/spdd_state/Cargo.toml new file mode 100644 index 00000000..0241b90e --- /dev/null +++ b/modules/spdd_state/Cargo.toml @@ -0,0 +1,23 @@ +# Acropolis DRep state module + +[package] +name = "acropolis_module_spdd_state" +version = "0.1.0" +edition = "2021" +authors = ["William Hankins "] +description = "Stake Pool Delegation Distribution State Tracker" +license = "Apache-2.0" + +[dependencies] +caryatid_sdk = "0.12" +config = "0.15.11" +anyhow = "1.0" +acropolis_common = { path = "../../common" } +tracing = "0.1.40" +tokio = { version = "1", features = ["full"] } +imbl = { version = "5.0.0", features = ["serde"] } +serde_json = "1.0.132" +hex = "0.4.3" + +[lib] +path = "src/spdd_state.rs" diff --git a/modules/spdd_state/src/rest.rs b/modules/spdd_state/src/rest.rs new file mode 100644 index 00000000..6f658542 --- /dev/null +++ b/modules/spdd_state/src/rest.rs @@ -0,0 +1,49 @@ +use crate::state::State; +use acropolis_common::messages::RESTResponse; +use acropolis_common::serialization::Bech32WithHrp; +use anyhow::Result; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::Mutex; + +/// Handles /spdd +pub async fn handle_spdd(state: Arc>, params: Vec) -> Result { + let locked = state.lock().await; + + let spdd_opt = if params.len() == 1 { + match params[0].parse::() { + Ok(epoch) => locked.get_epoch(epoch), + Err(_) => { + return Ok(RESTResponse::with_text( + 400, + "Invalid epoch query parameter: must be a number", + )); + } + } + } else { + locked.get_latest() + }; + + if let Some(spdd) = spdd_opt { + let spdd: HashMap<_, _> = spdd + .iter() + .map(|(k, v)| { + ( + k.to_bech32_with_hrp("pool").unwrap_or_else(|_| hex::encode(k)), + *v, + ) + }) + .collect(); + + match serde_json::to_string(&spdd) { + Ok(body) => Ok(RESTResponse::with_json(200, &body)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!( + "Internal server error retrieving stake pool delegation distribution: {e}" + ), + )), + } + } else { + Ok(RESTResponse::with_json(200, "{}")) + } +} diff --git a/modules/spdd_state/src/spdd_state.rs b/modules/spdd_state/src/spdd_state.rs new file mode 100644 index 00000000..5d0ed1bc --- /dev/null +++ b/modules/spdd_state/src/spdd_state.rs @@ -0,0 +1,110 @@ +//! Acropolis SPDD state module for Caryatid +//! Stores historical stake pool delegation distributions +use acropolis_common::{ + messages::{CardanoMessage, Message}, + rest_helper::handle_rest_with_path_parameter, + KeyHash, +}; +use anyhow::Result; +use caryatid_sdk::{module, Context, Module}; +use config::Config; +use std::{collections::BTreeMap, sync::Arc}; +use tokio::sync::Mutex; +use tracing::{error, info, info_span, Instrument}; +mod state; +use state::State; +mod rest; +use rest::handle_spdd; + +const DEFAULT_SUBSCRIBE_TOPIC: &str = "cardano.spo.distribution"; +const DEFAULT_HANDLE_SPDD_TOPIC: (&str, &str) = ("handle-topic-spdd", "rest.get.spdd"); + +/// SPDD State module +#[module( + message_type(Message), + name = "spdd-state", + description = "Stake Pool Delegation Distribution State Tracker" +)] + +pub struct SPDDState; + +impl SPDDState { + pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + // Get configuration + let subscribe_topic = + config.get_string("subscribe-topic").unwrap_or(DEFAULT_SUBSCRIBE_TOPIC.to_string()); + info!("Creating subscriber on '{subscribe_topic}'"); + + let handle_spdd_topic = config + .get_string(DEFAULT_HANDLE_SPDD_TOPIC.0) + .unwrap_or(DEFAULT_HANDLE_SPDD_TOPIC.1.to_string()); + info!("Creating request handler on '{}'", handle_spdd_topic); + + let state = Arc::new(Mutex::new(State::new())); + + // Subscribe for spdd messages from accounts_state + let state_handler = state.clone(); + let mut message_subscription = context.subscribe(&subscribe_topic).await?; + context.run(async move { + loop { + let Ok((_, message)) = message_subscription.read().await else { + return; + }; + match message.as_ref() { + Message::Cardano((_, CardanoMessage::SPOStakeDistribution(msg))) => { + let span = info_span!("spdd_state.handle", epoch = msg.epoch); + async { + let mut state = state_handler.lock().await; + + let spdd: BTreeMap = + msg.spos.iter().map(|(k, v)| (k.clone(), *v)).collect(); + + state.insert_spdd(msg.epoch, spdd); + } + .instrument(span) + .await; + } + + _ => error!("Unexpected message type: {message:?}"), + } + } + }); + + // Register /spdd REST endpoint + let state_rest = state.clone(); + handle_rest_with_path_parameter(context.clone(), &handle_spdd_topic, move |params| { + let params: Vec = params.iter().map(|s| s.to_string()).collect(); + handle_spdd(state_rest.clone(), params) + }); + + // Ticker to log stats + let mut tick_subscription = context.subscribe("clock.tick").await?; + let state_logger = state.clone(); + context.run(async move { + loop { + let Ok((_, message)) = tick_subscription.read().await else { + return; + }; + + if let Message::Clock(clock) = message.as_ref() { + if clock.number % 60 == 0 { + let span = info_span!("spdd_state.tick", number = clock.number); + async { + state_logger + .lock() + .await + .tick() + .await + .inspect_err(|e| error!("SPDD tick error: {e}")) + .ok(); + } + .instrument(span) + .await; + } + } + } + }); + + Ok(()) + } +} diff --git a/modules/spdd_state/src/state.rs b/modules/spdd_state/src/state.rs new file mode 100644 index 00000000..396e511c --- /dev/null +++ b/modules/spdd_state/src/state.rs @@ -0,0 +1,47 @@ +use acropolis_common::KeyHash; +use imbl::OrdMap; +use std::collections::BTreeMap; +use tracing::info; + +pub struct State { + historical_distributions: OrdMap>, +} + +impl State { + pub fn new() -> Self { + Self { + historical_distributions: OrdMap::new(), + } + } + + pub fn insert_spdd(&mut self, epoch: u64, spdd: BTreeMap) { + self.historical_distributions.insert(epoch, spdd); + } + + pub fn get_latest(&self) -> Option> { + self.historical_distributions.iter().last().map(|(_, map)| map.clone()) + } + + pub fn get_epoch(&self, epoch: u64) -> Option> { + self.historical_distributions.get(&epoch).cloned() + } + + pub async fn tick(&self) -> anyhow::Result<()> { + let num_epochs = self.historical_distributions.len(); + let latest = self.historical_distributions.iter().last(); + + if let Some((epoch, spo_map)) = latest { + let spo_count = spo_map.len(); + info!( + num_epochs, + latest_epoch = *epoch, + spo_count, + "SPDD state: tracking {num_epochs} epochs, latest is {epoch} with {spo_count} SPOs" + ); + } else { + info!("SPDD state: no data yet"); + } + + Ok(()) + } +} diff --git a/modules/spo_state/src/spo_state.rs b/modules/spo_state/src/spo_state.rs index 75eab791..a9b4f988 100644 --- a/modules/spo_state/src/spo_state.rs +++ b/modules/spo_state/src/spo_state.rs @@ -5,7 +5,7 @@ use acropolis_common::{ messages::{ CardanoMessage, Message, SnapshotDumpMessage, SnapshotMessage, SnapshotStateMessage, }, - rest_helper::{handle_rest, handle_rest_with_parameter}, + rest_helper::{handle_rest, handle_rest_with_path_parameter}, }; use anyhow::Result; use caryatid_sdk::{module, Context, Module}; @@ -138,7 +138,9 @@ impl SPOState { .handle_tx_certs(block, tx_certs_msg) .inspect_err(|e| error!("Messaging handling error: {e}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), } @@ -153,7 +155,7 @@ impl SPOState { // Handle REST requests for single SPO state and retiring pools let state_single = state.clone(); - handle_rest_with_parameter(context.clone(), &handle_single_topic, move |param| { + handle_rest_with_path_parameter(context.clone(), &handle_single_topic, move |param| { handle_spo(state_single.clone(), param[0].to_string()) }); @@ -176,7 +178,9 @@ impl SPOState { .await .inspect_err(|e| error!("Tick error: {e}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } } } diff --git a/modules/utxo_state/src/utxo_state.rs b/modules/utxo_state/src/utxo_state.rs index ed5151b6..eb193047 100644 --- a/modules/utxo_state/src/utxo_state.rs +++ b/modules/utxo_state/src/utxo_state.rs @@ -3,7 +3,7 @@ use acropolis_common::{ messages::{CardanoMessage, Message}, - rest_helper::handle_rest_with_parameter, + rest_helper::handle_rest_with_path_parameter, }; use caryatid_sdk::{module, Context, Module}; @@ -100,7 +100,9 @@ impl UTXOState { .await .inspect_err(|e| error!("Messaging handling error: {e}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), @@ -127,13 +129,15 @@ impl UTXOState { .await .inspect_err(|e| error!("Tick error: {e}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } } } }); - handle_rest_with_parameter(context.clone(), &single_utxo_topic, move |param| { + handle_rest_with_path_parameter(context.clone(), &single_utxo_topic, move |param| { handle_single_utxo(state.clone(), param[0].to_string()) }); diff --git a/processes/omnibus/Cargo.toml b/processes/omnibus/Cargo.toml index 4ed26065..2d15db7f 100644 --- a/processes/omnibus/Cargo.toml +++ b/processes/omnibus/Cargo.toml @@ -32,7 +32,7 @@ acropolis_module_stake_delta_filter = { path = "../../modules/stake_delta_filter acropolis_module_epoch_activity_counter = { path = "../../modules/epoch_activity_counter" } acropolis_module_accounts_state = { path = "../../modules/accounts_state" } acropolis_module_rest_blockfrost = { path = "../../modules/rest_blockfrost" } - +acropolis_module_spdd_state = { path = "../../modules/spdd_state" } anyhow = "1.0" config = "0.15.11" tracing = "0.1.40" diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index a2deedaf..1389a8c4 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -48,6 +48,8 @@ store-history = false [module.accounts-state] +[module.spdd-state] + [module.clock] [module.rest-server] diff --git a/processes/omnibus/src/main.rs b/processes/omnibus/src/main.rs index e9f6165d..bc6fd341 100644 --- a/processes/omnibus/src/main.rs +++ b/processes/omnibus/src/main.rs @@ -18,6 +18,7 @@ use acropolis_module_governance_state::GovernanceState; use acropolis_module_mithril_snapshot_fetcher::MithrilSnapshotFetcher; use acropolis_module_parameters_state::ParametersState; use acropolis_module_rest_blockfrost::BlockfrostREST; +use acropolis_module_spdd_state::SPDDState; use acropolis_module_spo_state::SPOState; use acropolis_module_stake_delta_filter::StakeDeltaFilter; use acropolis_module_tx_unpacker::TxUnpacker; @@ -59,7 +60,9 @@ pub async fn main() -> Result<()> { .build() .tracer("rust-otel-otlp"); let otel_layer = OpenTelemetryLayer::new(otel_tracer) - .with_filter(EnvFilter::from_default_env().add_directive(filter::LevelFilter::INFO.into())) + .with_filter( + EnvFilter::from_default_env().add_directive(filter::LevelFilter::INFO.into()), + ) .with_filter(filter::filter_fn(|meta| meta.is_span())); Registry::default().with(fmt_layer).with(otel_layer).init(); } else { @@ -95,6 +98,7 @@ pub async fn main() -> Result<()> { EpochActivityCounter::register(&mut process); AccountsState::register(&mut process); BlockfrostREST::register(&mut process); + SPDDState::register(&mut process); Clock::::register(&mut process); RESTServer::::register(&mut process); From e395ac7593a1643337153001c38e8854555bca27 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Wed, 30 Jul 2025 19:39:43 +0000 Subject: [PATCH 2/6] feat: add spdd module with historical distribution REST endpoint Signed-off-by: William Hankins --- Cargo.lock | 5 +- common/Cargo.toml | 2 +- common/src/rest_helper.rs | 8 +- modules/rest_blockfrost/Cargo.toml | 2 +- modules/spdd_state/Cargo.toml | 1 - modules/spdd_state/src/rest.rs | 43 ++++++++-- modules/spdd_state/src/spdd_state.rs | 114 ++++++++++++++------------- modules/spdd_state/src/state.rs | 9 +-- processes/golden_tests/Cargo.toml | 2 +- processes/omnibus/Cargo.toml | 2 +- processes/omnibus/omnibus.toml | 1 + processes/replayer/Cargo.toml | 2 +- 12 files changed, 113 insertions(+), 78 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1c3f9cc6..774e73ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -210,7 +210,6 @@ dependencies = [ "caryatid_sdk", "config", "hex", - "imbl", "serde_json", "tokio", "tracing", @@ -1101,9 +1100,9 @@ dependencies = [ [[package]] name = "caryatid_module_rest_server" -version = "0.13.0" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32a545635a30944ab96b41397be9ae36cf15d48d398db2aaff268067a08c31d8" +checksum = "04b3369fd37cc7c395feea110e0fc60075cb9fbc3c3470a4d3b0988da1606b72" dependencies = [ "anyhow", "axum", diff --git a/common/Cargo.toml b/common/Cargo.toml index 5ba92abb..39d55c6e 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -17,7 +17,7 @@ blake2 = "0.10" bs58 = "0.5" caryatid_sdk = "0.12" caryatid_module_clock = "0.12" -caryatid_module_rest_server = "0.13" +caryatid_module_rest_server = "0.14" chrono = "0.4" gcd = "2.3" fraction = "0.15" diff --git a/common/src/rest_helper.rs b/common/src/rest_helper.rs index 00353903..bc607d09 100644 --- a/common/src/rest_helper.rs +++ b/common/src/rest_helper.rs @@ -5,7 +5,7 @@ use anyhow::{anyhow, Result}; use caryatid_sdk::Context; use futures::future::Future; use num_traits::ToPrimitive; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use tokio::task::JoinHandle; use tracing::{error, info}; @@ -82,7 +82,7 @@ where } }) } -/* + // Handle a REST request with query parameters pub fn handle_rest_with_query_parameter( context: Arc>, @@ -90,7 +90,7 @@ pub fn handle_rest_with_query_parameter( handler: F, ) -> JoinHandle<()> where - F: Fn(&[&str]) -> Fut + Send + Sync + Clone + 'static, + F: Fn(HashMap) -> Fut + Send + Sync + Clone + 'static, Fut: Future> + Send + 'static, { context.handle(topic, move |message: Arc| { @@ -110,7 +110,7 @@ where Arc::new(Message::RESTResponse(response)) } }) -}*/ +} /// Extract parameters from the request path based on the topic pattern. /// Skips the first 3 parts of the topic as these are never parameters diff --git a/modules/rest_blockfrost/Cargo.toml b/modules/rest_blockfrost/Cargo.toml index b2460107..2147750f 100644 --- a/modules/rest_blockfrost/Cargo.toml +++ b/modules/rest_blockfrost/Cargo.toml @@ -14,7 +14,7 @@ anyhow = "1.0" async-trait = "0.1" bech32 = "0.11" caryatid_sdk = "0.12" -caryatid_module_rest_server = "0.13" +caryatid_module_rest_server = "0.14" config = "0.15.11" hex = "0.4.3" serde = { version = "1.0.214", features = ["derive"] } diff --git a/modules/spdd_state/Cargo.toml b/modules/spdd_state/Cargo.toml index 0241b90e..475f67f7 100644 --- a/modules/spdd_state/Cargo.toml +++ b/modules/spdd_state/Cargo.toml @@ -15,7 +15,6 @@ anyhow = "1.0" acropolis_common = { path = "../../common" } tracing = "0.1.40" tokio = { version = "1", features = ["full"] } -imbl = { version = "5.0.0", features = ["serde"] } serde_json = "1.0.132" hex = "0.4.3" diff --git a/modules/spdd_state/src/rest.rs b/modules/spdd_state/src/rest.rs index 6f658542..e38ae893 100644 --- a/modules/spdd_state/src/rest.rs +++ b/modules/spdd_state/src/rest.rs @@ -6,12 +6,38 @@ use std::{collections::HashMap, sync::Arc}; use tokio::sync::Mutex; /// Handles /spdd -pub async fn handle_spdd(state: Arc>, params: Vec) -> Result { - let locked = state.lock().await; +pub async fn handle_spdd( + state: Option>>, + params: HashMap, +) -> Result { + let locked = match state.as_ref() { + Some(state) => state.lock().await, + None => { + return Ok(RESTResponse::with_text( + 503, + "SPDD storage is disabled by configuration", + )); + } + }; + + let spdd_opt = if let Some(epoch_str) = params.get("epoch") { + if params.len() > 1 { + return Ok(RESTResponse::with_text( + 400, + "Only 'epoch' is a valid query parameter", + )); + } - let spdd_opt = if params.len() == 1 { - match params[0].parse::() { - Ok(epoch) => locked.get_epoch(epoch), + match epoch_str.parse::() { + Ok(epoch) => match locked.get_epoch(epoch) { + Some(spdd) => Some(spdd), + None => { + return Ok(RESTResponse::with_text( + 404, + &format!("SPDD not found for epoch {}", epoch), + )); + } + }, Err(_) => { return Ok(RESTResponse::with_text( 400, @@ -19,8 +45,13 @@ pub async fn handle_spdd(state: Arc>, params: Vec) -> Resul )); } } - } else { + } else if params.is_empty() { locked.get_latest() + } else { + return Ok(RESTResponse::with_text( + 400, + "Unexpected query parameter: only 'epoch' is allowed", + )); }; if let Some(spdd) = spdd_opt { diff --git a/modules/spdd_state/src/spdd_state.rs b/modules/spdd_state/src/spdd_state.rs index 5d0ed1bc..49a032bf 100644 --- a/modules/spdd_state/src/spdd_state.rs +++ b/modules/spdd_state/src/spdd_state.rs @@ -2,7 +2,7 @@ //! Stores historical stake pool delegation distributions use acropolis_common::{ messages::{CardanoMessage, Message}, - rest_helper::handle_rest_with_path_parameter, + rest_helper::handle_rest_with_query_parameter, KeyHash, }; use anyhow::Result; @@ -18,6 +18,7 @@ use rest::handle_spdd; const DEFAULT_SUBSCRIBE_TOPIC: &str = "cardano.spo.distribution"; const DEFAULT_HANDLE_SPDD_TOPIC: (&str, &str) = ("handle-topic-spdd", "rest.get.spdd"); +const DEFAULT_STORE_SPDD: (&str, bool) = ("store-spdd", false); /// SPDD State module #[module( @@ -40,69 +41,74 @@ impl SPDDState { .unwrap_or(DEFAULT_HANDLE_SPDD_TOPIC.1.to_string()); info!("Creating request handler on '{}'", handle_spdd_topic); - let state = Arc::new(Mutex::new(State::new())); + let store_spdd = config.get_bool(DEFAULT_STORE_SPDD.0).unwrap_or(DEFAULT_STORE_SPDD.1); - // Subscribe for spdd messages from accounts_state - let state_handler = state.clone(); - let mut message_subscription = context.subscribe(&subscribe_topic).await?; - context.run(async move { - loop { - let Ok((_, message)) = message_subscription.read().await else { - return; - }; - match message.as_ref() { - Message::Cardano((_, CardanoMessage::SPOStakeDistribution(msg))) => { - let span = info_span!("spdd_state.handle", epoch = msg.epoch); - async { - let mut state = state_handler.lock().await; + let state_opt = if store_spdd { + let state = Arc::new(Mutex::new(State::new())); - let spdd: BTreeMap = - msg.spos.iter().map(|(k, v)| (k.clone(), *v)).collect(); + // Subscribe for spdd messages from accounts_state + let state_handler = state.clone(); + let mut message_subscription = context.subscribe(&subscribe_topic).await?; + context.run(async move { + loop { + let Ok((_, message)) = message_subscription.read().await else { + return; + }; + match message.as_ref() { + Message::Cardano((_, CardanoMessage::SPOStakeDistribution(msg))) => { + let span = info_span!("spdd_state.handle", epoch = msg.epoch); + async { + let mut state = state_handler.lock().await; - state.insert_spdd(msg.epoch, spdd); + let spdd: BTreeMap = + msg.spos.iter().map(|(k, v)| (k.clone(), *v)).collect(); + + state.insert_spdd(msg.epoch, spdd); + } + .instrument(span) + .await; } - .instrument(span) - .await; - } - _ => error!("Unexpected message type: {message:?}"), + _ => error!("Unexpected message type: {message:?}"), + } } - } - }); - - // Register /spdd REST endpoint - let state_rest = state.clone(); - handle_rest_with_path_parameter(context.clone(), &handle_spdd_topic, move |params| { - let params: Vec = params.iter().map(|s| s.to_string()).collect(); - handle_spdd(state_rest.clone(), params) - }); - - // Ticker to log stats - let mut tick_subscription = context.subscribe("clock.tick").await?; - let state_logger = state.clone(); - context.run(async move { - loop { - let Ok((_, message)) = tick_subscription.read().await else { - return; - }; + }); + // Ticker to log stats + let mut tick_subscription = context.subscribe("clock.tick").await?; + let state_logger = state.clone(); + context.run(async move { + loop { + let Ok((_, message)) = tick_subscription.read().await else { + return; + }; - if let Message::Clock(clock) = message.as_ref() { - if clock.number % 60 == 0 { - let span = info_span!("spdd_state.tick", number = clock.number); - async { - state_logger - .lock() - .await - .tick() - .await - .inspect_err(|e| error!("SPDD tick error: {e}")) - .ok(); + if let Message::Clock(clock) = message.as_ref() { + if clock.number % 60 == 0 { + let span = info_span!("spdd_state.tick", number = clock.number); + async { + state_logger + .lock() + .await + .tick() + .await + .inspect_err(|e| error!("SPDD tick error: {e}")) + .ok(); + } + .instrument(span) + .await; } - .instrument(span) - .await; } } - } + }); + Some(state) + } else { + None + }; + + // Register /spdd REST endpoint + handle_rest_with_query_parameter(context.clone(), &handle_spdd_topic, move |params| { + let state_rest = state_opt.clone(); + handle_spdd(state_rest.clone(), params) }); Ok(()) diff --git a/modules/spdd_state/src/state.rs b/modules/spdd_state/src/state.rs index 396e511c..629fef55 100644 --- a/modules/spdd_state/src/state.rs +++ b/modules/spdd_state/src/state.rs @@ -1,16 +1,15 @@ use acropolis_common::KeyHash; -use imbl::OrdMap; use std::collections::BTreeMap; use tracing::info; pub struct State { - historical_distributions: OrdMap>, + historical_distributions: BTreeMap>, } impl State { pub fn new() -> Self { Self { - historical_distributions: OrdMap::new(), + historical_distributions: BTreeMap::new(), } } @@ -19,7 +18,7 @@ impl State { } pub fn get_latest(&self) -> Option> { - self.historical_distributions.iter().last().map(|(_, map)| map.clone()) + self.historical_distributions.last_key_value().map(|(_, map)| map.clone()) } pub fn get_epoch(&self, epoch: u64) -> Option> { @@ -36,7 +35,7 @@ impl State { num_epochs, latest_epoch = *epoch, spo_count, - "SPDD state: tracking {num_epochs} epochs, latest is {epoch} with {spo_count} SPOs" + "Tracking {num_epochs} epochs, latest is {epoch} with {spo_count} SPOs" ); } else { info!("SPDD state: no data yet"); diff --git a/processes/golden_tests/Cargo.toml b/processes/golden_tests/Cargo.toml index 341fd479..07388863 100644 --- a/processes/golden_tests/Cargo.toml +++ b/processes/golden_tests/Cargo.toml @@ -11,7 +11,7 @@ license = "Apache-2.0" caryatid_process = "0.12" caryatid_sdk = "0.12" caryatid_module_clock = "0.12" -caryatid_module_rest_server = "0.13" +caryatid_module_rest_server = "0.14" caryatid_module_spy = "0.12" diff --git a/processes/omnibus/Cargo.toml b/processes/omnibus/Cargo.toml index 2d15db7f..cb1f1fbe 100644 --- a/processes/omnibus/Cargo.toml +++ b/processes/omnibus/Cargo.toml @@ -11,7 +11,7 @@ license = "Apache-2.0" caryatid_process = "0.12" caryatid_sdk = "0.12" caryatid_module_clock = "0.12" -caryatid_module_rest_server = "0.13" +caryatid_module_rest_server = "0.14" caryatid_module_spy = "0.12" # Core message definition diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 1389a8c4..7283b4a9 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -49,6 +49,7 @@ store-history = false [module.accounts-state] [module.spdd-state] +store-spdd = false [module.clock] diff --git a/processes/replayer/Cargo.toml b/processes/replayer/Cargo.toml index b04ef4b7..7303f014 100644 --- a/processes/replayer/Cargo.toml +++ b/processes/replayer/Cargo.toml @@ -11,7 +11,7 @@ license = "Apache-2.0" caryatid_process = "0.12" caryatid_sdk = "0.12" caryatid_module_clock = "0.12" -caryatid_module_rest_server = "0.13" +caryatid_module_rest_server = "0.14" caryatid_module_spy = "0.12" # Core message definition From a8f86a0ce24211370e9cfdab9914be1fe0918519 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 31 Jul 2025 23:48:05 +0000 Subject: [PATCH 3/6] feat: add drdd module with historical distribution REST endpoint Signed-off-by: William Hankins --- Cargo.lock | 16 +++ modules/accounts_state/src/accounts_state.rs | 13 +- modules/accounts_state/src/rest.rs | 52 +------- modules/drdd_state/Cargo.toml | 23 ++++ modules/drdd_state/src/drdd_state.rs | 120 +++++++++++++++++++ modules/drdd_state/src/rest.rs | 107 +++++++++++++++++ modules/drdd_state/src/state.rs | 53 ++++++++ modules/spdd_state/Cargo.toml | 2 +- processes/omnibus/Cargo.toml | 2 + processes/omnibus/omnibus.toml | 9 +- processes/omnibus/src/main.rs | 2 + 11 files changed, 332 insertions(+), 67 deletions(-) create mode 100644 modules/drdd_state/Cargo.toml create mode 100644 modules/drdd_state/src/drdd_state.rs create mode 100644 modules/drdd_state/src/rest.rs create mode 100644 modules/drdd_state/src/state.rs diff --git a/Cargo.lock b/Cargo.lock index 774e73ab..640eb26d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -63,6 +63,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "acropolis_module_drdd_state" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "anyhow", + "caryatid_sdk", + "config", + "hex", + "serde", + "serde_json", + "tokio", + "tracing", +] + [[package]] name = "acropolis_module_drep_state" version = "0.1.0" @@ -314,6 +329,7 @@ dependencies = [ "acropolis_common", "acropolis_module_accounts_state", "acropolis_module_block_unpacker", + "acropolis_module_drdd_state", "acropolis_module_drep_state", "acropolis_module_epoch_activity_counter", "acropolis_module_genesis_bootstrapper", diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index d7bd2251..d7bbed87 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -24,7 +24,7 @@ mod rest; use acropolis_common::queries::accounts::{ AccountInfo, AccountsStateQuery, AccountsStateQueryResponse, }; -use rest::{handle_drdd, handle_pots}; +use rest::handle_pots; const DEFAULT_SPO_STATE_TOPIC: &str = "cardano.spo.state"; const DEFAULT_EPOCH_ACTIVITY_TOPIC: &str = "cardano.epoch.activity"; @@ -38,7 +38,6 @@ const DEFAULT_SPO_DISTRIBUTION_TOPIC: &str = "cardano.spo.distribution"; const DEFAULT_PROTOCOL_PARAMETERS_TOPIC: &str = "cardano.protocol.parameters"; const DEFAULT_HANDLE_POTS_TOPIC: (&str, &str) = ("handle-topic-pots", "rest.get.pots"); -const DEFAULT_HANDLE_DRDD_TOPIC: (&str, &str) = ("handle-topic-drdd", "rest.get.drdd"); /// Accounts State module #[module( @@ -386,16 +385,10 @@ impl AccountsState { .unwrap_or(DEFAULT_HANDLE_POTS_TOPIC.1.to_string()); info!("Creating request handler on '{}'", handle_pots_topic); - let handle_drdd_topic = config - .get_string(DEFAULT_HANDLE_DRDD_TOPIC.0) - .unwrap_or(DEFAULT_HANDLE_DRDD_TOPIC.1.to_string()); - info!("Creating request handler on '{}'", handle_drdd_topic); - // Create history let history = Arc::new(Mutex::new(StateHistory::::new("AccountsState"))); let history_account_single = history.clone(); let history_pots = history.clone(); - let history_drdd = history.clone(); let history_tick = history.clone(); context.handle("accounts-state", move |message| { @@ -449,10 +442,6 @@ impl AccountsState { handle_pots(history_pots.clone()) }); - handle_rest(context.clone(), &handle_drdd_topic, move || { - handle_drdd(history_drdd.clone()) - }); - // Ticker to log stats let mut tick_subscription = context.subscribe("clock.tick").await?; context.clone().run(async move { diff --git a/modules/accounts_state/src/rest.rs b/modules/accounts_state/src/rest.rs index e730bbcc..ef3b63a8 100644 --- a/modules/accounts_state/src/rest.rs +++ b/modules/accounts_state/src/rest.rs @@ -5,16 +5,8 @@ use anyhow::Result; use tokio::sync::Mutex; use crate::state::State; +use acropolis_common::messages::RESTResponse; use acropolis_common::state_history::StateHistory; -use acropolis_common::{messages::RESTResponse, Lovelace}; - -/// Handles /drdd -#[derive(serde::Serialize, serde::Deserialize)] -struct APIDRepDelegationDistribution { - pub abstain: Lovelace, - pub no_confidence: Lovelace, - pub dreps: Vec<(String, u64)>, -} /// Handles /pots pub async fn handle_pots(history: Arc>>) -> Result { @@ -34,45 +26,3 @@ pub async fn handle_pots(history: Arc>>) -> Result>>) -> Result { - let locked = history.lock().await; - let state = match locked.current() { - Some(state) => state, - None => return Ok(RESTResponse::with_json(200, "{}")), - }; - - let drdd = state.generate_drdd(); - - let dreps = { - let mut dreps = Vec::with_capacity(drdd.dreps.len()); - for (cred, amount) in drdd.dreps { - let bech32 = match cred.to_drep_bech32() { - Ok(val) => val, - Err(e) => { - return Ok(RESTResponse::with_text( - 500, - &format!("Internal server error while retrieving DRep delegation distribution: {e}"), - )); - } - }; - dreps.push((bech32, amount)); - } - dreps - }; - - let response = APIDRepDelegationDistribution { - abstain: drdd.abstain, - no_confidence: drdd.no_confidence, - dreps, - }; - - match serde_json::to_string(&response) { - Ok(json) => Ok(RESTResponse::with_json(200, &json)), - Err(e) => Ok(RESTResponse::with_text( - 500, - &format!("Internal server error while retrieving DRep delegation distribution: {e}"), - )), - } -} diff --git a/modules/drdd_state/Cargo.toml b/modules/drdd_state/Cargo.toml new file mode 100644 index 00000000..c2243ddd --- /dev/null +++ b/modules/drdd_state/Cargo.toml @@ -0,0 +1,23 @@ +# Acropolis DRDD state module + +[package] +name = "acropolis_module_drdd_state" +version = "0.1.0" +edition = "2021" +authors = ["William Hankins "] +description = "DRep Pool Delegation Distribution State Tracker" +license = "Apache-2.0" + +[dependencies] +caryatid_sdk = "0.12" +config = "0.15.11" +anyhow = "1.0" +acropolis_common = { path = "../../common" } +tracing = "0.1.40" +tokio = { version = "1", features = ["full"] } +serde = { version = "1.0.214", features = ["derive"] } +serde_json = "1.0.132" +hex = "0.4.3" + +[lib] +path = "src/drdd_state.rs" \ No newline at end of file diff --git a/modules/drdd_state/src/drdd_state.rs b/modules/drdd_state/src/drdd_state.rs new file mode 100644 index 00000000..c0c9a7e6 --- /dev/null +++ b/modules/drdd_state/src/drdd_state.rs @@ -0,0 +1,120 @@ +//! Acropolis DRDD state module for Caryatid +//! Stores historical DRep delegation distributions +use acropolis_common::{ + messages::{CardanoMessage, Message}, + rest_helper::handle_rest_with_query_parameter, +}; +use anyhow::Result; +use caryatid_sdk::{module, Context, Module}; +use config::Config; +use std::sync::Arc; +use tokio::sync::Mutex; +use tracing::{error, info, info_span, Instrument}; +mod state; +use state::State; +mod rest; +use rest::handle_drdd; + +use crate::state::DRepDistribution; + +const DEFAULT_SUBSCRIBE_TOPIC: &str = "cardano.drep.distribution"; +const DEFAULT_HANDLE_DRDD_TOPIC: (&str, &str) = ("handle-topic-drdd", "rest.get.drdd"); +const DEFAULT_STORE_DRDD: (&str, bool) = ("store-drdd", false); + +/// DRDD State module +#[module( + message_type(Message), + name = "drdd-state", + description = "DRep Delegation Distribution State Tracker" +)] + +pub struct DRDDState; + +impl DRDDState { + pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + // Get configuration + let subscribe_topic = + config.get_string("subscribe-topic").unwrap_or(DEFAULT_SUBSCRIBE_TOPIC.to_string()); + info!("Creating subscriber on '{subscribe_topic}'"); + + let handle_drdd_topic = config + .get_string(DEFAULT_HANDLE_DRDD_TOPIC.0) + .unwrap_or(DEFAULT_HANDLE_DRDD_TOPIC.1.to_string()); + info!("Creating request handler on '{}'", handle_drdd_topic); + + let store_drdd = config.get_bool(DEFAULT_STORE_DRDD.0).unwrap_or(DEFAULT_STORE_DRDD.1); + + let state_opt = if store_drdd { + let state = Arc::new(Mutex::new(State::new())); + + // Subscribe for drdd messages from accounts_state + let state_handler = state.clone(); + let mut message_subscription = context.subscribe(&subscribe_topic).await?; + context.run(async move { + loop { + let Ok((_, message)) = message_subscription.read().await else { + return; + }; + match message.as_ref() { + Message::Cardano((_, CardanoMessage::DRepStakeDistribution(msg))) => { + let span = info_span!("spdd_state.handle", epoch = msg.epoch); + async { + let mut state = state_handler.lock().await; + + let drdd = DRepDistribution { + dreps: msg.dreps.iter().map(|(k, v)| (k.clone(), *v)).collect(), + abstain: msg.abstain, + no_confidence: msg.no_confidence, + }; + + state.insert_drdd(msg.epoch, drdd); + } + .instrument(span) + .await; + } + + _ => error!("Unexpected message type: {message:?}"), + } + } + }); + // Ticker to log stats + let mut tick_subscription = context.subscribe("clock.tick").await?; + let state_logger = state.clone(); + context.run(async move { + loop { + let Ok((_, message)) = tick_subscription.read().await else { + return; + }; + + if let Message::Clock(clock) = message.as_ref() { + if clock.number % 60 == 0 { + let span = info_span!("drdd_state.tick", number = clock.number); + async { + state_logger + .lock() + .await + .tick() + .await + .inspect_err(|e| error!("DRDD tick error: {e}")) + .ok(); + } + .instrument(span) + .await; + } + } + } + }); + Some(state) + } else { + None + }; + + // Register /drdd REST endpoint + handle_rest_with_query_parameter(context.clone(), &handle_drdd_topic, move |params| { + let state_rest = state_opt.clone(); + handle_drdd(state_rest.clone(), params) + }); + + Ok(()) + } +} diff --git a/modules/drdd_state/src/rest.rs b/modules/drdd_state/src/rest.rs new file mode 100644 index 00000000..97ba6985 --- /dev/null +++ b/modules/drdd_state/src/rest.rs @@ -0,0 +1,107 @@ +use crate::state::State; +use acropolis_common::{messages::RESTResponse, DRepCredential}; +use anyhow::Result; +use serde::Serialize; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::Mutex; + +// Response struct for DRDD +#[derive(Serialize)] +struct DRDDResponse { + dreps: HashMap, + abstain: u64, + no_confidence: u64, +} + +/// Handles /drdd +pub async fn handle_drdd( + state: Option>>, + params: HashMap, +) -> Result { + let locked = match state.as_ref() { + Some(state) => state.lock().await, + None => { + return Ok(RESTResponse::with_text( + 503, + "DRDD storage is disabled by configuration", + )); + } + }; + + let drdd_opt = if let Some(epoch_str) = params.get("epoch") { + if params.len() > 1 { + return Ok(RESTResponse::with_text( + 400, + "Only 'epoch' is a valid query parameter", + )); + } + + match epoch_str.parse::() { + Ok(epoch) => match locked.get_epoch(epoch) { + Some(drdd) => Some(drdd), + None => { + return Ok(RESTResponse::with_text( + 404, + &format!("DRDD not found for epoch {}", epoch), + )); + } + }, + Err(_) => { + return Ok(RESTResponse::with_text( + 400, + "Invalid epoch query parameter: must be a number", + )); + } + } + } else if params.is_empty() { + locked.get_latest() + } else { + return Ok(RESTResponse::with_text( + 400, + "Unexpected query parameter: only 'epoch' is allowed", + )); + }; + + if let Some(drdd) = drdd_opt { + let dreps: HashMap = drdd + .dreps + .iter() + .map(|(k, v)| { + let key = k.to_drep_bech32().unwrap_or_else(|_| match k { + DRepCredential::AddrKeyHash(bytes) | DRepCredential::ScriptHash(bytes) => { + hex::encode(bytes) + } + }); + (key, *v) + }) + .collect(); + + let response = DRDDResponse { + dreps, + abstain: drdd.abstain, + no_confidence: drdd.no_confidence, + }; + + match serde_json::to_string(&response) { + Ok(body) => Ok(RESTResponse::with_json(200, &body)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Internal server error retrieving DRep delegation distribution: {e}"), + )), + } + } else { + let response = DRDDResponse { + dreps: HashMap::new(), + abstain: 0, + no_confidence: 0, + }; + + match serde_json::to_string(&response) { + Ok(body) => Ok(RESTResponse::with_json(200, &body)), + Err(_) => Ok(RESTResponse::with_text( + 500, + "Internal server error serializing empty DRDD response", + )), + } + } +} diff --git a/modules/drdd_state/src/state.rs b/modules/drdd_state/src/state.rs new file mode 100644 index 00000000..27bb1d5f --- /dev/null +++ b/modules/drdd_state/src/state.rs @@ -0,0 +1,53 @@ +use acropolis_common::DRepCredential; +use std::collections::BTreeMap; +use tracing::info; + +pub struct State { + historical_distributions: BTreeMap, +} + +#[derive(Clone)] +pub struct DRepDistribution { + pub dreps: BTreeMap, + pub abstain: u64, + pub no_confidence: u64, +} + +impl State { + pub fn new() -> Self { + Self { + historical_distributions: BTreeMap::new(), + } + } + + pub fn insert_drdd(&mut self, epoch: u64, drdd: DRepDistribution) { + self.historical_distributions.insert(epoch, drdd); + } + + pub fn get_latest(&self) -> Option { + self.historical_distributions.last_key_value().map(|(_, map)| map.clone()) + } + + pub fn get_epoch(&self, epoch: u64) -> Option { + self.historical_distributions.get(&epoch).cloned() + } + + pub async fn tick(&self) -> anyhow::Result<()> { + let num_epochs = self.historical_distributions.len(); + let latest = self.historical_distributions.iter().last(); + + if let Some((epoch, drep_map)) = latest { + let drep_count = drep_map.dreps.len(); + info!( + num_epochs, + latest_epoch = *epoch, + drep_count, + "Tracking {num_epochs} epochs, latest is {epoch} with {drep_count} DReps" + ); + } else { + info!("DRDD state: no data yet"); + } + + Ok(()) + } +} diff --git a/modules/spdd_state/Cargo.toml b/modules/spdd_state/Cargo.toml index 475f67f7..cde68d26 100644 --- a/modules/spdd_state/Cargo.toml +++ b/modules/spdd_state/Cargo.toml @@ -1,4 +1,4 @@ -# Acropolis DRep state module +# Acropolis SPDD state module [package] name = "acropolis_module_spdd_state" diff --git a/processes/omnibus/Cargo.toml b/processes/omnibus/Cargo.toml index cb1f1fbe..957bdcdf 100644 --- a/processes/omnibus/Cargo.toml +++ b/processes/omnibus/Cargo.toml @@ -33,6 +33,8 @@ acropolis_module_epoch_activity_counter = { path = "../../modules/epoch_activity acropolis_module_accounts_state = { path = "../../modules/accounts_state" } acropolis_module_rest_blockfrost = { path = "../../modules/rest_blockfrost" } acropolis_module_spdd_state = { path = "../../modules/spdd_state" } +acropolis_module_drdd_state = { path = "../../modules/drdd_state" } + anyhow = "1.0" config = "0.15.11" tracing = "0.1.40" diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 7283b4a9..54ed7903 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -32,8 +32,14 @@ address-delta-topic = "cardano.address.delta" [module.spo-state] +[module.spdd-state] +store-spdd = false + [module.drep-state] +[module.drdd-state] +store-drdd = false + [module.governance-state] [module.parameters-state] @@ -48,9 +54,6 @@ store-history = false [module.accounts-state] -[module.spdd-state] -store-spdd = false - [module.clock] [module.rest-server] diff --git a/processes/omnibus/src/main.rs b/processes/omnibus/src/main.rs index bc6fd341..69d14be4 100644 --- a/processes/omnibus/src/main.rs +++ b/processes/omnibus/src/main.rs @@ -11,6 +11,7 @@ use tracing_subscriber; // External modules use acropolis_module_accounts_state::AccountsState; use acropolis_module_block_unpacker::BlockUnpacker; +use acropolis_module_drdd_state::DRDDState; use acropolis_module_drep_state::DRepState; use acropolis_module_epoch_activity_counter::EpochActivityCounter; use acropolis_module_genesis_bootstrapper::GenesisBootstrapper; @@ -99,6 +100,7 @@ pub async fn main() -> Result<()> { AccountsState::register(&mut process); BlockfrostREST::register(&mut process); SPDDState::register(&mut process); + DRDDState::register(&mut process); Clock::::register(&mut process); RESTServer::::register(&mut process); From b535bd63570aa91a230c6d79fa2e78595864ff20 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Wed, 6 Aug 2025 22:47:27 +0000 Subject: [PATCH 4/6] fix: Use OrdMap instead of HashMap for spdd/drdd, implement query param macro Signed-off-by: William Hankins --- Cargo.lock | 2 ++ common/src/rest_helper.rs | 36 +++++++++++++++++++++++- modules/drdd_state/Cargo.toml | 1 + modules/drdd_state/src/drdd_state.rs | 6 ++-- modules/drdd_state/src/rest.rs | 41 ++++++++-------------------- modules/drdd_state/src/state.rs | 17 ++++++------ modules/spdd_state/Cargo.toml | 1 + modules/spdd_state/src/rest.rs | 41 ++++++++-------------------- modules/spdd_state/src/spdd_state.rs | 9 +++--- modules/spdd_state/src/state.rs | 16 +++++------ 10 files changed, 87 insertions(+), 83 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bae1eaf1..fdab2f49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -74,6 +74,7 @@ dependencies = [ "caryatid_sdk", "config", "hex", + "imbl", "serde", "serde_json", "tokio", @@ -227,6 +228,7 @@ dependencies = [ "caryatid_sdk", "config", "hex", + "imbl", "serde_json", "tokio", "tracing", diff --git a/common/src/rest_helper.rs b/common/src/rest_helper.rs index bc607d09..c3ea34e5 100644 --- a/common/src/rest_helper.rs +++ b/common/src/rest_helper.rs @@ -84,7 +84,7 @@ where } // Handle a REST request with query parameters -pub fn handle_rest_with_query_parameter( +pub fn handle_rest_with_query_parameters( context: Arc>, topic: &str, handler: F, @@ -142,3 +142,37 @@ impl ToCheckedF64 for T { self.to_f64().ok_or_else(|| anyhow!("Failed to convert {name} to f64")) } } + +// Macros for extracting and validating REST query parameters +#[macro_export] +macro_rules! extract_strict_query_params { + ($params:expr, { $($key:literal => $var:ident : Option<$type:ty>,)* }) => { + $( + let mut $var: Option<$type> = None; + )* + + for (k, v) in &$params { + match k.as_str() { + $( + $key => { + $var = match v.parse::<$type>() { + Ok(val) => Some(val), + Err(_) => { + return Ok($crate::messages::RESTResponse::with_text( + 400, + concat!("Invalid ", $key, " query parameter: must be a valid type"), + )); + } + }; + } + )* + _ => { + return Ok($crate::messages::RESTResponse::with_text( + 400, + concat!("Unexpected query parameter: only allowed keys are: ", $( $key, " ", )*) + )); + } + } + } + }; +} diff --git a/modules/drdd_state/Cargo.toml b/modules/drdd_state/Cargo.toml index c2243ddd..1531c04a 100644 --- a/modules/drdd_state/Cargo.toml +++ b/modules/drdd_state/Cargo.toml @@ -18,6 +18,7 @@ tokio = { version = "1", features = ["full"] } serde = { version = "1.0.214", features = ["derive"] } serde_json = "1.0.132" hex = "0.4.3" +imbl = { version = "5.0.0", features = ["serde"] } [lib] path = "src/drdd_state.rs" \ No newline at end of file diff --git a/modules/drdd_state/src/drdd_state.rs b/modules/drdd_state/src/drdd_state.rs index c0c9a7e6..3b21a9a2 100644 --- a/modules/drdd_state/src/drdd_state.rs +++ b/modules/drdd_state/src/drdd_state.rs @@ -2,7 +2,7 @@ //! Stores historical DRep delegation distributions use acropolis_common::{ messages::{CardanoMessage, Message}, - rest_helper::handle_rest_with_query_parameter, + rest_helper::handle_rest_with_query_parameters, }; use anyhow::Result; use caryatid_sdk::{module, Context, Module}; @@ -57,7 +57,7 @@ impl DRDDState { }; match message.as_ref() { Message::Cardano((_, CardanoMessage::DRepStakeDistribution(msg))) => { - let span = info_span!("spdd_state.handle", epoch = msg.epoch); + let span = info_span!("drdd_state.handle", epoch = msg.epoch); async { let mut state = state_handler.lock().await; @@ -110,7 +110,7 @@ impl DRDDState { }; // Register /drdd REST endpoint - handle_rest_with_query_parameter(context.clone(), &handle_drdd_topic, move |params| { + handle_rest_with_query_parameters(context.clone(), &handle_drdd_topic, move |params| { let state_rest = state_opt.clone(); handle_drdd(state_rest.clone(), params) }); diff --git a/modules/drdd_state/src/rest.rs b/modules/drdd_state/src/rest.rs index 97ba6985..37822569 100644 --- a/modules/drdd_state/src/rest.rs +++ b/modules/drdd_state/src/rest.rs @@ -1,5 +1,5 @@ use crate::state::State; -use acropolis_common::{messages::RESTResponse, DRepCredential}; +use acropolis_common::{extract_strict_query_params, messages::RESTResponse, DRepCredential}; use anyhow::Result; use serde::Serialize; use std::{collections::HashMap, sync::Arc}; @@ -28,38 +28,21 @@ pub async fn handle_drdd( } }; - let drdd_opt = if let Some(epoch_str) = params.get("epoch") { - if params.len() > 1 { - return Ok(RESTResponse::with_text( - 400, - "Only 'epoch' is a valid query parameter", - )); - } + extract_strict_query_params!(params, { + "epoch" => epoch: Option, + }); - match epoch_str.parse::() { - Ok(epoch) => match locked.get_epoch(epoch) { - Some(drdd) => Some(drdd), - None => { - return Ok(RESTResponse::with_text( - 404, - &format!("DRDD not found for epoch {}", epoch), - )); - } - }, - Err(_) => { + let drdd_opt = match epoch { + Some(epoch) => match locked.get_epoch(epoch) { + Some(drdd) => Some(drdd), + None => { return Ok(RESTResponse::with_text( - 400, - "Invalid epoch query parameter: must be a number", + 404, + &format!("DRDD not found for epoch {}", epoch), )); } - } - } else if params.is_empty() { - locked.get_latest() - } else { - return Ok(RESTResponse::with_text( - 400, - "Unexpected query parameter: only 'epoch' is allowed", - )); + }, + None => locked.get_latest(), }; if let Some(drdd) = drdd_opt { diff --git a/modules/drdd_state/src/state.rs b/modules/drdd_state/src/state.rs index 27bb1d5f..50fbe3ba 100644 --- a/modules/drdd_state/src/state.rs +++ b/modules/drdd_state/src/state.rs @@ -1,14 +1,14 @@ use acropolis_common::DRepCredential; -use std::collections::BTreeMap; +use imbl::OrdMap; use tracing::info; pub struct State { - historical_distributions: BTreeMap, + historical_distributions: OrdMap, } #[derive(Clone)] pub struct DRepDistribution { - pub dreps: BTreeMap, + pub dreps: OrdMap, pub abstain: u64, pub no_confidence: u64, } @@ -16,7 +16,7 @@ pub struct DRepDistribution { impl State { pub fn new() -> Self { Self { - historical_distributions: BTreeMap::new(), + historical_distributions: OrdMap::new(), } } @@ -24,12 +24,11 @@ impl State { self.historical_distributions.insert(epoch, drdd); } - pub fn get_latest(&self) -> Option { - self.historical_distributions.last_key_value().map(|(_, map)| map.clone()) + pub fn get_latest(&self) -> Option<&DRepDistribution> { + self.historical_distributions.iter().next_back().map(|(_, map)| map) } - - pub fn get_epoch(&self, epoch: u64) -> Option { - self.historical_distributions.get(&epoch).cloned() + pub fn get_epoch(&self, epoch: u64) -> Option<&DRepDistribution> { + self.historical_distributions.get(&epoch) } pub async fn tick(&self) -> anyhow::Result<()> { diff --git a/modules/spdd_state/Cargo.toml b/modules/spdd_state/Cargo.toml index cde68d26..5d2acdc6 100644 --- a/modules/spdd_state/Cargo.toml +++ b/modules/spdd_state/Cargo.toml @@ -17,6 +17,7 @@ tracing = "0.1.40" tokio = { version = "1", features = ["full"] } serde_json = "1.0.132" hex = "0.4.3" +imbl = { version = "5.0.0", features = ["serde"] } [lib] path = "src/spdd_state.rs" diff --git a/modules/spdd_state/src/rest.rs b/modules/spdd_state/src/rest.rs index e38ae893..217afd59 100644 --- a/modules/spdd_state/src/rest.rs +++ b/modules/spdd_state/src/rest.rs @@ -1,6 +1,6 @@ use crate::state::State; -use acropolis_common::messages::RESTResponse; use acropolis_common::serialization::Bech32WithHrp; +use acropolis_common::{extract_strict_query_params, messages::RESTResponse}; use anyhow::Result; use std::{collections::HashMap, sync::Arc}; use tokio::sync::Mutex; @@ -20,38 +20,21 @@ pub async fn handle_spdd( } }; - let spdd_opt = if let Some(epoch_str) = params.get("epoch") { - if params.len() > 1 { - return Ok(RESTResponse::with_text( - 400, - "Only 'epoch' is a valid query parameter", - )); - } + extract_strict_query_params!(params, { + "epoch" => epoch: Option, + }); - match epoch_str.parse::() { - Ok(epoch) => match locked.get_epoch(epoch) { - Some(spdd) => Some(spdd), - None => { - return Ok(RESTResponse::with_text( - 404, - &format!("SPDD not found for epoch {}", epoch), - )); - } - }, - Err(_) => { + let spdd_opt = match epoch { + Some(epoch) => match locked.get_epoch(epoch) { + Some(spdd) => Some(spdd), + None => { return Ok(RESTResponse::with_text( - 400, - "Invalid epoch query parameter: must be a number", + 404, + &format!("SPDD not found for epoch {}", epoch), )); } - } - } else if params.is_empty() { - locked.get_latest() - } else { - return Ok(RESTResponse::with_text( - 400, - "Unexpected query parameter: only 'epoch' is allowed", - )); + }, + None => locked.get_latest(), }; if let Some(spdd) = spdd_opt { diff --git a/modules/spdd_state/src/spdd_state.rs b/modules/spdd_state/src/spdd_state.rs index 33adf0f3..9cf07239 100644 --- a/modules/spdd_state/src/spdd_state.rs +++ b/modules/spdd_state/src/spdd_state.rs @@ -2,13 +2,14 @@ //! Stores historical stake pool delegation distributions use acropolis_common::{ messages::{CardanoMessage, Message}, - rest_helper::handle_rest_with_query_parameter, + rest_helper::handle_rest_with_query_parameters, DelegatedStake, KeyHash, }; use anyhow::Result; use caryatid_sdk::{module, Context, Module}; use config::Config; -use std::{collections::BTreeMap, sync::Arc}; +use imbl::OrdMap; +use std::sync::Arc; use tokio::sync::Mutex; use tracing::{error, info, info_span, Instrument}; mod state; @@ -60,7 +61,7 @@ impl SPDDState { async { let mut state = state_handler.lock().await; - let spdd: BTreeMap = + let spdd: OrdMap = msg.spos.iter().map(|(k, v)| (k.clone(), *v)).collect(); state.insert_spdd(msg.epoch, spdd); @@ -106,7 +107,7 @@ impl SPDDState { }; // Register /spdd REST endpoint - handle_rest_with_query_parameter(context.clone(), &handle_spdd_topic, move |params| { + handle_rest_with_query_parameters(context.clone(), &handle_spdd_topic, move |params| { let state_rest = state_opt.clone(); handle_spdd(state_rest.clone(), params) }); diff --git a/modules/spdd_state/src/state.rs b/modules/spdd_state/src/state.rs index b9976265..207e10c2 100644 --- a/modules/spdd_state/src/state.rs +++ b/modules/spdd_state/src/state.rs @@ -1,28 +1,28 @@ use acropolis_common::{DelegatedStake, KeyHash}; -use std::collections::BTreeMap; +use imbl::OrdMap; use tracing::info; pub struct State { - historical_distributions: BTreeMap>, + historical_distributions: OrdMap>, } impl State { pub fn new() -> Self { Self { - historical_distributions: BTreeMap::new(), + historical_distributions: OrdMap::new(), } } - pub fn insert_spdd(&mut self, epoch: u64, spdd: BTreeMap) { + pub fn insert_spdd(&mut self, epoch: u64, spdd: OrdMap) { self.historical_distributions.insert(epoch, spdd); } - pub fn get_latest(&self) -> Option> { - self.historical_distributions.last_key_value().map(|(_, map)| map.clone()) + pub fn get_latest(&self) -> Option<&OrdMap> { + self.historical_distributions.iter().next_back().map(|(_, map)| map) } - pub fn get_epoch(&self, epoch: u64) -> Option> { - self.historical_distributions.get(&epoch).cloned() + pub fn get_epoch(&self, epoch: u64) -> Option<&OrdMap> { + self.historical_distributions.get(&epoch) } pub async fn tick(&self) -> anyhow::Result<()> { From c1e71b6cb62a3ddd26397dc122c3e2c0cf2e50bf Mon Sep 17 00:00:00 2001 From: William Hankins Date: Tue, 12 Aug 2025 18:28:44 +0000 Subject: [PATCH 5/6] fix: optimize spdd/drdd updates to avoid unnecessary clones Signed-off-by: William Hankins --- common/src/types.rs | 2 +- modules/drdd_state/src/drdd_state.rs | 17 +++++------ modules/drdd_state/src/state.rs | 45 ++++++++++++++++++++++++++-- modules/spdd_state/src/spdd_state.rs | 12 ++++---- modules/spdd_state/src/state.rs | 33 ++++++++++++++++++-- 5 files changed, 87 insertions(+), 22 deletions(-) diff --git a/common/src/types.rs b/common/src/types.rs index 66915d13..bc3a0522 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -460,7 +460,7 @@ pub struct StakeDelegation { } /// SPO total delegation data (for SPDD) -#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq)] pub struct DelegatedStake { /// Active stake - UTXO values only (used for reward calcs) pub active: Lovelace, diff --git a/modules/drdd_state/src/drdd_state.rs b/modules/drdd_state/src/drdd_state.rs index 3b21a9a2..e8486cbc 100644 --- a/modules/drdd_state/src/drdd_state.rs +++ b/modules/drdd_state/src/drdd_state.rs @@ -15,8 +15,6 @@ use state::State; mod rest; use rest::handle_drdd; -use crate::state::DRepDistribution; - const DEFAULT_SUBSCRIBE_TOPIC: &str = "cardano.drep.distribution"; const DEFAULT_HANDLE_DRDD_TOPIC: (&str, &str) = ("handle-topic-drdd", "rest.get.drdd"); const DEFAULT_STORE_DRDD: (&str, bool) = ("store-drdd", false); @@ -59,15 +57,14 @@ impl DRDDState { Message::Cardano((_, CardanoMessage::DRepStakeDistribution(msg))) => { let span = info_span!("drdd_state.handle", epoch = msg.epoch); async { - let mut state = state_handler.lock().await; - - let drdd = DRepDistribution { - dreps: msg.dreps.iter().map(|(k, v)| (k.clone(), *v)).collect(), - abstain: msg.abstain, - no_confidence: msg.no_confidence, - }; + let mut guard = state_handler.lock().await; - state.insert_drdd(msg.epoch, drdd); + guard.apply_drdd_snapshot( + msg.epoch, + msg.dreps.iter().map(|(k, v)| (k.clone(), *v)), + msg.abstain, + msg.no_confidence, + ); } .instrument(span) .await; diff --git a/modules/drdd_state/src/state.rs b/modules/drdd_state/src/state.rs index 50fbe3ba..9232b17c 100644 --- a/modules/drdd_state/src/state.rs +++ b/modules/drdd_state/src/state.rs @@ -1,5 +1,5 @@ use acropolis_common::DRepCredential; -use imbl::OrdMap; +use imbl::{OrdMap, OrdSet}; use tracing::info; pub struct State { @@ -20,6 +20,47 @@ impl State { } } + pub fn apply_drdd_snapshot( + &mut self, + epoch: u64, + snapshot_dreps: I, + abstain: u64, + no_confidence: u64, + ) where + I: IntoIterator, + { + let mut next = self.get_latest().cloned().unwrap_or_else(|| DRepDistribution { + dreps: OrdMap::new(), + abstain: 0, + no_confidence: 0, + }); + + next.abstain = abstain; + next.no_confidence = no_confidence; + + // Update new or changed entries + let mut present = OrdSet::new(); + for (k, v_new) in snapshot_dreps { + let changed = match next.dreps.get(&k) { + Some(v_old) => *v_old != v_new, + None => true, + }; + if changed { + next.dreps.insert(k.clone(), v_new); + } + present.insert(k); + } + + // Remove keys that disappeared. + let to_remove: Vec<_> = + next.dreps.keys().filter(|k| !present.contains(k)).cloned().collect(); + for k in to_remove { + next.dreps.remove(&k); + } + + self.insert_drdd(epoch, next); + } + pub fn insert_drdd(&mut self, epoch: u64, drdd: DRepDistribution) { self.historical_distributions.insert(epoch, drdd); } @@ -33,7 +74,7 @@ impl State { pub async fn tick(&self) -> anyhow::Result<()> { let num_epochs = self.historical_distributions.len(); - let latest = self.historical_distributions.iter().last(); + let latest = self.historical_distributions.iter().next_back(); if let Some((epoch, drep_map)) = latest { let drep_count = drep_map.dreps.len(); diff --git a/modules/spdd_state/src/spdd_state.rs b/modules/spdd_state/src/spdd_state.rs index 9cf07239..0ec49898 100644 --- a/modules/spdd_state/src/spdd_state.rs +++ b/modules/spdd_state/src/spdd_state.rs @@ -3,12 +3,10 @@ use acropolis_common::{ messages::{CardanoMessage, Message}, rest_helper::handle_rest_with_query_parameters, - DelegatedStake, KeyHash, }; use anyhow::Result; use caryatid_sdk::{module, Context, Module}; use config::Config; -use imbl::OrdMap; use std::sync::Arc; use tokio::sync::Mutex; use tracing::{error, info, info_span, Instrument}; @@ -59,12 +57,12 @@ impl SPDDState { Message::Cardano((_, CardanoMessage::SPOStakeDistribution(msg))) => { let span = info_span!("spdd_state.handle", epoch = msg.epoch); async { - let mut state = state_handler.lock().await; + let mut guard = state_handler.lock().await; - let spdd: OrdMap = - msg.spos.iter().map(|(k, v)| (k.clone(), *v)).collect(); - - state.insert_spdd(msg.epoch, spdd); + guard.apply_spdd_snapshot( + msg.epoch, + msg.spos.iter().map(|(k, v)| (k.clone(), *v)), + ); } .instrument(span) .await; diff --git a/modules/spdd_state/src/state.rs b/modules/spdd_state/src/state.rs index 207e10c2..13a11805 100644 --- a/modules/spdd_state/src/state.rs +++ b/modules/spdd_state/src/state.rs @@ -1,5 +1,5 @@ use acropolis_common::{DelegatedStake, KeyHash}; -use imbl::OrdMap; +use imbl::{OrdMap, OrdSet}; use tracing::info; pub struct State { @@ -13,6 +13,35 @@ impl State { } } + pub fn apply_spdd_snapshot(&mut self, epoch: u64, snapshot: I) + where + I: IntoIterator, + { + let mut next = self.get_latest().cloned().unwrap_or_else(OrdMap::new); + + // Update new or changed entries + let mut present = OrdSet::new(); + for (k, v_new) in snapshot { + let changed = match next.get(&k) { + Some(v_old) => *v_old != v_new, + None => true, + }; + if changed { + next.insert(k.clone(), v_new); + } + present.insert(k); + } + + // Remove keys that disappeared. + let to_remove: Vec<_> = + next.keys().filter(|k| !present.contains::<[u8]>((**k).as_slice())).cloned().collect(); + for k in to_remove { + next.remove(&k); + } + + self.insert_spdd(epoch, next); + } + pub fn insert_spdd(&mut self, epoch: u64, spdd: OrdMap) { self.historical_distributions.insert(epoch, spdd); } @@ -27,7 +56,7 @@ impl State { pub async fn tick(&self) -> anyhow::Result<()> { let num_epochs = self.historical_distributions.len(); - let latest = self.historical_distributions.iter().last(); + let latest = self.historical_distributions.iter().next_back(); if let Some((epoch, spo_map)) = latest { let spo_count = spo_map.len(); From aa1f495e66ac3493afbf03cb714bef35e8928702 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Mon, 25 Aug 2025 17:17:29 +0000 Subject: [PATCH 6/6] fix: disabled storing spdd by default and update drep_state to use refactored StateHistory initalization Signed-off-by: William Hankins --- modules/drep_state/src/drep_state.rs | 11 +++++++---- processes/omnibus/omnibus.toml | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/modules/drep_state/src/drep_state.rs b/modules/drep_state/src/drep_state.rs index 860026d0..ac008c1e 100644 --- a/modules/drep_state/src/drep_state.rs +++ b/modules/drep_state/src/drep_state.rs @@ -6,7 +6,7 @@ use acropolis_common::{ queries::governance::{ DRepInfo, DRepsList, GovernanceStateQuery, GovernanceStateQueryResponse, }, - state_history::StateHistory, + state_history::{HistoryKind, StateHistory}, BlockInfo, BlockStatus, }; use anyhow::Result; @@ -86,7 +86,7 @@ impl DRepState { Message::Cardano((ref block_info, _)) => { // rollback only on certs if block_info.status == BlockStatus::RolledBack { - state = history.lock().await.get_rolled_back_state(&block_info); + state = history.lock().await.get_rolled_back_state(block_info.number); } current_block = Some(block_info.clone()); block_info.new_epoch && block_info.epoch > 0 @@ -178,7 +178,7 @@ impl DRepState { // Commit the new state if let Some(block_info) = current_block { - history.lock().await.commit(&block_info, state); + history.lock().await.commit(block_info.number, state); } } } @@ -249,7 +249,10 @@ impl DRepState { info!("Creating DRep query handler on '{drep_query_topic}'"); // Initalize state history - let history = Arc::new(Mutex::new(StateHistory::::new("DRepState"))); + let history = Arc::new(Mutex::new(StateHistory::::new( + "DRepState", + HistoryKind::BlockState, + ))); let history_run = history.clone(); let query_history = history.clone(); let ticker_history = history.clone(); diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 4b4268fa..b68f5b61 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -33,7 +33,7 @@ address-delta-topic = "cardano.address.delta" [module.spo-state] [module.spdd-state] -store-spdd = true +store-spdd = false [module.drep-state] # Enables /governance/dreps/{drep_id} endpoint (Requires store-delegators to be enabled)