-
Notifications
You must be signed in to change notification settings - Fork 419
Async FilesystemStore #3931
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Async FilesystemStore #3931
Conversation
👋 Thanks for assigning @TheBlueMatt as a reviewer! |
29b8bcf
to
81ad668
Compare
let this = Arc::clone(&self.inner); | ||
|
||
Box::pin(async move { | ||
tokio::task::spawn_blocking(move || { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mhh, so I'm not sure if spawning blocking tasks for every IO call is the way to go (see for example https://docs.rs/tokio/latest/tokio/fs/index.html#tuning-your-file-io: "To get good performance with file IO on Tokio, it is recommended to batch your operations into as few spawn_blocking calls as possible."). Maybe there are other designs that we should at least consider before moving forward with this approach. For example, we could create a dedicated pool of longer-lived worker task(s) that process a queue?
If we use spawn_blocking
, can we give the user control over which runtime this exactly will be spawned on? Also, rather than just doing wrapping, should we be using tokio::fs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mhh, so I'm not sure if spawning blocking tasks for every IO call is the way to go (see for example https://docs.rs/tokio/latest/tokio/fs/index.html#tuning-your-file-io: "To get good performance with file IO on Tokio, it is recommended to batch your operations into as few spawn_blocking calls as possible.").
If we should batch operations, I think the current approach is better than using tokio::fs? Because it already batches the various operations inside kvstoresync::write.
Further batching probably needs to happen at a higher level in LDK, and might be a bigger change. Not sure if that is worth it just for FIlesystemStore, especially when that store is not the preferred store for real world usage?
For example, we could create a dedicated pool of longer-lived worker task(s) that process a queue?
Isn't Tokio doing that already when a task is spawned?
If we use spawn_blocking, can we give the user control over which runtime this exactly will be spawned on? Also, rather than just doing wrapping, should we be using tokio::fs?
With tokio::fs, the current runtime is used. I'd think that that is then also sufficient if we spawn ourselves, without a need to specifiy which runtime exactly?
More generally, I think the main purpose of this PR is to show how an async kvstore could be implemented, and to have something for testing potentially. Additionally if there are users that really want to use this type of store in production, they could. But I don't think it is something to spend too much time on. A remote database is probably the more important target to design for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With tokio::fs, the current runtime is used. I'd think that that is then also sufficient if we spawn ourselves, without a need to specifiy which runtime exactly?
Hmm, I'm not entirely sure, especially for users that have multiple runtime contexts floating around, it might be important to make sure the store uses a particular one (cc @domZippilli ?). I'll also have to think through this for LDK Node when we make the switch to async KVStore there, but happy to leave as-is for now.
lightning/src/util/persist.rs
Outdated
} | ||
|
||
/// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`] | ||
/// data migration. | ||
pub trait MigratableKVStore: KVStore { | ||
pub trait MigratableKVStore: KVStoreSync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How will we solve this for an KVStore
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment belongs in #3905?
We might not need to solve it now, as long as we still require a sync implementation alongside an async one? If we support async-only kvstores, then we can create an async version of this trait?
81ad668
to
e462bce
Compare
Removed garbage collector, because we need to keep the last written version. |
97d6b3f
to
02dce94
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3931 +/- ##
==========================================
- Coverage 88.77% 88.60% -0.18%
==========================================
Files 175 174 -1
Lines 127760 127825 +65
Branches 127760 127825 +65
==========================================
- Hits 113425 113255 -170
- Misses 11780 12076 +296
+ Partials 2555 2494 -61
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
c061fcd
to
2492508
Compare
9938dfe
to
7d98528
Compare
38ab949
to
dd9e1b5
Compare
Updated code to not use an async wrapper, but conditionally expose the async I didn't yet update the |
🔔 1st Reminder Hey @TheBlueMatt! This PR has been waiting for your review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We def cant leak memory on each write https://github.com/lightningdevkit/rust-lightning/pull/3931/files#r2251747384
f4e8d62
to
bc90cdb
Compare
lightning-persister/src/fs_store.rs
Outdated
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()) | ||
}; | ||
let mut last_written_version = inner_lock_ref.write().unwrap(); | ||
inner_lock_ref = Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really really tried to extract this block (which is duplicated for remove
) into a function, but I couldn't get it to work. Returning the guard that references inner_lock_ref
was the problem all the time. Returning both as a tuple or struct didn't fix it either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to drop this map entry lookup entirely by just passing the async state in - we already had a reference to the Arc
in get_new_version_number
so we can just pipe that through.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice elimination of the outer lock usage. Pushed a fix up commit.
bc90cdb
to
e8cadf6
Compare
Discussed various approaches offline and settled for adding a inflight counter and obtaining the lock twice. Both for sync and async, the garbage collector is now gone. I am assuming the gain of batching clean up was negligible. |
lightning-persister/src/fs_store.rs
Outdated
// If there are no inflight writes and no arcs in use elsewhere, we can remove the map entry to prevent | ||
// leaking memory. | ||
if async_state.inflight_writes == 0 && Arc::strong_count(&inner_lock_ref) == 2 { | ||
outer_lock.remove(&dest_file_path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has to go after the write actually completes. I believe this inversion is currently possible:
start writing A, it inserts a state and spawns the future.
the future runs to here, removes the state (since its the only pending write)
then we start writing B, it inserts a fresh state and spawns the future
the write future for B gets scheduled first, writes B
then the A future resumes here and we write A, overwriting the later state B.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sadly this would mean blocking the write
method (before it goes async) on an actual disk write (in an async task) in some cases, which isn't really great. It should be pretty easy to fix by moving to atomics in AsyncState
rather than a RwLock
wrapping AtomicState
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the write future for B gets scheduled first, writes B
Is this possible? Because we were already running future A and had obtained the file lock. So B needs to wait?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They have different locks because B inserted a fresh state which is a different RwLock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is so tricky this concurrent programming. Now cleaning up after the write is done.
lightning-persister/src/fs_store.rs
Outdated
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()) | ||
}; | ||
let mut last_written_version = inner_lock_ref.write().unwrap(); | ||
inner_lock_ref = Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to drop this map entry lookup entirely by just passing the async state in - we already had a reference to the Arc
in get_new_version_number
so we can just pipe that through.
lightning-persister/src/fs_store.rs
Outdated
if state.inflight_writes == 0 && Arc::strong_count(&inner_lock_ref) == 2 { | ||
self.locks.lock().unwrap().remove(&dest_file_path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a potential deadlock risk in this code. The function is removing an entry from self.locks
while simultaneously holding a write lock on an entry from that same map. If another thread attempts to acquire the outer lock (via self.locks.lock().unwrap()
) while this code is executing, it could create a deadlock situation.
Consider refactoring this cleanup logic to occur after releasing the inner lock, perhaps in a separate function that's called once all locks have been released. This would maintain the map's integrity while avoiding the potential for deadlock conditions.
if state.inflight_writes == 0 && Arc::strong_count(&inner_lock_ref) == 2 { | |
self.locks.lock().unwrap().remove(&dest_file_path); | |
if state.inflight_writes == 0 && Arc::strong_count(&inner_lock_ref) == 2 { | |
// Mark for cleanup after releasing the inner lock | |
let cleanup_needed = true; | |
} else { | |
let cleanup_needed = false; | |
} | |
// Release inner lock implicitly by ending its scope | |
drop(state); | |
// Now safe to clean up the lock entry if needed | |
if cleanup_needed { | |
self.locks.lock().unwrap().remove(&dest_file_path); | |
} |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if this is real...
6fff800
to
33b8095
Compare
a3fbef3
to
5ccd9f5
Compare
I have to admit that this PR is much trickier than I anticipated. Pushed a new version with the following changes:
Still not sure what to do with the |
5ccd9f5
to
6175bd4
Compare
Fuzz sanity caught something. Interesting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did another pass.
Still not sure what to do with the
lazy
remove.
Hmm, honestly, I start to question whether it gains us that much. In the FilesystemStore
it only saves us one fsync or so. For any SQL/most databases it won't be implemented. IIRC there would also only a few cloud-based storage system for which archive/lazy delete would make any difference. So if we're not sure how to deal with it in the async world and/or it further complicates things, we could consider dropping the parameter, maybe?
impl FilesystemStoreInner { | ||
fn get_inner_lock_ref(&self, path: PathBuf) -> Arc<RwLock<AsyncState>> { | ||
let mut outer_lock = self.locks.lock().unwrap(); | ||
Arc::clone(&outer_lock.entry(path).or_default()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, having too many of these tiny helpers really gets confusing, as you lose context on what's actually happening. It also somewhat robs us of the opportunity to properly deal with the acquired guards at the callsite.
Relatedly, can we inline get_new_version
into get_new_version_and_state
and replace the one callsite of the former with the latter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Helpers or not, it's a trade-off between needing to jump to call vs risking future changes not applied to all duplications.
the opportunity to properly deal with the acquired guards at the callsite
I don't think there is a need to do this anywhere currently?
replace the one callsite of the former with the latter?
I don't think this is possible, because get_new_version_and_state
would also need to return the guard. Something I've tried in many different ways, but seems difficult.
lightning-persister/src/fs_store.rs
Outdated
|
||
// Check if we already have a newer version written/removed. This is used in async contexts to realize eventual | ||
// consistency. | ||
let stale = version <= async_state.last_written_version; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
let stale = version <= async_state.last_written_version; | |
let is_stale_version = version <= async_state.last_written_version; |
(or maybe inverting the value could even be cleaner and would match the comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought the is_...
naming wasn't a rust convention. Maybe only for methods?
Renamed.
I also wanted to invert originally, but couldn't come up with a good name. Stale seemed to carry a clearer meaning. How would you name it?
@@ -549,6 +741,66 @@ mod tests { | |||
do_read_write_remove_list_persist(&fs_store); | |||
} | |||
|
|||
#[cfg(feature = "tokio")] | |||
#[tokio::test] | |||
async fn read_write_remove_list_persist_async() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the complexity, IMO it would make sense to extend test coverage here. In particular, it would be good if we'd find a way to simulate a number of concurrent write actions and always assert everything resolves as expected. This could be solved through adequate proptests
, or by introducing some fuzzer.
let _guard = inner_lock_ref.read().unwrap(); | ||
|
||
let mut f = fs::File::open(dest_file_path)?; | ||
f.read_to_end(&mut buf)?; | ||
} | ||
|
||
self.garbage_collect_locks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we'll happily insert new entries on read
/list
/etc., shouldn't we keep the garbage collection tasks in here, too, not just on write?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. List doesn't lock at all, that one is apparently a bit loose. But read, yes, I think it should clean up if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added clean up code to read.
// If there are no more writes pending and no arcs in use elsewhere, we can remove the map entry to prevent | ||
// leaking memory. The two arcs are the one in the map and the one held here in inner_lock_ref. | ||
if !more_writes_pending && Arc::strong_count(&inner_lock_ref) == 2 { | ||
self.locks.lock().unwrap().remove(&dest_file_path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty weird pattern that we hand-in the Arc
'd inner value, but here we retake the outer lock without dropping the guard. It seems like it could invite deadlocks (at least in the future). It may be preferable to re-instantiate the scopes we had before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually Graphite mentioned this too and I had that comment still parked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it needs to happen within the guard, because otherwise another thread may initiate a write, and that state would then be removed immediately.
} | ||
|
||
fn get_new_version(async_state: &mut AsyncState) -> u64 { | ||
async_state.latest_version += 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we debug_assert
here that this is always >= last_written_version
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added. It can even be >
I think.
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>, | ||
) -> lightning::io::Result<()> { | ||
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; | ||
fn execute_locked<F: FnOnce() -> Result<(), lightning::io::Error>>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Given the name it would fit better if this took an RwLockWriteGuard
or a &mut AsyncState
rather than the RwLock
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the name isn't clear, I'd rather change the name instead of the logic. I tried to encapsulate the locking code, and execute the callback with the lock acquired. Isn't execute_locked
conveying that?
528b414
to
744fdc8
Compare
Rebased to see if fuzz error disappears |
let inner_lock_ref: Arc<RwLock<AsyncState>> = self.get_inner_lock_ref(dest_file_path); | ||
|
||
let new_version = { | ||
let mut async_state = inner_lock_ref.write().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bleh, this means that if there's a write happening for a key and another write starts for the same key, the task spawning the second write async will end up blocking until the first write completes. This should be easy to remedy by moving the lock onto just the latest_written_version
field and making the latest_version
field an atomic.
|
||
// If there are no more writes pending and no arcs in use elsewhere, we can remove the map entry to prevent | ||
// leaking memory. The two arcs are the one in the map and the one held here in inner_lock_ref. | ||
if !more_writes_pending && Arc::strong_count(&inner_lock_ref) == 2 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmmmm, I think we can accidentally remove too quick here -
(a) a thread starts the write process, does the async write, and gets to this line, finds that there are no more writes pending and there are no other references to the arc
(b) another thread starts the write process, clones the arc after looking it up in the lock
(c) the first thread resumes and removes the entry.
I believe the fix is easy, though, just move the locks
lock above the Arc
count check.
Async filesystem store with eventually consistent writes. It is just using tokio's
spawn_blocking
, because that is whattokio::fs
would otherwise do as well. Usingtokio::fs
would make it complicated to reuse the sync code.ldk-node try out: lightningdevkit/ldk-node@main...joostjager:ldk-node:async-fsstore