Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
de67837
refactor: add fees to epoch_active_counter state to track fees from b…
golddydev Aug 20, 2025
09ea431
feat: add EpochState for pools history data in SPO state.
golddydev Aug 20, 2025
9d45285
refactor: enhance SPOState to manage vrf vkey hash epoch data
golddydev Aug 21, 2025
d886444
Merge branch 'main' into golddydev/prepare-state-for-pools-history
golddydev Aug 21, 2025
5068acf
refactor: optimize stake calculation in generate_spdd function
golddydev Aug 21, 2025
3b20eff
fix: format issue
golddydev Aug 21, 2025
b0ad202
feat: add SPO reward state management
golddydev Aug 21, 2025
d010b1c
refactor: remove logging from SPO stake distribution and epoch activi…
golddydev Aug 21, 2025
c8ed01d
feat: implement pool epoch history endpoint
golddydev Aug 22, 2025
4e7fdb7
refactor: remove reward state wronly implemented in account state
golddydev Aug 22, 2025
2bc48f5
chore: streamline SPDD generation in handle_spdd function
golddydev Aug 22, 2025
a79ebf4
Merge branch 'main' into golddydev/prepare-state-for-pools-history
golddydev Aug 24, 2025
9843b89
refactor: add delegators count to DelegatedState in SPDD message and …
golddydev Aug 24, 2025
6532ab0
refactor: add vrf_key_hashes mapping to BlockState
golddydev Aug 24, 2025
c862de1
feat: calculate and store active size in epoch state
golddydev Aug 24, 2025
22f6f69
feat: add PoolEpochStateRest struct and update pool history handling
golddydev Aug 24, 2025
d2c487c
chore: remove logs
golddydev Aug 24, 2025
39f0f8d
feat: add SPO rewards publisher and handler
golddydev Aug 26, 2025
751d6bb
fix: update documentation for SPO rewards publisher
golddydev Aug 26, 2025
bc74093
docs: update handle_spo_rewards documentation
golddydev Aug 26, 2025
74b50e5
feat: enhance pool history handling with latest epoch filtering
golddydev Aug 26, 2025
29cb3fd
refactor: rename variables for clarity in accounts state and SPO rewa…
golddydev Aug 27, 2025
b575ec3
chore: update spors to spo_rewards
golddydev Aug 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions common/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,15 @@ pub struct SPOStakeDistributionMessage {
pub spos: Vec<(KeyHash, DelegatedStake)>,
}

#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
pub struct SPORewardsMessage {
/// Epoch which has ended
pub epoch: u64,

/// SPO rewards by operator ID (total rewards before distribution, pool operator's rewards)
pub spos: Vec<(KeyHash, SPORewards)>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ProtocolParamsMessage {
pub params: ProtocolParams,
Expand Down Expand Up @@ -224,6 +233,7 @@ pub enum CardanoMessage {
// Stake distribution info
DRepStakeDistribution(DRepStakeDistributionMessage), // Info about drep stake
SPOStakeDistribution(SPOStakeDistributionMessage), // SPO delegation distribution (SPDD)
SPORewards(SPORewardsMessage), // SPO rewards distribution (SPRD)
StakeAddressDeltas(StakeAddressDeltasMessage), // Stake part of address deltas
}

Expand Down
6 changes: 4 additions & 2 deletions common/src/queries/pools.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::PoolRegistration;
use crate::{PoolEpochState, PoolRegistration};

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum PoolsStateQuery {
Expand Down Expand Up @@ -94,7 +94,9 @@ pub struct PoolsTotalBlocksMinted {
pub struct PoolInfo {}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PoolHistory {}
pub struct PoolHistory {
pub history: Vec<PoolEpochState>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PoolMetadata {}
Expand Down
25 changes: 25 additions & 0 deletions common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,18 @@ pub struct PoolRetirement {
pub epoch: u64,
}

/// Pool Epoch History Data
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PoolEpochState {
pub epoch: u64,
pub blocks_minted: u64,
pub active_stake: u64,
pub active_size: RationalNumber,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

active_size could be computed in the REST handler if total_active_stake was stored by epoch.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, that is why I used RationalNumber, decimal is only calculated on demand (rest handler)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current design is storing total_active_stake for each PoolEpochState as the denominator when this RationalNumber could be constructed in the REST handler.

pub delegators_count: u64,
pub pool_reward: u64,
pub spo_reward: u64,
}

/// Stake delegation data
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct StakeDelegation {
Expand All @@ -487,10 +499,23 @@ pub struct DelegatedStake {
/// Active stake - UTXO values only (used for reward calcs)
pub active: Lovelace,

/// Active delegators count - delegators making active stakes (used for pool history)
pub active_delegators_count: u64,

/// Total 'live' stake - UTXO values and rewards (used for VRF)
pub live: Lovelace,
}

/// SPO rewards data (for SPORewardsMessage)
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SPORewards {
/// Total rewards before distribution
pub total_rewards: Lovelace,

/// Pool operator's rewards
pub operator_rewards: Lovelace,
}

/// Genesis key delegation
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct GenesisKeyDelegation {
Expand Down
21 changes: 20 additions & 1 deletion modules/accounts_state/src/accounts_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ mod drep_distribution_publisher;
use drep_distribution_publisher::DRepDistributionPublisher;
mod spo_distribution_publisher;
use spo_distribution_publisher::SPODistributionPublisher;
mod spo_rewards_publisher;
use spo_rewards_publisher::SPORewardsPublisher;
mod state;
use state::State;
mod monetary;
Expand All @@ -39,6 +41,7 @@ const DEFAULT_STAKE_DELTAS_TOPIC: &str = "cardano.stake.deltas";
const DEFAULT_DREP_STATE_TOPIC: &str = "cardano.drep.state";
const DEFAULT_DREP_DISTRIBUTION_TOPIC: &str = "cardano.drep.distribution";
const DEFAULT_SPO_DISTRIBUTION_TOPIC: &str = "cardano.spo.distribution";
const DEFAULT_SPO_REWARDS_TOPIC: &str = "cardano.spo.rewards";
const DEFAULT_PROTOCOL_PARAMETERS_TOPIC: &str = "cardano.protocol.parameters";

const DEFAULT_HANDLE_SPDD_TOPIC: (&str, &str) = ("handle-topic-spdd", "rest.get.spdd");
Expand All @@ -61,6 +64,7 @@ impl AccountsState {
history: Arc<Mutex<StateHistory<State>>>,
mut drep_publisher: DRepDistributionPublisher,
mut spo_publisher: SPODistributionPublisher,
mut spo_rewards_publisher: SPORewardsPublisher,
mut spos_subscription: Box<dyn Subscription<Message>>,
mut ea_subscription: Box<dyn Subscription<Message>>,
mut certs_subscription: Box<dyn Subscription<Message>>,
Expand Down Expand Up @@ -196,11 +200,20 @@ impl AccountsState {
);
async {
Self::check_sync(&current_block, &block_info);
state
let spo_rewards = state
.handle_epoch_activity(ea_msg)
.await
.inspect_err(|e| error!("EpochActivity handling error: {e:#}"))
.ok();
// SPO rewards is for previous epoch
if let Some(spo_rewards) = spo_rewards {
if let Err(e) = spo_rewards_publisher
.publish_spo_rewards(block_info, spo_rewards)
.await
{
error!("Error publishing SPO rewards: {e:#}")
}
}
}
.instrument(span)
.await;
Expand Down Expand Up @@ -374,6 +387,10 @@ impl AccountsState {
.get_string("publish-spo-distribution-topic")
.unwrap_or(DEFAULT_SPO_DISTRIBUTION_TOPIC.to_string());

let spo_rewards_topic = config
.get_string("publish-spo-rewards-topic")
.unwrap_or(DEFAULT_SPO_REWARDS_TOPIC.to_string());

// REST handler topics
let handle_spdd_topic = config
.get_string(DEFAULT_HANDLE_SPDD_TOPIC.0)
Expand Down Expand Up @@ -489,6 +506,7 @@ impl AccountsState {
let drep_publisher =
DRepDistributionPublisher::new(context.clone(), drep_distribution_topic);
let spo_publisher = SPODistributionPublisher::new(context.clone(), spo_distribution_topic);
let spo_rewards_publisher = SPORewardsPublisher::new(context.clone(), spo_rewards_topic);

// Subscribe
let spos_subscription = context.subscribe(&spo_state_topic).await?;
Expand All @@ -506,6 +524,7 @@ impl AccountsState {
history,
drep_publisher,
spo_publisher,
spo_rewards_publisher,
spos_subscription,
ea_subscription,
certs_subscription,
Expand Down
12 changes: 11 additions & 1 deletion modules/accounts_state/src/rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::snapshot::{Snapshot, SnapshotSPO};
use acropolis_common::{
protocol_params::ShelleyParams, rational_number::RationalNumber, KeyHash, Lovelace,
RewardAccount,
RewardAccount, SPORewards,
};
use anyhow::{bail, Result};
use bigdecimal::{BigDecimal, One, ToPrimitive, Zero};
Expand All @@ -19,6 +19,9 @@ pub struct RewardsResult {

/// Rewards to be paid
pub rewards: Vec<(RewardAccount, Lovelace)>,

/// SPO rewards
pub spo_rewards: Vec<(KeyHash, SPORewards)>,
}

/// State for rewards calculation
Expand Down Expand Up @@ -241,6 +244,13 @@ impl RewardsState {
costs.to_u64().unwrap_or(0)
};
result.rewards.push((spo.reward_account.clone(), spo_benefit));
result.spo_rewards.push((
operator_id.clone(),
SPORewards {
total_rewards: pool_rewards.to_u64().unwrap_or(0),
operator_rewards: spo_benefit,
},
));
result.total_paid += spo_benefit;
*total_paid_to_pools += spo_benefit;
}
Expand Down
41 changes: 41 additions & 0 deletions modules/accounts_state/src/spo_rewards_publisher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use acropolis_common::messages::{CardanoMessage, Message, SPORewardsMessage};
use acropolis_common::{BlockInfo, KeyHash, SPORewards};
use caryatid_sdk::Context;
use std::sync::Arc;

/// Message publisher for Stake Pool Delegation Distribution (SPDD)
pub struct SPORewardsPublisher {
/// Module context
context: Arc<Context<Message>>,

/// Topic to publish on
topic: String,
}

impl SPORewardsPublisher {
/// Construct with context and topic to publish on
pub fn new(context: Arc<Context<Message>>, topic: String) -> Self {
Self { context, topic }
}

/// Publish the SPO rewards
pub async fn publish_spo_rewards(
&mut self,
block: &BlockInfo,
spo_rewards: Vec<(KeyHash, SPORewards)>,
) -> anyhow::Result<()> {
self.context
.message_bus
.publish(
&self.topic,
Arc::new(Message::Cardano((
block.clone(),
CardanoMessage::SPORewards(SPORewardsMessage {
epoch: block.epoch - 1, // End of previous epoch
spos: spo_rewards.into_iter().collect(),
}),
))),
)
.await
}
}
46 changes: 29 additions & 17 deletions modules/accounts_state/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use crate::monetary::calculate_monetary_change;
use crate::rewards::{RewardsResult, RewardsState};
use crate::snapshot::Snapshot;
use acropolis_common::SPORewards;
use acropolis_common::{
messages::{
DRepStateMessage, EpochActivityMessage, PotDeltasMessage, ProtocolParamsMessage,
Expand Down Expand Up @@ -382,10 +383,11 @@ impl State {

/// Derive the Stake Pool Delegation Distribution (SPDD) - a map of total stake values
/// (both with and without rewards) for each active SPO
/// And Stake Pool Reward State (rewards and delegators_count for each pool)
/// Key of returned map is the SPO 'operator' ID
pub fn generate_spdd(&self) -> BTreeMap<KeyHash, DelegatedStake> {
// Shareable Dashmap with referenced keys
let spo_stakes = Arc::new(DashMap::<KeyHash, DelegatedStake>::new());
let spo_stakes = DashMap::<KeyHash, DelegatedStake>::new();

// Total stake across all addresses in parallel, first collecting into a vector
// because imbl::OrdMap doesn't work in Rayon
Expand All @@ -402,20 +404,20 @@ impl State {
// Parallel sum all the stakes into the spo_stake map
sas_data
.par_iter() // Rayon multi-threaded iterator
.for_each_init(
|| Arc::clone(&spo_stakes),
|map, (spo, (utxo_value, rewards))| {
map.entry(spo.clone())
.and_modify(|v| {
v.active += *utxo_value;
v.live += *utxo_value + *rewards;
})
.or_insert(DelegatedStake {
active: *utxo_value,
live: *utxo_value + *rewards,
});
},
);
.for_each(|(spo, (utxo_value, rewards))| {
spo_stakes
.entry(spo.clone())
.and_modify(|v| {
v.active += *utxo_value;
v.active_delegators_count += 1;
v.live += *utxo_value + *rewards;
})
.or_insert(DelegatedStake {
active: *utxo_value,
active_delegators_count: 1,
live: *utxo_value + *rewards,
});
});

// Collect into a plain BTreeMap, so that it is ordered on output
spo_stakes.iter().map(|entry| (entry.key().clone(), entry.value().clone())).collect()
Expand Down Expand Up @@ -491,7 +493,12 @@ impl State {

/// Handle an EpochActivityMessage giving total fees and block counts by VRF key for
/// the just-ended epoch
pub async fn handle_epoch_activity(&mut self, ea_msg: &EpochActivityMessage) -> Result<()> {
/// This also returns SPO rewards for publishing to the SPDD topic (For epoch N)
pub async fn handle_epoch_activity(
&mut self,
ea_msg: &EpochActivityMessage,
) -> Result<Vec<(KeyHash, SPORewards)>> {
let mut spo_rewards: Vec<(KeyHash, SPORewards)> = Vec::new();
// Reverse map of VRF key to SPO operator ID
let vrf_to_operator: HashMap<KeyHash, KeyHash> =
self.spos.iter().map(|(id, spo)| (spo.vrf_key_hash.clone(), id.clone())).collect();
Expand Down Expand Up @@ -524,14 +531,19 @@ impl State {
self.add_to_reward(&account, amount);
}

// save SPO rewards
spo_rewards = reward_result.spo_rewards.into_iter().collect();

// Adjust the reserves for next time with amount actually paid
self.pots.reserves -= reward_result.total_paid;
}
_ => (),
}
};
// Enter epoch - note the message specifies the epoch that has just *ended*
self.enter_epoch(ea_msg.epoch + 1, ea_msg.total_fees, spo_block_counts)
self.enter_epoch(ea_msg.epoch + 1, ea_msg.total_fees, spo_block_counts)?;

Ok(spo_rewards)
}

/// Handle an SPOStateMessage with the full set of SPOs valid at the end of the last
Expand Down
Loading