This framework allows you to create custom processors for indexing and processing Alephium blockchain data. The framework provides a flexible architecture for handling different types of blockchain events and storing them in a PostgreSQL database using Diesel ORM.
The Lending Marketplace Processor demonstrates how to create a custom processor that tracks lending contract events. It processes two types of events:
- Loan Actions (Create, Cancel, Pay, Accept, Liquidate)
- Loan Details (lending and collateral information)
-
Data Models: Database tables represented as Rust structs using Diesel ORM
LoanActionModel: Tracks loan lifecycle eventsLoanDetailModel: Stores loan terms and conditions
-
Custom Output Type: Define how processor output is handled
#[derive(Debug, Clone)] pub struct LendingContractOutput { pub loan_actions: Vec<LoanActionModel>, pub loan_details: Vec<LoanDetailModel>, } impl CustomProcessorOutput for LendingContractOutput { fn as_any(&self) -> &dyn std::any::Any { self } fn clone_box(&self) -> Box<dyn CustomProcessorOutput> { Box::new(self.clone()) } }
-
Event Types: Enumeration of supported contract events
#[derive(Debug, Clone, Copy, PartialEq, Eq, FromSqlRow, DbEnum, Serialize, AsExpression)] #[diesel(sql_type = SmallInt)] pub enum LoanActionType { LoanCreated, LoanCancelled, LoanPaid, LoanAccepted, LoanLiquidated, }
- Define Your Data Models
#[derive(Queryable, Selectable, Insertable, Debug, Clone, Serialize, AsChangeset)]
#[diesel(table_name = schema::loan_actions)]
pub struct LoanActionModel {
loan_subcontract_id: String,
loan_id: Option<BigDecimal>,
by: String,
timestamp: NaiveDateTime,
action_type: LoanActionType,
}
#[derive(Queryable, Selectable, Insertable, Debug, Clone, Serialize, AsChangeset)]
#[diesel(table_name = schema::loan_details)]
pub struct LoanDetailModel {
loan_subcontract_id: String,
lending_token_id: String,
collateral_token_id: String,
lending_amount: BigDecimal,
collateral_amount: BigDecimal,
interest_rate: BigDecimal,
duration: BigDecimal,
lender: String,
}- Create Your Processor
pub struct LendingContractProcessor {
connection_pool: Arc<DbPool>,
contract_address: String,
}
impl LendingContractProcessor {
pub fn new(connection_pool: Arc<DbPool>, contract_address: String) -> Self {
Self { connection_pool, contract_address }
}
}
#[async_trait]
impl ProcessorTrait for LendingContractProcessor {
fn name(&self) -> &'static str {
"lending_contract_processor"
}
fn connection_pool(&self) -> &Arc<DbPool> {
&self.connection_pool
}
async fn process_blocks(
&self,
_from: i64,
_to: i64,
blocks: Vec<BlockAndEvents>,
) -> Result<ProcessorOutput> {
// Process blocks and convert to models
let (loan_actions, loan_details) = convert_to_model(blocks, &self.contract_address);
tracing::info!(
"Processed {} loan actions and {} loan details",
loan_actions.len(),
loan_details.len()
);
// Return custom output
Ok(ProcessorOutput::Custom(Arc::new(LendingContractOutput {
loan_actions,
loan_details
})))
}
// Override storage method to handle custom output
async fn store_output(&self, output: ProcessorOutput) -> Result<()> {
if let ProcessorOutput::Custom(custom) = output {
if let Some(lending_output) = custom.as_any().downcast_ref::<LendingContractOutput>() {
// Store loan actions
if !lending_output.loan_actions.is_empty() {
insert_loan_actions_to_db(
self.connection_pool.clone(),
lending_output.loan_actions.clone()
).await?;
}
// Store loan details
if !lending_output.loan_details.is_empty() {
insert_loan_details_to_db(
self.connection_pool.clone(),
lending_output.loan_details.clone()
).await?;
}
}
}
Ok(())
}
}- Create Factory Function and Register Processor
The factory function is a crucial part that creates and configures your processor. It's used by the worker to instantiate your processor with the correct configuration:
// Factory function that creates your processor instance
fn register_lending_contract(
pool: Arc<DbPool>,
args: Option<serde_json::Value>
) -> Box<dyn ProcessorTrait> {
// Extract contract address from args
let contract_address = if let Some(args) = args {
args.get("contract_address")
.and_then(|v| v.as_str())
.unwrap()
.to_string()
} else {
panic!("Missing contract address argument")
};
// Create and return the processor
Box::new(LendingContractProcessor::new(pool, contract_address))
}
// Register the processor with the worker
let processor_config = ProcessorConfig::Custom {
name: "lending processor".to_string(),
factory: register_lending_contract, // Pass the factory function
args: Some(serde_json::json!({
"contract_address": "yuF1Sum4ricLFBc86h3RdjFsebR7ZXKBHm2S5sZmVsiF"
}))
};
// Create worker with the processor
let worker = Worker::new(
vec![processor_config], // Can register multiple processors
database_url,
Network::Testnet,
None,
Some(SyncOptions {
start_ts: Some(1716560632750),
step: Some(1800000 * 10),
}),
Some(FetchStrategy::Parallel { num_workers: 10 }),
).await?;The factory function pattern allows for:
- Dynamic processor creation based on configuration
- Dependency injection (database pool)
- Configuration validation at startup
- Multiple processor instances with different configurations
- Clean separation between processor creation and usage
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenvy::dotenv().ok();
tracing_subscriber::fmt().init();
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let processor_config = ProcessorConfig::Custom {
name: "lending processor".to_string(),
factory: register_lending_contract,
args: Some(serde_json::json!({
"contract_address": "yuF1Sum4ricLFBc86h3RdjFsebR7ZXKBHm2S5sZmVsiF"
}))
};
let worker = Worker::new(
vec![processor_config],
database_url,
Network::Testnet,
None,
Some(SyncOptions {
start_ts: Some(1716560632750),
step: Some(1800000 * 10), // Process blocks in 5-hour chunks
}),
Some(FetchStrategy::Parallel { num_workers: 10 }),
).await?;
worker.run().await?;
Ok(())
}-
Event Processing
- Validate event field count before processing
- Use proper type conversion with error handling
- Filter events by contract address
- Handle different event types appropriately
-
Custom Output Handling
- Implement
CustomProcessorOutputtrait for your output type - Use
ProcessorOutput::Customto wrap your output - Override
store_outputto handle custom data storage - Use proper type downcasting with error handling
- Implement
-
Error Handling
- Use custom error types for specific failures
- Implement comprehensive logging
- Handle all potential error cases
- Validate input data thoroughly
-
Configuration
- Use environment variables for database configuration
- Pass contract address through processor args
- Configure appropriate sync options
Contributions are welcome! Please feel free to submit a Pull Request.