Skip to content

Commit 99094cd

Browse files
committed
Add onion mailbox for async receivers
This introduces an in-memory mailbox to hold onion messages until the receiver comes online. This is required for async payment `held_htlc_available` messages. The mailbox is bounded by a maximum number of peers and a maximum number of messages per peer.
1 parent 65945f8 commit 99094cd

File tree

8 files changed

+198
-30
lines changed

8 files changed

+198
-30
lines changed

Cargo.toml

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,17 +52,17 @@ default = []
5252
#lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" }
5353
#lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" }
5454

55-
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "50391d3a3efa7a8f32d119d126a633e4b1981ee6", features = ["std"] }
56-
lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "50391d3a3efa7a8f32d119d126a633e4b1981ee6" }
57-
lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "50391d3a3efa7a8f32d119d126a633e4b1981ee6", features = ["std"] }
58-
lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "50391d3a3efa7a8f32d119d126a633e4b1981ee6" }
59-
lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "50391d3a3efa7a8f32d119d126a633e4b1981ee6" }
60-
lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "50391d3a3efa7a8f32d119d126a633e4b1981ee6" }
61-
lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "50391d3a3efa7a8f32d119d126a633e4b1981ee6" }
62-
lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "50391d3a3efa7a8f32d119d126a633e4b1981ee6", features = ["rest-client", "rpc-client", "tokio"] }
63-
lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "50391d3a3efa7a8f32d119d126a633e4b1981ee6", features = ["esplora-async-https", "electrum-rustls-ring", "time"] }
64-
lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "50391d3a3efa7a8f32d119d126a633e4b1981ee6" }
65-
lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "50391d3a3efa7a8f32d119d126a633e4b1981ee6" }
55+
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "341e8e436968dd37b82594a693dbf5c535662880", features = ["std"] }
56+
lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "341e8e436968dd37b82594a693dbf5c535662880" }
57+
lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "341e8e436968dd37b82594a693dbf5c535662880", features = ["std"] }
58+
lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "341e8e436968dd37b82594a693dbf5c535662880" }
59+
lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "341e8e436968dd37b82594a693dbf5c535662880" }
60+
lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "341e8e436968dd37b82594a693dbf5c535662880" }
61+
lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "341e8e436968dd37b82594a693dbf5c535662880" }
62+
lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "341e8e436968dd37b82594a693dbf5c535662880", features = ["rest-client", "rpc-client", "tokio"] }
63+
lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "341e8e436968dd37b82594a693dbf5c535662880", features = ["esplora-async-https", "electrum-rustls-ring", "time"] }
64+
lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "341e8e436968dd37b82594a693dbf5c535662880" }
65+
lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "341e8e436968dd37b82594a693dbf5c535662880" }
6666

6767
#lightning = { path = "../rust-lightning/lightning", features = ["std"] }
6868
#lightning-types = { path = "../rust-lightning/lightning-types" }
@@ -109,7 +109,7 @@ winapi = { version = "0.3", features = ["winbase"] }
109109
[dev-dependencies]
110110
#lightning = { version = "0.1.0", features = ["std", "_test_utils"] }
111111
#lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["std", "_test_utils"] }
112-
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "50391d3a3efa7a8f32d119d126a633e4b1981ee6", features = ["std", "_test_utils"] }
112+
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "341e8e436968dd37b82594a693dbf5c535662880", features = ["std", "_test_utils"] }
113113
#lightning = { path = "../rust-lightning/lightning", features = ["std", "_test_utils"] }
114114
proptest = "1.0.0"
115115
regex = "1.5.6"

src/builder.rs

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::liquidity::{
2727
};
2828
use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger};
2929
use crate::message_handler::NodeCustomMessageHandler;
30+
use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
3031
use crate::peer_store::PeerStore;
3132
use crate::runtime::Runtime;
3233
use crate::tx_broadcaster::TransactionBroadcaster;
@@ -1452,17 +1453,31 @@ fn build_with_store_internal(
14521453
}
14531454

14541455
// Initialize the PeerManager
1455-
let onion_messenger: Arc<OnionMessenger> = Arc::new(OnionMessenger::new(
1456-
Arc::clone(&keys_manager),
1457-
Arc::clone(&keys_manager),
1458-
Arc::clone(&logger),
1459-
Arc::clone(&channel_manager),
1460-
message_router,
1461-
Arc::clone(&channel_manager),
1462-
Arc::clone(&channel_manager),
1463-
IgnoringMessageHandler {},
1464-
IgnoringMessageHandler {},
1465-
));
1456+
let onion_messenger: Arc<OnionMessenger> = if config.async_payment_services_enabled {
1457+
Arc::new(OnionMessenger::new_with_offline_peer_interception(
1458+
Arc::clone(&keys_manager),
1459+
Arc::clone(&keys_manager),
1460+
Arc::clone(&logger),
1461+
Arc::clone(&channel_manager),
1462+
message_router,
1463+
Arc::clone(&channel_manager),
1464+
Arc::clone(&channel_manager),
1465+
IgnoringMessageHandler {},
1466+
IgnoringMessageHandler {},
1467+
))
1468+
} else {
1469+
Arc::new(OnionMessenger::new(
1470+
Arc::clone(&keys_manager),
1471+
Arc::clone(&keys_manager),
1472+
Arc::clone(&logger),
1473+
Arc::clone(&channel_manager),
1474+
message_router,
1475+
Arc::clone(&channel_manager),
1476+
Arc::clone(&channel_manager),
1477+
IgnoringMessageHandler {},
1478+
IgnoringMessageHandler {},
1479+
))
1480+
};
14661481
let ephemeral_bytes: [u8; 32] = keys_manager.get_secure_random_bytes();
14671482

14681483
// Initialize the GossipSource
@@ -1649,6 +1664,12 @@ fn build_with_store_internal(
16491664
},
16501665
};
16511666

1667+
let om_mailbox = if config.async_payment_services_enabled {
1668+
Some(Arc::new(OnionMessageMailbox::new()))
1669+
} else {
1670+
None
1671+
};
1672+
16521673
let (stop_sender, _) = tokio::sync::watch::channel(());
16531674
let (background_processor_stop_sender, _) = tokio::sync::watch::channel(());
16541675
let is_running = Arc::new(RwLock::new(false));
@@ -1681,6 +1702,7 @@ fn build_with_store_internal(
16811702
is_running,
16821703
is_listening,
16831704
node_metrics,
1705+
om_mailbox,
16841706
})
16851707
}
16861708

src/config.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,13 @@ pub struct Config {
181181
pub route_parameters: Option<RouteParametersConfig>,
182182
/// Whether to enable the static invoice service to support async payment reception for clients.
183183
pub async_payment_services_enabled: bool,
184+
/// If this is set to true, then if we as an often-offline payer receive a [`StaticInvoice`] to
185+
/// pay, we will attempt to hold the corresponding outbound HTLCs with our next-hop channel
186+
/// counterparty(s) that support the `htlc_hold` feature. This allows our node to go offline once
187+
/// the HTLCs are locked in even though the recipient may not yet be online to receive them.
188+
///
189+
/// [`StaticInvoice`]: lightning::offers::static_invoice::StaticInvoice
190+
pub hold_outbound_htlcs_at_next_hop: bool,
184191
}
185192

186193
impl Default for Config {
@@ -196,6 +203,7 @@ impl Default for Config {
196203
route_parameters: None,
197204
node_alias: None,
198205
async_payment_services_enabled: false,
206+
hold_outbound_htlcs_at_next_hop: false,
199207
}
200208
}
201209
}
@@ -330,6 +338,13 @@ pub(crate) fn default_user_config(config: &Config) -> UserConfig {
330338
user_config.channel_handshake_limits.force_announced_channel_preference = true;
331339
}
332340

341+
if config.async_payment_services_enabled {
342+
user_config.enable_htlc_hold = true;
343+
}
344+
if config.hold_outbound_htlcs_at_next_hop {
345+
user_config.hold_outbound_htlcs_at_next_hop = true;
346+
}
347+
333348
user_config
334349
}
335350

src/event.rs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use crate::types::{CustomTlvRecord, DynStore, PaymentStore, Sweeper, Wallet};
8+
use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
9+
use crate::types::{CustomTlvRecord, DynStore, OnionMessenger, PaymentStore, Sweeper, Wallet};
910
use crate::{
1011
hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore,
1112
UserChannelId,
@@ -459,6 +460,8 @@ where
459460
logger: L,
460461
config: Arc<Config>,
461462
static_invoice_store: Option<StaticInvoiceStore>,
463+
onion_messenger: Arc<OnionMessenger>,
464+
om_mailbox: Option<Arc<OnionMessageMailbox>>,
462465
}
463466

464467
impl<L: Deref + Clone + Sync + Send + 'static> EventHandler<L>
@@ -472,7 +475,8 @@ where
472475
output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
473476
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
474477
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
475-
static_invoice_store: Option<StaticInvoiceStore>, runtime: Arc<Runtime>, logger: L,
478+
static_invoice_store: Option<StaticInvoiceStore>, onion_messenger: Arc<OnionMessenger>,
479+
om_mailbox: Option<Arc<OnionMessageMailbox>>, runtime: Arc<Runtime>, logger: L,
476480
config: Arc<Config>,
477481
) -> Self {
478482
Self {
@@ -490,6 +494,8 @@ where
490494
runtime,
491495
config,
492496
static_invoice_store,
497+
onion_messenger,
498+
om_mailbox,
493499
}
494500
}
495501

@@ -1491,11 +1497,24 @@ where
14911497

14921498
self.bump_tx_event_handler.handle_event(&bte).await;
14931499
},
1494-
LdkEvent::OnionMessageIntercepted { .. } => {
1495-
debug_assert!(false, "We currently don't support onion message interception, so this event should never be emitted.");
1500+
LdkEvent::OnionMessageIntercepted { peer_node_id, message } => {
1501+
if let Some(om_mailbox) = self.om_mailbox.as_ref() {
1502+
om_mailbox.onion_message_intercepted(peer_node_id, message);
1503+
} else {
1504+
log_trace!(
1505+
self.logger,
1506+
"Onion message intercepted, but no onion message mailbox available"
1507+
);
1508+
}
14961509
},
1497-
LdkEvent::OnionMessagePeerConnected { .. } => {
1498-
debug_assert!(false, "We currently don't support onion message interception, so this event should never be emitted.");
1510+
LdkEvent::OnionMessagePeerConnected { peer_node_id } => {
1511+
if let Some(om_mailbox) = self.om_mailbox.as_ref() {
1512+
let messages = om_mailbox.onion_message_peer_connected(peer_node_id);
1513+
1514+
for message in messages {
1515+
let _ = self.onion_messenger.forward_onion_message(message, &peer_node_id);
1516+
}
1517+
}
14991518
},
15001519

15011520
LdkEvent::PersistStaticInvoice {

src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ use gossip::GossipSource;
136136
use graph::NetworkGraph;
137137
use io::utils::write_node_metrics;
138138
use liquidity::{LSPS1Liquidity, LiquiditySource};
139+
use payment::asynchronous::om_mailbox::OnionMessageMailbox;
139140
use payment::asynchronous::static_invoice_store::StaticInvoiceStore;
140141
use payment::{
141142
Bolt11Payment, Bolt12Payment, OnchainPayment, PaymentDetails, SpontaneousPayment,
@@ -205,6 +206,7 @@ pub struct Node {
205206
is_running: Arc<RwLock<bool>>,
206207
is_listening: Arc<AtomicBool>,
207208
node_metrics: Arc<RwLock<NodeMetrics>>,
209+
om_mailbox: Option<Arc<OnionMessageMailbox>>,
208210
}
209211

210212
impl Node {
@@ -517,6 +519,8 @@ impl Node {
517519
Arc::clone(&self.payment_store),
518520
Arc::clone(&self.peer_store),
519521
static_invoice_store,
522+
Arc::clone(&self.onion_messenger),
523+
self.om_mailbox.clone(),
520524
Arc::clone(&self.runtime),
521525
Arc::clone(&self.logger),
522526
Arc::clone(&self.config),

src/payment/asynchronous/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8+
pub(crate) mod om_mailbox;
89
mod rate_limiter;
910
pub(crate) mod static_invoice_store;
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
use std::collections::{HashMap, VecDeque};
2+
use std::sync::Mutex;
3+
4+
use bitcoin::secp256k1::PublicKey;
5+
use lightning::ln::msgs::OnionMessage;
6+
7+
pub(crate) struct OnionMessageMailbox {
8+
map: Mutex<HashMap<bitcoin::secp256k1::PublicKey, VecDeque<lightning::ln::msgs::OnionMessage>>>,
9+
}
10+
11+
impl OnionMessageMailbox {
12+
const MAX_MESSAGES_PER_PEER: usize = 30;
13+
const MAX_PEERS: usize = 300;
14+
15+
pub fn new() -> Self {
16+
Self { map: Mutex::new(HashMap::with_capacity(Self::MAX_PEERS)) }
17+
}
18+
19+
pub(crate) fn onion_message_intercepted(&self, peer_node_id: PublicKey, message: OnionMessage) {
20+
let mut map = self.map.lock().unwrap();
21+
22+
let queue = map.entry(peer_node_id).or_insert_with(VecDeque::new);
23+
if queue.len() >= Self::MAX_MESSAGES_PER_PEER {
24+
queue.pop_front();
25+
}
26+
queue.push_back(message);
27+
28+
// Enforce a peers limit. If exceeded, evict the peer with the longest queue.
29+
if map.len() > Self::MAX_PEERS {
30+
let peer_to_remove =
31+
map.iter().max_by_key(|(_, queue)| queue.len()).map(|(peer, _)| *peer).unwrap();
32+
33+
map.remove(&peer_to_remove);
34+
}
35+
}
36+
37+
pub(crate) fn onion_message_peer_connected(
38+
&self, peer_node_id: bitcoin::secp256k1::PublicKey,
39+
) -> Vec<OnionMessage> {
40+
let mut map = self.map.lock().unwrap();
41+
42+
if let Some(queue) = map.remove(&peer_node_id) {
43+
queue.into()
44+
} else {
45+
Vec::new()
46+
}
47+
}
48+
49+
#[cfg(test)]
50+
pub(crate) fn is_empty(&self) -> bool {
51+
let map = self.map.lock().unwrap();
52+
map.is_empty()
53+
}
54+
}
55+
56+
#[cfg(test)]
57+
mod tests {
58+
use bitcoin::key::Secp256k1;
59+
use bitcoin::secp256k1::{PublicKey, SecretKey};
60+
use lightning::onion_message;
61+
62+
use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
63+
64+
#[test]
65+
fn onion_message_mailbox() {
66+
let mailbox = OnionMessageMailbox::new();
67+
68+
let secp = Secp256k1::new();
69+
let sk_bytes = [12; 32];
70+
let sk = SecretKey::from_slice(&sk_bytes).unwrap();
71+
let peer_node_id = PublicKey::from_secret_key(&secp, &sk);
72+
73+
let blinding_sk = SecretKey::from_slice(&[13; 32]).unwrap();
74+
let blinding_point = PublicKey::from_secret_key(&secp, &blinding_sk);
75+
76+
let message_sk = SecretKey::from_slice(&[13; 32]).unwrap();
77+
let message_point = PublicKey::from_secret_key(&secp, &message_sk);
78+
79+
let message = lightning::ln::msgs::OnionMessage {
80+
blinding_point,
81+
onion_routing_packet: onion_message::packet::Packet {
82+
version: 0,
83+
public_key: message_point,
84+
hop_data: vec![1, 2, 3],
85+
hmac: [0; 32],
86+
},
87+
};
88+
mailbox.onion_message_intercepted(peer_node_id, message.clone());
89+
90+
let messages = mailbox.onion_message_peer_connected(peer_node_id);
91+
assert_eq!(messages.len(), 1);
92+
assert_eq!(messages[0], message);
93+
94+
assert!(mailbox.is_empty());
95+
96+
let messages = mailbox.onion_message_peer_connected(peer_node_id);
97+
assert_eq!(messages.len(), 0);
98+
}
99+
}

tests/integration_tests_rust.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1132,13 +1132,14 @@ fn simple_bolt12_send_receive() {
11321132
}
11331133

11341134
#[test]
1135-
fn static_invoice_server() {
1135+
fn async_payment() {
11361136
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();
11371137
let chain_source = TestChainSource::Esplora(&electrsd);
11381138

11391139
let mut config_sender = random_config(true);
11401140
config_sender.node_config.listening_addresses = None;
11411141
config_sender.node_config.node_alias = None;
1142+
config_sender.node_config.hold_outbound_htlcs_at_next_hop = true;
11421143
config_sender.log_writer =
11431144
TestLogWriter::Custom(Arc::new(MultiNodeLogger::new("sender ".to_string())));
11441145
let node_sender = setup_node(&chain_source, config_sender, None);
@@ -1241,9 +1242,16 @@ fn static_invoice_server() {
12411242
std::thread::sleep(std::time::Duration::from_millis(100));
12421243
};
12431244

1245+
node_receiver.stop().unwrap();
1246+
12441247
let payment_id =
12451248
node_sender.bolt12_payment().send_using_amount(&offer, 5_000, None, None).unwrap();
12461249

1250+
// Sleep to allow the payment reach a state where the htlc is held and waiting for the receiver to come online.
1251+
std::thread::sleep(std::time::Duration::from_millis(3000));
1252+
1253+
node_receiver.start().unwrap();
1254+
12471255
expect_payment_successful_event!(node_sender, Some(payment_id), None);
12481256
}
12491257

0 commit comments

Comments
 (0)