Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ RUN touch contra-escrow-program/program/src/lib.rs contra-escrow-program/tests/i
test_utils/src/lib.rs scripts/devnet/src/lib.rs \
contra-escrow-program/clients/rust/src/lib.rs contra-withdraw-program/clients/rust/src/lib.rs \
core/src/lib.rs metrics/src/lib.rs auth/src/lib.rs && \
printf 'fn main() {}\n' > bench-tps/src/main.rs
printf 'fn main() {}\n' > bench-tps/src/main.rs && \
printf 'fn main() {}\n' > auth/src/main.rs

# Build the project with the dummy files. We can cache this layer.
RUN cargo build --release
Expand Down
4 changes: 3 additions & 1 deletion indexer/config/local/operator-contra.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ type = "postgres"
max_connections = 10

[operator]
poll_interval_secs = 5
poll_interval_secs = 1
batch_size = 10
retry_max_attempts = 3
retry_base_delay_secs = 1
channel_buffer_size = 100
# Solana block time is ~400 ms
confirmation_poll_interval_ms = 400
9 changes: 6 additions & 3 deletions indexer/config/local/operator-solana.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ type = "postgres"
max_connections = 10

[operator]
poll_interval_secs = 5
batch_size = 10
# Faster poll/batch settings suited to higher-throughput
poll_interval_secs = 1
batch_size = 100
retry_max_attempts = 3
retry_base_delay_secs = 1
channel_buffer_size = 100
channel_buffer_size = 1000
# Contra block time is 100 ms so poll quickly
confirmation_poll_interval_ms = 100
4 changes: 3 additions & 1 deletion indexer/config/railway/operator-contra.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ type = "postgres"
max_connections = 10

[operator]
poll_interval_secs = 5
poll_interval_secs = 1
batch_size = 10
retry_max_attempts = 3
retry_base_delay_secs = 1
channel_buffer_size = 100
# Solana mainnet block time is ~400 ms
confirmation_poll_interval_ms = 400
8 changes: 5 additions & 3 deletions indexer/config/railway/operator-solana.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ type = "postgres"
max_connections = 10

[operator]
poll_interval_secs = 5
batch_size = 10
poll_interval_secs = 1
batch_size = 100
retry_max_attempts = 3
retry_base_delay_secs = 1
channel_buffer_size = 100
channel_buffer_size = 1000
# Contra block time is ~100 ms; poll faster than the 400 ms default
confirmation_poll_interval_ms = 100
8 changes: 8 additions & 0 deletions indexer/src/bin/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use clap::{Parser, Subcommand};
use contra_indexer::config::DEFAULT_CONFIRMATION_POLL_INTERVAL_MS;
use contra_indexer::{
BackfillConfig, ContraIndexerConfig, DatasourceType, IndexerConfig, OperatorConfig,
PostgresConfig, ProgramType, ReconciliationConfig, RpcPollingConfig, StorageType,
Expand Down Expand Up @@ -96,6 +97,8 @@ struct OperatorSection {
reconciliation_webhook_url: Option<String>,
#[serde(default = "default_feepayer_monitor_interval_secs")]
feepayer_monitor_interval_secs: u64,
#[serde(default = "default_confirmation_poll_interval_ms")]
confirmation_poll_interval_ms: u64,
}

fn default_reconciliation_interval_secs() -> u64 {
Expand All @@ -110,6 +113,10 @@ fn default_feepayer_monitor_interval_secs() -> u64 {
60
}

fn default_confirmation_poll_interval_ms() -> u64 {
DEFAULT_CONFIRMATION_POLL_INTERVAL_MS
}

#[derive(Parser, Debug)]
#[command(name = "contra-indexer", about = "Index data from Contra programs")]
struct Args {
Expand Down Expand Up @@ -401,6 +408,7 @@ async fn run_operator(figment: Figment, verbose: bool) -> Result<(), Box<dyn std
reconciliation_tolerance_bps: operator.reconciliation_tolerance_bps,
reconciliation_webhook_url: operator.reconciliation_webhook_url,
feepayer_monitor_interval: Duration::from_secs(operator.feepayer_monitor_interval_secs),
confirmation_poll_interval_ms: operator.confirmation_poll_interval_ms,
};

// Validate signer configuration early (from environment variables)
Expand Down
13 changes: 13 additions & 0 deletions indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,17 @@ pub struct OperatorConfig {
/// How often to check the feepayer SOL balance (escrow operators only)
#[serde(default = "default_feepayer_monitor_interval")]
pub feepayer_monitor_interval: std::time::Duration,
/// Milliseconds between `getSignatureStatuses` polls when confirming a sent transaction.
/// Lower values reduce per-tx latency on Contra (~100 ms); higher values suit Solana
/// (~400 ms block time). Defaults to `DEFAULT_CONFIRMATION_POLL_INTERVAL_MS`.
#[serde(default = "default_confirmation_poll_interval_ms")]
pub confirmation_poll_interval_ms: u64,
}

/// Default poll interval for `confirmation_poll_interval_ms`, matching Solana's ~400 ms block time.
/// operator-solana overrides this to 100 ms since Contra confirms faster.
pub const DEFAULT_CONFIRMATION_POLL_INTERVAL_MS: u64 = 400;

fn default_reconciliation_interval() -> std::time::Duration {
std::time::Duration::from_secs(5 * 60) // 5 minutes
}
Expand All @@ -284,6 +293,10 @@ fn default_feepayer_monitor_interval() -> std::time::Duration {
std::time::Duration::from_secs(60)
}

fn default_confirmation_poll_interval_ms() -> u64 {
DEFAULT_CONFIRMATION_POLL_INTERVAL_MS
}

impl OperatorConfig {
/// Validate that required signers are configured
///
Expand Down
1 change: 1 addition & 0 deletions indexer/src/operator/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ mod tests {
reconciliation_tolerance_bps: 10,
reconciliation_webhook_url: None,
feepayer_monitor_interval: Duration::from_secs(60),
confirmation_poll_interval_ms: 400,
}
}

Expand Down
1 change: 1 addition & 0 deletions indexer/src/operator/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ pub async fn run(
sender_token,
sender_storage,
config.retry_max_attempts,
config.confirmation_poll_interval_ms,
sender_source_rpc,
)
.await
Expand Down
1 change: 1 addition & 0 deletions indexer/src/operator/reconciliation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ mod tests {
reconciliation_tolerance_bps: 10,
reconciliation_webhook_url: None,
feepayer_monitor_interval: std::time::Duration::from_secs(60),
confirmation_poll_interval_ms: 400,
}
}

Expand Down
1 change: 1 addition & 0 deletions indexer/src/operator/sender/mint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ pub(super) async fn try_jit_mint_initialization(
&sig,
CommitmentConfig::confirmed(),
&init_tx_builder.extra_error_checks_policy(),
state.confirmation_poll_interval_ms,
)
.await
{
Expand Down
5 changes: 5 additions & 0 deletions indexer/src/operator/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub async fn run_sender(
cancellation_token: tokio_util::sync::CancellationToken,
storage: Arc<Storage>,
retry_max_attempts: u32,
confirmation_poll_interval_ms: u64,
source_rpc_client: Option<Arc<RpcClientWithRetry>>,
) -> Result<(), OperatorError> {
info!("Starting sender");
Expand All @@ -53,6 +54,7 @@ pub async fn run_sender(
instance_pda,
storage,
retry_max_attempts,
confirmation_poll_interval_ms,
source_rpc_client,
)?;

Expand Down Expand Up @@ -123,6 +125,7 @@ pub async fn run_sender(
#[cfg(test)]
mod tests {
use super::*;
use crate::config::DEFAULT_CONFIRMATION_POLL_INTERVAL_MS;
use crate::config::{PostgresConfig, ProgramType, StorageType};
use crate::storage::common::storage::mock::MockStorage;
use crate::ContraIndexerConfig;
Expand Down Expand Up @@ -168,6 +171,7 @@ mod tests {
cancellation_token,
storage,
3,
DEFAULT_CONFIRMATION_POLL_INTERVAL_MS,
None,
)
.await;
Expand Down Expand Up @@ -200,6 +204,7 @@ mod tests {
cancellation_token,
storage,
3,
DEFAULT_CONFIRMATION_POLL_INTERVAL_MS,
None,
)
.await;
Expand Down
1 change: 1 addition & 0 deletions indexer/src/operator/sender/proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ mod tests {
mint_builders: HashMap::new(),
mint_cache: MintCache::new(storage),
retry_max_attempts: 3,
confirmation_poll_interval_ms: 400,
rotation_retry_queue: Vec::new(),
pending_rotation: None,
program_type: crate::config::ProgramType::Escrow,
Expand Down
3 changes: 3 additions & 0 deletions indexer/src/operator/sender/remint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ async fn attempt_remint(
&signature,
CommitmentConfig::confirmed(),
&ExtraErrorCheckPolicy::None,
state.confirmation_poll_interval_ms,
)
.await
.map_err(|e| format!("Failed to confirm remint transaction: {}", e))?;
Expand Down Expand Up @@ -342,6 +343,7 @@ mod tests {
mint_builders: HashMap::new(),
mint_cache: MintCache::new(storage),
retry_max_attempts: 3,
confirmation_poll_interval_ms: 400,
rotation_retry_queue: Vec::new(),
pending_rotation: None,
program_type: crate::config::ProgramType::Escrow,
Expand Down Expand Up @@ -384,6 +386,7 @@ mod tests {
mint_builders: HashMap::new(),
mint_cache: MintCache::new(storage),
retry_max_attempts: 3,
confirmation_poll_interval_ms: 400,
rotation_retry_queue: Vec::new(),
pending_rotation: None,
program_type: crate::config::ProgramType::Escrow,
Expand Down
15 changes: 14 additions & 1 deletion indexer/src/operator/sender/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ impl SenderState {
instance_pda: Option<Pubkey>,
storage: Arc<Storage>,
retry_max_attempts: u32,
confirmation_poll_interval_ms: u64,
source_rpc_client: Option<Arc<RpcClientWithRetry>>,
) -> Result<Self, OperatorError> {
// Initialize global RPC client with retry
Expand All @@ -52,6 +53,7 @@ impl SenderState {
mint_cache,
mint_builders: HashMap::new(),
retry_max_attempts,
confirmation_poll_interval_ms,
rotation_retry_queue: Vec::new(),
pending_rotation: None,
program_type: config.program_type,
Expand Down Expand Up @@ -349,6 +351,7 @@ mod tests {
mint_builders: HashMap::new(),
mint_cache: MintCache::new(storage),
retry_max_attempts: 3,
confirmation_poll_interval_ms: 400,
rotation_retry_queue: Vec::new(),
pending_rotation: None,
program_type: crate::config::ProgramType::Escrow,
Expand Down Expand Up @@ -801,6 +804,7 @@ mod tests {
mint_builders: HashMap::new(),
mint_cache: MintCache::new(storage),
retry_max_attempts: 3,
confirmation_poll_interval_ms: 400,
rotation_retry_queue: Vec::new(),
pending_rotation: None,
program_type: ProgramType::Escrow,
Expand Down Expand Up @@ -849,7 +853,15 @@ mod tests {
let storage = Arc::new(Storage::Mock(mock));
let config = make_config();

let result = SenderState::new(&config, CommitmentLevel::Confirmed, None, storage, 3, None);
let result = SenderState::new(
&config,
CommitmentLevel::Confirmed,
None,
storage,
3,
400,
None,
);

assert!(result.is_ok());
let state = result.unwrap();
Expand All @@ -874,6 +886,7 @@ mod tests {
Some(instance_pda),
storage,
5,
400,
None,
);

Expand Down
2 changes: 2 additions & 0 deletions indexer/src/operator/sender/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ pub(super) async fn send_and_confirm(
&signature,
commitment_config,
extra_error_checks_policy,
state.confirmation_poll_interval_ms,
)
.await;

Expand Down Expand Up @@ -764,6 +765,7 @@ mod tests {
mint_builders: HashMap::new(),
mint_cache: MintCache::new(storage),
retry_max_attempts: 3,
confirmation_poll_interval_ms: 400,
rotation_retry_queue: Vec::new(),
pending_rotation: None,
program_type: ProgramType::Escrow,
Expand Down
2 changes: 2 additions & 0 deletions indexer/src/operator/sender/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub struct SenderState {
pub mint_builders: HashMap<i64, MintToBuilder>,
pub mint_cache: MintCache,
pub retry_max_attempts: u32,
/// Milliseconds between `getSignatureStatuses` polls. Populated from `OperatorConfig`.
pub confirmation_poll_interval_ms: u64,
pub rotation_retry_queue: Vec<(TransactionContext, ReleaseFundsBuilder)>,
/// Pending ResetSmtRoot transaction waiting for in-flight txs to settle
pub pending_rotation: Option<Box<ResetSmtRootBuilder>>,
Expand Down
23 changes: 20 additions & 3 deletions indexer/src/operator/utils/rpc_util.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::rpc_client::GetConfirmedSignaturesForAddress2Config;
use solana_rpc_client_api::client_error;
use solana_rpc_client_api::client_error::ErrorKind;
use solana_rpc_client_api::config::RpcTransactionConfig;
use solana_rpc_client_api::request::RpcError;
use solana_sdk::account::Account;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::hash::Hash;
Expand Down Expand Up @@ -39,6 +41,18 @@ impl Default for RetryConfig {
}
}

/// Returns `true` for errors that will never succeed on retry.
///
/// `-32601` (Method not found) is a permanent protocol-level rejection; retrying
/// wastes the full backoff budget without any chance of recovery.
// TODO: remove once the RPC endpoint implements all required methods.
fn is_permanent_rpc_error(e: &client_error::Error) -> bool {
matches!(
e.kind(),
ErrorKind::RpcError(RpcError::RpcResponseError { code: -32601, .. })
)
}

pub struct RpcClientWithRetry {
pub rpc_client: Arc<RpcClient>,
pub retry_config: RetryConfig,
Expand Down Expand Up @@ -91,12 +105,15 @@ impl RpcClientWithRetry {
match f().await {
Ok(result) => return Ok(result),
Err(e) => {
if attempts >= self.retry_config.max_attempts {
let err: Box<client_error::Error> = e.into();
if attempts >= self.retry_config.max_attempts
|| is_permanent_rpc_error(&err)
{
warn!(
"{} failed after {} attempts: {}",
operation_name, attempts, e
operation_name, attempts, err
);
return Err(e.into());
return Err(err);
}

let delay = self.retry_config.base_delay * 2_u32.pow(attempts - 1);
Expand Down
Loading
Loading