Skip to content

Commit ca22b77

Browse files
committed
Add onion mailbox for async receivers
This introduces an in-memory mailbox to hold onion messages until the receiver comes online. This is required for async payment `held_htlc_available` messages. The mailbox is bounded by a maximum number of peers and a maximum number of messages per peer.
1 parent c36d944 commit ca22b77

File tree

6 files changed

+174
-18
lines changed

6 files changed

+174
-18
lines changed

src/builder.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::liquidity::{
2727
};
2828
use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger};
2929
use crate::message_handler::NodeCustomMessageHandler;
30+
use crate::om_mailbox::OnionMessageMailbox;
3031
use crate::peer_store::PeerStore;
3132
use crate::runtime::Runtime;
3233
use crate::tx_broadcaster::TransactionBroadcaster;
@@ -1452,17 +1453,31 @@ fn build_with_store_internal(
14521453
}
14531454

14541455
// Initialize the PeerManager
1455-
let onion_messenger: Arc<OnionMessenger> = Arc::new(OnionMessenger::new(
1456-
Arc::clone(&keys_manager),
1457-
Arc::clone(&keys_manager),
1458-
Arc::clone(&logger),
1459-
Arc::clone(&channel_manager),
1460-
message_router,
1461-
Arc::clone(&channel_manager),
1462-
Arc::clone(&channel_manager),
1463-
IgnoringMessageHandler {},
1464-
IgnoringMessageHandler {},
1465-
));
1456+
let onion_messenger: Arc<OnionMessenger> = if config.async_payment_services_enabled {
1457+
Arc::new(OnionMessenger::new_with_offline_peer_interception(
1458+
Arc::clone(&keys_manager),
1459+
Arc::clone(&keys_manager),
1460+
Arc::clone(&logger),
1461+
Arc::clone(&channel_manager),
1462+
message_router,
1463+
Arc::clone(&channel_manager),
1464+
Arc::clone(&channel_manager),
1465+
IgnoringMessageHandler {},
1466+
IgnoringMessageHandler {},
1467+
))
1468+
} else {
1469+
Arc::new(OnionMessenger::new(
1470+
Arc::clone(&keys_manager),
1471+
Arc::clone(&keys_manager),
1472+
Arc::clone(&logger),
1473+
Arc::clone(&channel_manager),
1474+
message_router,
1475+
Arc::clone(&channel_manager),
1476+
Arc::clone(&channel_manager),
1477+
IgnoringMessageHandler {},
1478+
IgnoringMessageHandler {},
1479+
))
1480+
};
14661481
let ephemeral_bytes: [u8; 32] = keys_manager.get_secure_random_bytes();
14671482

14681483
// Initialize the GossipSource
@@ -1649,6 +1664,8 @@ fn build_with_store_internal(
16491664
},
16501665
};
16511666

1667+
let om_mailbox = Arc::new(OnionMessageMailbox::new());
1668+
16521669
let (stop_sender, _) = tokio::sync::watch::channel(());
16531670
let (background_processor_stop_sender, _) = tokio::sync::watch::channel(());
16541671
let is_running = Arc::new(RwLock::new(false));
@@ -1681,6 +1698,7 @@ fn build_with_store_internal(
16811698
is_running,
16821699
is_listening,
16831700
node_metrics,
1701+
om_mailbox,
16841702
})
16851703
}
16861704

src/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,8 @@ pub(crate) fn default_user_config(config: &Config) -> UserConfig {
330330
user_config.channel_handshake_limits.force_announced_channel_preference = true;
331331
}
332332

333+
user_config.enable_htlc_hold = true;
334+
333335
user_config
334336
}
335337

src/event.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use crate::types::{CustomTlvRecord, DynStore, PaymentStore, Sweeper, Wallet};
8+
use crate::om_mailbox::OnionMessageMailbox;
9+
use crate::types::{CustomTlvRecord, DynStore, OnionMessenger, PaymentStore, Sweeper, Wallet};
910
use crate::{
1011
hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore,
1112
UserChannelId,
@@ -459,6 +460,8 @@ where
459460
logger: L,
460461
config: Arc<Config>,
461462
static_invoice_store: Option<StaticInvoiceStore>,
463+
onion_messenger: Arc<OnionMessenger>,
464+
om_mailbox: Arc<OnionMessageMailbox>,
462465
}
463466

464467
impl<L: Deref + Clone + Sync + Send + 'static> EventHandler<L>
@@ -472,7 +475,8 @@ where
472475
output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
473476
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
474477
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
475-
static_invoice_store: Option<StaticInvoiceStore>, runtime: Arc<Runtime>, logger: L,
478+
static_invoice_store: Option<StaticInvoiceStore>, onion_messenger: Arc<OnionMessenger>,
479+
om_mailbox: Arc<OnionMessageMailbox>, runtime: Arc<Runtime>, logger: L,
476480
config: Arc<Config>,
477481
) -> Self {
478482
Self {
@@ -490,6 +494,8 @@ where
490494
runtime,
491495
config,
492496
static_invoice_store,
497+
onion_messenger,
498+
om_mailbox,
493499
}
494500
}
495501

@@ -1491,11 +1497,15 @@ where
14911497

14921498
self.bump_tx_event_handler.handle_event(&bte).await;
14931499
},
1494-
LdkEvent::OnionMessageIntercepted { .. } => {
1495-
debug_assert!(false, "We currently don't support onion message interception, so this event should never be emitted.");
1500+
LdkEvent::OnionMessageIntercepted { peer_node_id, message } => {
1501+
self.om_mailbox.onion_message_intercepted(peer_node_id, message);
14961502
},
1497-
LdkEvent::OnionMessagePeerConnected { .. } => {
1498-
debug_assert!(false, "We currently don't support onion message interception, so this event should never be emitted.");
1503+
LdkEvent::OnionMessagePeerConnected { peer_node_id } => {
1504+
let messages = self.om_mailbox.onion_message_peer_connected(peer_node_id);
1505+
1506+
for message in messages {
1507+
let _ = self.onion_messenger.forward_onion_message(message, &peer_node_id);
1508+
}
14991509
},
15001510

15011511
LdkEvent::PersistStaticInvoice {

src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ pub mod io;
9292
pub mod liquidity;
9393
pub mod logger;
9494
mod message_handler;
95+
mod om_mailbox;
9596
pub mod payment;
9697
mod peer_store;
9798
mod runtime;
@@ -136,6 +137,7 @@ use gossip::GossipSource;
136137
use graph::NetworkGraph;
137138
use io::utils::write_node_metrics;
138139
use liquidity::{LSPS1Liquidity, LiquiditySource};
140+
use om_mailbox::OnionMessageMailbox;
139141
use payment::asynchronous::static_invoice_store::StaticInvoiceStore;
140142
use payment::{
141143
Bolt11Payment, Bolt12Payment, OnchainPayment, PaymentDetails, SpontaneousPayment,
@@ -205,6 +207,7 @@ pub struct Node {
205207
is_running: Arc<RwLock<bool>>,
206208
is_listening: Arc<AtomicBool>,
207209
node_metrics: Arc<RwLock<NodeMetrics>>,
210+
om_mailbox: Arc<OnionMessageMailbox>,
208211
}
209212

210213
impl Node {
@@ -517,6 +520,8 @@ impl Node {
517520
Arc::clone(&self.payment_store),
518521
Arc::clone(&self.peer_store),
519522
static_invoice_store,
523+
Arc::clone(&self.onion_messenger),
524+
Arc::clone(&self.om_mailbox),
520525
Arc::clone(&self.runtime),
521526
Arc::clone(&self.logger),
522527
Arc::clone(&self.config),
@@ -1491,6 +1496,11 @@ impl Node {
14911496
Error::PersistenceFailed
14921497
})
14931498
}
1499+
1500+
#[allow(missing_docs)]
1501+
pub fn om_mailbox_is_empty(&self) -> bool {
1502+
self.om_mailbox.is_empty()
1503+
}
14941504
}
14951505

14961506
impl Drop for Node {

src/om_mailbox.rs

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
use std::{
2+
collections::{HashMap, VecDeque},
3+
sync::Mutex,
4+
};
5+
6+
use lightning::ln::msgs::OnionMessage;
7+
8+
pub(crate) struct OnionMessageMailbox {
9+
map: Mutex<HashMap<bitcoin::secp256k1::PublicKey, VecDeque<lightning::ln::msgs::OnionMessage>>>,
10+
}
11+
12+
impl OnionMessageMailbox {
13+
const MAX_MESSAGES_PER_PEER: usize = 100;
14+
const MAX_PEERS: usize = 100;
15+
16+
pub fn new() -> Self {
17+
Self { map: Mutex::new(HashMap::new()) }
18+
}
19+
20+
pub(crate) fn onion_message_intercepted(
21+
&self, peer_node_id: bitcoin::secp256k1::PublicKey,
22+
message: lightning::ln::msgs::OnionMessage,
23+
) {
24+
let mut map = self.map.lock().unwrap();
25+
26+
let queue = map.entry(peer_node_id).or_insert_with(VecDeque::new);
27+
if queue.len() >= Self::MAX_MESSAGES_PER_PEER {
28+
queue.pop_front();
29+
}
30+
queue.push_back(message);
31+
32+
// Enforce a peers limit. If exceeded, evict the peer with the longest queue.
33+
if map.len() > Self::MAX_PEERS {
34+
let peer_to_remove = map
35+
.iter()
36+
.max_by_key(|(_, queue)| queue.len())
37+
.map(|(peer, _)| peer.clone())
38+
.unwrap();
39+
40+
map.remove(&peer_to_remove);
41+
}
42+
}
43+
44+
pub(crate) fn onion_message_peer_connected(
45+
&self, peer_node_id: bitcoin::secp256k1::PublicKey,
46+
) -> Vec<OnionMessage> {
47+
let mut map = self.map.lock().unwrap();
48+
49+
if let Some(queue) = map.remove(&peer_node_id) {
50+
queue.into()
51+
} else {
52+
Vec::new()
53+
}
54+
}
55+
56+
pub(crate) fn is_empty(&self) -> bool {
57+
let map = self.map.lock().unwrap();
58+
map.is_empty()
59+
}
60+
}
61+
62+
#[cfg(test)]
63+
mod tests {
64+
use bitcoin::{
65+
key::Secp256k1,
66+
secp256k1::{PublicKey, SecretKey},
67+
};
68+
use lightning::onion_message;
69+
70+
use crate::om_mailbox::OnionMessageMailbox;
71+
72+
#[test]
73+
fn onion_message_mailbox() {
74+
let mailbox = OnionMessageMailbox::new();
75+
76+
let secp = Secp256k1::new();
77+
let sk_bytes = [12; 32];
78+
let sk = SecretKey::from_slice(&sk_bytes).unwrap();
79+
let peer_node_id = PublicKey::from_secret_key(&secp, &sk);
80+
81+
let blinding_sk = SecretKey::from_slice(&[13; 32]).unwrap();
82+
let blinding_point = PublicKey::from_secret_key(&secp, &blinding_sk);
83+
84+
let message_sk = SecretKey::from_slice(&[13; 32]).unwrap();
85+
let message_point = PublicKey::from_secret_key(&secp, &message_sk);
86+
87+
let message = lightning::ln::msgs::OnionMessage {
88+
blinding_point,
89+
onion_routing_packet: onion_message::packet::Packet {
90+
version: 0,
91+
public_key: message_point,
92+
hop_data: vec![1, 2, 3],
93+
hmac: [0; 32],
94+
},
95+
};
96+
mailbox.onion_message_intercepted(peer_node_id, message.clone());
97+
98+
let messages = mailbox.onion_message_peer_connected(peer_node_id);
99+
assert_eq!(messages.len(), 1);
100+
assert_eq!(messages[0], message);
101+
102+
assert!(mailbox.is_empty());
103+
104+
let messages = mailbox.onion_message_peer_connected(peer_node_id);
105+
assert_eq!(messages.len(), 0);
106+
}
107+
}

tests/integration_tests_rust.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1132,7 +1132,7 @@ fn simple_bolt12_send_receive() {
11321132
}
11331133

11341134
#[test]
1135-
fn static_invoice_server() {
1135+
fn async_payment() {
11361136
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();
11371137
let chain_source = TestChainSource::Esplora(&electrsd);
11381138

@@ -1241,9 +1241,18 @@ fn static_invoice_server() {
12411241
std::thread::sleep(std::time::Duration::from_millis(100));
12421242
};
12431243

1244+
node_receiver.stop().unwrap();
1245+
12441246
let payment_id =
12451247
node_sender.bolt12_payment().send_using_amount(&offer, 5_000, None, None).unwrap();
12461248

1249+
// Sleep to allow the payment reach a state where the htlc is held and waiting for the receiver to come online.
1250+
while node_receiver_lsp.om_mailbox_is_empty() {
1251+
std::thread::sleep(std::time::Duration::from_millis(100));
1252+
}
1253+
1254+
node_receiver.start().unwrap();
1255+
12471256
expect_payment_successful_event!(node_sender, Some(payment_id), None);
12481257
}
12491258

0 commit comments

Comments
 (0)