Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rust/main/agents/scraper/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod m20230309_000004_create_table_delivered_message;
mod m20230309_000004_create_table_gas_payment;
mod m20230309_000005_create_table_message;
mod m20250224_000006_create_table_raw_message_dispatch;
mod m20260224_000007_add_cursor_type;

pub struct Migrator;

Expand All @@ -32,6 +33,7 @@ impl MigratorTrait for Migrator {
Box::new(m20230309_000004_create_table_delivered_message::Migration),
Box::new(m20230309_000005_create_table_message::Migration),
Box::new(m20250224_000006_create_table_raw_message_dispatch::Migration),
Box::new(m20260224_000007_add_cursor_type::Migration),
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Cursor::Table)
.add_column(
ColumnDef::new(Cursor::Stage)
.text()
.not_null()
.default("finalized"),
)
.to_owned(),
)
.await?;

manager
.create_index(
Index::create()
.table(Cursor::Table)
.name("cursor_domain_type_height_idx")
.col(Cursor::Domain)
.col(Cursor::Stage)
.col(Cursor::Height)
.index_type(IndexType::BTree)
.to_owned(),
)
.await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_index(
Index::drop()
.table(Cursor::Table)
.name("cursor_domain_type_height_idx")
.to_owned(),
)
.await?;

manager
.alter_table(
Table::alter()
.table(Cursor::Table)
.drop_column(Cursor::Stage)
.to_owned(),
)
.await?;

Ok(())
}
}

#[derive(Iden)]
enum Cursor {
Table,
Domain,
Height,
#[iden = "cursor_type"]
Stage,
}
137 changes: 128 additions & 9 deletions rust/main/agents/scraper/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,29 @@ use async_trait::async_trait;
use derive_more::AsRef;
use futures::future::try_join_all;
use hyperlane_core::{
rpc_clients::RPC_RETRY_SLEEP_DURATION, Delivery, HyperlaneDomain, HyperlaneMessage,
InterchainGasPayment, H512,
rpc_clients::RPC_RETRY_SLEEP_DURATION, Delivery, HyperlaneDomain, HyperlaneMessage, Indexer,
InterchainGasPayment, ReorgPeriod, H512,
};
use tokio::{sync::mpsc::Receiver as MpscReceiver, task::JoinHandle, time::sleep};
use tracing::{info, info_span, instrument, trace, Instrument};

use hyperlane_base::{
broadcast::BroadcastMpscSender, metrics::AgentMetrics, settings::IndexSettings, AgentMetadata,
BaseAgent, ChainMetrics, ChainSpecificMetricsUpdater, ContractSyncMetrics, ContractSyncer,
CoreMetrics, HyperlaneAgentCore, RuntimeMetrics, SyncOptions,
broadcast::BroadcastMpscSender,
metrics::AgentMetrics,
settings::{ChainConf, IndexSettings, SequenceIndexer, TryFromWithMetrics},
AgentMetadata, BaseAgent, ChainMetrics, ChainSpecificMetricsUpdater, ContractSync,
ContractSyncMetrics, ContractSyncer, CoreMetrics, HyperlaneAgentCore, RuntimeMetrics,
SyncOptions, WatermarkContractSync, WatermarkLogStore,
};

use crate::{db::ScraperDb, settings::ScraperSettings, store::HyperlaneDbStore};
use crate::{
db::ScraperDb,
settings::ScraperSettings,
store::{HyperlaneDbStore, TipMessageStore},
};

const CURSOR_INSTANTIATION_ATTEMPTS: usize = 10;
const MESSAGE_DISPATCH_TIP_LABEL: &str = "message_dispatch_tip";

/// A message explorer scraper agent
#[derive(Debug, AsRef)]
Expand All @@ -40,6 +48,7 @@ struct ChainScraper {
index_settings: IndexSettings,
store: HyperlaneDbStore,
domain: HyperlaneDomain,
tip_chain_setup: Option<ChainConf>,
}

#[async_trait]
Expand Down Expand Up @@ -189,7 +198,7 @@ impl Scraper {
let index_settings = scraper.index_settings.clone();
let domain = scraper.domain.clone();

let mut tasks = Vec::with_capacity(2);
let mut tasks = Vec::with_capacity(4);
let (message_indexer, maybe_broadcaster) = self
.build_message_indexer(
domain.clone(),
Expand All @@ -214,16 +223,30 @@ impl Scraper {

let gas_payment_indexer = self
.build_interchain_gas_payment_indexer(
domain,
domain.clone(),
self.core_metrics.clone(),
self.contract_sync_metrics.clone(),
store,
store.clone(),
index_settings.clone(),
BroadcastMpscSender::<H512>::map_get_receiver(maybe_broadcaster.as_ref()).await,
)
.await?;
tasks.push(gas_payment_indexer);

if let Some(tip_chain_setup) = scraper.tip_chain_setup.as_ref() {
let tip_message_indexer = self
.build_tip_message_indexer(
domain,
self.core_metrics.clone(),
self.contract_sync_metrics.clone(),
store,
index_settings,
tip_chain_setup,
)
.await?;
tasks.push(tip_message_indexer);
}

Ok(tokio::spawn(
async move {
try_join_all(tasks)
Expand All @@ -244,6 +267,17 @@ impl Scraper {
) -> eyre::Result<ChainScraper> {
info!(domain = domain.name(), "create chain scraper for domain");
let chain_setup = settings.chain_setup(domain)?;
let tip_chain_setup = if chain_setup.reorg_period.is_none() {
info!(
domain = domain.name(),
"No reorg period configured, skipping tip message indexer"
);
None
} else {
let mut tip_chain_setup = chain_setup.clone();
tip_chain_setup.reorg_period = ReorgPeriod::None;
Some(tip_chain_setup)
};
info!(domain = domain.name(), "create HyperlaneProvider");
let provider = chain_setup.build_provider(&metrics).await?.into();
info!(domain = domain.name(), "create HyperlaneDbStore");
Expand All @@ -261,6 +295,7 @@ impl Scraper {
domain: domain.clone(),
store,
index_settings: chain_setup.index.clone(),
tip_chain_setup,
})
}

Expand Down Expand Up @@ -342,6 +377,90 @@ impl Scraper {
Ok((task, maybe_broadcaser))
}

async fn build_tip_message_indexer(
&self,
domain: HyperlaneDomain,
metrics: Arc<CoreMetrics>,
contract_sync_metrics: Arc<ContractSyncMetrics>,
finalized_store: HyperlaneDbStore,
index_settings: IndexSettings,
tip_chain_setup: &ChainConf,
) -> eyre::Result<JoinHandle<()>> {
let label = MESSAGE_DISPATCH_TIP_LABEL;
let indexer = SequenceIndexer::<HyperlaneMessage>::try_from_with_metrics(
tip_chain_setup,
&metrics,
true,
)
.await
.map_err(|err| {
tracing::error!(
?err,
domain = domain.name(),
label,
"Error building tip message indexer"
);
err
})?;

// With `reorg_period = None` on `tip_chain_setup`, this returns the latest tip height.
let tip_height = indexer.get_finalized_block_number().await.map_err(|err| {
tracing::error!(
?err,
domain = domain.name(),
label,
"Error getting tip block height"
);
err
})?;

let tip_store = Arc::new(
TipMessageStore::new(
finalized_store.db.clone(),
domain.clone(),
finalized_store.mailbox_address,
// Seed tip cursor near-head on first run for responsiveness.
// Use tip-1 so the first sync pass still includes current tip block.
// Finalized pipeline remains authoritative for historical backfill.
tip_height.saturating_sub(1) as u64,
Some(contract_sync_metrics.stored_events.clone()),
)
.await
.map_err(|err| {
tracing::error!(
?err,
domain = domain.name(),
label,
"Error creating tip message store"
);
err
})?,
);

let sync: Arc<WatermarkContractSync<HyperlaneMessage>> = Arc::new(ContractSync::new(
domain.clone(),
tip_store as WatermarkLogStore<_>,
indexer,
contract_sync_metrics.as_ref().clone(),
false,
));

let cursor = sync.cursor(index_settings).await.map_err(|err| {
tracing::error!(
?err,
domain = domain.name(),
label,
"Error getting tip cursor"
);
err
})?;

Ok(tokio::spawn(
async move { sync.sync(label, cursor.into()).await }
.instrument(info_span!("ChainContractSync", chain=%domain.name(), event=label)),
))
}

async fn build_delivery_indexer(
&self,
domain: HyperlaneDomain,
Expand Down
39 changes: 36 additions & 3 deletions rust/main/agents/scraper/src/db/block_cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,24 @@ use super::generated::cursor;

const MAX_WRITE_BACK_FREQUENCY: Duration = Duration::from_secs(10);

/// Distinguishes independent cursor streams for the same domain.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum CursorKind {
/// Cursor used by finalized/enriched scraping.
Finalized,
/// Cursor used by near-tip/raw scraping.
Tip,
}

impl CursorKind {
fn as_str(self) -> &'static str {
match self {
Self::Finalized => "finalized",
Self::Tip => "tip",
}
}
}

#[derive(Debug)]
struct BlockCursorInner {
/// Block height
Expand All @@ -27,18 +45,26 @@ pub struct BlockCursor {
db: DbConn,
/// The hyperlane domain this block cursor is for.
domain: u32,
/// Which cursor stream this record belongs to.
cursor_kind: CursorKind,
inner: RwLock<BlockCursorInner>,
}

impl BlockCursor {
async fn new(db: DbConn, domain: u32, default_height: u64) -> Result<Self> {
async fn new(
db: DbConn,
domain: u32,
default_height: u64,
cursor_kind: CursorKind,
) -> Result<Self> {
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
enum QueryAs {
Height,
}

let height = (cursor::Entity::find())
.filter(cursor::Column::Domain.eq(domain))
.filter(cursor::Column::CursorType.eq(cursor_kind.as_str()))
.order_by(cursor::Column::Height, Order::Desc)
.select_only()
.column_as(cursor::Column::Height, QueryAs::Height)
Expand All @@ -59,6 +85,7 @@ impl BlockCursor {
Ok(Self {
db,
domain,
cursor_kind,
inner: RwLock::new(BlockCursorInner {
height,
last_saved_at: Instant::now(),
Expand Down Expand Up @@ -86,6 +113,7 @@ impl BlockCursor {
let model = cursor::ActiveModel {
id: ActiveValue::NotSet,
domain: ActiveValue::Set(self.domain as i32),
cursor_type: ActiveValue::Set(self.cursor_kind.as_str().to_owned()),
time_created: ActiveValue::NotSet,
height: ActiveValue::Set(height as i64),
};
Expand All @@ -100,7 +128,12 @@ impl BlockCursor {
}

impl ScraperDb {
pub async fn block_cursor(&self, domain: u32, default_height: u64) -> Result<BlockCursor> {
BlockCursor::new(self.clone_connection(), domain, default_height).await
pub async fn block_cursor(
&self,
domain: u32,
default_height: u64,
cursor_kind: CursorKind,
) -> Result<BlockCursor> {
BlockCursor::new(self.clone_connection(), domain, default_height, cursor_kind).await
}
}
3 changes: 3 additions & 0 deletions rust/main/agents/scraper/src/db/generated/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ impl EntityName for Entity {
pub struct Model {
pub id: i64,
pub domain: i32,
pub cursor_type: String,
pub time_created: TimeDateTime,
pub height: i64,
}
Expand All @@ -23,6 +24,7 @@ pub struct Model {
pub enum Column {
Id,
Domain,
CursorType,
TimeCreated,
Height,
}
Expand Down Expand Up @@ -50,6 +52,7 @@ impl ColumnTrait for Column {
match self {
Self::Id => ColumnType::BigInteger.def(),
Self::Domain => ColumnType::Integer.def(),
Self::CursorType => ColumnType::Text.def(),
Self::TimeCreated => ColumnType::DateTime.def(),
Self::Height => ColumnType::BigInteger.def(),
}
Expand Down
Loading
Loading