Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE: POC Feat/time event validation on ADM #568

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6,951 changes: 5,907 additions & 1,044 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
resolver = "2"
members = [
"anchor-hoku",
"anchor-remote",
"anchor-service",
"api",
Expand Down Expand Up @@ -59,6 +60,7 @@ bs58 = "0.4"
bytecheck = "0.6.7"
bytes = "1.1"
bytesize = "1.1"
ceramic-anchor-hoku = { path = "./anchor-hoku" }
ceramic-anchor-service = { path = "./anchor-service" }
ceramic-anchor-remote = { path = "./anchor-remote" }
ceramic-api = { path = "./api" }
Expand Down Expand Up @@ -96,7 +98,7 @@ datafusion-flight-sql-server = { git = "https://github.com/datafusion-contrib/da
datafusion-flight-sql-table-provider = { git = "https://github.com/datafusion-contrib/datafusion-federation.git", branch = "main" }
deadqueue = "0.2.3"
derivative = "2.2"
derive_more = "0.99.17"
derive_more = "1.0.0"
dirs-next = "2"
ed25519-dalek = "2.1"
expect-test = "1.4.1"
Expand Down Expand Up @@ -206,15 +208,15 @@ tikv-jemallocator = { version = "0.5", features = [
tikv-jemalloc-ctl = "0.5"
time = "0.3.9"
tmpdir = "1.0.0"
tokio = { version = "1", default-features = false, features = [
tokio = { version = "1.40.0", default-features = false, features = [
"rt",
"macros",
"sync",
] }
tokio-context = "0.1.3"
tokio-stream = "0.1.11"
tokio-test = "0.4.2"
tokio-util = { version = "0.7.10", features = ["compat", "rt"] }
tokio-util = { version = "0.7.12", features = ["compat", "rt"] }
toml = "0.5.9"
tonic = { version = "0.12", features = ["tls"] }
tower = "0.4"
Expand All @@ -237,7 +239,8 @@ wasm-timer = "0.2.5"
which = "4.3.0"
xtaskops = "0.3"
zeroize = "1.4"

hoku_sdk = {path = "../../../hoku/rust-hoku/sdk"}
hoku_provider = {path = "../../../hoku/rust-hoku/provider"}

[workspace.package]
version = "0.37.0"
Expand Down
47 changes: 47 additions & 0 deletions anchor-hoku/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
[package]
name = "ceramic-anchor-hoku"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
repository.workspace = true
publish = false

[dependencies]
anyhow.workspace = true
async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
ceramic-anchor-service.workspace = true
ceramic-car.workspace = true
ceramic-core.workspace = true
ceramic-event.workspace = true
ceramic-sql.workspace = true
chrono.workspace = true
expect-test.workspace = true
#fendermint_crypto = { git = "https://github.com/hokunet/ipc.git", branch = "develop" }
#fendermint_vm_message = { git = "https://github.com/hokunet/ipc.git", branch = "develop" }
#fendermint_actor_accumulator = { git = "https://github.com/hokunet/ipc.git", branch = "develop" }
fvm_shared = "4.1.0"
hex.workspace = true
#hoku_provider = { git = "https://github.com/hokunet/rust-hoku.git", branch = "main" }
#hoku_sdk = { git = "https://github.com/hokunet/rust-hoku.git", branch = "main" }
#hoku_signer = { git = "https://github.com/hokunet/rust-hoku.git", branch = "main" }
hoku_sdk.workspace = true
hoku_provider.workspace = true
hoku_signer = { path = "../../../../hoku/rust-hoku/signer" }
multihash-codetable.workspace = true
reqwest.workspace = true
ring.workspace = true
serde.workspace = true
serde_ipld_dagcbor.workspace = true
serde_json.workspace = true
tendermint-rpc = { version = "0.39.1", features = [
"http-client",
] }
tokio.workspace = true
tracing.workspace = true
uuid.workspace = true

[features]
test-network = []
220 changes: 220 additions & 0 deletions anchor-hoku/src/hoku_rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
use std::time::Duration;

use anyhow::{anyhow, Result};
use async_trait::async_trait;
use base64::{engine::general_purpose::STANDARD_NO_PAD as b64_standard, Engine as _};
use bytes::Bytes;
use fvm_shared::address::{set_current_network, Address, Network};
use hoku_provider::json_rpc::JsonRpcProvider;
use hoku_provider::tx::TxReceipt;
use hoku_sdk::machine::accumulator::{Accumulator, PushReturn};
use hoku_sdk::machine::Machine;
use hoku_signer::key::SecretKey;
use hoku_signer::{key::parse_secret_key, AccountKind, SubnetID, Wallet};

use ceramic_anchor_service::{
DetachedTimeEvent, MerkleNode, MerkleNodes, RootTimeEvent, TransactionManager,
};
use ceramic_core::{Cid, SerializeExt};
use ceramic_event::unvalidated::Proof;

/// Hoku RPC
pub struct HokuRpc {
secret_key: SecretKey,
time_hub: Accumulator,
provider: JsonRpcProvider,
subnet_id: SubnetID,
poll_interval: Duration,
poll_retry_count: u64,
}

#[async_trait]
impl TransactionManager for HokuRpc {
async fn anchor_root(&self, root_cid: Cid) -> Result<RootTimeEvent> {
let tx_receipt = self.create_anchor_request(root_cid).await?;
let proof = Proof::new(
"hoku:mainnet".to_string(),
root_cid,
root_cid,
format!(
"hoku({}:{})",
self.time_hub.address(),
tx_receipt.data.unwrap().index
),
);
let proof_cid = proof.to_cid()?;
println!("Proof CID: {:?}", proof_cid);
return Ok(RootTimeEvent {
proof,
detached_time_event: DetachedTimeEvent {
path: "".to_string(),
proof: proof_cid,
},
remote_merkle_nodes: Default::default(),
});
}
}

impl HokuRpc {
/// Create a new Hoku RPC instance
pub async fn new(
private_key: &str,
hoku_rpc_url: &str,
hoku_timehub_address: &str,
hoku_subnet: &str,
anchor_poll_interval: Duration,
anchor_poll_retry_count: u64,
) -> Result<Self> {
// let secret_key = parse_ed25519_private_key(private_key)?;
let secret_key =
parse_secret_key("1c323d494d1d069fe4c891350a1ec691c4216c17418a0cb3c7533b143bd2b812")?;
let provider = JsonRpcProvider::new_http(hoku_rpc_url.parse()?, None, None)?;
let subnet_id = hoku_subnet.parse()?;
println!("Hoku timehub address: {:?}", hoku_timehub_address);
set_current_network(Network::Testnet);
Ok(Self {
secret_key,
time_hub: Accumulator::attach(dbg!(hoku_timehub_address.parse())?).await?,
provider,
subnet_id,
poll_interval: anchor_poll_interval,
poll_retry_count: anchor_poll_retry_count,
})
}

/// Create an anchor request on the remote CAS
pub async fn create_anchor_request(&self, root_cid: Cid) -> Result<TxReceipt<PushReturn>> {
let mut signer = Wallet::new_secp256k1(
self.secret_key.clone(),
AccountKind::Ethereum,
self.subnet_id.clone(),
)?;
signer.init_sequence(&self.provider).await?;

self.time_hub
.push(
&self.provider,
&mut signer,
Bytes::from(root_cid.to_bytes()),
Default::default(),
)
.await
}
}

fn parse_ed25519_private_key(key: &str) -> Result<String> {
// Remove header and footer
let key = key
.lines()
.filter(|line| !line.contains("BEGIN") && !line.contains("END"))
.collect::<String>();

// Decode base64
let decoded = b64_standard.decode(key.as_bytes())?;

// The secret key starts at index 47 and is 32 bytes long
if decoded.len() < 79 {
return Err(anyhow!("Invalid key length"));
}

// Return the hex-encoded secret key
Ok(hex::encode(&decoded[47..79]))
}

// Tests to call the CAS request
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use expect_test::expect_file;
use multihash_codetable::{Code, MultihashDigest};
use ring::signature::Ed25519KeyPair;

use ceramic_anchor_service::{
AnchorService, MockAnchorEventService, Store, TransactionManager,
};
use ceramic_core::Cid;
use ceramic_sql::sqlite::SqlitePool;

// #[tokio::test]
// #[ignore]
// async fn test_anchor_batch_with_cas() {
// let anchor_client = Arc::new(MockAnchorEventService::new(10));
// let anchor_requests = anchor_client
// .events_since_high_water_mark(NodeId::random().0, 0, 1_000_000)
// .await
// .unwrap();
// let (node_id, keypair) = node_id_and_private_key();
// let remote_cas = Arc::new(HokuRpc::new(
// node_id,
// keypair,
// "https://cas-dev.3boxlabs.com".to_owned(),
// Duration::from_secs(1),
// 1,
// ));
// let anchor_service = AnchorService::new(
// remote_cas,
// anchor_client,
// SqlitePool::connect_in_memory().await.unwrap(),
// NodeId::random().0,
// Duration::from_secs(1),
// 10,
// );
// let all_blocks = anchor_service
// .anchor_batch(anchor_requests.as_slice())
// .await
// .unwrap();
// expect_file!["./test-data/test_anchor_batch_with_cas.test.txt"]
// .assert_debug_eq(&all_blocks);
// }
//
// #[tokio::test]
// #[ignore]
// async fn test_create_anchor_request_with_cas() {
// let mock_root_cid =
// Cid::from_str("bafyreia776z4jdg5zgycivcpr3q6lcu6llfowkrljkmq3bex2k5hkzat54").unwrap();
// let (node_id, keypair) = node_id_and_private_key();
//
// let remote_cas = HokuRpc::new(
// node_id,
// keypair,
// "https://cas-dev.3boxlabs.com".to_owned(),
// Duration::from_secs(1),
// 1,
// );
// let receipt = remote_cas.anchor_root(mock_root_cid).await;
// expect_file!["./test-data/create_anchor_request_on_cas.test.txt"].assert_debug_eq(&receipt);
// }
//
// #[tokio::test]
// async fn test_anchor_response() {
// let anchor_response = include_str!("test-data/anchor_response.json").to_string();
// let CasResponseParseResult::Anchored(receipt) =
// parse_anchor_response(anchor_response).await.unwrap()
// else {
// panic!("expected anchored receipt");
// };
// expect_file!["./test-data/anchor_response.test.txt"].assert_debug_eq(&receipt);
// }
//
// #[tokio::test]
// async fn test_jwt() {
// let mock_data = serde_ipld_dagcbor::to_vec(b"mock root").unwrap();
// let mock_hash = MultihashDigest::digest(&Code::Sha2_256, &mock_data);
// let (node_id, keypair) = node_id_and_private_key();
// let remote_cas = Arc::new(HokuRpc::new(
// node_id,
// keypair,
// "https://cas-dev.3boxlabs.com".to_owned(),
// Duration::from_secs(1),
// 1,
// ));
// remote_cas
// .auth_jwt(hex::encode(mock_hash.digest()))
// .await
// .unwrap();
// }
}
5 changes: 5 additions & 0 deletions anchor-hoku/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//! This crate provides a interface for interacting with a remote anchoring system (whether legacy CAS or a blockchain)
#![warn(missing_docs)]
mod hoku_rpc;

pub use hoku_rpc::HokuRpc;
8 changes: 4 additions & 4 deletions anchor-remote/src/cas_remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl RemoteCas {
pub fn new(
node_id: NodeId,
keypair: Ed25519KeyPair,
remote_anchor_service_url: String,
remote_anchor_service_url: &str,
anchor_poll_interval: Duration,
anchor_poll_retry_count: u64,
) -> Self {
Expand Down Expand Up @@ -270,7 +270,7 @@ mod tests {
let remote_cas = Arc::new(RemoteCas::new(
node_id,
keypair,
"https://cas-dev.3boxlabs.com".to_owned(),
"https://cas-dev.3boxlabs.com",
Duration::from_secs(1),
1,
));
Expand Down Expand Up @@ -300,7 +300,7 @@ mod tests {
let remote_cas = RemoteCas::new(
node_id,
keypair,
"https://cas-dev.3boxlabs.com".to_owned(),
"https://cas-dev.3boxlabs.com",
Duration::from_secs(1),
1,
);
Expand All @@ -327,7 +327,7 @@ mod tests {
let remote_cas = Arc::new(RemoteCas::new(
node_id,
keypair,
"https://cas-dev.3boxlabs.com".to_owned(),
"https://cas-dev.3boxlabs.com",
Duration::from_secs(1),
1,
));
Expand Down
9 changes: 9 additions & 0 deletions anchor-service/src/anchor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ impl TimeEventBatch {
time_event.prev(),
time_event.proof(),
))?;
println!(
"build_time_event_insertable: Time event CID: {:?}",
time_event_cid
);
let blocks_in_path: Vec<ProofEdge> = Self::find_tree_blocks_along_path(
time_event.path(),
&anchor_request.prev,
Expand Down Expand Up @@ -192,6 +196,11 @@ impl TimeEventBatch {
let mut blocks = Vec::new();
let mut current_node_cid = *root;
for part in path.split('/') {
// todo_{self-anchoring-project:??} : write a test which checks this code path
// todo_{self-anchoring-project:??} : time events created by self anchoring need to be validated before inserting in db
if part.is_empty() {
break;
}
let merkle_node = merkle_nodes
.get(&current_node_cid)
.ok_or_else(|| anyhow!("missing merkle node for CID: {}", current_node_cid))?;
Expand Down
Loading