diff --git a/keepers/stakenet-keeper/src/main.rs b/keepers/stakenet-keeper/src/main.rs index bdb327fb..bdf20943 100644 --- a/keepers/stakenet-keeper/src/main.rs +++ b/keepers/stakenet-keeper/src/main.rs @@ -6,7 +6,6 @@ It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is se use clap::Parser; use dotenvy::dotenv; use log::*; -use rand::Rng; use rusqlite::Connection; use solana_client::nonblocking::rpc_client::RpcClient; use solana_metrics::set_host_id; @@ -15,12 +14,12 @@ use stakenet_keeper::{ operations::{ self, block_metadata::db::create_sqlite_tables, - keeper_operations::{set_flag, KeeperCreates, KeeperOperations}, + keeper_operations::{set_flag, KeeperOperations}, }, state::{ keeper_config::{Args, KeeperConfig}, keeper_state::{KeeperFlag, KeeperState}, - update_state::{create_missing_accounts, post_create_update, pre_create_update}, + operation::{OperationQueue, OperationState}, }, }; use std::{process::Command, sync::Arc, time::Duration}; @@ -30,6 +29,10 @@ use tokio::time::sleep; fn set_run_flags(args: &Args) -> u32 { let mut run_flags = 0; + run_flags = set_flag(run_flags, KeeperOperations::PreCreateUpdate); + run_flags = set_flag(run_flags, KeeperOperations::CreateMissingAccounts); + run_flags = set_flag(run_flags, KeeperOperations::PostCreateUpdate); + if args.run_cluster_history { run_flags = set_flag(run_flags, KeeperOperations::ClusterHistory); } @@ -60,9 +63,6 @@ fn set_run_flags(args: &Args) -> u32 { if args.run_priority_fee_commission { run_flags = set_flag(run_flags, KeeperOperations::PriorityFeeCommission); } - if args.run_directed_staking { - run_flags = set_flag(run_flags, KeeperOperations::DirectedStaking); - } run_flags } @@ -72,18 +72,6 @@ fn should_clear_startup_flag(tick: u64, intervals: &[u64]) -> bool { tick % (max_interval + 1) == 0 } -fn should_emit(tick: u64, intervals: &[u64]) -> bool { - intervals.iter().any(|interval| tick % (interval + 1) == 0) -} - -fn should_update(tick: u64, intervals: &[u64]) -> bool { - intervals.iter().any(|interval| tick % interval == 0) -} - -fn should_fire(tick: u64, interval: u64) -> bool { - tick % interval == 0 -} - fn advance_tick(tick: &mut u64) { *tick += 1; } @@ -93,15 +81,6 @@ async fn sleep_and_tick(tick: &mut u64) { advance_tick(tick); } -/// To reduce transaction collisions, we sleep a random amount after any emit -async fn random_cooldown(range: u8) { - let mut rng = rand::thread_rng(); - let sleep_duration = rng.gen_range(0..=60 * (range as u64 + 1)); - - info!("\n\n⏰ Cooldown for {} seconds\n", sleep_duration); - sleep(Duration::from_secs(sleep_duration)).await; -} - async fn run_keeper(keeper_config: KeeperConfig) { // Intervals let metrics_interval = keeper_config.metrics_interval; @@ -120,6 +99,24 @@ async fn run_keeper(keeper_config: KeeperConfig) { let mut keeper_state = KeeperState::default(); keeper_state.set_cluster_name(&keeper_config.cluster_name); + let mut operation_queue = OperationQueue::new( + keeper_config.validator_history_interval, + keeper_config.steward_interval, + keeper_config.block_metadata_interval, + keeper_config.metrics_interval, + keeper_config.run_flags, + ); + + info!( + "Operations: {}", + operation_queue + .tasks + .iter() + .map(|o| o.operation.to_string()) + .collect::>() + .join(",") + ); + let smallest_interval = intervals.iter().min().unwrap(); let mut tick: u64 = *smallest_interval; // 1 second ticks - start at metrics interval @@ -128,187 +125,56 @@ async fn run_keeper(keeper_config: KeeperConfig) { } loop { - // ---------------------- FETCH ----------------------------------- - // The fetch ( update ) functions fetch everything we need for the operations from the blockchain - // Additionally, this function will update the keeper state. If update fails - it will skip the fire functions. - if should_update(tick, &intervals) { - info!("Pre-fetching data for update...({})", tick); - match pre_create_update(&keeper_config, &mut keeper_state).await { - Ok(_) => { - keeper_state.increment_update_run_for_epoch(KeeperOperations::PreCreateUpdate); - } - Err(e) => { - error!("Failed to pre create update: {:?}", e); - - keeper_state - .increment_update_error_for_epoch(KeeperOperations::PreCreateUpdate); - - advance_tick(&mut tick); - continue; - } - } - - if keeper_config.pay_for_new_accounts { - info!("Creating missing accounts...({})", tick); - match create_missing_accounts(&keeper_config, &keeper_state).await { - Ok(new_accounts_created) => { - keeper_state.increment_update_run_for_epoch( - KeeperOperations::CreateMissingAccounts, - ); - - let total_txs: usize = - new_accounts_created.iter().map(|(_, txs)| txs).sum(); - keeper_state.increment_update_txs_for_epoch( - KeeperOperations::CreateMissingAccounts, - total_txs as u64, - ); - - new_accounts_created - .iter() - .for_each(|(operation, created_accounts)| { - keeper_state.increment_creations_for_epoch(( - operation.clone(), - *created_accounts as u64, - )); - }); - } - Err(e) => { - error!("Failed to create missing accounts: {:?}", e); - - keeper_state.increment_update_error_for_epoch( - KeeperOperations::CreateMissingAccounts, - ); - - advance_tick(&mut tick); - continue; - } - } - } - - info!("Post-fetching data for update...({})", tick); - match post_create_update(&keeper_config, &mut keeper_state).await { - Ok(_) => { - keeper_state.increment_update_run_for_epoch(KeeperOperations::PostCreateUpdate); - } - Err(e) => { - error!("Failed to post create update: {:?}", e); - - keeper_state - .increment_update_error_for_epoch(KeeperOperations::PostCreateUpdate); - - advance_tick(&mut tick); - continue; - } - } - } - - // ---------------------- FIRE ------------------------------------ - - // VALIDATOR HISTORY - if should_fire(tick, validator_history_interval) { - info!("Firing operations..."); - - info!("Updating cluster history..."); - keeper_state.set_runs_errors_and_txs_for_epoch( - operations::cluster_history::fire(&keeper_config, &keeper_state).await, - ); + if keeper_state + .check_last_update_epoch(keeper_config.client.clone()) + .await + { + info!("Epoch transition Steward cranking"); - info!("Updating copy vote accounts..."); keeper_state.set_runs_errors_txs_and_flags_for_epoch( - operations::vote_account::fire(&keeper_config, &keeper_state).await, + operations::steward::fire(&keeper_config, &keeper_state).await, ); - info!("Updating mev commission..."); - keeper_state.set_runs_errors_and_txs_for_epoch( - operations::mev_commission::fire(&keeper_config, &keeper_state).await, - ); + info!("Epoch transition Steward crank completed"); + } - info!("Updating mev earned..."); - keeper_state.set_runs_errors_and_txs_for_epoch( - operations::mev_earned::fire(&keeper_config, &keeper_state).await, - ); + operation_queue.mark_should_fire(tick); - if keeper_config.oracle_authority_keypair.is_some() { - info!("Updating stake accounts..."); - keeper_state.set_runs_errors_and_txs_for_epoch( - operations::stake_upload::fire(&keeper_config, &keeper_state).await, - ); - } + while let Some(task) = operation_queue.get_next_pending() { + let operation = task.operation; - if keeper_config.oracle_authority_keypair.is_some() - && keeper_config.gossip_entrypoints.is_some() + if let Err(e) = operation + .execute(&keeper_config, &mut keeper_state, &mut operation_queue) + .await { - info!("Updating gossip accounts..."); - keeper_state.set_runs_errors_and_txs_for_epoch( - operations::gossip_upload::fire(&keeper_config, &keeper_state).await, - ); + error!("Operation {operation:?} failed, stopping execution: {e:?}",); + break; } - info!("Updating priority fee commission..."); - keeper_state.set_runs_errors_and_txs_for_epoch( - operations::priority_fee_commission::fire(&keeper_config, &keeper_state).await, - ); - - if !keeper_state.keeper_flags.check_flag(KeeperFlag::Startup) { - random_cooldown(keeper_config.cool_down_range).await; - } - } + // After sending many tx, check `last_update_epoch` in stake pool + if operation.is_heavy_operation() + && keeper_state + .check_last_update_epoch(keeper_config.client.clone()) + .await + { + info!("Epoch transition Steward cranking"); - // STEWARD - if should_fire(tick, steward_interval) { - info!("Cranking Steward..."); - keeper_state.set_runs_errors_txs_and_flags_for_epoch( - operations::steward::fire(&keeper_config, &keeper_state).await, - ); + keeper_state.set_runs_errors_txs_and_flags_for_epoch( + operations::steward::fire(&keeper_config, &keeper_state).await, + ); - if !keeper_state.keeper_flags.check_flag(KeeperFlag::Startup) { - random_cooldown(keeper_config.cool_down_range).await; + info!("Epoch transition Steward crank completed"); } } - // PRIORITY FEE BLOCK METADATA - if should_fire(tick, block_metadata_interval) - && keeper_config - .priority_fee_oracle_authority_keypair - .is_some() - { - info!("Updating priority fee block metadata..."); - keeper_state.set_runs_errors_and_txs_for_epoch( - operations::block_metadata::operations::fire(&keeper_config, &keeper_state).await, - ); - } - - // ---------------------- EMIT --------------------------------- - - if should_fire(tick, metrics_interval) { - keeper_state.set_runs_errors_and_txs_for_epoch( - operations::metrics_emit::fire( - &keeper_config, - &keeper_state, - keeper_config.cluster_name.as_str(), - ) - .await, - ); + for task in operation_queue.tasks.iter() { + if matches!(task.state, OperationState::Failed) { + error!("Operation failed: {}", task.operation); + } } - if should_emit(tick, &intervals) { - info!("Emitting metrics..."); - keeper_state.emit(); - - KeeperOperations::emit( - &keeper_state.runs_for_epoch, - &keeper_state.errors_for_epoch, - &keeper_state.txs_for_epoch, - keeper_config.cluster_name.as_str(), - ); - - KeeperCreates::emit( - &keeper_state.created_accounts_for_epoch, - &keeper_state.cluster_name, - ); - } + operation_queue.reset_for_next_cycle(); - // ---------- CLEAR STARTUP ---------- if should_clear_startup_flag(tick, &intervals) { keeper_state.keeper_flags.unset_flag(KeeperFlag::Startup); } diff --git a/keepers/stakenet-keeper/src/operations/keeper_operations.rs b/keepers/stakenet-keeper/src/operations/keeper_operations.rs index ad2c7439..e996e2b5 100644 --- a/keepers/stakenet-keeper/src/operations/keeper_operations.rs +++ b/keepers/stakenet-keeper/src/operations/keeper_operations.rs @@ -1,4 +1,19 @@ +use std::time::Duration; + +use log::*; +use rand::Rng; use solana_metrics::datapoint_info; +use tokio::time::sleep; + +use crate::{ + operations, + state::{ + keeper_config::KeeperConfig, + keeper_state::{KeeperFlag, KeeperState}, + operation::OperationQueue, + update_state::{create_missing_accounts, post_create_update, pre_create_update}, + }, +}; #[derive(Clone)] pub enum KeeperCreates { @@ -26,7 +41,7 @@ impl KeeperCreates { } } -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum KeeperOperations { PreCreateUpdate, CreateMissingAccounts, @@ -41,7 +56,26 @@ pub enum KeeperOperations { EmitMetrics, BlockMetadataKeeper, PriorityFeeCommission, - DirectedStaking, +} + +impl std::fmt::Display for KeeperOperations { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + KeeperOperations::PreCreateUpdate => write!(f, "create_update"), + KeeperOperations::CreateMissingAccounts => write!(f, "create_missing_accounts"), + KeeperOperations::PostCreateUpdate => write!(f, "post_create_update"), + KeeperOperations::ClusterHistory => write!(f, "cluster_history"), + KeeperOperations::GossipUpload => write!(f, "gossip_upload"), + KeeperOperations::StakeUpload => write!(f, "stake_upload"), + KeeperOperations::VoteAccount => write!(f, "vote_account"), + KeeperOperations::MevEarned => write!(f, "mev_earned"), + KeeperOperations::MevCommission => write!(f, "mev_commission"), + KeeperOperations::Steward => write!(f, "steward"), + KeeperOperations::EmitMetrics => write!(f, "emit_metrics"), + KeeperOperations::BlockMetadataKeeper => write!(f, "block_metadata_keeper"), + KeeperOperations::PriorityFeeCommission => write!(f, "priority_fee_commission"), + } + } } pub fn set_flag(run_flags: u32, flag: KeeperOperations) -> u32 { @@ -59,6 +93,17 @@ pub fn check_flag(run_flags: u32, flag: KeeperOperations) -> bool { impl KeeperOperations { pub const LEN: usize = 13; + /// Returns true if the operation sends many transactions. + pub fn is_heavy_operation(&self) -> bool { + matches!( + self, + KeeperOperations::VoteAccount + | KeeperOperations::StakeUpload + | KeeperOperations::GossipUpload + | KeeperOperations::CreateMissingAccounts + ) + } + pub fn emit( runs_for_epoch: &[u64; KeeperOperations::LEN], errors_for_epoch: &[u64; KeeperOperations::LEN], @@ -270,4 +315,263 @@ impl KeeperOperations { "cluster" => cluster, ); } + + /// Execute operation + pub async fn execute( + &self, + keeper_config: &KeeperConfig, + keeper_state: &mut KeeperState, + operation_queue: &mut OperationQueue, + ) -> Result<(), Box> { + let operation = *self; + + log::info!("Executing operation: {operation:?}"); + + // Execute the operation + match self { + KeeperOperations::PreCreateUpdate => { + match pre_create_update(keeper_config, keeper_state).await { + Ok(_) => { + keeper_state.increment_update_run_for_epoch(operation); + operation_queue.mark_completed(operation); + } + Err(e) => { + error!("Failed to pre create update: {:?}", e); + keeper_state.increment_update_error_for_epoch(operation); + operation_queue.mark_failed(operation); + } + } + } + + KeeperOperations::CreateMissingAccounts => { + if keeper_config.pay_for_new_accounts { + match create_missing_accounts(keeper_config, keeper_state).await { + Ok(new_accounts_created) => { + keeper_state.increment_update_run_for_epoch(operation); + let total_txs: usize = + new_accounts_created.iter().map(|(_, txs)| txs).sum(); + keeper_state + .increment_update_txs_for_epoch(operation, total_txs as u64); + new_accounts_created.iter().for_each(|(op, created)| { + keeper_state + .increment_creations_for_epoch((op.clone(), *created as u64)); + }); + operation_queue.mark_completed(operation); + } + Err(e) => { + error!("Failed to create missing accounts: {e:?}"); + keeper_state.increment_update_error_for_epoch(operation); + operation_queue.mark_failed(operation); + } + } + } else { + operation_queue.mark_completed(operation); + } + } + + KeeperOperations::PostCreateUpdate => { + match post_create_update(keeper_config, keeper_state).await { + Ok(_) => { + keeper_state.increment_update_run_for_epoch(operation); + operation_queue.mark_completed(operation); + } + Err(e) => { + error!("Failed to post create update: {e:?}"); + keeper_state.increment_update_error_for_epoch(operation); + operation_queue.mark_failed(operation); + } + } + } + + KeeperOperations::ClusterHistory => { + let (operation, runs, errors, txs) = + operations::cluster_history::fire(keeper_config, keeper_state).await; + keeper_state.set_runs_errors_and_txs_for_epoch((operation, runs, errors, txs)); + + if errors > 0 { + operation_queue.mark_failed(operation); + } else { + operation_queue.mark_completed(operation); + } + } + + KeeperOperations::VoteAccount => { + let (operation, runs, errors, txs, keeper_flags) = + operations::vote_account::fire(keeper_config, keeper_state).await; + keeper_state.set_runs_errors_txs_and_flags_for_epoch(( + operation, + runs, + errors, + txs, + keeper_flags, + )); + + if errors > 0 { + operation_queue.mark_failed(operation); + } else { + operation_queue.mark_completed(operation); + } + } + + KeeperOperations::MevCommission => { + let (operation, runs, errors, txs) = + operations::mev_commission::fire(keeper_config, keeper_state).await; + keeper_state.set_runs_errors_and_txs_for_epoch((operation, runs, errors, txs)); + + if errors > 0 { + operation_queue.mark_failed(operation); + } else { + operation_queue.mark_completed(operation); + } + } + + KeeperOperations::MevEarned => { + let (operation, runs, errors, txs) = + operations::mev_earned::fire(keeper_config, keeper_state).await; + keeper_state.set_runs_errors_and_txs_for_epoch((operation, runs, errors, txs)); + + if errors > 0 { + operation_queue.mark_failed(operation); + } else { + operation_queue.mark_completed(operation); + } + } + + KeeperOperations::StakeUpload => { + if keeper_config.oracle_authority_keypair.is_some() { + let (operation, runs, errors, txs) = + operations::stake_upload::fire(keeper_config, keeper_state).await; + keeper_state.set_runs_errors_and_txs_for_epoch((operation, runs, errors, txs)); + + if errors > 0 { + operation_queue.mark_failed(operation); + } else { + operation_queue.mark_completed(operation); + } + } else { + operation_queue.mark_completed(operation); + } + } + + KeeperOperations::GossipUpload => { + if keeper_config.oracle_authority_keypair.is_some() + && keeper_config.gossip_entrypoints.is_some() + { + let (operation, runs, errors, txs) = + operations::gossip_upload::fire(keeper_config, keeper_state).await; + keeper_state.set_runs_errors_and_txs_for_epoch((operation, runs, errors, txs)); + + if errors > 0 { + operation_queue.mark_failed(operation); + } else { + operation_queue.mark_completed(operation); + } + } else { + operation_queue.mark_completed(operation); + } + } + + KeeperOperations::PriorityFeeCommission => { + let (operation, runs, errors, txs) = + operations::priority_fee_commission::fire(keeper_config, keeper_state).await; + keeper_state.set_runs_errors_and_txs_for_epoch((operation, runs, errors, txs)); + + if errors > 0 { + operation_queue.mark_failed(operation); + } else { + operation_queue.mark_completed(operation); + } + + // Cooldown after validator history operations complete + if !keeper_state.keeper_flags.check_flag(KeeperFlag::Startup) { + random_cooldown(keeper_config.cool_down_range).await; + } + } + + KeeperOperations::Steward => { + info!("Cranking Steward (normal interval)..."); + + let (operation, runs, errors, txs, keeper_flags) = + operations::steward::fire(keeper_config, keeper_state).await; + keeper_state.set_runs_errors_txs_and_flags_for_epoch(( + operation, + runs, + errors, + txs, + keeper_flags, + )); + + if errors > 0 { + operation_queue.mark_failed(operation); + } else { + operation_queue.mark_completed(operation); + } + + if !keeper_state.keeper_flags.check_flag(KeeperFlag::Startup) { + random_cooldown(keeper_config.cool_down_range).await; + } + } + + KeeperOperations::BlockMetadataKeeper => { + if keeper_config + .priority_fee_oracle_authority_keypair + .is_some() + { + let (operation, runs, errors, txs) = + operations::block_metadata::operations::fire(keeper_config, keeper_state) + .await; + keeper_state.set_runs_errors_and_txs_for_epoch((operation, runs, errors, txs)); + + if errors > 0 { + operation_queue.mark_failed(operation); + } else { + operation_queue.mark_completed(operation); + } + } else { + operation_queue.mark_completed(operation); + } + } + + KeeperOperations::EmitMetrics => { + let (operation, runs, errors, txs) = operations::metrics_emit::fire( + keeper_config, + keeper_state, + keeper_config.cluster_name.as_str(), + ) + .await; + keeper_state.set_runs_errors_and_txs_for_epoch((operation, runs, errors, txs)); + + keeper_state.emit(); + + KeeperOperations::emit( + &keeper_state.runs_for_epoch, + &keeper_state.errors_for_epoch, + &keeper_state.txs_for_epoch, + keeper_config.cluster_name.as_str(), + ); + + KeeperCreates::emit( + &keeper_state.created_accounts_for_epoch, + &keeper_state.cluster_name, + ); + + if errors > 0 { + operation_queue.mark_failed(operation); + } else { + operation_queue.mark_completed(operation); + } + } + } + + Ok(()) + } +} + +/// To reduce transaction collisions, we sleep a random amount after any emit +async fn random_cooldown(range: u8) { + let mut rng = rand::thread_rng(); + let sleep_duration = rng.gen_range(0..=60 * (range as u64 + 1)); + + info!("\n\n⏰ Cooldown for {} seconds\n", sleep_duration); + sleep(Duration::from_secs(sleep_duration)).await; } diff --git a/keepers/stakenet-keeper/src/state/keeper_config.rs b/keepers/stakenet-keeper/src/state/keeper_config.rs index acfad0ab..6dd242fe 100644 --- a/keepers/stakenet-keeper/src/state/keeper_config.rs +++ b/keepers/stakenet-keeper/src/state/keeper_config.rs @@ -237,10 +237,6 @@ pub struct Args { #[arg(long, env, default_value = "false")] pub run_priority_fee_commission: bool, - /// Run Directed Staking Operation - #[arg(long, env, default_value = "false")] - pub run_directed_staking: bool, - /// Number of epochs to look back for block metadata #[arg(long, env, default_value = "3")] pub lookback_epochs: u64, @@ -299,7 +295,6 @@ impl fmt::Display for Args { Redundant RPC URLs: {:?}\n\ Run Priority Fee Commission: {:?}\n\ Validator History Min Stake: {:?} lamports\n\ - Run Directed Staking Operation: {:?}\n\ -------------------------------", self.json_rpc_url, self.gossip_entrypoints, @@ -340,7 +335,6 @@ impl fmt::Display for Args { self.redundant_rpc_urls, self.run_priority_fee_commission, self.validator_history_min_stake, - self.run_directed_staking, ) } } diff --git a/keepers/stakenet-keeper/src/state/keeper_state.rs b/keepers/stakenet-keeper/src/state/keeper_state.rs index 0085306e..24536ccd 100644 --- a/keepers/stakenet-keeper/src/state/keeper_state.rs +++ b/keepers/stakenet-keeper/src/state/keeper_state.rs @@ -16,7 +16,9 @@ use stakenet_sdk::{ aggregate_accounts::{AllStewardAccounts, AllValidatorAccounts}, errors::JitoTransactionError, }, - utils::accounts::{get_directed_stake_meta, get_validator_history_address}, + utils::accounts::{ + get_directed_stake_meta, get_stake_pool_account, get_validator_history_address, + }, }; use validator_history::{ClusterHistory, ValidatorHistory}; @@ -127,7 +129,11 @@ pub struct KeeperState { pub all_active_validator_accounts: Option>, pub steward_progress_flags: StewardProgressFlags, pub cluster_name: String, + + /// The epoch when the stake pool was last updated + last_update_epoch_stake_pool: Option, } + impl KeeperState { pub fn update_identity_to_vote_map(&mut self) { self.identity_to_vote_map = self @@ -349,6 +355,35 @@ impl KeeperState { Ok(false) } + + /// Check `last_update_epoch` field in Stake Pool + /// + /// # Process + /// + /// 1. Check the field `last_update_epoch` in StakePool account + /// 2. If the `last_update_epoch` has updated, trigger steward operation + /// 3. Otherwise, continue next operation + pub async fn check_last_update_epoch(&mut self, client: Arc) -> bool { + if let Some(all_steward_accounts) = self.all_steward_accounts.as_ref() { + if let Ok(stake_pool) = + get_stake_pool_account(&client, &all_steward_accounts.stake_pool_address).await + { + match self.last_update_epoch_stake_pool.as_mut() { + Some(last_update_epoch) => { + let has_updated = stake_pool.last_update_epoch > *last_update_epoch; + *last_update_epoch = stake_pool.last_update_epoch; + return has_updated; + } + None => { + self.last_update_epoch_stake_pool = Some(stake_pool.last_update_epoch); + return false; + } + } + } + } + + false + } } impl Default for KeeperState { @@ -383,6 +418,7 @@ impl Default for KeeperState { all_active_validator_accounts: None, steward_progress_flags: StewardProgressFlags { flags: 0 }, cluster_name: String::new(), + last_update_epoch_stake_pool: None, } } } diff --git a/keepers/stakenet-keeper/src/state/mod.rs b/keepers/stakenet-keeper/src/state/mod.rs index c85a7a73..2caee95b 100644 --- a/keepers/stakenet-keeper/src/state/mod.rs +++ b/keepers/stakenet-keeper/src/state/mod.rs @@ -1,3 +1,4 @@ pub mod keeper_config; pub mod keeper_state; +pub mod operation; pub mod update_state; diff --git a/keepers/stakenet-keeper/src/state/operation.rs b/keepers/stakenet-keeper/src/state/operation.rs new file mode 100644 index 00000000..397cb5aa --- /dev/null +++ b/keepers/stakenet-keeper/src/state/operation.rs @@ -0,0 +1,314 @@ +//! Operation Queue Management +//! +//! This module provides a queue-based system for managing and executing keeper operations +//! in a controlled, sequential manner with support for interval-based scheduling and +//! epoch transition detection. +//! +//! # Architecture +//! +//! The queue system operates on a tick-based cycle (1 second per tick): +//! 1. **Mark Phase**: `mark_should_fire()` determines which operations should run based on their intervals +//! 2. **Execute Phase**: Operations are executed one-by-one via `get_next_pending()` +//! 3. **State Update**: After execution, operations are marked as Completed/Failed +//! 4. **Reset Phase**: Queue resets for the next cycle +//! +//! # Operation States +//! +//! - `Pending`: Operation should run this cycle and hasn't been executed yet +//! - `Completed`: Operation executed successfully this cycle +//! - `Failed`: Operation failed during execution +//! - `Skipped`: Operation should not run this cycle (based on interval) + +use crate::operations::keeper_operations::KeeperOperations; + +/// Represents the current execution state of an operation task. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum OperationState { + /// Operation should execute this cycle and hasn't been executed yet + Pending, + + /// Operation executed successfully in this cycle + Completed, + + /// Operation failed during execution in this cycle + Failed, + + /// Operation should not execute this cycle based on interval check + Skipped, +} + +/// Defines the interval category for an operation. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum IntervalType { + /// Operations that fire at `validator_history_interval` + /// (e.g., ClusterHistory, VoteAccount, MEV operations) + ValidatorHistory, + + /// Operations that fire at `steward_interval` + /// (e.g., Steward cranking operations) + Steward, + + /// Operations that fire at `block_metadata_interval` + /// (e.g., Priority fee block metadata) + BlockMetadata, + + /// Operations that fire at `metrics_interval` + /// (e.g., Metrics emission) + Metrics, +} + +/// Represents a single operation task in the execution queue. +#[derive(Debug, Clone)] +pub struct OperationTask { + /// The keeper operation to execute + pub operation: KeeperOperations, + + /// Current state of this task in the execution cycle + pub state: OperationState, + + /// Interval category that determines when this operation fires + pub interval_type: IntervalType, +} + +/// Queue for managing keeper operations execution order and state. +pub struct OperationQueue { + /// List of all operation tasks in execution order + pub tasks: Vec, + + /// Index of the next task to check in the current cycle. + current_index: usize, + + /// Interval in seconds for validator history operations + validator_history_interval: u64, + + /// Interval in seconds for steward operations + steward_interval: u64, + + /// Interval in seconds for block metadata operations + block_metadata_interval: u64, + + /// Interval in seconds for metrics operations + metrics_interval: u64, +} + +impl OperationQueue { + /// Creates a new operation queue based on run flags. + pub fn new( + validator_history_interval: u64, + steward_interval: u64, + block_metadata_interval: u64, + metrics_interval: u64, + run_flags: u32, + ) -> Self { + let mut tasks = Vec::new(); + + // Build tasks in execution order based on run_flags + + // Fetch operations + if run_flags & (1 << KeeperOperations::PreCreateUpdate as u32) != 0 { + tasks.push(OperationTask { + operation: KeeperOperations::PreCreateUpdate, + state: OperationState::Pending, + interval_type: IntervalType::ValidatorHistory, + }); + } + + if run_flags & (1 << KeeperOperations::CreateMissingAccounts as u32) != 0 { + tasks.push(OperationTask { + operation: KeeperOperations::CreateMissingAccounts, + state: OperationState::Pending, + interval_type: IntervalType::ValidatorHistory, + }); + } + + if run_flags & (1 << KeeperOperations::PostCreateUpdate as u32) != 0 { + tasks.push(OperationTask { + operation: KeeperOperations::PostCreateUpdate, + state: OperationState::Pending, + interval_type: IntervalType::ValidatorHistory, + }); + } + + // Validator history operations + if run_flags & (1 << KeeperOperations::ClusterHistory as u32) != 0 { + tasks.push(OperationTask { + operation: KeeperOperations::ClusterHistory, + state: OperationState::Pending, + interval_type: IntervalType::ValidatorHistory, + }); + } + + if run_flags & (1 << KeeperOperations::VoteAccount as u32) != 0 { + tasks.push(OperationTask { + operation: KeeperOperations::VoteAccount, + state: OperationState::Pending, + interval_type: IntervalType::ValidatorHistory, + }); + } + + if run_flags & (1 << KeeperOperations::MevCommission as u32) != 0 { + tasks.push(OperationTask { + operation: KeeperOperations::MevCommission, + state: OperationState::Pending, + interval_type: IntervalType::ValidatorHistory, + }); + } + + if run_flags & (1 << KeeperOperations::MevEarned as u32) != 0 { + tasks.push(OperationTask { + operation: KeeperOperations::MevEarned, + state: OperationState::Pending, + interval_type: IntervalType::ValidatorHistory, + }); + } + + if run_flags & (1 << KeeperOperations::StakeUpload as u32) != 0 { + tasks.push(OperationTask { + operation: KeeperOperations::StakeUpload, + state: OperationState::Pending, + interval_type: IntervalType::ValidatorHistory, + }); + } + + if run_flags & (1 << KeeperOperations::GossipUpload as u32) != 0 { + tasks.push(OperationTask { + operation: KeeperOperations::GossipUpload, + state: OperationState::Pending, + interval_type: IntervalType::ValidatorHistory, + }); + } + + if run_flags & (1 << KeeperOperations::PriorityFeeCommission as u32) != 0 { + tasks.push(OperationTask { + operation: KeeperOperations::PriorityFeeCommission, + state: OperationState::Pending, + interval_type: IntervalType::ValidatorHistory, + }); + } + + // Steward operation + if run_flags & (1 << KeeperOperations::Steward as u32) != 0 { + tasks.push(OperationTask { + operation: KeeperOperations::Steward, + state: OperationState::Pending, + interval_type: IntervalType::Steward, + }); + } + + // Block metadata operation + if run_flags & (1 << KeeperOperations::BlockMetadataKeeper as u32) != 0 { + tasks.push(OperationTask { + operation: KeeperOperations::BlockMetadataKeeper, + state: OperationState::Pending, + interval_type: IntervalType::BlockMetadata, + }); + } + + // Metrics operation + if run_flags & (1 << KeeperOperations::EmitMetrics as u32) != 0 { + tasks.push(OperationTask { + operation: KeeperOperations::EmitMetrics, + state: OperationState::Pending, + interval_type: IntervalType::Metrics, + }); + } + + Self { + tasks, + current_index: 0, + validator_history_interval, + steward_interval, + block_metadata_interval, + metrics_interval, + } + } + + /// Returns all configured intervals + fn get_all_intervals(&self) -> Vec { + vec![ + self.validator_history_interval, + self.metrics_interval, + self.steward_interval, + self.block_metadata_interval, + ] + } + + /// Checks if any interval matches for update operations. + fn should_update(&self, tick: u64) -> bool { + self.get_all_intervals() + .iter() + .any(|interval| tick % interval == 0) + } + + /// Checks if any interval matches for emit operations. + fn should_emit(&self, tick: u64) -> bool { + self.get_all_intervals() + .iter() + .any(|interval| tick % (interval + 1) == 0) + } + + /// Marks which operations should fire this cycle based on current tick and their intervals. + pub fn mark_should_fire(&mut self, tick: u64) { + let should_update = self.should_update(tick); + let should_emit = self.should_emit(tick); + + for task in self.tasks.iter_mut() { + let should_fire = match task.operation { + // Fetch operations use should_update logic + KeeperOperations::PreCreateUpdate + | KeeperOperations::CreateMissingAccounts + | KeeperOperations::PostCreateUpdate => should_update, + + // Metrics uses should_emit logic + KeeperOperations::EmitMetrics => should_emit, + + // All other operations use standard interval check + _ => { + let interval = match task.interval_type { + IntervalType::ValidatorHistory => self.validator_history_interval, + IntervalType::Steward => self.steward_interval, + IntervalType::BlockMetadata => self.block_metadata_interval, + IntervalType::Metrics => self.metrics_interval, + }; + tick % interval == 0 + } + }; + + task.state = if should_fire { + OperationState::Pending + } else { + OperationState::Skipped + }; + } + } + + /// Returns the next pending task in the queue. + pub fn get_next_pending(&mut self) -> Option<&mut OperationTask> { + for i in self.current_index..self.tasks.len() { + if self.tasks[i].state == OperationState::Pending { + self.current_index = i + 1; + return Some(&mut self.tasks[i]); + } + } + None + } + + /// Marks an operation as successfully completed. + pub fn mark_completed(&mut self, operation: KeeperOperations) { + if let Some(task) = self.tasks.iter_mut().find(|t| t.operation == operation) { + task.state = OperationState::Completed; + } + } + + /// Marks an operation as failed. + pub fn mark_failed(&mut self, operation: KeeperOperations) { + if let Some(task) = self.tasks.iter_mut().find(|t| t.operation == operation) { + task.state = OperationState::Failed; + } + } + + /// Resets the queue for the next execution cycle. + pub fn reset_for_next_cycle(&mut self) { + self.current_index = 0; + } +}