Skip to content

Commit d93409a

Browse files
committed
Read persisted LSPS2 service state in LiquidityManager::new
We read any previously-persisted state upon construction of `LiquidityManager`.
1 parent b8272ed commit d93409a

File tree

5 files changed

+153
-36
lines changed

5 files changed

+153
-36
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2242,16 +2242,19 @@ mod tests {
22422242
Arc::clone(&logger),
22432243
Arc::clone(&keys_manager),
22442244
));
2245-
let liquidity_manager = Arc::new(LiquidityManagerSync::new(
2246-
Arc::clone(&keys_manager),
2247-
Arc::clone(&keys_manager),
2248-
Arc::clone(&manager),
2249-
None,
2250-
None,
2251-
Arc::clone(&kv_store) as Arc<dyn KVStoreSync + Sync + Send>,
2252-
None,
2253-
None,
2254-
));
2245+
let liquidity_manager = Arc::new(
2246+
LiquidityManagerSync::new(
2247+
Arc::clone(&keys_manager),
2248+
Arc::clone(&keys_manager),
2249+
Arc::clone(&manager),
2250+
None,
2251+
None,
2252+
Arc::clone(&kv_store) as Arc<dyn KVStoreSync + Sync + Send>,
2253+
None,
2254+
None,
2255+
)
2256+
.unwrap(),
2257+
);
22552258
let node = Node {
22562259
node: manager,
22572260
p2p_gossip_sync,

lightning-liquidity/src/lsps2/service.rs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,7 @@ impl OutboundJITChannel {
468468
}
469469
}
470470

471-
struct PeerState {
471+
pub(crate) struct PeerState {
472472
outbound_channels_by_intercept_scid: HashMap<u64, OutboundJITChannel>,
473473
intercept_scid_by_user_channel_id: HashMap<u128, u64>,
474474
intercept_scid_by_channel_id: HashMap<ChannelId, u64>,
@@ -593,15 +593,35 @@ where
593593
{
594594
/// Constructs a `LSPS2ServiceHandler`.
595595
pub(crate) fn new(
596-
pending_messages: Arc<MessageQueue>, pending_events: Arc<EventQueue>, channel_manager: CM,
596+
peer_states: Vec<(PublicKey, PeerState)>, pending_messages: Arc<MessageQueue>,
597+
pending_events: Arc<EventQueue>, channel_manager: CM,
597598
kv_store: Arc<dyn KVStore + Send + Sync>, config: LSPS2ServiceConfig,
598599
) -> Self {
600+
let mut peer_by_intercept_scid = new_hash_map();
601+
let mut peer_by_channel_id = new_hash_map();
602+
for (node_id, peer_state) in peer_states.iter() {
603+
for (intercept_scid, _) in peer_state.outbound_channels_by_intercept_scid.iter() {
604+
let res = peer_by_intercept_scid.insert(*intercept_scid, *node_id);
605+
debug_assert!(res.is_none(), "Intercept SCIDs should never collide");
606+
}
607+
608+
for (channel_id, _) in peer_state.intercept_scid_by_channel_id.iter() {
609+
let res = peer_by_channel_id.insert(*channel_id, *node_id);
610+
debug_assert!(res.is_none(), "Channel IDs should never collide");
611+
}
612+
}
613+
614+
let per_peer_state = peer_states
615+
.into_iter()
616+
.map(|(k, v)| (k, Mutex::new(v)))
617+
.collect::<HashMap<PublicKey, Mutex<PeerState>>>();
618+
599619
Self {
600620
pending_messages,
601621
pending_events,
602-
per_peer_state: RwLock::new(new_hash_map()),
603-
peer_by_intercept_scid: RwLock::new(new_hash_map()),
604-
peer_by_channel_id: RwLock::new(new_hash_map()),
622+
per_peer_state: RwLock::new(per_peer_state),
623+
peer_by_intercept_scid: RwLock::new(peer_by_intercept_scid),
624+
peer_by_channel_id: RwLock::new(peer_by_channel_id),
605625
total_pending_requests: AtomicUsize::new(0),
606626
channel_manager,
607627
kv_store,

lightning-liquidity/src/manager.rs

Lines changed: 56 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::lsps5::client::{LSPS5ClientConfig, LSPS5ClientHandler};
2424
use crate::lsps5::msgs::LSPS5Message;
2525
use crate::lsps5::service::{LSPS5ServiceConfig, LSPS5ServiceHandler};
2626
use crate::message_queue::MessageQueue;
27+
use crate::persist::read_lsps2_service_peer_states;
2728

2829
use crate::lsps1::client::{LSPS1ClientConfig, LSPS1ClientHandler};
2930
use crate::lsps1::msgs::LSPS1Message;
@@ -35,6 +36,7 @@ use crate::lsps2::msgs::LSPS2Message;
3536
use crate::lsps2::service::{LSPS2ServiceConfig, LSPS2ServiceHandler};
3637
use crate::prelude::{new_hash_map, new_hash_set, HashMap, HashSet};
3738
use crate::sync::{Arc, Mutex, RwLock};
39+
use crate::utils::async_poll::dummy_waker;
3840
#[cfg(feature = "time")]
3941
use crate::utils::time::DefaultTimeProvider;
4042
use crate::utils::time::TimeProvider;
@@ -57,6 +59,7 @@ use bitcoin::secp256k1::PublicKey;
5759
use core::future::Future as StdFuture;
5860
use core::ops::Deref;
5961
use core::pin::Pin;
62+
use core::task;
6063

6164
const LSPS_FEATURE_BIT: usize = 729;
6265

@@ -281,12 +284,14 @@ where
281284
C::Target: Filter,
282285
{
283286
/// Constructor for the [`LiquidityManager`] using the default system clock
284-
pub fn new(
287+
///
288+
/// Will read persisted service states from the given [`KVStore`].
289+
pub async fn new(
285290
entropy_source: ES, node_signer: NS, channel_manager: CM, chain_source: Option<C>,
286291
chain_params: Option<ChainParameters>, kv_store: Arc<dyn KVStore + Send + Sync>,
287292
service_config: Option<LiquidityServiceConfig>,
288293
client_config: Option<LiquidityClientConfig>,
289-
) -> Self {
294+
) -> Result<Self, lightning::io::Error> {
290295
let time_provider = Arc::new(DefaultTimeProvider);
291296
Self::new_with_custom_time_provider(
292297
entropy_source,
@@ -299,6 +304,7 @@ where
299304
client_config,
300305
time_provider,
301306
)
307+
.await
302308
}
303309
}
304310

@@ -318,16 +324,18 @@ where
318324
{
319325
/// Constructor for the [`LiquidityManager`] with a custom time provider.
320326
///
327+
/// Will read persisted service states from the given [`KVStore`].
328+
///
321329
/// This should be used on non-std platforms where access to the system time is not
322330
/// available.
323331
/// Sets up the required protocol message handlers based on the given
324332
/// [`LiquidityClientConfig`] and [`LiquidityServiceConfig`].
325-
pub fn new_with_custom_time_provider(
333+
pub async fn new_with_custom_time_provider(
326334
entropy_source: ES, node_signer: NS, channel_manager: CM, chain_source: Option<C>,
327335
chain_params: Option<ChainParameters>, kv_store: Arc<dyn KVStore + Send + Sync>,
328336
service_config: Option<LiquidityServiceConfig>,
329337
client_config: Option<LiquidityClientConfig>, time_provider: TP,
330-
) -> Self {
338+
) -> Result<Self, lightning::io::Error> {
331339
let pending_messages = Arc::new(MessageQueue::new());
332340
let pending_events = Arc::new(EventQueue::new(Arc::clone(&kv_store)));
333341
let ignored_peers = RwLock::new(new_hash_set());
@@ -344,22 +352,30 @@ where
344352
)
345353
})
346354
});
347-
let lsps2_service_handler = service_config.as_ref().and_then(|config| {
348-
config.lsps2_service_config.as_ref().map(|config| {
355+
356+
let lsps2_service_handler = if let Some(service_config) = service_config.as_ref() {
357+
if let Some(lsps2_service_config) = service_config.lsps2_service_config.as_ref() {
349358
if let Some(number) =
350359
<LSPS2ServiceHandler<CM> as LSPSProtocolMessageHandler>::PROTOCOL_NUMBER
351360
{
352361
supported_protocols.push(number);
353362
}
354-
LSPS2ServiceHandler::new(
363+
364+
let peer_states = read_lsps2_service_peer_states(kv_store.clone()).await?;
365+
Some(LSPS2ServiceHandler::new(
366+
peer_states,
355367
Arc::clone(&pending_messages),
356368
Arc::clone(&pending_events),
357369
channel_manager.clone(),
358370
Arc::clone(&kv_store),
359-
config.clone(),
360-
)
361-
})
362-
});
371+
lsps2_service_config.clone(),
372+
))
373+
} else {
374+
None
375+
}
376+
} else {
377+
None
378+
};
363379

364380
let lsps5_client_handler = client_config.as_ref().and_then(|config| {
365381
config.lsps5_client_config.as_ref().map(|config| {
@@ -434,7 +450,7 @@ where
434450
None
435451
};
436452

437-
Self {
453+
Ok(Self {
438454
pending_messages,
439455
pending_events,
440456
request_id_to_method_map: Mutex::new(new_hash_map()),
@@ -452,7 +468,7 @@ where
452468
_client_config: client_config,
453469
best_block: RwLock::new(chain_params.map(|chain_params| chain_params.best_block)),
454470
_chain_source: chain_source,
455-
}
471+
})
456472
}
457473

458474
/// Returns a reference to the LSPS0 client-side handler.
@@ -990,9 +1006,10 @@ where
9901006
chain_params: Option<ChainParameters>, kv_store_sync: Arc<dyn KVStoreSync + Send + Sync>,
9911007
service_config: Option<LiquidityServiceConfig>,
9921008
client_config: Option<LiquidityClientConfig>,
993-
) -> Self {
1009+
) -> Result<Self, lightning::io::Error> {
9941010
let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync));
995-
let inner = Arc::new(LiquidityManager::new(
1011+
1012+
let mut fut = Box::pin(LiquidityManager::new(
9961013
entropy_source,
9971014
node_signer,
9981015
channel_manager,
@@ -1002,7 +1019,17 @@ where
10021019
service_config,
10031020
client_config,
10041021
));
1005-
Self { inner }
1022+
1023+
let mut waker = dummy_waker();
1024+
let mut ctx = task::Context::from_waker(&mut waker);
1025+
let inner = match fut.as_mut().poll(&mut ctx) {
1026+
task::Poll::Ready(result) => result,
1027+
task::Poll::Pending => {
1028+
// In a sync context, we can't wait for the future to complete.
1029+
unreachable!("LiquidityManager::new should not be pending in a sync context");
1030+
},
1031+
}?;
1032+
Ok(Self { inner: Arc::new(inner) })
10061033
}
10071034
}
10081035

@@ -1028,9 +1055,9 @@ where
10281055
chain_params: Option<ChainParameters>, kv_store_sync: Arc<dyn KVStoreSync + Send + Sync>,
10291056
service_config: Option<LiquidityServiceConfig>,
10301057
client_config: Option<LiquidityClientConfig>, time_provider: TP,
1031-
) -> Self {
1058+
) -> Result<Self, lightning::io::Error> {
10321059
let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync));
1033-
let inner = Arc::new(LiquidityManager::new_with_custom_time_provider(
1060+
let mut fut = Box::pin(LiquidityManager::new_with_custom_time_provider(
10341061
entropy_source,
10351062
node_signer,
10361063
channel_manager,
@@ -1041,7 +1068,17 @@ where
10411068
client_config,
10421069
time_provider,
10431070
));
1044-
Self { inner }
1071+
1072+
let mut waker = dummy_waker();
1073+
let mut ctx = task::Context::from_waker(&mut waker);
1074+
let inner = match fut.as_mut().poll(&mut ctx) {
1075+
task::Poll::Ready(result) => result,
1076+
task::Poll::Pending => {
1077+
// In a sync context, we can't wait for the future to complete.
1078+
unreachable!("LiquidityManager::new should not be pending in a sync context");
1079+
},
1080+
}?;
1081+
Ok(Self { inner: Arc::new(inner) })
10451082
}
10461083

10471084
/// Returns a reference to the LSPS0 client-side handler.

lightning-liquidity/src/persist.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,17 @@
99

1010
//! Types and utils for persistence.
1111
12+
use crate::lsps2::service::PeerState as LSPS2ServicePeerState;
13+
14+
use lightning::io::Cursor;
15+
use lightning::util::persist::KVStore;
16+
use lightning::util::ser::Readable;
17+
18+
use bitcoin::secp256k1::PublicKey;
19+
20+
use core::ops::Deref;
21+
use core::str::FromStr;
22+
1223
/// The primary namespace under which the [`LiquidityManager`] will be persisted.
1324
///
1425
/// [`LiquidityManager`]: crate::LiquidityManager
@@ -33,3 +44,47 @@ pub const LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE: &str = "lsps2_service";
3344
///
3445
/// [`LSPS5ServiceHandler`]: crate::lsps5::service::LSPS5ServiceHandler
3546
pub const LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE: &str = "lsps5_service";
47+
48+
pub(crate) async fn read_lsps2_service_peer_states<K: Deref>(
49+
kv_store: K,
50+
) -> Result<Vec<(PublicKey, LSPS2ServicePeerState)>, lightning::io::Error>
51+
where
52+
K::Target: KVStore,
53+
{
54+
let mut res = Vec::new();
55+
56+
for stored_key in kv_store
57+
.list(
58+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
59+
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
60+
)
61+
.await?
62+
{
63+
let mut reader = Cursor::new(
64+
kv_store
65+
.read(
66+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
67+
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
68+
&stored_key,
69+
)
70+
.await?,
71+
);
72+
73+
let peer_state = LSPS2ServicePeerState::read(&mut reader).map_err(|_| {
74+
lightning::io::Error::new(
75+
lightning::io::ErrorKind::InvalidData,
76+
"Failed to deserialize LSPS2 peer state",
77+
)
78+
})?;
79+
80+
let key = PublicKey::from_str(&stored_key).map_err(|_| {
81+
lightning::io::Error::new(
82+
lightning::io::ErrorKind::InvalidData,
83+
"Failed to deserialize stored key entry",
84+
)
85+
})?;
86+
87+
res.push((key, peer_state));
88+
}
89+
Ok(res)
90+
}

lightning-liquidity/tests/common/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ pub(crate) fn create_service_and_client_nodes<'a, 'b, 'c>(
3838
Some(service_config),
3939
None,
4040
Arc::clone(&time_provider),
41-
);
41+
)
42+
.unwrap();
4243

4344
let client_kv_store = Arc::new(TestStore::new(false));
4445
let client_lm = LiquidityManagerSync::new_with_custom_time_provider(
@@ -51,7 +52,8 @@ pub(crate) fn create_service_and_client_nodes<'a, 'b, 'c>(
5152
None,
5253
Some(client_config),
5354
time_provider,
54-
);
55+
)
56+
.unwrap();
5557

5658
let mut iter = nodes.into_iter();
5759
let service_node = LiquidityNode::new(iter.next().unwrap(), service_lm);

0 commit comments

Comments
 (0)