From efe0231900180dba17b29bf52dc5976f79451b51 Mon Sep 17 00:00:00 2001 From: unbound Date: Fri, 5 Dec 2025 16:03:08 +0100 Subject: [PATCH] refactors instruction decoding and account arranging --- Cargo.lock | 22 +++- crates/core/src/collection.rs | 24 ++-- crates/core/src/deserialize.rs | 6 +- crates/core/src/instruction.rs | 49 ++----- crates/core/src/pipeline.rs | 2 +- crates/core/src/postgres/processors.rs | 19 +-- crates/core/src/postgres/rows.rs | 26 ++-- crates/core/src/schema.rs | 31 ++--- crates/core/src/transaction.rs | 5 +- crates/core/src/transformers.rs | 11 +- crates/macros/src/try_decode_ixs.rs | 81 ++++++------ crates/proc-macros/src/lib.rs | 28 ++-- examples/pipeline-bench/src/main.rs | 52 ++++++-- examples/pumpswap-alerts/Cargo.toml | 2 +- examples/pumpswap-alerts/src/main.rs | 124 ++++-------------- .../templates/graphqlQueryPageGeneric.njk | 14 +- .../renderer/templates/instructionsMod.njk | 44 +++---- .../templates/instructionsPostgresMod.njk | 20 +-- 18 files changed, 239 insertions(+), 321 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ae739c044..150d7fd8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2048,6 +2048,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "carbon-pump-amm-decoder" +version = "0.1.0" +dependencies = [ + "async-trait", + "base64 0.22.1", + "borsh", + "carbon-core", + "carbon-test-utils", + "juniper", + "serde", + "serde-big-array", + "serde_json", + "solana-account", + "solana-instruction", + "solana-pubkey 3.0.0", + "sqlx", + "sqlx_migrator", +] + [[package]] name = "carbon-pump-fees-decoder" version = "0.12.0" @@ -6676,7 +6696,7 @@ dependencies = [ "carbon-core", "carbon-helius-laserstream-datasource", "carbon-log-metrics", - "carbon-pump-swap-decoder", + "carbon-pump-amm-decoder", "dotenv", "env_logger", "log", diff --git a/crates/core/src/collection.rs b/crates/core/src/collection.rs index a0a5cbb36..2321cdd23 100644 --- a/crates/core/src/collection.rs +++ b/crates/core/src/collection.rs @@ -39,7 +39,7 @@ //! instruction handling within your application, benefiting from simplified //! parsing and type management capabilities. -use {crate::instruction::DecodedInstruction, serde::Serialize}; +use serde::Serialize; /// A trait for defining collections of Solana instructions, enabling parsing /// and type-based management. @@ -62,16 +62,15 @@ use {crate::instruction::DecodedInstruction, serde::Serialize}; /// ### `parse_instruction` /// /// This method is responsible for converting a Solana `Instruction` object into -/// a `DecodedInstruction` containing the custom type defined by the -/// implementor. The parsed instruction can then be processed within the -/// pipeline according to application-specific logic. +/// the custom instruction enum type defined by the implementor. The parsed +/// instruction can then be processed within the pipeline according to +/// application-specific logic. /// /// - **Parameters**: /// - `instruction`: A reference to a `solana_instruction::Instruction`, /// representing the raw instruction to be decoded. -/// - **Returns**: An `Option>` containing the decoded -/// instruction if successful, or `None` if parsing fails or the instruction -/// is unsupported. +/// - **Returns**: An `Option` containing the decoded instruction if +/// successful, or `None` if parsing fails or the instruction is unsupported. /// /// ### `get_type` /// @@ -92,12 +91,19 @@ use {crate::instruction::DecodedInstruction, serde::Serialize}; /// determines how raw instruction data is transformed into a decoded form, /// impacting subsequent processing within the pipeline. pub trait InstructionDecoderCollection: - Clone + std::fmt::Debug + Send + Sync + Eq + std::hash::Hash + Serialize + 'static + Clone + + std::fmt::Debug + + Send + + Sync + + Eq + + std::hash::Hash + + Serialize + + 'static { type InstructionType: Clone + std::fmt::Debug + PartialEq + Eq + Send + Sync + 'static; fn parse_instruction( instruction: &solana_instruction::Instruction, - ) -> Option>; + ) -> Option; fn get_type(&self) -> Self::InstructionType; } diff --git a/crates/core/src/deserialize.rs b/crates/core/src/deserialize.rs index e73da37e0..43db1abb5 100644 --- a/crates/core/src/deserialize.rs +++ b/crates/core/src/deserialize.rs @@ -99,7 +99,11 @@ pub fn extract_discriminator(length: usize, data: &[u8]) -> Option<(&[u8], &[u8] /// - `ArrangedAccounts`: The output type representing the custom arrangement of /// accounts. pub trait ArrangeAccounts { - type ArrangedAccounts; + type ArrangedAccounts: Clone + + Send + + Sync + + std::fmt::Debug + + Unpin; fn arrange_accounts( accounts: &[solana_instruction::AccountMeta], diff --git a/crates/core/src/instruction.rs b/crates/core/src/instruction.rs index 2266118f3..4b3aa0348 100644 --- a/crates/core/src/instruction.rs +++ b/crates/core/src/instruction.rs @@ -25,9 +25,6 @@ use { metrics::MetricsCollection, processor::Processor, transaction::TransactionMetadata, }, async_trait::async_trait, - serde::{Deserialize, Serialize}, - solana_instruction::AccountMeta, - solana_pubkey::Pubkey, std::{ ops::{Deref, DerefMut}, sync::Arc, @@ -177,30 +174,6 @@ impl InstructionMetadata { pub type InstructionsWithMetadata = Vec<(InstructionMetadata, solana_instruction::Instruction)>; -/// A decoded instruction containing program ID, data, and associated accounts. -/// -/// The `DecodedInstruction` struct represents the outcome of decoding a raw -/// instruction, encapsulating its program ID, parsed data, and the accounts -/// involved. -/// -/// # Type Parameters -/// -/// - `T`: The type representing the decoded data for the instruction. -/// -/// # Fields -/// -/// - `program_id`: The program ID that owns the instruction. -/// - `data`: The decoded data payload for the instruction, of type `T`. -/// - `accounts`: A vector of `AccountMeta`, representing the accounts involved -/// in the instruction. - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct DecodedInstruction { - pub program_id: Pubkey, - pub data: T, - pub accounts: Vec, -} - /// A trait for decoding Solana instructions into a structured type. /// /// Implement the `InstructionDecoder` trait for types that can decode raw @@ -214,15 +187,15 @@ pub struct DecodedInstruction { /// /// # Required Methods /// -/// - `decode_instruction`: Decodes a raw Solana `Instruction` into a -/// `DecodedInstruction`. +/// - `decode_instruction`: Decodes a raw Solana `Instruction` into the +/// instruction enum directly. pub trait InstructionDecoder<'a> { type InstructionType; fn decode_instruction( &self, instruction: &'a solana_instruction::Instruction, - ) -> Option>; + ) -> Option; } /// The input type for the instruction processor. @@ -233,7 +206,7 @@ pub trait InstructionDecoder<'a> { #[derive(Debug)] pub struct InstructionProcessorInputType<'a, T> { pub metadata: &'a InstructionMetadata, - pub decoded_instruction: &'a DecodedInstruction, + pub instruction: &'a T, pub nested_instructions: &'a NestedInstructions, pub raw_instruction: &'a solana_instruction::Instruction, } @@ -302,7 +275,7 @@ where log::trace!("InstructionPipe::run(nested_instruction: {nested_instruction:?}, metrics)",); let decode_start = std::time::Instant::now(); - let decoded_instruction = self + let instruction = self .decoder .decode_instruction(&nested_instruction.instruction); let decode_time_ns = decode_start.elapsed().as_nanos(); @@ -310,13 +283,13 @@ where .record_histogram("instruction_decode_time_nanoseconds", decode_time_ns as f64) .await?; - if let Some(decoded_instruction) = decoded_instruction { + if let Some(instruction) = instruction { let process_start = std::time::Instant::now(); let process_metrics = metrics.clone(); let data = InstructionProcessorInputType { metadata: &nested_instruction.metadata, - decoded_instruction: &decoded_instruction, + instruction: &instruction, nested_instructions: &nested_instruction.inner_instructions, raw_instruction: &nested_instruction.instruction, }; @@ -505,7 +478,8 @@ impl UnsafeNestedBuilder { mod tests { use { - super::*, solana_instruction::Instruction, solana_transaction_status::TransactionStatusMeta, + super::*, solana_instruction::Instruction, solana_pubkey::Pubkey, + solana_transaction_status::TransactionStatusMeta, }; fn create_instruction_with_metadata( @@ -527,7 +501,10 @@ mod tests { }; let instruction = Instruction { program_id: Pubkey::new_unique(), - accounts: vec![AccountMeta::new(Pubkey::new_unique(), false)], + accounts: vec![solana_instruction::AccountMeta::new( + Pubkey::new_unique(), + false, + )], data: vec![], }; (metadata, instruction) diff --git a/crates/core/src/pipeline.rs b/crates/core/src/pipeline.rs index a028c63e0..cb7b69269 100644 --- a/crates/core/src/pipeline.rs +++ b/crates/core/src/pipeline.rs @@ -1348,7 +1348,7 @@ impl PipelineBuilder { filters: Vec>, ) -> Self where - T: Send + Sync + 'static, + T: Send + Sync + crate::deserialize::ArrangeAccounts + 'static, P: for<'a> Processor> + Send + Sync + 'static, { log::trace!( diff --git a/crates/core/src/postgres/processors.rs b/crates/core/src/postgres/processors.rs index 1280c7540..76fefc7c1 100644 --- a/crates/core/src/postgres/processors.rs +++ b/crates/core/src/postgres/processors.rs @@ -1,4 +1,3 @@ -use solana_instruction::AccountMeta; use std::sync::Arc; use crate::{ @@ -140,7 +139,7 @@ impl crate::processor::Processor> for PostgresInstructionProcessor where T: Clone + Send + Sync + 'static, - W: From<(T, InstructionMetadata, Vec)> + Upsert + Send + 'static, + W: From<(T, InstructionMetadata)> + Upsert + Send + 'static, { async fn process( &mut self, @@ -148,15 +147,11 @@ where metrics: Arc, ) -> CarbonResult<()> { let metadata = input.metadata; - let decoded_instruction = input.decoded_instruction; + let instruction = input.instruction; let start = std::time::Instant::now(); - let wrapper = W::from(( - decoded_instruction.data.clone(), - metadata.clone(), - decoded_instruction.accounts.clone(), - )); + let wrapper = W::from((instruction.clone(), metadata.clone())); match wrapper.upsert(&self.pool).await { Ok(()) => { @@ -206,13 +201,9 @@ where metrics: Arc, ) -> CarbonResult<()> { let metadata = input.metadata; - let decoded_instruction = input.decoded_instruction; + let instruction = input.instruction; - let instruction_row = InstructionRow::from_parts( - decoded_instruction.data.clone(), - metadata.clone(), - decoded_instruction.accounts.clone(), - ); + let instruction_row = InstructionRow::from_parts(instruction.clone(), metadata.clone()); let start = std::time::Instant::now(); diff --git a/crates/core/src/postgres/rows.rs b/crates/core/src/postgres/rows.rs index d09672e62..bac165e80 100644 --- a/crates/core/src/postgres/rows.rs +++ b/crates/core/src/postgres/rows.rs @@ -1,4 +1,3 @@ -use solana_instruction::AccountMeta; use crate::{ account::AccountMetadata, @@ -158,23 +157,17 @@ pub struct InstructionRow< > { #[sqlx(flatten)] pub metadata: InstructionRowMetadata, - pub data: sqlx::types::Json, - pub accounts: sqlx::types::Json>, + pub instruction: sqlx::types::Json, } impl< T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + Sync + Unpin + 'static, > InstructionRow { - pub fn from_parts( - source: T, - metadata: InstructionMetadata, - accounts: Vec, - ) -> Self { + pub fn from_parts(instruction: T, metadata: InstructionMetadata) -> Self { Self { metadata: metadata.clone().into(), - data: sqlx::types::Json(source), - accounts: sqlx::types::Json(accounts), + instruction: sqlx::types::Json(instruction), } } } @@ -185,13 +178,12 @@ impl< > Insert for InstructionRow { async fn insert(&self, pool: &sqlx::PgPool) -> CarbonResult<()> { - sqlx::query(r#"INSERT INTO instructions (__signature, __instruction_index, __stack_height, __slot, data, accounts) VALUES ($1, $2, $3, $4, $5, $6)"#) + sqlx::query(r#"INSERT INTO instructions (__signature, __instruction_index, __stack_height, __slot, instruction) VALUES ($1, $2, $3, $4, $5)"#) .bind(self.metadata.signature.clone()) .bind(self.metadata.instruction_index) .bind(self.metadata.stack_height) .bind(self.metadata.slot.clone()) - .bind(self.data.clone()) - .bind(self.accounts.clone()) + .bind(self.instruction.clone()) .execute(pool) .await .map_err(|e| crate::error::Error::Custom(e.to_string()))?; @@ -205,13 +197,12 @@ impl< > Upsert for InstructionRow { async fn upsert(&self, pool: &sqlx::PgPool) -> CarbonResult<()> { - sqlx::query(r#"INSERT INTO instructions (__signature, __instruction_index, __stack_height, __slot, data, accounts) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (__signature, __instruction_index, __stack_height) DO UPDATE SET __slot = $4, data = $5, accounts = $6"#) + sqlx::query(r#"INSERT INTO instructions (__signature, __instruction_index, __stack_height, __slot, instruction) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (__signature, __instruction_index, __stack_height) DO UPDATE SET __slot = $4, instruction = $5"#) .bind(self.metadata.signature.clone()) .bind(self.metadata.instruction_index) .bind(self.metadata.stack_height) .bind(self.metadata.slot.clone()) - .bind(self.data.clone()) - .bind(self.accounts.clone()) + .bind(self.instruction.clone()) .execute(pool) .await .map_err(|e| crate::error::Error::Custom(e.to_string()))?; @@ -272,8 +263,7 @@ impl sqlx_migrator::Operation for InstructionRowMigrationOperati __instruction_index BIGINT NOT NULL, __stack_height BIGINT NOT NULL, __slot NUMERIC, - data JSONB NOT NULL, - accounts JSONB NOT NULL, + instruction JSONB NOT NULL, PRIMARY KEY (__signature, __instruction_index, __stack_height) )"#, ) diff --git a/crates/core/src/schema.rs b/crates/core/src/schema.rs index 243a1a5a7..35e569d71 100644 --- a/crates/core/src/schema.rs +++ b/crates/core/src/schema.rs @@ -39,10 +39,8 @@ //! type implements `DeserializeOwned`. use { - crate::{collection::InstructionDecoderCollection, instruction::DecodedInstruction}, + crate::collection::InstructionDecoderCollection, serde::de::DeserializeOwned, - solana_instruction::AccountMeta, - solana_pubkey::Pubkey, std::collections::HashMap, }; @@ -68,14 +66,12 @@ pub struct InstructionSchemaNode { pub inner_instructions: Vec>, } -/// Represents a parsed instruction, containing its program ID, decoded -/// instruction data, and any nested instructions within the transaction. +/// Represents a parsed instruction, containing decoded instruction data +/// and any nested instructions within the transaction. #[derive(Debug)] pub struct ParsedInstruction { - /// The program ID associated with this instruction. - pub program_id: Pubkey, /// The decoded instruction data. - pub instruction: DecodedInstruction, + pub instruction: T, /// A vector of parsed nested instructions. pub inner_instructions: Vec>, } @@ -145,9 +141,9 @@ impl TransactionSchema { pub fn match_nodes( &self, instructions: &[ParsedInstruction], - ) -> Option)>> { + ) -> Option> { log::trace!("Schema::match_nodes(self: {self:?}, instructions: {instructions:?})"); - let mut output = HashMap::)>::new(); + let mut output = HashMap::::new(); let mut node_index = 0; let mut instruction_index = 0; @@ -175,7 +171,7 @@ impl TransactionSchema { return None; }; - if current_instruction.instruction.data.get_type() != instruction_node.ix_type + if current_instruction.instruction.get_type() != instruction_node.ix_type && !any { log::trace!( @@ -184,7 +180,7 @@ impl TransactionSchema { return None; } - if current_instruction.instruction.data.get_type() != instruction_node.ix_type + if current_instruction.instruction.get_type() != instruction_node.ix_type && any { log::trace!( @@ -196,10 +192,7 @@ impl TransactionSchema { output.insert( instruction_node.name.clone(), - ( - current_instruction.instruction.data.clone(), - current_instruction.instruction.accounts.clone(), - ), + current_instruction.instruction.clone(), ); if !instruction_node.inner_instructions.is_empty() { @@ -243,9 +236,9 @@ impl TransactionSchema { /// A new `HashMap` containing all elements from `a` and `b`. In the case of /// duplicate keys, values from `b` will overwrite those from `a`. pub fn merge_hashmaps( - a: HashMap)>, - b: HashMap)>, -) -> HashMap)> + a: HashMap, + b: HashMap, +) -> HashMap where K: std::cmp::Eq + std::hash::Hash, { diff --git a/crates/core/src/transaction.rs b/crates/core/src/transaction.rs index d997da144..f388a3301 100644 --- a/crates/core/src/transaction.rs +++ b/crates/core/src/transaction.rs @@ -36,7 +36,7 @@ use { crate::{ collection::InstructionDecoderCollection, error::CarbonResult, - instruction::{DecodedInstruction, InstructionMetadata, NestedInstruction}, + instruction::{InstructionMetadata, NestedInstruction}, metrics::MetricsCollection, processor::Processor, schema::{ParsedInstruction, TransactionSchema}, @@ -132,7 +132,7 @@ impl TryFrom for TransactionMetadata { /// implementing `DeserializeOwned`. pub type TransactionProcessorInputType = ( Arc, - Vec<(InstructionMetadata, DecodedInstruction)>, + Vec<(InstructionMetadata, T)>, Option, ); @@ -260,7 +260,6 @@ pub fn parse_instructions( for nested_ix in nested_ixs { if let Some(instruction) = T::parse_instruction(&nested_ix.instruction) { parsed_instructions.push(ParsedInstruction { - program_id: nested_ix.instruction.program_id, instruction, inner_instructions: parse_instructions(&nested_ix.inner_instructions), }); diff --git a/crates/core/src/transformers.rs b/crates/core/src/transformers.rs index 80b1d8152..09c427869 100644 --- a/crates/core/src/transformers.rs +++ b/crates/core/src/transformers.rs @@ -25,7 +25,7 @@ use { collection::InstructionDecoderCollection, datasource::TransactionUpdate, error::{CarbonResult, Error}, - instruction::{DecodedInstruction, InstructionMetadata, MAX_INSTRUCTION_STACK_DEPTH}, + instruction::{InstructionMetadata, MAX_INSTRUCTION_STACK_DEPTH}, schema::ParsedInstruction, transaction::TransactionMetadata, }, @@ -262,11 +262,10 @@ pub fn extract_account_metas( Ok(accounts) } -/// Unnests parsed instructions, producing an array of `(InstructionMetadata, -/// DecodedInstruction)` tuple +/// Unnests parsed instructions, producing an array of `(InstructionMetadata, T)` tuples /// /// This function takes a vector of `ParsedInstruction` and unnests them into a -/// vector of `(InstructionMetadata, DecodedInstruction)` tuples. +/// vector of `(InstructionMetadata, T)` tuples. /// It recursively processes nested instructions, increasing the stack height /// for each level of nesting. /// @@ -279,13 +278,13 @@ pub fn extract_account_metas( /// /// # Returns /// -/// A vector of `(InstructionMetadata, DecodedInstruction)` tuples +/// A vector of `(InstructionMetadata, T)` tuples /// representing the unnested instructions. pub fn unnest_parsed_instructions( transaction_metadata: Arc, instructions: Vec>, stack_height: u32, -) -> Vec<(InstructionMetadata, DecodedInstruction)> { +) -> Vec<(InstructionMetadata, T)> { log::trace!("unnest_parsed_instructions(instructions: {instructions:?})"); let mut result = Vec::new(); diff --git a/crates/macros/src/try_decode_ixs.rs b/crates/macros/src/try_decode_ixs.rs index 52fa7abee..e6d949ce1 100644 --- a/crates/macros/src/try_decode_ixs.rs +++ b/crates/macros/src/try_decode_ixs.rs @@ -11,13 +11,13 @@ //! decode the instruction into each type sequentially, returning the first //! successful match. If no match is found, `None` is returned. -/// Attempts to decode an instruction into a specific variant type. +/// Attempts to decode an instruction into a unified enum variant. /// /// The `try_decode_instructions!` macro takes an instruction and tries to -/// decode it into one of the provided variant types. If decoding is successful, -/// it returns a `DecodedInstruction` object with the decoded data wrapped in -/// the specified variant. If none of the variant types match, it returns -/// `None`. +/// decode it into one of the provided instruction types. If decoding is +/// successful, it returns the instruction enum with both the decoded data +/// and arranged accounts in a single struct variant. If no variant type +/// matches, it returns `None`. /// /// This macro is useful for handling multiple potential instruction types /// dynamically, enabling streamlined processing of instructions without @@ -26,22 +26,29 @@ /// # Syntax /// /// ```ignore -/// try_decode_instructions!(instruction, VariantA => TypeA, VariantB => TypeB, ...); +/// try_decode_instructions!( +/// instruction, +/// PROGRAM_ID, +/// MyEnum::VariantA => TypeA, +/// MyEnum::VariantB => TypeB, +/// ); /// ``` /// /// - `$instruction`: The instruction to decode. -/// - `$variant`: The enum variant to wrap the decoded instruction data. -/// - `$ty`: The type to which the instruction data should be deserialized. +/// - `$program_id`: The program ID for this instruction set. +/// - `$enum_name::$variant`: The full path to the enum variant. +/// - `$ty`: The instruction data type that implements `CarbonDeserialize` and `ArrangeAccounts`. /// /// # Example /// /// ```ignore -///use carbon_macros::try_decode_instructions; +/// use carbon_macros::try_decode_instructions; /// /// let instruction = Instruction { /* initialize with program_id, accounts, and data */ }; /// /// let decoded = try_decode_instructions!( /// instruction, +/// PROGRAM_ID, /// MyEnum::VariantOne => TypeOne, /// MyEnum::VariantTwo => TypeTwo, /// ); @@ -50,44 +57,42 @@ /// # Parameters /// /// - `$instruction`: The instruction being decoded, which must include -/// `program_id`, `accounts`, and `data` fields. The `data` field should be a -/// byte slice compatible with the deserialization process. -/// - `$variant`: Enum variants that wrap the deserialized data. These variants -/// should correspond to valid instruction types within the context. -/// - `$ty`: The type for each variant, which must implement a `deserialize` -/// method to convert the instruction data into the appropriate form. +/// `program_id`, `accounts`, and `data` fields. +/// - `$program_id`: The program ID to be stored in the resulting enum variant. +/// - `$enum_name::$variant`: The enum name and variant name separated by `::`. +/// The enum must have struct variants with `program_id`, `data`, and `accounts` fields. +/// - `$ty`: The instruction data type, which must implement a `decode(&[u8]) -> Option` +/// method for data deserialization and `ArrangeAccounts` for account arrangement. /// /// # Returns /// -/// Returns an `Option` that contains the decoded -/// instruction wrapped in the specified variant type if decoding is successful. -/// If no variant type matches, it returns `None`. +/// Returns an `Option` containing the instruction enum variant with +/// both decoded data and arranged accounts if successful. Returns `None` if +/// deserialization or account arrangement fails. /// /// # Notes /// -/// - Ensure that each `$ty` type implements a `deserialize` method, as this is -/// necessary for the macro to attempt decoding. The deserialization method -/// should handle byte slices. -/// - The macro iterates over each variant type sequentially, returning the -/// first successful match. If no types match, `None` is returned. -/// - This macro is especially useful for processing complex transactions where -/// multiple instruction types are possible, improving flexibility and -/// reducing boilerplate code. +/// - Each instruction type must implement a `decode` method and the `ArrangeAccounts` trait. +/// - The macro automatically arranges accounts during decoding, eliminating the need +/// for manual arrangement in processors. +/// - The resulting enum variant contains three fields: `program_id`, `data`, and `accounts`. +/// - The macro iterates through each variant sequentially, returning the first successful match. #[macro_export] macro_rules! try_decode_instructions { - ($instruction:expr, $($variant:path => $ty:ty),* $(,)?) => {{ - use carbon_core::deserialize::CarbonDeserialize; + ($instruction:expr, $program_id:expr, $($enum_name:ident :: $variant:ident => $ty:ty),* $(,)?) => {{ + use carbon_core::deserialize::ArrangeAccounts; $( - if let Some(decoded_instruction) = <$ty>::deserialize($instruction.data.as_slice()) { - Some(carbon_core::instruction::DecodedInstruction { - program_id: $instruction.program_id, - accounts: $instruction.accounts.clone(), - data: $variant(decoded_instruction), - }) - } else + if let Some(data) = <$ty>::decode($instruction.data.as_slice()) { + if let Some(accounts) = <$ty>::arrange_accounts(&$instruction.accounts) { + let result = $enum_name::$variant { + program_id: $program_id, + data: data, + accounts: accounts, + }; + return Some(result); + } + } )* - { - None - } + None }}; } diff --git a/crates/proc-macros/src/lib.rs b/crates/proc-macros/src/lib.rs index e833f7e5d..dae83a377 100644 --- a/crates/proc-macros/src/lib.rs +++ b/crates/proc-macros/src/lib.rs @@ -717,12 +717,8 @@ pub fn instruction_decoder_collection(input: TokenStream) -> TokenStream { }); parse_instruction_arms.push(quote! { - if let Some(decoded_instruction) = #decoder_expr.decode_instruction(&instruction) { - return Some(carbon_core::instruction::DecodedInstruction { - program_id: instruction.program_id, - accounts: instruction.accounts.clone(), - data: #instructions_enum_name::#program_variant(decoded_instruction.data), - }); + if let Some(instruction_enum) = #decoder_expr.decode_instruction(&instruction) { + return Some(#instructions_enum_name::#program_variant(instruction_enum)); } }); @@ -754,7 +750,7 @@ pub fn instruction_decoder_collection(input: TokenStream) -> TokenStream { fn parse_instruction( instruction: &solana_instruction::Instruction - ) -> Option> { + ) -> Option { #(#parse_instruction_arms)* None } @@ -834,12 +830,8 @@ pub fn instruction_decoder_collection_fast(input: TokenStream) -> TokenStream { if let Some(program_id_path) = explicit_program_id_path { parse_instruction_match_arms.push(quote! { #program_id_path => { - if let Some(decoded_instruction) = #decoder_expr.decode_instruction(&instruction) { - Some(carbon_core::instruction::DecodedInstruction { - program_id: instruction.program_id, - accounts: instruction.accounts.clone(), - data: #instructions_enum_name::#program_variant(decoded_instruction.data), - }) + if let Some(instruction_enum) = #decoder_expr.decode_instruction(&instruction) { + Some(#instructions_enum_name::#program_variant(instruction_enum)) } else { None } @@ -848,12 +840,8 @@ pub fn instruction_decoder_collection_fast(input: TokenStream) -> TokenStream { } else { // No program id path: include in slow-path fallback. fallback_decode_blocks.push(quote! { - if let Some(decoded_instruction) = #decoder_expr.decode_instruction(&instruction) { - return Some(carbon_core::instruction::DecodedInstruction { - program_id: instruction.program_id, - accounts: instruction.accounts.clone(), - data: #instructions_enum_name::#program_variant(decoded_instruction.data), - }); + if let Some(instruction_enum) = #decoder_expr.decode_instruction(&instruction) { + return Some(#instructions_enum_name::#program_variant(instruction_enum)); } }); } @@ -886,7 +874,7 @@ pub fn instruction_decoder_collection_fast(input: TokenStream) -> TokenStream { fn parse_instruction( instruction: &solana_instruction::Instruction - ) -> Option> { + ) -> Option { match instruction.program_id { #(#parse_instruction_match_arms),* _ => { diff --git a/examples/pipeline-bench/src/main.rs b/examples/pipeline-bench/src/main.rs index 6c5091fa4..9c02b8562 100644 --- a/examples/pipeline-bench/src/main.rs +++ b/examples/pipeline-bench/src/main.rs @@ -4,8 +4,9 @@ use async_trait::async_trait; use carbon_core::{ account::{AccountDecoder, AccountProcessorInputType, DecodedAccount}, datasource::{AccountUpdate, TransactionUpdate, Update, UpdateType}, + deserialize::ArrangeAccounts, error::CarbonResult, - instruction::{DecodedInstruction, InstructionDecoder, InstructionProcessorInputType}, + instruction::{InstructionDecoder, InstructionProcessorInputType}, metrics::MetricsCollection, pipeline::{Pipeline, ShutdownStrategy}, processor::Processor, @@ -375,19 +376,52 @@ impl carbon_core::datasource::Datasource for SyntheticDatasource { } } +#[derive(Debug, Clone)] +enum SyntheticInstruction { + #[allow(dead_code)] + NoOp { + program_id: Pubkey, + data: SyntheticInstructionData, + accounts: SyntheticInstructionAccounts, + }, +} + +#[derive(Debug, Clone)] +struct SyntheticInstructionData; + +impl SyntheticInstructionData { + #[allow(dead_code)] + fn decode(_data: &[u8]) -> Option { + Some(SyntheticInstructionData) + } +} + +#[derive(Debug, Clone)] +struct SyntheticInstructionAccounts; + +impl ArrangeAccounts for SyntheticInstructionData { + type ArrangedAccounts = SyntheticInstructionAccounts; + + fn arrange_accounts( + _accounts: &[solana_instruction::AccountMeta], + ) -> Option { + Some(SyntheticInstructionAccounts) + } +} + struct SyntheticInstructionDecoder; impl<'a> InstructionDecoder<'a> for SyntheticInstructionDecoder { - type InstructionType = (); + type InstructionType = SyntheticInstruction; fn decode_instruction( &self, _instruction: &'a solana_instruction::Instruction, - ) -> Option> { - Some(DecodedInstruction { + ) -> Option { + Some(SyntheticInstruction::NoOp { program_id: Pubkey::new_unique(), - data: (), - accounts: vec![], + data: SyntheticInstructionData, + accounts: SyntheticInstructionAccounts, }) } } @@ -395,10 +429,12 @@ impl<'a> InstructionDecoder<'a> for SyntheticInstructionDecoder { #[derive(Default)] struct SyntheticInstructionProcessor; -impl Processor> for SyntheticInstructionProcessor { +impl Processor> + for SyntheticInstructionProcessor +{ async fn process( &mut self, - _data: &InstructionProcessorInputType<'_, ()>, + _data: &InstructionProcessorInputType<'_, SyntheticInstruction>, _metrics: Arc, ) -> CarbonResult<()> { Ok(()) diff --git a/examples/pumpswap-alerts/Cargo.toml b/examples/pumpswap-alerts/Cargo.toml index fb9d0c809..548513f9a 100644 --- a/examples/pumpswap-alerts/Cargo.toml +++ b/examples/pumpswap-alerts/Cargo.toml @@ -7,7 +7,7 @@ edition = { workspace = true } carbon-core = { workspace = true } carbon-helius-laserstream-datasource = { workspace = true } carbon-log-metrics = { workspace = true } -carbon-pump-swap-decoder = { workspace = true } +carbon-pump-amm-decoder = { path = "../../decoders/generated" } dotenv = { workspace = true } env_logger = { workspace = true } diff --git a/examples/pumpswap-alerts/src/main.rs b/examples/pumpswap-alerts/src/main.rs index dc588a8aa..ab0e45776 100644 --- a/examples/pumpswap-alerts/src/main.rs +++ b/examples/pumpswap-alerts/src/main.rs @@ -5,10 +5,9 @@ use { }, carbon_helius_laserstream_datasource::{LaserStreamClientConfig, LaserStreamGeyserClient}, carbon_log_metrics::LogMetrics, - carbon_pump_swap_decoder::{ - instructions::PumpSwapInstruction, PumpSwapDecoder, PROGRAM_ID as PUMPSWAP_PROGRAM_ID, + carbon_pump_amm_decoder::{ + instructions::PumpAmmInstruction, PumpAmmDecoder, PROGRAM_ID as PUMPSWAP_PROGRAM_ID, }, - solana_native_token::LAMPORTS_PER_SOL, std::{ collections::{HashMap, HashSet}, env, @@ -84,7 +83,7 @@ pub async fn main() -> CarbonResult<()> { .datasource(laserstream_datasource) .metrics(Arc::new(LogMetrics::new())) .metrics_flush_interval(3) - .instruction(PumpSwapDecoder, PumpSwapInstructionProcessor) + .instruction(PumpAmmDecoder, PumpSwapInstructionProcessor) .shutdown_strategy(carbon_core::pipeline::ShutdownStrategy::Immediate) .build()? .run() @@ -95,112 +94,33 @@ pub async fn main() -> CarbonResult<()> { pub struct PumpSwapInstructionProcessor; -impl Processor> +impl Processor> for PumpSwapInstructionProcessor { async fn process( &mut self, - input: &InstructionProcessorInputType<'_, PumpSwapInstruction>, + input: &InstructionProcessorInputType<'_, PumpAmmInstruction>, _metrics: Arc, ) -> CarbonResult<()> { let signature = input.metadata.transaction_metadata.signature; - let _accounts = input.decoded_instruction.accounts.clone(); - let pumpswap_instruction = input.decoded_instruction.data.clone(); - match pumpswap_instruction { - PumpSwapInstruction::Buy(buy) => { - log::info!("Buy: signature: {signature}, buy: {buy:?}"); - } - PumpSwapInstruction::Sell(sell) => { - log::info!("Sell: signature: {signature}, sell: {sell:?}"); - } - PumpSwapInstruction::CreatePool(create_pool) => { - log::info!("CreatePool: signature: {signature}, create_pool: {create_pool:?}"); - } - PumpSwapInstruction::Deposit(deposit) => { - log::info!("Deposit: signature: {signature}, deposit: {deposit:?}"); - } - PumpSwapInstruction::Withdraw(withdraw) => { - log::info!("Withdraw: signature: {signature}, withdraw: {withdraw:?}"); - } - PumpSwapInstruction::CreateConfig(create_config) => { - log::info!( - "CreateConfig: signature: {signature}, create_config: {create_config:?}" - ); - } - PumpSwapInstruction::UpdateFeeConfig(update_fee_config) => { - log::info!("UpdateFeeConfig: signature: {signature}, update_fee_config: {update_fee_config:?}"); - } - PumpSwapInstruction::UpdateAdmin(update_admin) => { - log::info!("UpdateAdmin: signature: {signature}, update_admin: {update_admin:?}"); - } - PumpSwapInstruction::CollectCoinCreatorFee(collect_fee) => { - log::info!( - "CollectCoinCreatorFee: signature: {signature}, collect_fee: {collect_fee:?}" - ); - } - PumpSwapInstruction::BuyEvent(buy_event) => { - let sol_amount = buy_event.quote_amount_in as f64 / LAMPORTS_PER_SOL as f64; - log::info!( - "BuyEvent: signature: {signature}, SOL: {:.4}, pool: {}, user: {}", - sol_amount, - buy_event.pool, - buy_event.user, - ); - } - PumpSwapInstruction::SellEvent(sell_event) => { - let sol_amount = sell_event.quote_amount_out as f64 / LAMPORTS_PER_SOL as f64; - log::info!( - "SellEvent: signature: {signature}, SOL: {:.4}, pool: {}, user: {}", - sol_amount, - sell_event.pool, - sell_event.user - ); - } - PumpSwapInstruction::CreatePoolEvent(pool_event) => { - log::info!("CreatePoolEvent: signature: {signature}, pool_event: {pool_event:?}"); - } - PumpSwapInstruction::DepositEvent(deposit_event) => { - log::info!( - "DepositEvent: signature: {signature}, deposit_event: {deposit_event:?}" - ); - } - PumpSwapInstruction::WithdrawEvent(withdraw_event) => { - log::info!( - "WithdrawEvent: signature: {signature}, withdraw_event: {withdraw_event:?}" - ); - } - PumpSwapInstruction::CreateConfigEvent(config_event) => { - log::info!( - "CreateConfigEvent: signature: {signature}, config_event: {config_event:?}" - ); - } - PumpSwapInstruction::UpdateFeeConfigEvent(fee_config_event) => { - log::info!("UpdateFeeConfigEvent: signature: {signature}, fee_config_event: {fee_config_event:?}"); - } - PumpSwapInstruction::UpdateAdminEvent(admin_event) => { - log::info!( - "UpdateAdminEvent: signature: {signature}, admin_event: {admin_event:?}" - ); - } - PumpSwapInstruction::CollectCoinCreatorFeeEvent(fee_event) => { - let fee_amount = fee_event.coin_creator_fee as f64 / LAMPORTS_PER_SOL as f64; - log::info!( - "CollectCoinCreatorFeeEvent: signature: {signature}, fee: {:.6} SOL, creator: {}", - fee_amount, - fee_event.coin_creator - ); - } - PumpSwapInstruction::ClaimTokenIncentivesEvent(incentive_event) => { - log::info!("ClaimTokenIncentivesEvent: signature: {signature}, incentive_event: {incentive_event:?}"); - } - PumpSwapInstruction::InitUserVolumeAccumulatorEvent(volume_event) => { - log::info!("InitUserVolumeAccumulatorEvent: signature: {signature}, volume_event: {volume_event:?}"); - } - _ => { - log::debug!("Other PumpSwap instruction: signature: {signature}, data: {pumpswap_instruction:?}"); - } - } + match input.instruction { + PumpAmmInstruction::AdminSetCoinCreator { + program_id, + data, + accounts, + } => { + log::info!("PumpSwap (Program ID: {program_id}) instruction: signature: {signature}, data: {data:?}, accounts: {accounts:?}"); + } + PumpAmmInstruction::Buy { + program_id, + data, + accounts, + } => { + log::info!("PumpSwap (Program ID: {program_id}) instruction: signature: {signature}, data: {data:?}, accounts: {accounts:?}"); + } + _ => {} + }; Ok(()) } diff --git a/packages/renderer/templates/graphqlQueryPageGeneric.njk b/packages/renderer/templates/graphqlQueryPageGeneric.njk index d9ebf9e9d..d31ca849c 100644 --- a/packages/renderer/templates/graphqlQueryPageGeneric.njk +++ b/packages/renderer/templates/graphqlQueryPageGeneric.njk @@ -153,7 +153,7 @@ impl QueryRoot { Ok(rows.into_iter().filter_map(|r| { {% if instructionsToExport.length == 1 %} - let crate::instructions::{{ program.name | pascalCase }}Instruction::{{ instruction.name | pascalCase }}(instruction) = r.data.0; + let crate::instructions::{{ program.name | pascalCase }}Instruction::{{ instruction.name | pascalCase }} { data: instruction, .. } = r.data.0; // Create metadata from database values let metadata = crate::instructions::graphql::InstructionMetadataGraphQL { signature: r.metadata.signature.clone(), @@ -175,7 +175,7 @@ impl QueryRoot { None } {% else %} - if let crate::instructions::{{ program.name | pascalCase }}Instruction::{{ instruction.name | pascalCase }}(instruction) = r.data.0 { + if let crate::instructions::{{ program.name | pascalCase }}Instruction::{{ instruction.name | pascalCase }} { data: instruction, .. } = r.data.0 { // Create metadata from database values let metadata = crate::instructions::graphql::InstructionMetadataGraphQL { signature: r.metadata.signature.clone(), @@ -222,7 +222,7 @@ impl QueryRoot { Ok(rows.into_iter().filter_map(|r| { {% if instructionsToExport.length == 1 %} - let crate::instructions::{{ program.name | pascalCase }}Instruction::{{ instruction.name | pascalCase }}(instruction) = r.data.0; + let crate::instructions::{{ program.name | pascalCase }}Instruction::{{ instruction.name | pascalCase }} { data: instruction, .. } = r.data.0; // Create metadata from database values let metadata = crate::instructions::graphql::InstructionMetadataGraphQL { signature: r.metadata.signature.clone(), @@ -244,7 +244,7 @@ impl QueryRoot { None } {% else %} - if let crate::instructions::{{ program.name | pascalCase }}Instruction::{{ instruction.name | pascalCase }}(instruction) = r.data.0 { + if let crate::instructions::{{ program.name | pascalCase }}Instruction::{{ instruction.name | pascalCase }} { data: instruction, .. } = r.data.0 { // Create metadata from database values let metadata = crate::instructions::graphql::InstructionMetadataGraphQL { signature: r.metadata.signature.clone(), @@ -295,8 +295,7 @@ impl QueryRoot { .map_err(|e| juniper::FieldError::new(e.to_string(), juniper::Value::null()))?; Ok(rows.into_iter().filter_map(|r| { - if let crate::instructions::{{ program.name | pascalCase }}Instruction::CpiEvent(instruction) = r.data.0 { - let instruction = *instruction; + if let crate::instructions::{{ program.name | pascalCase }}Instruction::CpiEvent { data: instruction, .. } = r.data.0 { // Create metadata from database values let metadata = crate::instructions::graphql::InstructionMetadataGraphQL { signature: r.metadata.signature.clone(), @@ -340,8 +339,7 @@ impl QueryRoot { .map_err(|e| juniper::FieldError::new(e.to_string(), juniper::Value::null()))?; Ok(rows.into_iter().filter_map(|r| { - if let crate::instructions::{{ program.name | pascalCase }}Instruction::CpiEvent(instruction) = r.data.0 { - let instruction = *instruction; + if let crate::instructions::{{ program.name | pascalCase }}Instruction::CpiEvent { data: instruction, .. } = r.data.0 { // Create metadata from database values let metadata = crate::instructions::graphql::InstructionMetadataGraphQL { signature: r.metadata.signature.clone(), diff --git a/packages/renderer/templates/instructionsMod.njk b/packages/renderer/templates/instructionsMod.njk index 45e15281b..e5ed0492d 100644 --- a/packages/renderer/templates/instructionsMod.njk +++ b/packages/renderer/templates/instructionsMod.njk @@ -30,11 +30,19 @@ pub use self::cpi_event::*; {{ macros.enumWrapperDerives() }} pub enum {{ program.name | pascalCase }}Instruction { {% for instruction in instructionsToExport | sort(false, false, 'name') %} - {{ instruction.name | pascalCase }}({{ instruction.name | pascalCase }}), + {{ instruction.name | pascalCase }} { + program_id: solana_pubkey::Pubkey, + data: {{ instruction.name | pascalCase }}, + accounts: {{ instruction.name | pascalCase }}InstructionAccounts, + }, {% endfor %} {% if hasAnchorEvents %} // Anchor CPI Event Instruction - CpiEvent(Box) + CpiEvent { + program_id: solana_pubkey::Pubkey, + data: CpiEvent, + accounts: CpiEventInstructionAccounts, + }, {% endif %} } @@ -44,37 +52,21 @@ impl carbon_core::instruction::InstructionDecoder<'_> for {{ program.name | pasc fn decode_instruction( &self, instruction: &solana_instruction::Instruction, - ) -> Option> { + ) -> Option { if instruction.program_id != PROGRAM_ID { return None; } - let data = instruction.data.as_slice(); - -{% for instruction in instructionsToExport %} - { - if let Some(decoded) = {{ macros.escapeRustKeyword(instruction.name | snakeCase) }}::{{ instruction.name | pascalCase }}::decode(data) { - return Some(carbon_core::instruction::DecodedInstruction { - program_id: instruction.program_id, - data: {{ program.name | pascalCase }}Instruction::{{ instruction.name | pascalCase }}(decoded), - accounts: instruction.accounts.clone(), - }); - } - } + carbon_core::try_decode_instructions!( + instruction, + PROGRAM_ID, +{% for instruction in instructionsToExport | sort(false, false, 'name') %} + {{ program.name | pascalCase }}Instruction::{{ instruction.name | pascalCase }} => {{ instruction.name | pascalCase }}, {% endfor %} {% if hasAnchorEvents %} - { - if let Some(decoded) = cpi_event::CpiEvent::decode(data) { - return Some(carbon_core::instruction::DecodedInstruction { - program_id: instruction.program_id, - data: {{ program.name | pascalCase }}Instruction::CpiEvent(Box::new(decoded)), - accounts: instruction.accounts.clone(), - }); - } - } + {{ program.name | pascalCase }}Instruction::CpiEvent => CpiEvent, {% endif %} - - None + ) } } {% endblock %} \ No newline at end of file diff --git a/packages/renderer/templates/instructionsPostgresMod.njk b/packages/renderer/templates/instructionsPostgresMod.njk index 9e67144c2..ac2ecfdd4 100644 --- a/packages/renderer/templates/instructionsPostgresMod.njk +++ b/packages/renderer/templates/instructionsPostgresMod.njk @@ -56,18 +56,18 @@ impl From<({{ program.name | pascalCase }}Instruction, carbon_core::instruction: #[async_trait::async_trait] impl carbon_core::postgres::operations::Insert for {{ program.name | pascalCase }}InstructionWithMetadata { async fn insert(&self, pool: &sqlx::PgPool) -> carbon_core::error::CarbonResult<()> { - let {{ program.name | pascalCase }}InstructionWithMetadata(instruction, metadata, accounts) = self; + let {{ program.name | pascalCase }}InstructionWithMetadata(instruction, metadata, raw_accounts) = self; match instruction { {% for instruction in instructionsToExport %} - {{ program.name | pascalCase }}Instruction::{{ instruction.name | pascalCase }}(instruction) => { - let row = {{ macros.escapeRustKeyword(instruction.name | snakeCase) }}_row::{{ instruction.name | pascalCase }}Row::from_parts(instruction.clone(), metadata.clone(), accounts.clone()); + {{ program.name | pascalCase }}Instruction::{{ instruction.name | pascalCase }} { data, .. } => { + let row = {{ macros.escapeRustKeyword(instruction.name | snakeCase) }}_row::{{ instruction.name | pascalCase }}Row::from_parts(data.clone(), metadata.clone(), raw_accounts.clone()); row.insert(pool).await?; Ok(()) } {% endfor %} {% if hasAnchorEvents %} - {{ program.name | pascalCase }}Instruction::CpiEvent(instruction) => { - let row = cpi_event_row::CpiEventRow::from_parts((**instruction).clone(), metadata.clone(), accounts.clone()); + {{ program.name | pascalCase }}Instruction::CpiEvent { data, .. } => { + let row = cpi_event_row::CpiEventRow::from_parts(data.clone(), metadata.clone(), raw_accounts.clone()); row.insert(pool).await?; Ok(()) } @@ -79,18 +79,18 @@ impl carbon_core::postgres::operations::Insert for {{ program.name | pascalCase #[async_trait::async_trait] impl carbon_core::postgres::operations::Upsert for {{ program.name | pascalCase }}InstructionWithMetadata { async fn upsert(&self, pool: &sqlx::PgPool) -> carbon_core::error::CarbonResult<()> { - let {{ program.name | pascalCase }}InstructionWithMetadata(instruction, metadata, accounts) = self; + let {{ program.name | pascalCase }}InstructionWithMetadata(instruction, metadata, raw_accounts) = self; match instruction { {% for instruction in instructionsToExport %} - {{ program.name | pascalCase }}Instruction::{{ instruction.name | pascalCase }}(instruction) => { - let row = {{ macros.escapeRustKeyword(instruction.name | snakeCase) }}_row::{{ instruction.name | pascalCase }}Row::from_parts(instruction.clone(), metadata.clone(), accounts.clone()); + {{ program.name | pascalCase }}Instruction::{{ instruction.name | pascalCase }} { data, .. } => { + let row = {{ macros.escapeRustKeyword(instruction.name | snakeCase) }}_row::{{ instruction.name | pascalCase }}Row::from_parts(data.clone(), metadata.clone(), raw_accounts.clone()); row.upsert(pool).await?; Ok(()) } {% endfor %} {% if hasAnchorEvents %} - {{ program.name | pascalCase }}Instruction::CpiEvent(instruction) => { - let row = cpi_event_row::CpiEventRow::from_parts((**instruction).clone(), metadata.clone(), accounts.clone()); + {{ program.name | pascalCase }}Instruction::CpiEvent { data, .. } => { + let row = cpi_event_row::CpiEventRow::from_parts(data.clone(), metadata.clone(), raw_accounts.clone()); row.upsert(pool).await?; Ok(()) }