Skip to content

Commit 065b89f

Browse files
authored
refactor: use macros to register the subscription APIs (#5782)
1 parent d569eb4 commit 065b89f

7 files changed

Lines changed: 134 additions & 171 deletions

File tree

Cargo.lock

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ integer-encoding = "4.0"
108108
ipld-core = { version = "0.4", features = ["serde", "arb"] }
109109
is-terminal = "0.4"
110110
itertools = "0.14"
111-
jsonrpsee = { version = "0.25", features = ["server", "ws-client", "http-client"] }
111+
jsonrpsee = { version = "0.25", features = ["server", "ws-client", "http-client", "macros"] }
112112
jsonwebtoken = "9"
113113
keccak-hash = "0.11"
114114
kubert-prometheus-process = "0.2"

src/rpc/methods/eth.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pub(crate) mod errors;
55
mod eth_tx;
66
pub mod filter;
77
pub mod pubsub;
8+
pub(crate) mod pubsub_trait;
89
mod trace;
910
pub mod types;
1011
mod utils;

src/rpc/methods/eth/pubsub.rs

Lines changed: 67 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -59,172 +59,88 @@
5959
//! ```
6060
//!
6161
62-
use std::fmt;
63-
62+
use crate::rpc::eth::pubsub_trait::{
63+
EthPubSubApiServer, LogFilter, SubscriptionKind, SubscriptionParams,
64+
};
65+
use crate::rpc::{RPCState, chain};
6466
use fvm_ipld_blockstore::Blockstore;
65-
use serde::de::{self, Deserializer, SeqAccess, Visitor};
66-
use serde::{Deserialize, Serialize};
67+
use jsonrpsee::PendingSubscriptionSink;
68+
use jsonrpsee::core::{SubscriptionError, SubscriptionResult};
69+
use std::sync::Arc;
6770
use tokio::sync::broadcast::{Receiver as Subscriber, error::RecvError};
6871

69-
use crate::rpc::Ctx;
70-
use crate::rpc::eth::types::EthAddressList;
71-
use crate::rpc::eth::{EthFilterSpec, EthTopicSpec};
72-
73-
pub const ETH_SUBSCRIPTION: &str = "eth_subscription";
74-
75-
const NEW_HEADS: &str = "newHeads";
76-
const PENDING_TRANSACTIONS: &str = "pendingTransactions";
77-
const LOGS: &str = "logs";
78-
79-
#[derive(Default, Serialize, Deserialize, Debug, Clone)]
80-
#[serde(rename_all = "camelCase")]
81-
pub struct LogFilter {
82-
pub address: EthAddressList,
83-
pub topics: Option<EthTopicSpec>,
72+
pub struct EthPubSub<DB> {
73+
ctx: Arc<RPCState<DB>>,
8474
}
8575

86-
#[derive(Debug)]
87-
enum Subscription {
88-
NewHeads,
89-
PendingTransactions,
90-
Logs(Option<LogFilter>),
76+
impl<DB> EthPubSub<DB> {
77+
pub fn new(ctx: Arc<RPCState<DB>>) -> Self {
78+
Self { ctx }
79+
}
9180
}
9281

93-
impl<'de> Deserialize<'de> for Subscription {
94-
fn deserialize<D>(deserializer: D) -> Result<Subscription, D::Error>
95-
where
96-
D: Deserializer<'de>,
97-
{
98-
struct SubscriptionVisitor;
99-
100-
impl<'de> Visitor<'de> for SubscriptionVisitor {
101-
type Value = Subscription;
102-
103-
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
104-
formatter.write_str(r#"a JSON array like ["logs", {...}] or ["newHeads"]"#)
82+
#[async_trait::async_trait]
83+
impl<DB> EthPubSubApiServer for EthPubSub<DB>
84+
where
85+
DB: Blockstore + Send + Sync + 'static,
86+
{
87+
async fn subscribe(
88+
&self,
89+
pending: PendingSubscriptionSink,
90+
kind: SubscriptionKind,
91+
params: Option<SubscriptionParams>,
92+
) -> SubscriptionResult {
93+
let sink = pending.accept().await?;
94+
let ctx = self.ctx.clone();
95+
96+
match kind {
97+
SubscriptionKind::NewHeads => self.handle_new_heads_subscription(sink, ctx).await,
98+
SubscriptionKind::PendingTransactions => {
99+
return Err(SubscriptionError::from(
100+
jsonrpsee::types::ErrorObjectOwned::owned(
101+
jsonrpsee::types::error::METHOD_NOT_FOUND_CODE,
102+
"pendingTransactions subscription not yet implemented",
103+
None::<()>,
104+
),
105+
));
105106
}
106-
107-
fn visit_seq<V>(self, mut seq: V) -> Result<Subscription, V::Error>
108-
where
109-
V: SeqAccess<'de>,
110-
{
111-
let event_type: String = seq
112-
.next_element()?
113-
.ok_or_else(|| de::Error::invalid_length(0, &self))?;
114-
115-
match event_type.as_str() {
116-
NEW_HEADS => {
117-
if seq.next_element::<serde::de::IgnoredAny>()?.is_some() {
118-
return Err(de::Error::custom("unsupported event type"));
119-
}
120-
Ok(Subscription::NewHeads)
121-
}
122-
PENDING_TRANSACTIONS => {
123-
if seq.next_element::<serde::de::IgnoredAny>()?.is_some() {
124-
return Err(de::Error::custom("unsupported event type"));
125-
}
126-
Ok(Subscription::PendingTransactions)
127-
}
128-
LOGS => Ok(Subscription::Logs(seq.next_element()?)),
129-
_ => Err(de::Error::unknown_variant(
130-
&event_type,
131-
&[NEW_HEADS, PENDING_TRANSACTIONS, LOGS],
132-
)),
133-
}
107+
SubscriptionKind::Logs => {
108+
let filter = params.and_then(|p| p.filter);
109+
self.handle_logs_subscription(sink, ctx, filter).await
134110
}
135111
}
136112

137-
deserializer.deserialize_seq(SubscriptionVisitor)
113+
Ok(())
138114
}
139115
}
140116

141-
pub async fn eth_subscribe<DB: Blockstore + Sync + Send + 'static>(
142-
params: jsonrpsee::types::Params<'static>,
143-
pending: jsonrpsee::core::server::PendingSubscriptionSink,
144-
ctx: Ctx<DB>,
145-
_ext: http::Extensions,
146-
) -> impl jsonrpsee::IntoSubscriptionCloseResponse {
147-
let subscription: Subscription = match params.parse() {
148-
Ok(sub) => sub,
149-
Err(e) => {
150-
pending
151-
.reject(jsonrpsee::types::ErrorObjectOwned::from(e))
152-
.await;
153-
// If the subscription has not been "accepted" then
154-
// the return value will be "ignored" as it's not
155-
// allowed to send out any further notifications on
156-
// on the subscription.
157-
return Ok(());
158-
}
159-
};
160-
161-
tracing::trace!("Subscribing to event: {:?}", subscription);
162-
163-
match subscription {
164-
Subscription::NewHeads => {
165-
// Mark the subscription is accepted after the params has been parsed successful.
166-
// This is actually responds the underlying RPC method call and may fail if the
167-
// connection is closed.
168-
let sink = match pending.accept().await {
169-
Ok(sink) => sink,
170-
Err(e) => {
171-
tracing::error!("Failed to accept subscription: {:?}", e);
172-
return Ok(());
173-
}
174-
};
175-
176-
// Spawn newHeads task
177-
let (new_heads, handle) = crate::rpc::new_heads(&ctx);
178-
179-
tokio::spawn(async move {
180-
tracing::trace!(
181-
"Subscription task started (id: {:?})",
182-
sink.subscription_id()
183-
);
184-
185-
handle_subscription(new_heads, sink, handle).await;
186-
});
187-
}
188-
Subscription::Logs(filter) => {
189-
// Mark the subscription is accepted after the params has been parsed successful.
190-
// This is actually responds the underlying RPC method call and may fail if the
191-
// connection is closed.
192-
let sink = match pending.accept().await {
193-
Ok(sink) => sink,
194-
Err(e) => {
195-
tracing::error!("Failed to accept subscription: {:?}", e);
196-
return Ok(());
197-
}
198-
};
199-
200-
let filter_spec: Option<EthFilterSpec> = filter.map(Into::into);
201-
202-
// Spawn logs task
203-
let (logs, handle) = crate::rpc::chain::logs(&ctx, filter_spec);
204-
205-
tokio::spawn(async move {
206-
tracing::trace!(
207-
"Logs subscription task started (id: {:?})",
208-
sink.subscription_id()
209-
);
210-
211-
handle_subscription(logs, sink, handle).await;
212-
});
213-
}
214-
Subscription::PendingTransactions => {
215-
// TODO(akaladarshi): https://github.com/ChainSafe/forest/pull/5782
216-
pending
217-
.reject(jsonrpsee::types::ErrorObjectOwned::owned(
218-
jsonrpsee::types::error::METHOD_NOT_FOUND_CODE,
219-
"pendingTransactions subscription not yet implemented",
220-
None::<()>,
221-
))
222-
.await;
223-
return Ok(());
224-
}
117+
impl<DB> EthPubSub<DB>
118+
where
119+
DB: Blockstore + Send + Sync + 'static,
120+
{
121+
async fn handle_new_heads_subscription(
122+
&self,
123+
accepted_sink: jsonrpsee::SubscriptionSink,
124+
ctx: Arc<RPCState<DB>>,
125+
) {
126+
let (subscriber, handle) = chain::new_heads(&ctx);
127+
tokio::spawn(async move {
128+
handle_subscription(subscriber, accepted_sink, handle).await;
129+
});
225130
}
226131

227-
Ok(())
132+
async fn handle_logs_subscription(
133+
&self,
134+
accepted_sink: jsonrpsee::SubscriptionSink,
135+
ctx: Arc<RPCState<DB>>,
136+
filter_spec: Option<LogFilter>,
137+
) {
138+
let filter_spec = filter_spec.map(Into::into);
139+
let (logs, handle) = chain::logs(&ctx, filter_spec);
140+
tokio::spawn(async move {
141+
handle_subscription(logs, accepted_sink, handle).await;
142+
});
143+
}
228144
}
229145

230146
async fn handle_subscription<T>(
@@ -266,5 +182,5 @@ async fn handle_subscription<T>(
266182
}
267183
handle.abort();
268184

269-
tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id());
185+
tracing::info!("Subscription task ended (id: {:?})", sink.subscription_id());
270186
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright 2019-2025 ChainSafe Systems
2+
// SPDX-License-Identifier: Apache-2.0, MIT
3+
4+
use crate::rpc::eth::types::{EthAddressList, EthTopicSpec};
5+
use jsonrpsee::proc_macros::rpc;
6+
use serde::{Deserialize, Serialize};
7+
8+
#[rpc(server, namespace = "eth")]
9+
pub trait EthPubSubApi {
10+
/// Subscribe to Ethereum events
11+
#[subscription(
12+
name = "subscribe" => "subscription",
13+
aliases = ["Filecoin.EthSubscribe"],
14+
unsubscribe = "unsubscribe",
15+
unsubscribe_aliases = ["Filecoin.EthUnsubscribe"],
16+
item = serde_json::Value
17+
)]
18+
async fn subscribe(
19+
&self,
20+
kind: SubscriptionKind,
21+
params: Option<SubscriptionParams>,
22+
) -> jsonrpsee::core::SubscriptionResult;
23+
}
24+
25+
#[derive(Debug, Clone, Serialize, Deserialize)]
26+
#[serde(rename_all = "camelCase")]
27+
pub enum SubscriptionKind {
28+
NewHeads,
29+
PendingTransactions,
30+
Logs,
31+
}
32+
33+
#[derive(Default, Serialize, Deserialize, Debug, Clone)]
34+
#[serde(rename_all = "camelCase")]
35+
pub struct LogFilter {
36+
pub address: EthAddressList,
37+
pub topics: Option<EthTopicSpec>,
38+
}
39+
40+
#[derive(Debug, Clone, Serialize, Deserialize)]
41+
pub struct SubscriptionParams {
42+
#[serde(flatten)]
43+
pub filter: Option<LogFilter>,
44+
}

src/rpc/methods/eth/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use super::*;
55
use crate::blocks::CachingBlockHeader;
6-
use crate::rpc::eth::pubsub::LogFilter;
6+
use crate::rpc::eth::pubsub_trait::LogFilter;
77
use anyhow::ensure;
88
use ipld_core::serde::SerdeError;
99
use jsonrpsee::core::traits::IdProvider;

0 commit comments

Comments
 (0)