-
Notifications
You must be signed in to change notification settings - Fork 13
refactor: Change concurrency approach #100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
will probably inline this once it is proven to work well
futures-buffered is reexported by n0-future, as I learned yesterday
...by reverting a change that I did because of some older version of clippy
Documentation for this PR has been generated and is available at: https://n0-computer.github.io/iroh-blobs/pr/100/docs/iroh_blobs/ Last updated: 2025-07-11T11:57:10Z |
Here is where I am going with this. Now that we have the guarantee that each hash runs single threaded (not the same thread, thanks tokio, but at least 1 thread at a time), we can make concurrency much more lightweight: #101 |
Command::ListBlobs(cmd) => { | ||
trace!("{cmd:?}"); | ||
let (tx, rx) = tokio::sync::oneshot::channel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated change!
} | ||
for hash in to_remove { | ||
if let Some(slot) = self.handles.remove(&hash) { | ||
// do a quick check if the handle has become alive in the meantime, and reinsert it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Getting rid of this nasty race condition handling was the whole purpose of the exercise. Also, we don't need a scheduled cleanup task anymore since the entity manager will deal with it. If an actor becomes idle it will be removed from the map after shutdown.
@@ -4,6 +4,7 @@ use iroh_metrics::{Counter, MetricsGroup}; | |||
|
|||
/// Enum of metrics for the module | |||
#[allow(missing_docs)] | |||
#[allow(dead_code)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this dead code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to rethink what metrics to collect, many of those aren't even relevant anymore.
@@ -0,0 +1,1038 @@ | |||
#![allow(dead_code)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This thing was a separate crate. I inlined it so I don't have to publish and maintain yet another crate. The dead code part is the friendly API of the crate, if you don't want to embed the entity manager into your own actor.
But I want to embed the entity manager, since I don't want client -> iroh-blobs main actor -> entity manager -> entity. Save one hop.
} | ||
} | ||
|
||
/// A manager for entities identified by an entity id. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so you basically wrote a new actor system? 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, that's irpc, in a way. (At least my idea how async boundaries should be structured). This is very specialized, so it's more like a task manager.
You get concurrency but not parallelism per entity, and parallelism between separate entities. An entity here is the state for a hash.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right
@@ -299,6 +300,7 @@ mod tests { | |||
let outboard_path = options.outboard_path(&bh); | |||
let sizes_path = options.sizes_path(&bh); | |||
let bitfield_path = options.bitfield_path(&bh); | |||
tokio::time::sleep(Duration::from_millis(100)).await; // allow for some time for the file to be written |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very racy, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. There is some delay between when an entity becomes idle and when the bitfield gets written to disk. The bitfield is ephemeral, but can be expensive to compute. You need to call shutdown before terminating the process to make sure all bitfields are persisted, but here we are looking for the file before shutting down, so there is currently no API to wait for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what to do about this. I could add an await_idle(hash). but that is not really a generic API, so it would have to be just for the FS store.
One thing I am not sure about - the entity manager gives you an async but exclusive on_shutdown fn to perform cleanup like saving data to disk or an async io system. Should it also do the same for waking up, have an async per entity cb that would be called on wakeup? I did not do it because I thought you might want to do this lazily or partially inside your state, so having 1 cb would be restrictive. But it sure would make the 2 test databases in the tests much more pleasant to write... |
Nah, best handled by a tokio::sync::OnceCell or something custom. |
Description
Before, the concurrency approach for slots / handles for individual hashes was based on a top level task pool and "slots" that were managed in a map in the main actor. There were some tricky race conditions for the case where a handle would be "revived" when it was already executing its Drop fn - which does crucial things like writing the bitfield file. Also, there was actual parallelism per hash, which is probably not beneficial at all.
Now basically each hash runs effectively single threaded, meaning that we can later go from actual mutexes to more lightweight synchronisation primitives like https://crates.io/crates/atomic_refcell . Unfortunately everything must still be Send due to the fact that we run this whole thing on a multi-threaded executor 🤷 , thank you tokio. Otherwise we could just use a RefCell.
Now the concurrency is based on a task pool that will always contain at most a single task per hash. Multiple tasks that operate on the same hash are being handled concurrently, but not in parallel, using a
FuturesUnordered
. The drop case is handled in a cleaner way - when an actor becomes idle, it "gives back" its state to the owner - the manager actor.If a task is being spawned while drop runs, these tasks go into the inbox and the actor gets revived immediately afterwards. The manager also has a pool of inactive actors to prevent reallocations.
All this is abstracted away by the entity_manager.
The entire entity_manager module could at some point become a separate crate, but for now I have inlined it so I don't need to do another crate.
Breaking Changes
Notes & open questions
Change checklist