-
Notifications
You must be signed in to change notification settings - Fork 402
Stop decaying liquidity information during scoring #2656
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6471eb0
6c366cf
b84842a
f0f8194
9659c06
35b4964
6f8838f
5ac68c1
d54c930
2288842
d15a354
512f44c
40b4094
81389de
21facd0
18b4231
f8fb70a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -113,7 +113,7 @@ const ONION_MESSAGE_HANDLER_TIMER: u64 = 1; | |
const NETWORK_PRUNE_TIMER: u64 = 60 * 60; | ||
|
||
#[cfg(not(test))] | ||
const SCORER_PERSIST_TIMER: u64 = 60 * 60; | ||
const SCORER_PERSIST_TIMER: u64 = 60 * 5; | ||
#[cfg(test)] | ||
const SCORER_PERSIST_TIMER: u64 = 1; | ||
|
||
|
@@ -244,30 +244,30 @@ fn handle_network_graph_update<L: Deref>( | |
/// Updates scorer based on event and returns whether an update occurred so we can decide whether | ||
/// to persist. | ||
fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>( | ||
scorer: &'a S, event: &Event | ||
scorer: &'a S, event: &Event, duration_since_epoch: Duration, | ||
) -> bool { | ||
match event { | ||
Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => { | ||
let mut score = scorer.write_lock(); | ||
score.payment_path_failed(path, *scid); | ||
score.payment_path_failed(path, *scid, duration_since_epoch); | ||
}, | ||
Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => { | ||
// Reached if the destination explicitly failed it back. We treat this as a successful probe | ||
// because the payment made it all the way to the destination with sufficient liquidity. | ||
let mut score = scorer.write_lock(); | ||
score.probe_successful(path); | ||
score.probe_successful(path, duration_since_epoch); | ||
}, | ||
Event::PaymentPathSuccessful { path, .. } => { | ||
let mut score = scorer.write_lock(); | ||
score.payment_path_successful(path); | ||
score.payment_path_successful(path, duration_since_epoch); | ||
}, | ||
Event::ProbeSuccessful { path, .. } => { | ||
let mut score = scorer.write_lock(); | ||
score.probe_successful(path); | ||
score.probe_successful(path, duration_since_epoch); | ||
}, | ||
Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => { | ||
let mut score = scorer.write_lock(); | ||
score.probe_failed(path, *scid); | ||
score.probe_failed(path, *scid, duration_since_epoch); | ||
}, | ||
_ => return false, | ||
} | ||
Comment on lines
249
to
273
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Won't this mean channels along recently used paths will have their offsets decayed but other channels will not? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather the opposite - by the end of the patchset, we only decay in the timer method. When updating we just set the last-update to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe I'm confused, but it looks like we only decay once per hour in the background processor. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Plus once on startup. I'm not understanding your issue you're raising, are you saying we should reduce the hour to something less? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I was pointing out that we are left in a state of partial decay. Added a comment elsewhere, but if you modify There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Hmm... if one offset is updated frequently, you'll get into a state where the other offset is only ever partially decayed even though it may have been given that value many half-lives ago. So would really depend on both payment and decay frequency. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we're regularly sending some sats over a channel successfully, so we're constantly reducing our upper bound by the amount we're sending, I think its fine to not decay the lower bound? We'll eventually pick some other channel to send over cause we ran out of estimated liquidity, and we'll decay at that point. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW, that's not the only scenario. Failures at a channel and downstream from it adjust it's upper and lower bounds, respectively. So if you fail downstream with increasing amounts, the upper bound may not be properly decayed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, but presumably repeatedly failing downstream of a channel with higher and higher amounts isn't super likely. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not necessarily for the same payment or at the same downstream channel. From the perspective of the scored channel, it's simply the normal case of learning a more accurate lower bound on its liquidity as a consequence of knowing a payment routed through it but failed downstream. |
||
|
@@ -280,7 +280,7 @@ macro_rules! define_run_body { | |
$channel_manager: ident, $process_channel_manager_events: expr, | ||
$peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident, | ||
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, | ||
$timer_elapsed: expr, $check_slow_await: expr | ||
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, | ||
) => { { | ||
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); | ||
$channel_manager.timer_tick_occurred(); | ||
|
@@ -294,6 +294,7 @@ macro_rules! define_run_body { | |
let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); | ||
let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER); | ||
let mut have_pruned = false; | ||
let mut have_decayed_scorer = false; | ||
|
||
loop { | ||
$process_channel_manager_events; | ||
|
@@ -383,11 +384,10 @@ macro_rules! define_run_body { | |
if should_prune { | ||
// The network graph must not be pruned while rapid sync completion is pending | ||
if let Some(network_graph) = $gossip_sync.prunable_network_graph() { | ||
#[cfg(feature = "std")] { | ||
if let Some(duration_since_epoch) = $time_fetch() { | ||
log_trace!($logger, "Pruning and persisting network graph."); | ||
network_graph.remove_stale_channels_and_tracking(); | ||
} | ||
#[cfg(not(feature = "std"))] { | ||
network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs()); | ||
} else { | ||
log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time."); | ||
log_trace!($logger, "Persisting network graph."); | ||
} | ||
|
@@ -402,9 +402,24 @@ macro_rules! define_run_body { | |
last_prune_call = $get_timer(prune_timer); | ||
} | ||
|
||
if !have_decayed_scorer { | ||
if let Some(ref scorer) = $scorer { | ||
if let Some(duration_since_epoch) = $time_fetch() { | ||
log_trace!($logger, "Calling time_passed on scorer at startup"); | ||
scorer.write_lock().time_passed(duration_since_epoch); | ||
} | ||
} | ||
have_decayed_scorer = true; | ||
} | ||
|
||
if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) { | ||
if let Some(ref scorer) = $scorer { | ||
log_trace!($logger, "Persisting scorer"); | ||
if let Some(duration_since_epoch) = $time_fetch() { | ||
log_trace!($logger, "Calling time_passed and persisting scorer"); | ||
scorer.write_lock().time_passed(duration_since_epoch); | ||
} else { | ||
log_trace!($logger, "Persisting scorer"); | ||
} | ||
if let Err(e) = $persister.persist_scorer(&scorer) { | ||
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) | ||
} | ||
|
@@ -510,12 +525,16 @@ use core::task; | |
/// are unsure, you should set the flag, as the performance impact of it is minimal unless there | ||
/// are hundreds or thousands of simultaneous process calls running. | ||
/// | ||
/// The `fetch_time` parameter should return the current wall clock time, if one is available. If | ||
/// no time is available, some features may be disabled, however the node will still operate fine. | ||
/// | ||
/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you | ||
/// could setup `process_events_async` like this: | ||
/// ``` | ||
/// # use lightning::io; | ||
/// # use std::sync::{Arc, RwLock}; | ||
/// # use std::sync::atomic::{AtomicBool, Ordering}; | ||
/// # use std::time::SystemTime; | ||
/// # use lightning_background_processor::{process_events_async, GossipSync}; | ||
/// # struct MyStore {} | ||
/// # impl lightning::util::persist::KVStore for MyStore { | ||
|
@@ -584,6 +603,7 @@ use core::task; | |
/// Some(background_scorer), | ||
/// sleeper, | ||
/// mobile_interruptable_platform, | ||
/// || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap()) | ||
/// ) | ||
/// .await | ||
/// .expect("Failed to process events"); | ||
|
@@ -620,11 +640,12 @@ pub async fn process_events_async< | |
S: 'static + Deref<Target = SC> + Send + Sync, | ||
SC: for<'b> WriteableScore<'b>, | ||
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin, | ||
Sleeper: Fn(Duration) -> SleepFuture | ||
Sleeper: Fn(Duration) -> SleepFuture, | ||
FetchTime: Fn() -> Option<Duration>, | ||
>( | ||
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, | ||
gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>, | ||
sleeper: Sleeper, mobile_interruptable_platform: bool, | ||
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime, | ||
) -> Result<(), lightning::io::Error> | ||
where | ||
UL::Target: 'static + UtxoLookup, | ||
|
@@ -648,15 +669,18 @@ where | |
let scorer = &scorer; | ||
let logger = &logger; | ||
let persister = &persister; | ||
let fetch_time = &fetch_time; | ||
async move { | ||
if let Some(network_graph) = network_graph { | ||
handle_network_graph_update(network_graph, &event) | ||
} | ||
if let Some(ref scorer) = scorer { | ||
if update_scorer(scorer, &event) { | ||
log_trace!(logger, "Persisting scorer after update"); | ||
if let Err(e) = persister.persist_scorer(&scorer) { | ||
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) | ||
if let Some(duration_since_epoch) = fetch_time() { | ||
if update_scorer(scorer, &event, duration_since_epoch) { | ||
log_trace!(logger, "Persisting scorer after update"); | ||
if let Err(e) = persister.persist_scorer(&scorer) { | ||
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -688,7 +712,7 @@ where | |
task::Poll::Ready(exit) => { should_break = exit; true }, | ||
task::Poll::Pending => false, | ||
} | ||
}, mobile_interruptable_platform | ||
}, mobile_interruptable_platform, fetch_time, | ||
) | ||
} | ||
|
||
|
@@ -810,7 +834,10 @@ impl BackgroundProcessor { | |
handle_network_graph_update(network_graph, &event) | ||
} | ||
if let Some(ref scorer) = scorer { | ||
if update_scorer(scorer, &event) { | ||
use std::time::SystemTime; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since this functions uses system time now, it should probably be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think just feature=std is correct. If two crates depend on LDK, with one setting std and another setting no-std, LDK should build with all features. Otherwise, the create relying on std features will fail to compile because of an unrelated crate also in the dependency tree. |
||
let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) | ||
.expect("Time should be sometime after 1970"); | ||
if update_scorer(scorer, &event, duration_since_epoch) { | ||
log_trace!(logger, "Persisting scorer after update"); | ||
if let Err(e) = persister.persist_scorer(&scorer) { | ||
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) | ||
|
@@ -829,7 +856,12 @@ impl BackgroundProcessor { | |
channel_manager.get_event_or_persistence_needed_future(), | ||
chain_monitor.get_update_future() | ||
).wait_timeout(Duration::from_millis(100)); }, | ||
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false | ||
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false, | ||
|| { | ||
use std::time::SystemTime; | ||
Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) | ||
.expect("Time should be sometime after 1970")) | ||
}, | ||
jkczyz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
}); | ||
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } | ||
|
@@ -1117,7 +1149,7 @@ mod tests { | |
} | ||
|
||
impl ScoreUpdate for TestScorer { | ||
fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64) { | ||
fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration) { | ||
if let Some(expectations) = &mut self.event_expectations { | ||
match expectations.pop_front().unwrap() { | ||
TestResult::PaymentFailure { path, short_channel_id } => { | ||
|
@@ -1137,7 +1169,7 @@ mod tests { | |
} | ||
} | ||
|
||
fn payment_path_successful(&mut self, actual_path: &Path) { | ||
fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) { | ||
if let Some(expectations) = &mut self.event_expectations { | ||
match expectations.pop_front().unwrap() { | ||
TestResult::PaymentFailure { path, .. } => { | ||
|
@@ -1156,7 +1188,7 @@ mod tests { | |
} | ||
} | ||
|
||
fn probe_failed(&mut self, actual_path: &Path, _: u64) { | ||
fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) { | ||
if let Some(expectations) = &mut self.event_expectations { | ||
match expectations.pop_front().unwrap() { | ||
TestResult::PaymentFailure { path, .. } => { | ||
|
@@ -1174,7 +1206,7 @@ mod tests { | |
} | ||
} | ||
} | ||
fn probe_successful(&mut self, actual_path: &Path) { | ||
fn probe_successful(&mut self, actual_path: &Path, _: Duration) { | ||
if let Some(expectations) = &mut self.event_expectations { | ||
match expectations.pop_front().unwrap() { | ||
TestResult::PaymentFailure { path, .. } => { | ||
|
@@ -1192,6 +1224,7 @@ mod tests { | |
} | ||
} | ||
} | ||
fn time_passed(&mut self, _: Duration) {} | ||
} | ||
|
||
#[cfg(c_bindings)] | ||
|
@@ -1469,7 +1502,7 @@ mod tests { | |
tokio::time::sleep(dur).await; | ||
false // Never exit | ||
}) | ||
}, false, | ||
}, false, || Some(Duration::ZERO), | ||
); | ||
match bp_future.await { | ||
Ok(_) => panic!("Expected error persisting manager"), | ||
|
@@ -1600,7 +1633,7 @@ mod tests { | |
|
||
loop { | ||
let log_entries = nodes[0].logger.lines.lock().unwrap(); | ||
let expected_log = "Persisting scorer".to_string(); | ||
let expected_log = "Calling time_passed and persisting scorer".to_string(); | ||
if log_entries.get(&("lightning_background_processor", expected_log)).is_some() { | ||
break | ||
} | ||
|
@@ -1699,7 +1732,7 @@ mod tests { | |
_ = exit_receiver.changed() => true, | ||
} | ||
}) | ||
}, false, | ||
}, false, || Some(Duration::from_secs(1696300000)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's behind the choice of this number? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its, basically, when I wrote the patch. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason why it can't be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really, it just seemed a bit more realistic. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given the value doesn't affect the test, it's just curious to the reader to see something different from all the other places. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I tried to switch to ZERO but the test fails - it expects to prune entries from the network graph against a static RGS snapshot that has a timestamp in it. |
||
); | ||
|
||
let t1 = tokio::spawn(bp_future); | ||
|
@@ -1874,7 +1907,7 @@ mod tests { | |
_ = exit_receiver.changed() => true, | ||
} | ||
}) | ||
}, false, | ||
}, false, || Some(Duration::ZERO), | ||
); | ||
let t1 = tokio::spawn(bp_future); | ||
let t2 = tokio::spawn(async move { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we should use a constant here. It should be no more than the user-defined half-life, ideally such that the half-life is divisible by it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I guess? If a user sets an aggressive half-life I'm not entirely convinced we want to spin their CPU trying to decay liquidity bounds. Doing it a bit too often when they set a super high decay also seems fine-ish? I agree it'd be a bit nicer to switch to some function of the configured half-life, but I'm not sure its worth adding some accessor to
ScoreUpdate
.