Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rust/main/agents/scraper/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod m20230309_000004_create_table_delivered_message;
mod m20230309_000004_create_table_gas_payment;
mod m20230309_000005_create_table_message;
mod m20250224_000006_create_table_raw_message_dispatch;
mod m20260224_000007_add_cursor_type;

pub struct Migrator;

Expand All @@ -32,6 +33,7 @@ impl MigratorTrait for Migrator {
Box::new(m20230309_000004_create_table_delivered_message::Migration),
Box::new(m20230309_000005_create_table_message::Migration),
Box::new(m20250224_000006_create_table_raw_message_dispatch::Migration),
Box::new(m20260224_000007_add_cursor_type::Migration),
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let table = Cursor::Table.to_string();
let cursor_type = Cursor::Stage.to_string();
let domain = Cursor::Domain.to_string();
let height = Cursor::Height.to_string();
let index_name = "cursor_domain_type_height_idx";

manager
.get_connection()
.execute_unprepared(&format!(
r#"
ALTER TABLE "{table}"
ADD COLUMN IF NOT EXISTS "{cursor_type}" TEXT NOT NULL DEFAULT 'finalized'
"#
))
.await?;

manager
.get_connection()
.execute_unprepared(&format!(
r#"
CREATE INDEX IF NOT EXISTS "{index_name}"
ON "{table}" ("{domain}", "{cursor_type}", "{height}")
"#
))
.await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let table = Cursor::Table.to_string();
let cursor_type = Cursor::Stage.to_string();
let index_name = "cursor_domain_type_height_idx";

manager
.get_connection()
.execute_unprepared(&format!(r#"DROP INDEX IF EXISTS "{index_name}""#))
.await?;

manager
.get_connection()
.execute_unprepared(&format!(
r#"ALTER TABLE "{table}" DROP COLUMN IF EXISTS "{cursor_type}""#
))
.await?;

Ok(())
}
}

#[derive(Iden)]
enum Cursor {
Table,
Domain,
Height,
#[iden = "cursor_type"]
Stage,
}
137 changes: 128 additions & 9 deletions rust/main/agents/scraper/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,29 @@ use async_trait::async_trait;
use derive_more::AsRef;
use futures::future::try_join_all;
use hyperlane_core::{
rpc_clients::RPC_RETRY_SLEEP_DURATION, Delivery, HyperlaneDomain, HyperlaneMessage,
InterchainGasPayment, H512,
rpc_clients::RPC_RETRY_SLEEP_DURATION, Delivery, HyperlaneDomain, HyperlaneMessage, Indexer,
InterchainGasPayment, ReorgPeriod, H512,
};
use tokio::{sync::mpsc::Receiver as MpscReceiver, task::JoinHandle, time::sleep};
use tracing::{info, info_span, instrument, trace, Instrument};

use hyperlane_base::{
broadcast::BroadcastMpscSender, metrics::AgentMetrics, settings::IndexSettings, AgentMetadata,
BaseAgent, ChainMetrics, ChainSpecificMetricsUpdater, ContractSyncMetrics, ContractSyncer,
CoreMetrics, HyperlaneAgentCore, RuntimeMetrics, SyncOptions,
broadcast::BroadcastMpscSender,
metrics::AgentMetrics,
settings::{ChainConf, IndexSettings, SequenceIndexer, TryFromWithMetrics},
AgentMetadata, BaseAgent, ChainMetrics, ChainSpecificMetricsUpdater, ContractSync,
ContractSyncMetrics, ContractSyncer, CoreMetrics, HyperlaneAgentCore, RuntimeMetrics,
SyncOptions, WatermarkContractSync, WatermarkLogStore,
};

use crate::{db::ScraperDb, settings::ScraperSettings, store::HyperlaneDbStore};
use crate::{
db::ScraperDb,
settings::ScraperSettings,
store::{HyperlaneDbStore, TipMessageStore},
};

const CURSOR_INSTANTIATION_ATTEMPTS: usize = 10;
const MESSAGE_DISPATCH_TIP_LABEL: &str = "message_dispatch_tip";

/// A message explorer scraper agent
#[derive(Debug, AsRef)]
Expand All @@ -40,6 +48,7 @@ struct ChainScraper {
index_settings: IndexSettings,
store: HyperlaneDbStore,
domain: HyperlaneDomain,
tip_chain_setup: Option<ChainConf>,
}

#[async_trait]
Expand Down Expand Up @@ -189,7 +198,7 @@ impl Scraper {
let index_settings = scraper.index_settings.clone();
let domain = scraper.domain.clone();

let mut tasks = Vec::with_capacity(2);
let mut tasks = Vec::with_capacity(4);
let (message_indexer, maybe_broadcaster) = self
.build_message_indexer(
domain.clone(),
Expand All @@ -214,16 +223,30 @@ impl Scraper {

let gas_payment_indexer = self
.build_interchain_gas_payment_indexer(
domain,
domain.clone(),
self.core_metrics.clone(),
self.contract_sync_metrics.clone(),
store,
store.clone(),
index_settings.clone(),
BroadcastMpscSender::<H512>::map_get_receiver(maybe_broadcaster.as_ref()).await,
)
.await?;
tasks.push(gas_payment_indexer);

if let Some(tip_chain_setup) = scraper.tip_chain_setup.as_ref() {
let tip_message_indexer = self
.build_tip_message_indexer(
domain,
self.core_metrics.clone(),
self.contract_sync_metrics.clone(),
store,
index_settings,
tip_chain_setup,
)
.await?;
tasks.push(tip_message_indexer);
}

Ok(tokio::spawn(
async move {
try_join_all(tasks)
Expand All @@ -244,6 +267,17 @@ impl Scraper {
) -> eyre::Result<ChainScraper> {
info!(domain = domain.name(), "create chain scraper for domain");
let chain_setup = settings.chain_setup(domain)?;
let tip_chain_setup = if chain_setup.reorg_period.is_none() {
info!(
domain = domain.name(),
"No reorg period configured, skipping tip message indexer"
);
None
} else {
let mut tip_chain_setup = chain_setup.clone();
tip_chain_setup.reorg_period = ReorgPeriod::None;
Some(tip_chain_setup)
};
info!(domain = domain.name(), "create HyperlaneProvider");
let provider = chain_setup.build_provider(&metrics).await?.into();
info!(domain = domain.name(), "create HyperlaneDbStore");
Expand All @@ -261,6 +295,7 @@ impl Scraper {
domain: domain.clone(),
store,
index_settings: chain_setup.index.clone(),
tip_chain_setup,
})
}

Expand Down Expand Up @@ -342,6 +377,90 @@ impl Scraper {
Ok((task, maybe_broadcaser))
}

async fn build_tip_message_indexer(
&self,
domain: HyperlaneDomain,
metrics: Arc<CoreMetrics>,
contract_sync_metrics: Arc<ContractSyncMetrics>,
finalized_store: HyperlaneDbStore,
index_settings: IndexSettings,
tip_chain_setup: &ChainConf,
) -> eyre::Result<JoinHandle<()>> {
let label = MESSAGE_DISPATCH_TIP_LABEL;
let indexer = SequenceIndexer::<HyperlaneMessage>::try_from_with_metrics(
tip_chain_setup,
&metrics,
true,
)
.await
.map_err(|err| {
tracing::error!(
?err,
domain = domain.name(),
label,
"Error building tip message indexer"
);
err
})?;

// With `reorg_period = None` on `tip_chain_setup`, this returns the latest tip height.
let tip_height = indexer.get_finalized_block_number().await.map_err(|err| {
tracing::error!(
?err,
domain = domain.name(),
label,
"Error getting tip block height"
);
err
})?;

let tip_store = Arc::new(
TipMessageStore::new(
finalized_store.db.clone(),
domain.clone(),
finalized_store.mailbox_address,
// Seed tip cursor near-head on first run for responsiveness.
// Use tip-1 so the first sync pass still includes current tip block.
// Finalized pipeline remains authoritative for historical backfill.
tip_height.saturating_sub(1) as u64,
Some(contract_sync_metrics.stored_events.clone()),
)
.await
.map_err(|err| {
tracing::error!(
?err,
domain = domain.name(),
label,
"Error creating tip message store"
);
err
})?,
);

let sync: Arc<WatermarkContractSync<HyperlaneMessage>> = Arc::new(ContractSync::new(
domain.clone(),
tip_store as WatermarkLogStore<_>,
indexer,
contract_sync_metrics.as_ref().clone(),
false,
));

let cursor = sync.cursor(index_settings).await.map_err(|err| {
tracing::error!(
?err,
domain = domain.name(),
label,
"Error getting tip cursor"
);
err
})?;

Ok(tokio::spawn(
async move { sync.sync(label, cursor.into()).await }
.instrument(info_span!("ChainContractSync", chain=%domain.name(), event=label)),
))
}

async fn build_delivery_indexer(
&self,
domain: HyperlaneDomain,
Expand Down
Loading
Loading