diff --git a/rust/main/agents/scraper/migration/src/lib.rs b/rust/main/agents/scraper/migration/src/lib.rs index 2814b8cd676..478c6643e46 100644 --- a/rust/main/agents/scraper/migration/src/lib.rs +++ b/rust/main/agents/scraper/migration/src/lib.rs @@ -14,6 +14,7 @@ mod m20230309_000004_create_table_delivered_message; mod m20230309_000004_create_table_gas_payment; mod m20230309_000005_create_table_message; mod m20250224_000006_create_table_raw_message_dispatch; +mod m20260224_000007_add_cursor_type; pub struct Migrator; @@ -32,6 +33,7 @@ impl MigratorTrait for Migrator { Box::new(m20230309_000004_create_table_delivered_message::Migration), Box::new(m20230309_000005_create_table_message::Migration), Box::new(m20250224_000006_create_table_raw_message_dispatch::Migration), + Box::new(m20260224_000007_add_cursor_type::Migration), ] } } diff --git a/rust/main/agents/scraper/migration/src/m20260224_000007_add_cursor_type.rs b/rust/main/agents/scraper/migration/src/m20260224_000007_add_cursor_type.rs new file mode 100644 index 00000000000..8ffffd3d359 --- /dev/null +++ b/rust/main/agents/scraper/migration/src/m20260224_000007_add_cursor_type.rs @@ -0,0 +1,66 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let table = Cursor::Table.to_string(); + let cursor_type = Cursor::Stage.to_string(); + let domain = Cursor::Domain.to_string(); + let height = Cursor::Height.to_string(); + let index_name = "cursor_domain_type_height_idx"; + + manager + .get_connection() + .execute_unprepared(&format!( + r#" + ALTER TABLE "{table}" + ADD COLUMN IF NOT EXISTS "{cursor_type}" TEXT NOT NULL DEFAULT 'finalized' + "# + )) + .await?; + + manager + .get_connection() + .execute_unprepared(&format!( + r#" + CREATE INDEX IF NOT EXISTS "{index_name}" + ON "{table}" ("{domain}", "{cursor_type}", "{height}") + "# + )) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let table = Cursor::Table.to_string(); + let cursor_type = Cursor::Stage.to_string(); + let index_name = "cursor_domain_type_height_idx"; + + manager + .get_connection() + .execute_unprepared(&format!(r#"DROP INDEX IF EXISTS "{index_name}""#)) + .await?; + + manager + .get_connection() + .execute_unprepared(&format!( + r#"ALTER TABLE "{table}" DROP COLUMN IF EXISTS "{cursor_type}""# + )) + .await?; + + Ok(()) + } +} + +#[derive(Iden)] +enum Cursor { + Table, + Domain, + Height, + #[iden = "cursor_type"] + Stage, +} diff --git a/rust/main/agents/scraper/src/agent.rs b/rust/main/agents/scraper/src/agent.rs index 74c2b59bed3..b083ea5665d 100644 --- a/rust/main/agents/scraper/src/agent.rs +++ b/rust/main/agents/scraper/src/agent.rs @@ -4,21 +4,29 @@ use async_trait::async_trait; use derive_more::AsRef; use futures::future::try_join_all; use hyperlane_core::{ - rpc_clients::RPC_RETRY_SLEEP_DURATION, Delivery, HyperlaneDomain, HyperlaneMessage, - InterchainGasPayment, H512, + rpc_clients::RPC_RETRY_SLEEP_DURATION, Delivery, HyperlaneDomain, HyperlaneMessage, Indexer, + InterchainGasPayment, ReorgPeriod, H512, }; use tokio::{sync::mpsc::Receiver as MpscReceiver, task::JoinHandle, time::sleep}; use tracing::{info, info_span, instrument, trace, Instrument}; use hyperlane_base::{ - broadcast::BroadcastMpscSender, metrics::AgentMetrics, settings::IndexSettings, AgentMetadata, - BaseAgent, ChainMetrics, ChainSpecificMetricsUpdater, ContractSyncMetrics, ContractSyncer, - CoreMetrics, HyperlaneAgentCore, RuntimeMetrics, SyncOptions, + broadcast::BroadcastMpscSender, + metrics::AgentMetrics, + settings::{ChainConf, IndexSettings, SequenceIndexer, TryFromWithMetrics}, + AgentMetadata, BaseAgent, ChainMetrics, ChainSpecificMetricsUpdater, ContractSync, + ContractSyncMetrics, ContractSyncer, CoreMetrics, HyperlaneAgentCore, RuntimeMetrics, + SyncOptions, WatermarkContractSync, WatermarkLogStore, }; -use crate::{db::ScraperDb, settings::ScraperSettings, store::HyperlaneDbStore}; +use crate::{ + db::ScraperDb, + settings::ScraperSettings, + store::{HyperlaneDbStore, TipMessageStore}, +}; const CURSOR_INSTANTIATION_ATTEMPTS: usize = 10; +const MESSAGE_DISPATCH_TIP_LABEL: &str = "message_dispatch_tip"; /// A message explorer scraper agent #[derive(Debug, AsRef)] @@ -40,6 +48,7 @@ struct ChainScraper { index_settings: IndexSettings, store: HyperlaneDbStore, domain: HyperlaneDomain, + tip_chain_setup: Option, } #[async_trait] @@ -189,7 +198,7 @@ impl Scraper { let index_settings = scraper.index_settings.clone(); let domain = scraper.domain.clone(); - let mut tasks = Vec::with_capacity(2); + let mut tasks = Vec::with_capacity(4); let (message_indexer, maybe_broadcaster) = self .build_message_indexer( domain.clone(), @@ -214,16 +223,30 @@ impl Scraper { let gas_payment_indexer = self .build_interchain_gas_payment_indexer( - domain, + domain.clone(), self.core_metrics.clone(), self.contract_sync_metrics.clone(), - store, + store.clone(), index_settings.clone(), BroadcastMpscSender::::map_get_receiver(maybe_broadcaster.as_ref()).await, ) .await?; tasks.push(gas_payment_indexer); + if let Some(tip_chain_setup) = scraper.tip_chain_setup.as_ref() { + let tip_message_indexer = self + .build_tip_message_indexer( + domain, + self.core_metrics.clone(), + self.contract_sync_metrics.clone(), + store, + index_settings, + tip_chain_setup, + ) + .await?; + tasks.push(tip_message_indexer); + } + Ok(tokio::spawn( async move { try_join_all(tasks) @@ -244,6 +267,17 @@ impl Scraper { ) -> eyre::Result { info!(domain = domain.name(), "create chain scraper for domain"); let chain_setup = settings.chain_setup(domain)?; + let tip_chain_setup = if chain_setup.reorg_period.is_none() { + info!( + domain = domain.name(), + "No reorg period configured, skipping tip message indexer" + ); + None + } else { + let mut tip_chain_setup = chain_setup.clone(); + tip_chain_setup.reorg_period = ReorgPeriod::None; + Some(tip_chain_setup) + }; info!(domain = domain.name(), "create HyperlaneProvider"); let provider = chain_setup.build_provider(&metrics).await?.into(); info!(domain = domain.name(), "create HyperlaneDbStore"); @@ -261,6 +295,7 @@ impl Scraper { domain: domain.clone(), store, index_settings: chain_setup.index.clone(), + tip_chain_setup, }) } @@ -342,6 +377,90 @@ impl Scraper { Ok((task, maybe_broadcaser)) } + async fn build_tip_message_indexer( + &self, + domain: HyperlaneDomain, + metrics: Arc, + contract_sync_metrics: Arc, + finalized_store: HyperlaneDbStore, + index_settings: IndexSettings, + tip_chain_setup: &ChainConf, + ) -> eyre::Result> { + let label = MESSAGE_DISPATCH_TIP_LABEL; + let indexer = SequenceIndexer::::try_from_with_metrics( + tip_chain_setup, + &metrics, + true, + ) + .await + .map_err(|err| { + tracing::error!( + ?err, + domain = domain.name(), + label, + "Error building tip message indexer" + ); + err + })?; + + // With `reorg_period = None` on `tip_chain_setup`, this returns the latest tip height. + let tip_height = indexer.get_finalized_block_number().await.map_err(|err| { + tracing::error!( + ?err, + domain = domain.name(), + label, + "Error getting tip block height" + ); + err + })?; + + let tip_store = Arc::new( + TipMessageStore::new( + finalized_store.db.clone(), + domain.clone(), + finalized_store.mailbox_address, + // Seed tip cursor near-head on first run for responsiveness. + // Use tip-1 so the first sync pass still includes current tip block. + // Finalized pipeline remains authoritative for historical backfill. + tip_height.saturating_sub(1) as u64, + Some(contract_sync_metrics.stored_events.clone()), + ) + .await + .map_err(|err| { + tracing::error!( + ?err, + domain = domain.name(), + label, + "Error creating tip message store" + ); + err + })?, + ); + + let sync: Arc> = Arc::new(ContractSync::new( + domain.clone(), + tip_store as WatermarkLogStore<_>, + indexer, + contract_sync_metrics.as_ref().clone(), + false, + )); + + let cursor = sync.cursor(index_settings).await.map_err(|err| { + tracing::error!( + ?err, + domain = domain.name(), + label, + "Error getting tip cursor" + ); + err + })?; + + Ok(tokio::spawn( + async move { sync.sync(label, cursor.into()).await } + .instrument(info_span!("ChainContractSync", chain=%domain.name(), event=label)), + )) + } + async fn build_delivery_indexer( &self, domain: HyperlaneDomain, diff --git a/rust/main/agents/scraper/src/db/block_cursor.rs b/rust/main/agents/scraper/src/db/block_cursor.rs index 7d8fd7d5cfe..b93b3a95c8a 100644 --- a/rust/main/agents/scraper/src/db/block_cursor.rs +++ b/rust/main/agents/scraper/src/db/block_cursor.rs @@ -1,7 +1,7 @@ use std::time::{Duration, Instant}; use eyre::Result; -use sea_orm::{prelude::*, ActiveValue, Insert, Order, QueryOrder, QuerySelect}; +use sea_orm::{prelude::*, ActiveValue, ConnectionTrait, Insert, Order, QueryOrder, QuerySelect}; use tokio::sync::RwLock; use tracing::{debug, info, instrument, warn}; @@ -11,6 +11,24 @@ use super::generated::cursor; const MAX_WRITE_BACK_FREQUENCY: Duration = Duration::from_secs(10); +/// Distinguishes independent cursor streams for the same domain. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum CursorKind { + /// Cursor used by finalized/enriched scraping. + Finalized, + /// Cursor used by near-tip/raw scraping. + Tip, +} + +impl CursorKind { + fn as_str(self) -> &'static str { + match self { + Self::Finalized => "finalized", + Self::Tip => "tip", + } + } +} + #[derive(Debug)] struct BlockCursorInner { /// Block height @@ -27,26 +45,64 @@ pub struct BlockCursor { db: DbConn, /// The hyperlane domain this block cursor is for. domain: u32, + /// Which cursor stream this record belongs to. + cursor_kind: CursorKind, inner: RwLock, } impl BlockCursor { - async fn new(db: DbConn, domain: u32, default_height: u64) -> Result { + async fn new( + db: DbConn, + domain: u32, + default_height: u64, + cursor_kind: CursorKind, + ) -> Result { #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] enum QueryAs { Height, } - let height = (cursor::Entity::find()) + let height_with_cursor_type = (cursor::Entity::find()) .filter(cursor::Column::Domain.eq(domain)) + .filter(cursor::Column::CursorType.eq(cursor_kind.as_str())) .order_by(cursor::Column::Height, Order::Desc) .select_only() .column_as(cursor::Column::Height, QueryAs::Height) .into_values::() .one(&db) - .await? - .map(|h| h as u64) - .unwrap_or(default_height); + .await; + + let height = match height_with_cursor_type { + Ok(height) => height.map(|h| h as u64).unwrap_or(default_height), + Err(err) if should_fallback_to_legacy_cursor_query(&err) => { + warn!( + domain, + cursor_kind = ?cursor_kind, + error = ?err, + "cursor_type column missing, falling back to legacy cursor query" + ); + match cursor_kind { + CursorKind::Finalized => (cursor::Entity::find()) + .filter(cursor::Column::Domain.eq(domain)) + .order_by(cursor::Column::Height, Order::Desc) + .select_only() + .column_as(cursor::Column::Height, QueryAs::Height) + .into_values::() + .one(&db) + .await? + .map(|h| h as u64) + .unwrap_or(default_height), + CursorKind::Tip => { + warn!( + domain, + "Tip cursor persistence requires cursor_type migration; using default tip height" + ); + default_height + } + } + } + Err(err) => return Err(err.into()), + }; if height < default_height { warn!( height, @@ -59,6 +115,7 @@ impl BlockCursor { Ok(Self { db, domain, + cursor_kind, inner: RwLock::new(BlockCursorInner { height, last_saved_at: Instant::now(), @@ -86,21 +143,74 @@ impl BlockCursor { let model = cursor::ActiveModel { id: ActiveValue::NotSet, domain: ActiveValue::Set(self.domain as i32), + cursor_type: ActiveValue::Set(self.cursor_kind.as_str().to_owned()), time_created: ActiveValue::NotSet, height: ActiveValue::Set(height as i64), }; debug!(?model, "Inserting cursor"); - if let Err(e) = Insert::one(model).exec(&self.db).await { - warn!(error = ?e, "Failed to update database with new cursor. When you just started this, ensure that the migrations included this domain.") - } else { - debug!(cursor = ?*inner, "Updated cursor") + match Insert::one(model).exec(&self.db).await { + Ok(_) => debug!(cursor = ?*inner, "Updated cursor"), + Err(e) if should_fallback_to_legacy_cursor_query(&e) => match self.cursor_kind { + CursorKind::Finalized => { + warn!( + error = ?e, + domain = self.domain, + "cursor_type column missing, falling back to legacy finalized cursor insert" + ); + if let Err(legacy_err) = + insert_legacy_cursor_row(&self.db, self.domain, height).await + { + warn!( + error = ?legacy_err, + domain = self.domain, + "Failed to update database with legacy finalized cursor" + ); + } else { + debug!(cursor = ?*inner, "Updated cursor via legacy insert") + } + } + CursorKind::Tip => warn!( + error = ?e, + domain = self.domain, + "Tip cursor persistence requires cursor_type migration; skipping tip cursor write" + ), + }, + Err(e) => warn!( + error = ?e, + "Failed to update database with new cursor. When you just started this, ensure that the migrations included this domain." + ), } } } } impl ScraperDb { - pub async fn block_cursor(&self, domain: u32, default_height: u64) -> Result { - BlockCursor::new(self.clone_connection(), domain, default_height).await + pub async fn block_cursor( + &self, + domain: u32, + default_height: u64, + cursor_kind: CursorKind, + ) -> Result { + BlockCursor::new(self.clone_connection(), domain, default_height, cursor_kind).await } } + +fn should_fallback_to_legacy_cursor_query(err: &DbErr) -> bool { + let msg = err.to_string().to_lowercase(); + msg.contains("no such column: cursor_type") + || msg.contains("column cursor_type does not exist") + || msg.contains("column \"cursor_type\" does not exist") + || msg.contains("unknown column 'cursor_type'") +} + +async fn insert_legacy_cursor_row( + db: &DbConn, + domain: u32, + height: u64, +) -> std::result::Result<(), DbErr> { + db.execute_unprepared(&format!( + r#"INSERT INTO "cursor" ("domain", "height") VALUES ({domain}, {height})"# + )) + .await + .map(|_| ()) +} diff --git a/rust/main/agents/scraper/src/db/generated/cursor.rs b/rust/main/agents/scraper/src/db/generated/cursor.rs index 0db99f422ff..68ad1884abf 100644 --- a/rust/main/agents/scraper/src/db/generated/cursor.rs +++ b/rust/main/agents/scraper/src/db/generated/cursor.rs @@ -15,6 +15,7 @@ impl EntityName for Entity { pub struct Model { pub id: i64, pub domain: i32, + pub cursor_type: String, pub time_created: TimeDateTime, pub height: i64, } @@ -23,6 +24,7 @@ pub struct Model { pub enum Column { Id, Domain, + CursorType, TimeCreated, Height, } @@ -50,6 +52,7 @@ impl ColumnTrait for Column { match self { Self::Id => ColumnType::BigInteger.def(), Self::Domain => ColumnType::Integer.def(), + Self::CursorType => ColumnType::Text.def(), Self::TimeCreated => ColumnType::DateTime.def(), Self::Height => ColumnType::BigInteger.def(), } diff --git a/rust/main/agents/scraper/src/db/mod.rs b/rust/main/agents/scraper/src/db/mod.rs index ae848f1c2ab..4922068285b 100644 --- a/rust/main/agents/scraper/src/db/mod.rs +++ b/rust/main/agents/scraper/src/db/mod.rs @@ -1,5 +1,5 @@ pub use block::*; -pub use block_cursor::BlockCursor; +pub use block_cursor::{BlockCursor, CursorKind}; use eyre::Result; pub use message::*; pub use payment::*; diff --git a/rust/main/agents/scraper/src/store.rs b/rust/main/agents/scraper/src/store.rs index 58c55a24413..00dc3cc725c 100644 --- a/rust/main/agents/scraper/src/store.rs +++ b/rust/main/agents/scraper/src/store.rs @@ -1,6 +1,8 @@ pub use storage::HyperlaneDbStore; +pub use tip_dispatches::TipMessageStore; mod deliveries; mod dispatches; mod payments; mod storage; +mod tip_dispatches; diff --git a/rust/main/agents/scraper/src/store/storage.rs b/rust/main/agents/scraper/src/store/storage.rs index 8318f8ce5eb..1a5c11a6211 100644 --- a/rust/main/agents/scraper/src/store/storage.rs +++ b/rust/main/agents/scraper/src/store/storage.rs @@ -17,7 +17,7 @@ use hyperlane_core::{ HyperlaneWatermarkedLogStore, LogMeta, H256, H512, }; -use crate::db::{BasicBlock, BlockCursor, ScraperDb, StorableTxn}; +use crate::db::{BasicBlock, BlockCursor, CursorKind, ScraperDb, StorableTxn}; /// Maximum number of records to query at a time. This came about because when a /// lot of messages are sent in a short period of time we were ending up with a @@ -51,8 +51,12 @@ impl HyperlaneDbStore { stored_events_metric: Option, ) -> Result { let cursor = Arc::new( - db.block_cursor(domain.id(), index_settings.from as u64) - .await?, + db.block_cursor( + domain.id(), + index_settings.from as u64, + CursorKind::Finalized, + ) + .await?, ); Ok(Self { db, diff --git a/rust/main/agents/scraper/src/store/tip_dispatches.rs b/rust/main/agents/scraper/src/store/tip_dispatches.rs new file mode 100644 index 00000000000..ab326516a25 --- /dev/null +++ b/rust/main/agents/scraper/src/store/tip_dispatches.rs @@ -0,0 +1,95 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use eyre::Result; +use hyperlane_core::{ + HyperlaneDomain, HyperlaneLogStore, HyperlaneMessage, HyperlaneWatermarkedLogStore, Indexed, + LogMeta, H256, +}; +use prometheus::IntCounterVec; + +use crate::db::{BlockCursor, CursorKind, ScraperDb, StorableRawMessageDispatch}; + +/// Label for tip-stage raw dispatch metrics. +const RAW_MESSAGE_DISPATCH_TIP_LABEL: &str = "raw_message_dispatch_tip"; + +/// Message store for tip-stage scraping. +/// +/// This intentionally only stores raw dispatch rows because enriched tables are +/// finalized-state oriented and not reorg-resilient. +/// +/// Reorg behavior: +/// - if a message reappears at a different block/tx, upsert updates the existing row +/// - if a message is dropped by reorg and never re-included, a stale raw row can remain +/// +/// Finalized/enriched tables remain the authoritative source of truth. +#[derive(Clone, Debug)] +pub struct TipMessageStore { + db: ScraperDb, + domain: HyperlaneDomain, + mailbox_address: H256, + cursor: Arc, + stored_events_metric: Option, +} + +impl TipMessageStore { + pub async fn new( + db: ScraperDb, + domain: HyperlaneDomain, + mailbox_address: H256, + default_height: u64, + stored_events_metric: Option, + ) -> Result { + let cursor = Arc::new( + db.block_cursor(domain.id(), default_height, CursorKind::Tip) + .await?, + ); + Ok(Self { + db, + domain, + mailbox_address, + cursor, + stored_events_metric, + }) + } +} + +#[async_trait] +impl HyperlaneLogStore for TipMessageStore { + async fn store_logs(&self, messages: &[(Indexed, LogMeta)]) -> Result { + if messages.is_empty() { + return Ok(0); + } + + let raw_messages = messages + .iter() + .map(|(message, meta)| StorableRawMessageDispatch { + msg: message.inner(), + meta, + }); + let stored = self + .db + .store_raw_message_dispatches(self.domain.id(), &self.mailbox_address, raw_messages) + .await?; + + if let Some(metric) = self.stored_events_metric.as_ref() { + metric + .with_label_values(&[RAW_MESSAGE_DISPATCH_TIP_LABEL, self.domain.name()]) + .inc_by(stored); + } + + Ok(stored as u32) + } +} + +#[async_trait] +impl HyperlaneWatermarkedLogStore for TipMessageStore { + async fn retrieve_high_watermark(&self) -> Result> { + Ok(Some(self.cursor.height().await.try_into()?)) + } + + async fn store_high_watermark(&self, block_number: u32) -> Result<()> { + self.cursor.update(block_number.into()).await; + Ok(()) + } +} diff --git a/rust/main/utils/run-locally/src/cosmos/termination_invariants.rs b/rust/main/utils/run-locally/src/cosmos/termination_invariants.rs index f8036454cb9..c87f237d45a 100644 --- a/rust/main/utils/run-locally/src/cosmos/termination_invariants.rs +++ b/rust/main/utils/run-locally/src/cosmos/termination_invariants.rs @@ -2,10 +2,10 @@ use maplit::hashmap; -use crate::fetch_metric; use crate::invariants::provider_metrics_invariant_met; use crate::logging::log; use crate::metrics::agent_balance_sum; +use crate::{fetch_metric, fetch_metric_exact}; pub fn termination_invariants_met( relayer_metrics_port: u32, @@ -14,10 +14,10 @@ pub fn termination_invariants_met( starting_relayer_balance: f64, ) -> eyre::Result { let expected_gas_payments = messages_expected; - let gas_payments_event_count = fetch_metric( + let gas_payments_event_count = fetch_metric_exact( &relayer_metrics_port.to_string(), "hyperlane_contract_sync_stored_events", - &hashmap! {"data_type" => "gas_payment"}, + &hashmap! {"data_type" => "gas_payments"}, )? .iter() .sum::(); @@ -63,7 +63,7 @@ pub fn termination_invariants_met( return Ok(false); } - let dispatched_messages_scraped = fetch_metric( + let dispatched_messages_scraped = fetch_metric_exact( &scraper_metrics_port.to_string(), "hyperlane_contract_sync_stored_events", &hashmap! {"data_type" => "message_dispatch"}, @@ -79,7 +79,7 @@ pub fn termination_invariants_met( return Ok(false); } - let gas_payments_scraped = fetch_metric( + let gas_payments_scraped = fetch_metric_exact( &scraper_metrics_port.to_string(), "hyperlane_contract_sync_stored_events", &hashmap! {"data_type" => "gas_payment"}, @@ -95,7 +95,7 @@ pub fn termination_invariants_met( return Ok(false); } - let delivered_messages_scraped = fetch_metric( + let delivered_messages_scraped = fetch_metric_exact( &scraper_metrics_port.to_string(), "hyperlane_contract_sync_stored_events", &hashmap! {"data_type" => "message_delivery"}, diff --git a/rust/main/utils/run-locally/src/cosmosnative/mod.rs b/rust/main/utils/run-locally/src/cosmosnative/mod.rs index 41d56ee731a..b7f2f49e206 100644 --- a/rust/main/utils/run-locally/src/cosmosnative/mod.rs +++ b/rust/main/utils/run-locally/src/cosmosnative/mod.rs @@ -16,7 +16,7 @@ use tempfile::tempdir; use types::{AgentConfig, AgentConfigOut, Deployment}; use crate::{ - fetch_metric, log, + fetch_metric, fetch_metric_exact, log, metrics::agent_balance_sum, program::Program, utils::{as_task, concat_path, stop_child, AgentHandles, TaskHandle}, @@ -378,10 +378,10 @@ fn termination_invariants_met( starting_relayer_balance: f64, ) -> eyre::Result { let expected_gas_payments = messages_expected; - let gas_payments_event_count = fetch_metric( + let gas_payments_event_count = fetch_metric_exact( &relayer_metrics_port.to_string(), "hyperlane_contract_sync_stored_events", - &hashmap! {"data_type" => "gas_payment"}, + &hashmap! {"data_type" => "gas_payments"}, )? .iter() .sum::(); @@ -426,7 +426,7 @@ fn termination_invariants_met( return Ok(false); } - let dispatched_messages_scraped = fetch_metric( + let dispatched_messages_scraped = fetch_metric_exact( &scraper_metrics_port.to_string(), "hyperlane_contract_sync_stored_events", &hashmap! {"data_type" => "message_dispatch"}, @@ -442,7 +442,7 @@ fn termination_invariants_met( return Ok(false); } - let gas_payments_scraped = fetch_metric( + let gas_payments_scraped = fetch_metric_exact( &scraper_metrics_port.to_string(), "hyperlane_contract_sync_stored_events", &hashmap! {"data_type" => "gas_payment"}, @@ -458,7 +458,7 @@ fn termination_invariants_met( return Ok(false); } - let delivered_messages_scraped = fetch_metric( + let delivered_messages_scraped = fetch_metric_exact( &scraper_metrics_port.to_string(), "hyperlane_contract_sync_stored_events", &hashmap! {"data_type" => "message_delivery"}, diff --git a/rust/main/utils/run-locally/src/invariants/termination_invariants.rs b/rust/main/utils/run-locally/src/invariants/termination_invariants.rs index a01a840483f..67bb0d4c7b3 100644 --- a/rust/main/utils/run-locally/src/invariants/termination_invariants.rs +++ b/rust/main/utils/run-locally/src/invariants/termination_invariants.rs @@ -10,7 +10,9 @@ use crate::config::Config; use crate::logging::log; use crate::metrics::agent_balance_sum; use crate::utils::get_matching_lines; -use crate::{fetch_metric, AGENT_LOGGING_DIR, RELAYER_METRICS_PORT, SCRAPER_METRICS_PORT}; +use crate::{ + fetch_metric, fetch_metric_exact, AGENT_LOGGING_DIR, RELAYER_METRICS_PORT, SCRAPER_METRICS_PORT, +}; #[derive(Clone)] pub struct RelayerTerminationInvariantParams<'a> { @@ -238,7 +240,7 @@ pub fn scraper_termination_invariants_met( log!("Checking scraper termination invariants"); - let dispatched_messages_scraped = fetch_metric( + let dispatched_messages_scraped = fetch_metric_exact( SCRAPER_METRICS_PORT, "hyperlane_contract_sync_stored_events", &hashmap! {"data_type" => "message_dispatch"}, @@ -254,24 +256,36 @@ pub fn scraper_termination_invariants_met( return Ok(false); } - // Check raw message dispatches (stored without RPC dependencies for CCTP availability) - let raw_dispatches_scraped = fetch_metric( + // Check raw message dispatches (stored without RPC dependencies for CCTP availability). + // Tip and finalized stages both write to the same raw table; whichever stage inserts first + // gets the counter increment, the other stage may only upsert existing rows. + let raw_dispatches_scraped_finalized = fetch_metric_exact( SCRAPER_METRICS_PORT, "hyperlane_contract_sync_stored_events", &hashmap! {"data_type" => "raw_message_dispatch"}, )? .iter() .sum::(); + let raw_dispatches_scraped_tip = fetch_metric_exact( + SCRAPER_METRICS_PORT, + "hyperlane_contract_sync_stored_events", + &hashmap! {"data_type" => "raw_message_dispatch_tip"}, + )? + .iter() + .sum::(); + let raw_dispatches_scraped = raw_dispatches_scraped_finalized + raw_dispatches_scraped_tip; if raw_dispatches_scraped != total_messages_dispatched { log!( - "Scraper has scraped {} raw message dispatches, expected {}", + "Scraper has scraped {} raw message dispatches (finalized={}, tip={}), expected {}", raw_dispatches_scraped, + raw_dispatches_scraped_finalized, + raw_dispatches_scraped_tip, total_messages_dispatched, ); return Ok(false); } - let gas_payments_scraped = fetch_metric( + let gas_payments_scraped = fetch_metric_exact( SCRAPER_METRICS_PORT, "hyperlane_contract_sync_stored_events", &hashmap! {"data_type" => "gas_payment"}, @@ -287,7 +301,7 @@ pub fn scraper_termination_invariants_met( return Ok(false); } - let delivered_messages_scraped = fetch_metric( + let delivered_messages_scraped = fetch_metric_exact( SCRAPER_METRICS_PORT, "hyperlane_contract_sync_stored_events", &hashmap! {"data_type" => "message_delivery"}, diff --git a/rust/main/utils/run-locally/src/main.rs b/rust/main/utils/run-locally/src/main.rs index 312040f6b29..96b7f291fe7 100644 --- a/rust/main/utils/run-locally/src/main.rs +++ b/rust/main/utils/run-locally/src/main.rs @@ -31,7 +31,7 @@ use std::{ use ethers_contract::MULTICALL_ADDRESS; use hyperlane_core::{PendingOperationStatus, ReorgEvent, ReprepareReason, SubmitterType}; use logging::log; -pub use metrics::fetch_metric; +pub use metrics::{fetch_metric, fetch_metric_exact}; use once_cell::sync::Lazy; use program::Program; use relayer::msg::pending_message::{INVALIDATE_CACHE_METADATA_LOG, RETRIEVED_MESSAGE_LOG}; diff --git a/rust/main/utils/run-locally/src/metrics.rs b/rust/main/utils/run-locally/src/metrics.rs index 5f479bf0287..a15cd542cf8 100644 --- a/rust/main/utils/run-locally/src/metrics.rs +++ b/rust/main/utils/run-locally/src/metrics.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, error::Error as StdError, str::FromStr}; use eyre::{eyre, ErrReport, Result}; use maplit::hashmap; -/// Fetch a prometheus format metric, filtering by labels. +/// Fetch a prometheus format metric, filtering by label-value prefixes. pub fn fetch_metric(port: &str, metric: &str, labels: &HashMap<&str, &str>) -> Result> where T: FromStr, @@ -28,6 +28,33 @@ where .collect() } +/// Fetch a prometheus format metric, filtering by exact label-value matches. +pub fn fetch_metric_exact( + port: &str, + metric: &str, + labels: &HashMap<&str, &str>, +) -> Result> +where + T: FromStr, + E: Into + StdError + Send + Sync + 'static, +{ + let resp = ureq::get(&format!("http://127.0.0.1:{port}/metrics")); + resp.call()? + .into_string()? + .lines() + .filter(|l| l.starts_with(metric)) + .filter(|l| { + labels + .iter() + .all(|(k, v)| l.contains(&format!("{k}=\"{v}\""))) + }) + .map(|l| { + let value = l.rsplit_once(' ').ok_or(eyre!("Unknown metric format"))?.1; + Ok(value.parse::()?) + }) + .collect() +} + pub fn agent_balance_sum(metrics_port: u32) -> eyre::Result { let balance = fetch_metric( &metrics_port.to_string(), diff --git a/rust/main/utils/run-locally/src/starknet/mod.rs b/rust/main/utils/run-locally/src/starknet/mod.rs index 29d911b6372..16dfb7aafe1 100644 --- a/rust/main/utils/run-locally/src/starknet/mod.rs +++ b/rust/main/utils/run-locally/src/starknet/mod.rs @@ -15,7 +15,7 @@ use crate::program::Program; use crate::starknet::types::{AgentConfigOut, ValidatorConfig}; use crate::starknet::utils::{STARKNET_ACCOUNT, STARKNET_KEY}; use crate::utils::{as_task, concat_path, stop_child, AgentHandles, TaskHandle}; -use crate::{fetch_metric, AGENT_BIN_PATH}; +use crate::{fetch_metric, fetch_metric_exact, AGENT_BIN_PATH}; use self::cli::StarknetCLI; use self::source::{CodeSource, StarknetCLISource}; @@ -567,7 +567,7 @@ fn termination_invariants_met( let ending_relayer_balance: f64 = agent_balance_sum(relayer_metrics_port).unwrap(); - let dispatched_messages_scraped = fetch_metric( + let dispatched_messages_scraped = fetch_metric_exact( &scraper_metrics_port.to_string(), "hyperlane_contract_sync_stored_events", &hashmap! {"data_type" => "message_dispatch"}, @@ -583,7 +583,7 @@ fn termination_invariants_met( return Ok(false); } - let delivered_messages_scraped = fetch_metric( + let delivered_messages_scraped = fetch_metric_exact( &scraper_metrics_port.to_string(), "hyperlane_contract_sync_stored_events", &hashmap! {"data_type" => "message_delivery"},