Skip to content

Commit 6e8c391

Browse files
committed
clnrest: add support for dynamic paths
Changelog-Added: clnrest: add support for dynamic paths
1 parent 217e94c commit 6e8c391

File tree

6 files changed

+637
-126
lines changed

6 files changed

+637
-126
lines changed

plugins/rest-plugin/src/handlers.rs

Lines changed: 139 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use std::{collections::HashMap, process};
1+
use std::{collections::hash_map::Entry, process};
22

33
use anyhow::anyhow;
44
use axum::{
5-
body::{to_bytes, Body},
5+
body::Body,
66
extract::{Extension, Json, Path, State},
77
http::{Request, StatusCode},
88
middleware::Next,
@@ -17,8 +17,11 @@ use serde_json::json;
1717
use socketioxide::extract::{Data, SocketRef};
1818

1919
use crate::{
20-
shared::{call_rpc, filter_json, verify_rune},
21-
PluginState, SWAGGER_FALLBACK,
20+
shared::{
21+
call_rpc, filter_json, generate_response, get_clnrest_manifests, get_content_type,
22+
get_plugin_methods, handle_custom_paths, merge_params, parse_request_body, verify_rune,
23+
},
24+
ClnrestMap, PluginState, SWAGGER_FALLBACK,
2225
};
2326

2427
#[derive(Debug)]
@@ -55,7 +58,13 @@ impl IntoResponse for AppError {
5558
pub async fn list_methods(
5659
Extension(plugin): Extension<Plugin<PluginState>>,
5760
) -> Result<Html<String>, AppError> {
58-
match call_rpc(plugin, "help", json!(HelpRequest { command: None })).await {
61+
match call_rpc(
62+
&plugin.configuration().rpc_file,
63+
"help",
64+
Some(json!(HelpRequest { command: None })),
65+
)
66+
.await
67+
{
5968
Ok(help_response) => {
6069
let html_content = process_help_response(help_response);
6170
Ok(Html(html_content))
@@ -76,7 +85,14 @@ fn process_help_response(help_response: serde_json::Value) -> String {
7685
let mut processed_html_res = String::new();
7786

7887
for row in processed_res.help {
79-
processed_html_res.push_str(&format!("Command: {}\n", row.command));
88+
processed_html_res.push_str(&format!("Command: {}<br>", row.command));
89+
if let Some(clnrest) = row.clnrest {
90+
processed_html_res.push_str(&format!("Clnrest path:: {}\n", clnrest.path));
91+
processed_html_res.push_str(&format!("Clnrest method: {}\n", clnrest.method));
92+
processed_html_res
93+
.push_str(&format!("Clnrest content-type: {}\n", clnrest.content_type));
94+
processed_html_res.push_str(&format!("Clnrest rune: {}\n", clnrest.rune));
95+
}
8096
processed_html_res.push_str(line);
8197
}
8298

@@ -86,9 +102,9 @@ fn process_help_response(help_response: serde_json::Value) -> String {
86102
/* Handler for calling RPC methods */
87103
#[utoipa::path(
88104
post,
89-
path = "/v1/{rpc_method}",
105+
path = "/v1/{rpc_method_or_path}",
90106
responses(
91-
(status = 201, description = "Call rpc method", body = serde_json::Value),
107+
(status = 201, description = "Call rpc method by name or custom path", body = serde_json::Value),
92108
(status = 401, description = "Unauthorized", body = serde_json::Value),
93109
(status = 403, description = "Forbidden", body = serde_json::Value),
94110
(status = 404, description = "Not Found", body = serde_json::Value),
@@ -98,67 +114,93 @@ fn process_help_response(help_response: serde_json::Value) -> String {
98114
example = json!({}) ),
99115
security(("api_key" = []))
100116
)]
101-
pub async fn call_rpc_method(
102-
Path(rpc_method): Path<String>,
117+
pub async fn post_rpc_method(
118+
Path(path): Path<String>,
103119
headers: axum::http::HeaderMap,
104120
Extension(plugin): Extension<Plugin<PluginState>>,
105121
body: Request<Body>,
106122
) -> Result<Response, AppError> {
107-
let rune = headers
108-
.get("rune")
109-
.and_then(|v| v.to_str().ok())
110-
.map(String::from);
123+
let (rpc_method, path_params, rest_map) = handle_custom_paths(&plugin, &path, "POST").await?;
111124

112-
let bytes = match to_bytes(body.into_body(), usize::MAX).await {
113-
Ok(o) => o,
114-
Err(e) => {
115-
return Err(AppError::InternalServerError(RpcError {
116-
code: None,
117-
data: None,
118-
message: format!("Could not read request body: {}", e),
119-
}))
120-
}
121-
};
122-
123-
let mut rpc_params = match serde_json::from_slice(&bytes) {
124-
Ok(o) => o,
125-
Err(e1) => {
126-
/* it's not json but a form instead */
127-
let form_str = String::from_utf8(bytes.to_vec()).unwrap();
128-
let mut form_data = HashMap::new();
129-
for pair in form_str.split('&') {
130-
let mut kv = pair.split('=');
131-
if let (Some(key), Some(value)) = (kv.next(), kv.next()) {
132-
form_data.insert(key.to_string(), value.to_string());
133-
}
134-
}
135-
match serde_json::to_value(form_data) {
136-
Ok(o) => o,
137-
Err(e2) => {
138-
return Err(AppError::InternalServerError(RpcError {
139-
code: None,
140-
data: None,
141-
message: format!(
142-
"Could not parse json from form data: {}\
143-
Original serde_json error: {}",
144-
e2, e1
145-
),
146-
}))
125+
let mut rpc_params = parse_request_body(body).await?;
126+
127+
filter_json(&mut rpc_params);
128+
129+
merge_params(&mut rpc_params, path_params)?;
130+
131+
if rest_map.as_ref().map_or_else(|| true, |map| map.rune) {
132+
let rune = headers
133+
.get("rune")
134+
.and_then(|v| v.to_str().ok())
135+
.map(String::from);
136+
verify_rune(
137+
&plugin.configuration().rpc_file,
138+
rune,
139+
&rpc_method,
140+
Some(rpc_params.clone()),
141+
)
142+
.await?;
143+
}
144+
145+
let content_type = get_content_type(rest_map)?;
146+
147+
match call_rpc(
148+
&plugin.configuration().rpc_file,
149+
&rpc_method,
150+
Some(rpc_params),
151+
)
152+
.await
153+
{
154+
Ok(result) => Ok(generate_response(result, content_type)),
155+
Err(err) => {
156+
if let Some(code) = err.code {
157+
if code == -32601 {
158+
return Err(AppError::NotFound(err));
147159
}
148160
}
161+
Err(AppError::InternalServerError(err))
149162
}
150-
};
163+
}
164+
}
151165

152-
filter_json(&mut rpc_params);
166+
// Handler for calling RPC methods
167+
#[utoipa::path(
168+
get,
169+
path = "/v1/{rpc_method_or_path}",
170+
responses(
171+
(status = 201, description = "Call rpc method by name or custom path", body = serde_json::Value),
172+
(status = 401, description = "Unauthorized", body = serde_json::Value),
173+
(status = 403, description = "Forbidden", body = serde_json::Value),
174+
(status = 404, description = "Not Found", body = serde_json::Value),
175+
(status = 500, description = "Server Error", body = serde_json::Value)
176+
),
177+
security(("api_key" = []))
178+
)]
179+
pub async fn get_rpc_method(
180+
Path(path): Path<String>,
181+
headers: axum::http::HeaderMap,
182+
Extension(plugin): Extension<Plugin<PluginState>>,
183+
) -> Result<Response, AppError> {
184+
let (rpc_method, path_params, rest_map) = handle_custom_paths(&plugin, &path, "GET").await?;
153185

154-
verify_rune(plugin.clone(), rune, &rpc_method, &rpc_params).await?;
186+
if rest_map.as_ref().map_or_else(|| true, |map| map.rune) {
187+
let rune = headers
188+
.get("rune")
189+
.and_then(|v| v.to_str().ok())
190+
.map(String::from);
191+
verify_rune(
192+
&plugin.configuration().rpc_file,
193+
rune,
194+
&rpc_method,
195+
path_params.clone(),
196+
)
197+
.await?;
198+
}
155199

156-
match call_rpc(plugin, &rpc_method, rpc_params).await {
157-
Ok(result) => {
158-
let response_body = Json(result);
159-
let response = (StatusCode::CREATED, response_body).into_response();
160-
Ok(response)
161-
}
200+
let content_type = get_content_type(rest_map)?;
201+
202+
match call_rpc(&plugin.configuration().rpc_file, &rpc_method, path_params).await {
203+
Ok(result) => Ok(generate_response(result, content_type)),
162204
Err(err) => {
163205
if let Some(code) = err.code {
164206
if code == -32601 {
@@ -178,11 +220,42 @@ pub async fn handle_notification(
178220
plugin: Plugin<PluginState>,
179221
value: serde_json::Value,
180222
) -> Result<(), anyhow::Error> {
223+
log::debug!("notification: {}", value.to_string());
181224
if let Some(sht) = value.get("shutdown") {
182225
log::info!("Got shutdown notification: {}", sht);
183226
/* This seems to error when subscribing to "*" notifications */
184227
_ = plugin.shutdown();
185228
process::exit(0);
229+
} else if let Some(p_started) = value.get("plugin_started") {
230+
let rpc_methods = get_plugin_methods(p_started);
231+
232+
let manifests = get_clnrest_manifests(&plugin.configuration().rpc_file).await?;
233+
let mut rest_paths = plugin.state().rest_paths.lock().unwrap();
234+
for rpc_method in rpc_methods.into_iter() {
235+
let clnrest_data = match manifests.get(&rpc_method) {
236+
Some(c) => c.clone(),
237+
None => continue,
238+
};
239+
if let Entry::Vacant(entry) = rest_paths.entry(clnrest_data.path.clone()) {
240+
log::info!(
241+
"Registered custom path `{}` for `{}` via `{}`",
242+
clnrest_data.path,
243+
rpc_method,
244+
clnrest_data.method
245+
);
246+
entry.insert(ClnrestMap {
247+
content_type: clnrest_data.content_type,
248+
http_method: clnrest_data.method,
249+
rpc_method,
250+
rune: clnrest_data.rune,
251+
});
252+
}
253+
}
254+
} else if let Some(p_stopped) = value.get("plugin_stopped") {
255+
let rpc_methods = get_plugin_methods(p_stopped);
256+
257+
let mut rest_paths = plugin.state().rest_paths.lock().unwrap();
258+
rest_paths.retain(|_, v| !rpc_methods.contains(&v.rpc_method))
186259
}
187260
match plugin.state().notification_sender.send(value).await {
188261
Ok(()) => Ok(()),
@@ -213,7 +286,14 @@ pub async fn header_inspection_middleware(
213286
.map(String::from);
214287

215288
if upgrade.is_some() {
216-
match verify_rune(plugin, rune, "listclnrest-notifications", &json!({})).await {
289+
match verify_rune(
290+
&plugin.configuration().rpc_file,
291+
rune,
292+
"listclnrest-notifications",
293+
None,
294+
)
295+
.await
296+
{
217297
Ok(()) => Ok(next.run(req).await),
218298
Err(e) => Err(e),
219299
}

plugins/rest-plugin/src/main.rs

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,26 @@
1-
use std::{net::SocketAddr, str::FromStr, time::Duration};
1+
use std::{
2+
collections::{hash_map::Entry, HashMap},
3+
net::SocketAddr,
4+
str::FromStr,
5+
sync::{Arc, Mutex},
6+
time::Duration,
7+
};
28

39
use axum::{
410
http::{HeaderName, HeaderValue},
511
middleware,
6-
routing::{get, post},
12+
routing::get,
713
Extension, Router,
814
};
915
use axum_server::tls_rustls::RustlsConfig;
1016
use certs::{do_certificates_exist, generate_certificates};
1117
use cln_plugin::{Builder, Plugin};
1218
use handlers::{
13-
call_rpc_method, handle_notification, header_inspection_middleware, list_methods,
14-
socketio_on_connect,
19+
get_rpc_method, handle_notification, header_inspection_middleware, list_methods,
20+
post_rpc_method, socketio_on_connect,
1521
};
1622
use options::*;
23+
use shared::get_clnrest_manifests;
1724
use socketioxide::SocketIo;
1825
use tokio::{
1926
sync::mpsc::{self, Receiver, Sender},
@@ -38,13 +45,23 @@ mod shared;
3845
#[derive(Clone, Debug)]
3946
struct PluginState {
4047
notification_sender: Sender<serde_json::Value>,
48+
rest_paths: Arc<Mutex<HashMap<String, ClnrestMap>>>,
49+
}
50+
51+
#[derive(Debug, Clone)]
52+
pub struct ClnrestMap {
53+
pub content_type: String,
54+
pub http_method: String,
55+
pub rpc_method: String,
56+
pub rune: bool,
4157
}
4258

4359
#[derive(OpenApi)]
4460
#[openapi(
4561
paths(
4662
handlers::list_methods,
47-
handlers::call_rpc_method,
63+
handlers::post_rpc_method,
64+
handlers::get_rpc_method
4865
),
4966
modifiers(&SecurityAddon),
5067
)]
@@ -95,8 +112,14 @@ async fn main() -> Result<(), anyhow::Error> {
95112

96113
let (notify_tx, notify_rx) = mpsc::channel(100);
97114

115+
let rest_paths = match rest_manifests_init(&plugin.configuration().rpc_file).await {
116+
Ok(rest) => rest,
117+
Err(e) => return plugin.disable(&e.to_string()).await,
118+
};
119+
98120
let state = PluginState {
99121
notification_sender: notify_tx,
122+
rest_paths: Arc::new(Mutex::new(rest_paths)),
100123
};
101124

102125
let plugin = plugin.start(state.clone()).await?;
@@ -150,7 +173,7 @@ async fn run_rest_server(
150173
"/v1",
151174
Router::new()
152175
.route("/list-methods", get(list_methods))
153-
.route("/{rpc_method}", post(call_rpc_method))
176+
.route("/{*route}", get(get_rpc_method).post(post_rpc_method))
154177
.layer(clnrest_options.cors)
155178
.layer(Extension(plugin.clone()))
156179
.layer(
@@ -225,3 +248,27 @@ fn log_error(error: String) {
225248
"params": {"level":"warn", "message":error}})
226249
);
227250
}
251+
async fn rest_manifests_init(
252+
rpc_file: &String,
253+
) -> Result<HashMap<String, ClnrestMap>, anyhow::Error> {
254+
let manifests = get_clnrest_manifests(rpc_file).await?;
255+
let mut rest_paths: HashMap<String, ClnrestMap> = HashMap::new();
256+
for (rpc_method, clnrest_data) in manifests.into_iter() {
257+
if let Entry::Vacant(entry) = rest_paths.entry(clnrest_data.path.clone()) {
258+
log::info!(
259+
"Registered custom path `{}` for `{}` via `{}`",
260+
clnrest_data.path,
261+
rpc_method,
262+
clnrest_data.method
263+
);
264+
entry.insert(ClnrestMap {
265+
content_type: clnrest_data.content_type,
266+
http_method: clnrest_data.method,
267+
rpc_method,
268+
rune: clnrest_data.rune,
269+
});
270+
}
271+
}
272+
273+
Ok(rest_paths)
274+
}

0 commit comments

Comments
 (0)