Skip to content

Commit

Permalink
sol msg verifier: Add concurrency for message processing with streams
Browse files Browse the repository at this point in the history
  • Loading branch information
eloylp committed Mar 4, 2024
1 parent 7c70659 commit dce45b0
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions ampd/src/handlers/solana_verify_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use async_trait::async_trait;
use connection_router::state::ChainName;
use cosmrs::cosmwasm::MsgExecuteContract;
use error_stack::ResultExt;
use futures::stream::FuturesOrdered;
use futures::{StreamExt, TryStreamExt};
use serde::Deserialize;
use solana_sdk::signature::Signature;
use solana_transaction_status::UiTransactionEncoding;
Expand Down Expand Up @@ -183,9 +185,14 @@ where
}

let mut votes: Vec<Vote> = Vec::new();
let mut ord_fut = FuturesOrdered::new();

for msg in messages {
votes.push(self.process_message(&msg, &source_gateway_address).await?);
messages
.iter()
.for_each(|msg| ord_fut.push_back(self.process_message(&msg, &source_gateway_address)));

while let Some(vote_result) = ord_fut.next().await {
votes.push(vote_result?) // If there is a failure, its due to a network error, so we abort this handler operation and all messages need to be processed again.
}

self.broadcast_votes(poll_id, votes).await
Expand Down

0 comments on commit dce45b0

Please sign in to comment.