Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/filters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ pub mod noop;
pub mod parse_cbor;
pub mod select;
pub mod split_block;
pub mod split_tx;

#[cfg(feature = "wasm")]
pub mod wasm_plugin;

pub enum Bootstrapper {
Noop(noop::Stage),
SplitBlock(split_block::Stage),
SplitTx(split_tx::Stage),
IntoJson(into_json::Stage),
LegacyV1(legacy_v1::Stage),
ParseCbor(parse_cbor::Stage),
Expand All @@ -30,6 +32,7 @@ impl Bootstrapper {
match self {
Bootstrapper::Noop(p) => &mut p.input,
Bootstrapper::SplitBlock(p) => &mut p.input,
Bootstrapper::SplitTx(p) => &mut p.input,
Bootstrapper::IntoJson(p) => &mut p.input,
Bootstrapper::LegacyV1(p) => &mut p.input,
Bootstrapper::ParseCbor(p) => &mut p.input,
Expand All @@ -44,6 +47,7 @@ impl Bootstrapper {
match self {
Bootstrapper::Noop(p) => &mut p.output,
Bootstrapper::SplitBlock(p) => &mut p.output,
Bootstrapper::SplitTx(p) => &mut p.output,
Bootstrapper::IntoJson(p) => &mut p.output,
Bootstrapper::LegacyV1(p) => &mut p.output,
Bootstrapper::ParseCbor(p) => &mut p.output,
Expand All @@ -58,6 +62,7 @@ impl Bootstrapper {
match self {
Bootstrapper::Noop(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::SplitBlock(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::SplitTx(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::IntoJson(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::LegacyV1(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::ParseCbor(x) => gasket::runtime::spawn_stage(x, policy),
Expand All @@ -74,6 +79,7 @@ impl Bootstrapper {
pub enum Config {
Noop(noop::Config),
SplitBlock(split_block::Config),
SplitTx(split_tx::Config),
IntoJson(into_json::Config),
LegacyV1(legacy_v1::Config),
ParseCbor(parse_cbor::Config),
Expand All @@ -88,6 +94,7 @@ impl Config {
match self {
Config::Noop(c) => Ok(Bootstrapper::Noop(c.bootstrapper(ctx)?)),
Config::SplitBlock(c) => Ok(Bootstrapper::SplitBlock(c.bootstrapper(ctx)?)),
Config::SplitTx(c) => Ok(Bootstrapper::SplitTx(c.bootstrapper(ctx)?)),
Config::IntoJson(c) => Ok(Bootstrapper::IntoJson(c.bootstrapper(ctx)?)),
Config::LegacyV1(c) => Ok(Bootstrapper::LegacyV1(c.bootstrapper(ctx)?)),
Config::ParseCbor(c) => Ok(Bootstrapper::ParseCbor(c.bootstrapper(ctx)?)),
Expand Down
2 changes: 1 addition & 1 deletion src/filters/parse_cbor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl From<&Stage> for Worker {

gasket::impl_mapper!(|_worker: Worker, stage: Stage, unit: ChainEvent| => {
let output = unit.clone().try_map_record(|r| match r {
Record::CborTx(cbor) => {
Record::CborTx(_, cbor) => {
let tx = trv::MultiEraTx::decode(&cbor).or_panic()?;
let tx = stage.mapper.map_tx(&tx);
Ok(Record::ParsedTx(tx))
Expand Down
8 changes: 4 additions & 4 deletions src/filters/split_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@ use gasket::framework::*;
use serde::Deserialize;
use std::borrow::Cow;

use pallas::crypto::hash::Hash;
use pallas::ledger::traverse as trv;

use crate::framework::*;

type CborBlock<'a> = Cow<'a, [u8]>;
type CborTx<'a> = Cow<'a, [u8]>;

fn map_block_to_tx(cbor: CborBlock) -> Result<Vec<CborTx>, WorkerError> {
fn map_block_to_tx(cbor: CborBlock) -> Result<Vec<(Hash<32>, CborTx)>, WorkerError> {
let block = trv::MultiEraBlock::decode(cbor.as_ref()).or_panic()?;

let txs: Vec<_> = block
.txs()
.iter()
.map(|tx| tx.encode())
.map(Cow::Owned)
.map(|tx| (tx.hash(), Cow::Owned(tx.encode())))
.collect();

Ok(txs)
Expand Down Expand Up @@ -48,7 +48,7 @@ gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => {
Record::CborBlock(cbor) => {
let out = map_block_to_tx(Cow::Borrowed(&cbor))?
.into_iter()
.map(|tx| Record::CborTx(tx.into()))
.map(|(hash, cbor)| Record::CborTx(hash, cbor.into()))
.collect();

Ok(out)
Expand Down
76 changes: 76 additions & 0 deletions src/filters/split_tx.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//! A noop filter used as example and placeholder for other filters

use gasket::framework::*;
use serde::Deserialize;
use std::borrow::Cow;

use pallas::ledger::traverse as trv;

use crate::framework::*;

type CborTx<'a> = Cow<'a, [u8]>;
type CborUtxo<'a> = Cow<'a, [u8]>;

fn map_tx_to_utxo(cbor: CborTx) -> Result<Vec<(TxoRef, Option<CborUtxo>, Spent)>, WorkerError> {
let tx = trv::MultiEraTx::decode(cbor.as_ref()).or_panic()?;

let utxos: Vec<_> = tx
.produces()
.iter()
.map(|(idx, utxo)| {
(
(tx.hash(), *idx as u32),
Some(Cow::Owned(utxo.encode())),
false,
)
})
.collect();

Ok(utxos)
}

#[derive(Default, Stage)]
#[stage(name = "filter-split-tx", unit = "ChainEvent", worker = "Worker")]
pub struct Stage {
pub input: FilterInputPort,
pub output: FilterOutputPort,

#[metric]
ops_count: gasket::metrics::Counter,
}

#[derive(Default)]
pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Self
}
}

gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => {
let output = unit.clone().try_map_record_to_many(|r| match r {
Record::CborTx(_, cbor) => {
let out = map_tx_to_utxo(Cow::Borrowed(&cbor))?
.into_iter()
.map(|(txo, cbor, spent)| Record::CborUtxo(txo, cbor.map(|cbor| cbor.into()), spent))
.collect();

Ok(out)
}
x => Ok(vec![x]),
})?;

stage.ops_count.inc(1);

output
});

#[derive(Default, Deserialize)]
pub struct Config {}

impl Config {
pub fn bootstrapper(self, _ctx: &Context) -> Result<Stage, Error> {
Ok(Stage::default())
}
}
3 changes: 2 additions & 1 deletion src/filters/wasm_plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ impl Stage {
fn map_record(&mut self, r: Record) -> Result<Vec<Record>, Error> {
let extism::convert::Json::<serde_json::Value>(output) = match r {
Record::CborBlock(x) => self.plugin.call("map_cbor_block", x).unwrap(),
Record::CborTx(x) => self.plugin.call("map_cbor_tx", x).unwrap(),
Record::CborTx(_, x) => self.plugin.call("map_cbor_tx", x).unwrap(),
Record::CborUtxo(_, x, spent) => self.plugin.call("map_cbor_utxo", x).unwrap(),
Record::ParsedTx(x) => self
.plugin
.call("map_u5c_tx", extism::convert::Json(x))
Expand Down
27 changes: 22 additions & 5 deletions src/framework/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Internal pipeline framework

use pallas::crypto::hash::Hash;
use pallas::network::miniprotocols::Point;
use serde::Deserialize;
use serde_json::{json, Value as JsonValue};
Expand Down Expand Up @@ -101,21 +102,37 @@ pub struct Context {
pub breadcrumbs: Breadcrumbs,
}

pub type Cbor = Vec<u8>;
pub type TxRef = Hash<32>;
pub type TxoIdx = u32;
pub type TxoRef = (TxRef, TxoIdx);
pub type Spent = bool;

#[derive(Debug, Clone)]
pub enum Record {
CborBlock(Vec<u8>),
CborTx(Vec<u8>),
CborBlock(Cbor),
CborTx(TxRef, Cbor),
CborUtxo(TxoRef, Option<Cbor>, Spent),
ParsedBlock(ParsedBlock),
ParsedTx(ParsedTx),
GenericJson(JsonValue),
OuraV1Event(legacy_v1::Event),
ParsedTx(ParsedTx),
ParsedBlock(ParsedBlock),
}

impl From<Record> for JsonValue {
fn from(value: Record) -> Self {
match value {
Record::CborBlock(x) => json!({ "hex": hex::encode(x) }),
Record::CborTx(x) => json!({ "hex": hex::encode(x) }),
Record::CborTx(hash, cbor) => {
json!({ "hash": hash.to_string(), "hex": hex::encode(cbor) })
}
Record::CborUtxo((hash, idx), cbor, spent) => {
json!({
"txo": format!("{hash}#{idx}"),
"hex": cbor.map(|x| hex::encode(x)),
"spent": spent,
})
}
Record::ParsedBlock(x) => json!(x),
Record::ParsedTx(x) => json!(x),
Record::OuraV1Event(x) => json!(x),
Expand Down