Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 15 additions & 9 deletions crates/core/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
//! instruction handling within your application, benefiting from simplified
//! parsing and type management capabilities.

use {crate::instruction::DecodedInstruction, serde::Serialize};
use serde::Serialize;

/// A trait for defining collections of Solana instructions, enabling parsing
/// and type-based management.
Expand All @@ -62,16 +62,15 @@ use {crate::instruction::DecodedInstruction, serde::Serialize};
/// ### `parse_instruction`
///
/// This method is responsible for converting a Solana `Instruction` object into
/// a `DecodedInstruction` containing the custom type defined by the
/// implementor. The parsed instruction can then be processed within the
/// pipeline according to application-specific logic.
/// the custom instruction enum type defined by the implementor. The parsed
/// instruction can then be processed within the pipeline according to
/// application-specific logic.
///
/// - **Parameters**:
/// - `instruction`: A reference to a `solana_instruction::Instruction`,
/// representing the raw instruction to be decoded.
/// - **Returns**: An `Option<DecodedInstruction<Self>>` containing the decoded
/// instruction if successful, or `None` if parsing fails or the instruction
/// is unsupported.
/// - **Returns**: An `Option<Self>` containing the decoded instruction if
/// successful, or `None` if parsing fails or the instruction is unsupported.
///
/// ### `get_type`
///
Expand All @@ -92,12 +91,19 @@ use {crate::instruction::DecodedInstruction, serde::Serialize};
/// determines how raw instruction data is transformed into a decoded form,
/// impacting subsequent processing within the pipeline.
pub trait InstructionDecoderCollection:
Clone + std::fmt::Debug + Send + Sync + Eq + std::hash::Hash + Serialize + 'static
Clone
+ std::fmt::Debug
+ Send
+ Sync
+ Eq
+ std::hash::Hash
+ Serialize
+ 'static
{
type InstructionType: Clone + std::fmt::Debug + PartialEq + Eq + Send + Sync + 'static;

fn parse_instruction(
instruction: &solana_instruction::Instruction,
) -> Option<DecodedInstruction<Self>>;
) -> Option<Self>;
fn get_type(&self) -> Self::InstructionType;
}
6 changes: 5 additions & 1 deletion crates/core/src/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ pub fn extract_discriminator(length: usize, data: &[u8]) -> Option<(&[u8], &[u8]
/// - `ArrangedAccounts`: The output type representing the custom arrangement of
/// accounts.
pub trait ArrangeAccounts {
type ArrangedAccounts;
type ArrangedAccounts: Clone
+ Send
+ Sync
+ std::fmt::Debug
+ Unpin;

fn arrange_accounts(
accounts: &[solana_instruction::AccountMeta],
Expand Down
49 changes: 13 additions & 36 deletions crates/core/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ use {
metrics::MetricsCollection, processor::Processor, transaction::TransactionMetadata,
},
async_trait::async_trait,
serde::{Deserialize, Serialize},
solana_instruction::AccountMeta,
solana_pubkey::Pubkey,
std::{
ops::{Deref, DerefMut},
sync::Arc,
Expand Down Expand Up @@ -177,30 +174,6 @@ impl InstructionMetadata {

pub type InstructionsWithMetadata = Vec<(InstructionMetadata, solana_instruction::Instruction)>;

/// A decoded instruction containing program ID, data, and associated accounts.
///
/// The `DecodedInstruction` struct represents the outcome of decoding a raw
/// instruction, encapsulating its program ID, parsed data, and the accounts
/// involved.
///
/// # Type Parameters
///
/// - `T`: The type representing the decoded data for the instruction.
///
/// # Fields
///
/// - `program_id`: The program ID that owns the instruction.
/// - `data`: The decoded data payload for the instruction, of type `T`.
/// - `accounts`: A vector of `AccountMeta`, representing the accounts involved
/// in the instruction.

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct DecodedInstruction<T> {
pub program_id: Pubkey,
pub data: T,
pub accounts: Vec<AccountMeta>,
}

/// A trait for decoding Solana instructions into a structured type.
///
/// Implement the `InstructionDecoder` trait for types that can decode raw
Expand All @@ -214,15 +187,15 @@ pub struct DecodedInstruction<T> {
///
/// # Required Methods
///
/// - `decode_instruction`: Decodes a raw Solana `Instruction` into a
/// `DecodedInstruction`.
/// - `decode_instruction`: Decodes a raw Solana `Instruction` into the
/// instruction enum directly.
pub trait InstructionDecoder<'a> {
type InstructionType;

fn decode_instruction(
&self,
instruction: &'a solana_instruction::Instruction,
) -> Option<DecodedInstruction<Self::InstructionType>>;
) -> Option<Self::InstructionType>;
}

/// The input type for the instruction processor.
Expand All @@ -233,7 +206,7 @@ pub trait InstructionDecoder<'a> {
#[derive(Debug)]
pub struct InstructionProcessorInputType<'a, T> {
pub metadata: &'a InstructionMetadata,
pub decoded_instruction: &'a DecodedInstruction<T>,
pub instruction: &'a T,
pub nested_instructions: &'a NestedInstructions,
pub raw_instruction: &'a solana_instruction::Instruction,
}
Expand Down Expand Up @@ -302,21 +275,21 @@ where
log::trace!("InstructionPipe::run(nested_instruction: {nested_instruction:?}, metrics)",);

let decode_start = std::time::Instant::now();
let decoded_instruction = self
let instruction = self
.decoder
.decode_instruction(&nested_instruction.instruction);
let decode_time_ns = decode_start.elapsed().as_nanos();
metrics
.record_histogram("instruction_decode_time_nanoseconds", decode_time_ns as f64)
.await?;

if let Some(decoded_instruction) = decoded_instruction {
if let Some(instruction) = instruction {
let process_start = std::time::Instant::now();
let process_metrics = metrics.clone();

let data = InstructionProcessorInputType {
metadata: &nested_instruction.metadata,
decoded_instruction: &decoded_instruction,
instruction: &instruction,
nested_instructions: &nested_instruction.inner_instructions,
raw_instruction: &nested_instruction.instruction,
};
Expand Down Expand Up @@ -505,7 +478,8 @@ impl UnsafeNestedBuilder {
mod tests {

use {
super::*, solana_instruction::Instruction, solana_transaction_status::TransactionStatusMeta,
super::*, solana_instruction::Instruction, solana_pubkey::Pubkey,
solana_transaction_status::TransactionStatusMeta,
};

fn create_instruction_with_metadata(
Expand All @@ -527,7 +501,10 @@ mod tests {
};
let instruction = Instruction {
program_id: Pubkey::new_unique(),
accounts: vec![AccountMeta::new(Pubkey::new_unique(), false)],
accounts: vec![solana_instruction::AccountMeta::new(
Pubkey::new_unique(),
false,
)],
data: vec![],
};
(metadata, instruction)
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1348,7 +1348,7 @@ impl PipelineBuilder {
filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
) -> Self
where
T: Send + Sync + 'static,
T: Send + Sync + crate::deserialize::ArrangeAccounts + 'static,
P: for<'a> Processor<InstructionProcessorInputType<'a, T>> + Send + Sync + 'static,
{
log::trace!(
Expand Down
19 changes: 5 additions & 14 deletions crates/core/src/postgres/processors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use solana_instruction::AccountMeta;
use std::sync::Arc;

use crate::{
Expand Down Expand Up @@ -140,23 +139,19 @@ impl<T, W> crate::processor::Processor<InstructionProcessorInputType<'_, T>>
for PostgresInstructionProcessor<T, W>
where
T: Clone + Send + Sync + 'static,
W: From<(T, InstructionMetadata, Vec<AccountMeta>)> + Upsert + Send + 'static,
W: From<(T, InstructionMetadata)> + Upsert + Send + 'static,
{
async fn process(
&mut self,
input: &InstructionProcessorInputType<'_, T>,
metrics: Arc<MetricsCollection>,
) -> CarbonResult<()> {
let metadata = input.metadata;
let decoded_instruction = input.decoded_instruction;
let instruction = input.instruction;

let start = std::time::Instant::now();

let wrapper = W::from((
decoded_instruction.data.clone(),
metadata.clone(),
decoded_instruction.accounts.clone(),
));
let wrapper = W::from((instruction.clone(), metadata.clone()));

match wrapper.upsert(&self.pool).await {
Ok(()) => {
Expand Down Expand Up @@ -206,13 +201,9 @@ where
metrics: Arc<MetricsCollection>,
) -> CarbonResult<()> {
let metadata = input.metadata;
let decoded_instruction = input.decoded_instruction;
let instruction = input.instruction;

let instruction_row = InstructionRow::from_parts(
decoded_instruction.data.clone(),
metadata.clone(),
decoded_instruction.accounts.clone(),
);
let instruction_row = InstructionRow::from_parts(instruction.clone(), metadata.clone());

let start = std::time::Instant::now();

Expand Down
26 changes: 8 additions & 18 deletions crates/core/src/postgres/rows.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use solana_instruction::AccountMeta;

use crate::{
account::AccountMetadata,
Expand Down Expand Up @@ -158,23 +157,17 @@ pub struct InstructionRow<
> {
#[sqlx(flatten)]
pub metadata: InstructionRowMetadata,
pub data: sqlx::types::Json<T>,
pub accounts: sqlx::types::Json<Vec<AccountMeta>>,
pub instruction: sqlx::types::Json<T>,
}

impl<
T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + Sync + Unpin + 'static,
> InstructionRow<T>
{
pub fn from_parts(
source: T,
metadata: InstructionMetadata,
accounts: Vec<AccountMeta>,
) -> Self {
pub fn from_parts(instruction: T, metadata: InstructionMetadata) -> Self {
Self {
metadata: metadata.clone().into(),
data: sqlx::types::Json(source),
accounts: sqlx::types::Json(accounts),
instruction: sqlx::types::Json(instruction),
}
}
}
Expand All @@ -185,13 +178,12 @@ impl<
> Insert for InstructionRow<T>
{
async fn insert(&self, pool: &sqlx::PgPool) -> CarbonResult<()> {
sqlx::query(r#"INSERT INTO instructions (__signature, __instruction_index, __stack_height, __slot, data, accounts) VALUES ($1, $2, $3, $4, $5, $6)"#)
sqlx::query(r#"INSERT INTO instructions (__signature, __instruction_index, __stack_height, __slot, instruction) VALUES ($1, $2, $3, $4, $5)"#)
.bind(self.metadata.signature.clone())
.bind(self.metadata.instruction_index)
.bind(self.metadata.stack_height)
.bind(self.metadata.slot.clone())
.bind(self.data.clone())
.bind(self.accounts.clone())
.bind(self.instruction.clone())
.execute(pool)
.await
.map_err(|e| crate::error::Error::Custom(e.to_string()))?;
Expand All @@ -205,13 +197,12 @@ impl<
> Upsert for InstructionRow<T>
{
async fn upsert(&self, pool: &sqlx::PgPool) -> CarbonResult<()> {
sqlx::query(r#"INSERT INTO instructions (__signature, __instruction_index, __stack_height, __slot, data, accounts) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (__signature, __instruction_index, __stack_height) DO UPDATE SET __slot = $4, data = $5, accounts = $6"#)
sqlx::query(r#"INSERT INTO instructions (__signature, __instruction_index, __stack_height, __slot, instruction) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (__signature, __instruction_index, __stack_height) DO UPDATE SET __slot = $4, instruction = $5"#)
.bind(self.metadata.signature.clone())
.bind(self.metadata.instruction_index)
.bind(self.metadata.stack_height)
.bind(self.metadata.slot.clone())
.bind(self.data.clone())
.bind(self.accounts.clone())
.bind(self.instruction.clone())
.execute(pool)
.await
.map_err(|e| crate::error::Error::Custom(e.to_string()))?;
Expand Down Expand Up @@ -272,8 +263,7 @@ impl sqlx_migrator::Operation<sqlx::Postgres> for InstructionRowMigrationOperati
__instruction_index BIGINT NOT NULL,
__stack_height BIGINT NOT NULL,
__slot NUMERIC,
data JSONB NOT NULL,
accounts JSONB NOT NULL,
instruction JSONB NOT NULL,
PRIMARY KEY (__signature, __instruction_index, __stack_height)
)"#,
)
Expand Down
Loading
Loading