Skip to content

refactor: Lightweight watcher #101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 37 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
90c23be
Add entity-manager dep
rklaehn Jul 8, 2025
90db719
Remove unused dependencies
rklaehn Jul 8, 2025
24c7f2c
Merge branch 'lose-futures-buffered-dep' into entity-manager
rklaehn Jul 8, 2025
126d834
Merge branch 'main' into entity-manager
rklaehn Jul 8, 2025
264ed84
Add import
rklaehn Jul 8, 2025
a859674
Add helpers for set and update
rklaehn Jul 8, 2025
baf4d54
Make bipolar clippy happy again
rklaehn Jul 8, 2025
66f8540
Move message sending into meta
rklaehn Jul 8, 2025
9193b08
Merge branch 'remove-db-public-send' into entity-manager
rklaehn Jul 8, 2025
8cae7cd
WIP
rklaehn Jul 8, 2025
161a70b
Replace slots with EntityHandler
rklaehn Jul 8, 2025
169261a
Get rid of the weak reference
rklaehn Jul 8, 2025
6fe2e07
Inline entity manager and streamline the dispatcher code a bit
rklaehn Jul 9, 2025
fd9faab
revert a few changes to minimize the diff
rklaehn Jul 9, 2025
ba0ab3f
Add version of tokio::sync::Watcher that only works locally and copy …
rklaehn Jul 9, 2025
71f0766
Use single threaded version of tokio::sync::watch
rklaehn Jul 9, 2025
8ac1b78
codespell
rklaehn Jul 9, 2025
c72ebf5
Merge branch 'entity-manager' into lightweight-watcher
rklaehn Jul 9, 2025
6dfdd6a
fmt
rklaehn Jul 9, 2025
011746d
remove debug code
rklaehn Jul 9, 2025
def9abc
more comments
rklaehn Jul 9, 2025
47cab9c
Add property based tests for the entity manager
rklaehn Jul 10, 2025
00c54f3
Add a property based test that tests basic functionality of the entit…
rklaehn Jul 10, 2025
664beb8
clippy
rklaehn Jul 10, 2025
7b1b71b
Merge branch 'main' into entity-manager
rklaehn Jul 10, 2025
e584ad1
fmt
rklaehn Jul 10, 2025
4412be9
Add tests for the busy and dead cases which are hard to trigger witho…
rklaehn Jul 10, 2025
c26e555
clippy again
rklaehn Jul 10, 2025
75864bc
Merge branch 'entity-manager' into lightweight-watcher
rklaehn Jul 11, 2025
8004115
Merge branch 'main' into entity-manager
rklaehn Jul 11, 2025
c57f259
Merge branch 'main' into entity-manager
rklaehn Jul 23, 2025
425ae55
Merge branch 'entity-manager' into lightweight-watcher
rklaehn Jul 23, 2025
42bbbec
Merge branch 'main' into entity-manager
rklaehn Jul 24, 2025
591769f
Merge branch 'entity-manager' into lightweight-watcher
rklaehn Jul 24, 2025
91af79e
Merge branch 'main' into entity-manager
rklaehn Aug 5, 2025
f891633
Merge branch 'entity-manager' into lightweight-watcher
rklaehn Aug 5, 2025
ddd45d9
Merge branch 'main' into lightweight-watcher
rklaehn Aug 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ iroh-base = "0.91"
reflink-copy = "0.1.24"
irpc = { version = "0.7.0", features = ["rpc", "quinn_endpoint_setup", "spans", "stream", "derive"], default-features = false }
iroh-metrics = { version = "0.35" }
atomic_refcell = "0.1.13"

[dev-dependencies]
clap = { version = "4.5.31", features = ["derive"] }
Expand Down
17 changes: 12 additions & 5 deletions src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ use crate::{
ApiClient,
},
store::{
fs::util::entity_manager::{self, ActiveEntityState},
fs::util::entity_manager::{self, ActiveEntityState, ShutdownCause},
util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned},
Hash,
},
Expand Down Expand Up @@ -217,10 +217,17 @@ impl entity_manager::Params for EmParams {

type EntityState = Slot;

async fn on_shutdown(
_state: entity_manager::ActiveEntityState<Self>,
_cause: entity_manager::ShutdownCause,
) {
async fn on_shutdown(state: HashContext, cause: ShutdownCause) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated change!

// this isn't strictly necessary. Drop will run anyway as soon as the
// state is reset to it's default value. Doing it here means that we
// have exact control over where it happens.
if let Some(handle) = state.state.0.lock().await.take() {
trace!(
"shutting down entity manager for hash: {}, cause: {cause:?}",
state.id
);
drop(handle);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/store/fs/bao_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ use bao_tree::{
use bytes::{Bytes, BytesMut};
use derive_more::Debug;
use irpc::channel::mpsc;
use tokio::sync::watch;
use tracing::{debug, error, info, trace};

use super::{
entry_state::{DataLocation, EntryState, OutboardLocation},
options::{Options, PathOptions},
util::watch,
BaoFilePart,
};
use crate::{
Expand Down
1 change: 1 addition & 0 deletions src/store/fs/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::future::Future;

use tokio::{select, sync::mpsc};
pub(crate) mod entity_manager;
pub(crate) mod watch;

/// A wrapper for a tokio mpsc receiver that allows peeking at the next message.
#[derive(Debug)]
Expand Down
87 changes: 87 additions & 0 deletions src/store/fs/util/watch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use std::{ops::Deref, sync::Arc};

use atomic_refcell::{AtomicRef, AtomicRefCell};

struct State<T> {
value: T,
dropped: bool,
}

struct Shared<T> {
value: AtomicRefCell<State<T>>,
notify: tokio::sync::Notify,
}

pub struct Sender<T>(Arc<Shared<T>>);

pub struct Receiver<T>(Arc<Shared<T>>);

impl<T> Sender<T> {
pub fn new(value: T) -> Self {
Self(Arc::new(Shared {
value: AtomicRefCell::new(State {
value,
dropped: false,
}),
notify: tokio::sync::Notify::new(),
}))
}

pub fn send_if_modified<F>(&self, modify: F) -> bool
where
F: FnOnce(&mut T) -> bool,
{
let mut state = self.0.value.borrow_mut();
let modified = modify(&mut state.value);
if modified {
self.0.notify.notify_waiters();
}
modified
}

pub fn borrow(&self) -> impl Deref<Target = T> + '_ {
AtomicRef::map(self.0.value.borrow(), |state| &state.value)
}

pub fn subscribe(&self) -> Receiver<T> {
Receiver(self.0.clone())
}
}

impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.0.value.borrow_mut().dropped = true;
self.0.notify.notify_waiters();
}
}

impl<T> Receiver<T> {
pub async fn changed(&self) -> Result<(), error::RecvError> {
self.0.notify.notified().await;
if self.0.value.borrow().dropped {
Err(error::RecvError(()))
} else {
Ok(())
}
}

pub fn borrow(&self) -> impl Deref<Target = T> + '_ {
AtomicRef::map(self.0.value.borrow(), |state| &state.value)
}
}

pub mod error {
use std::{error::Error, fmt};

/// Error produced when receiving a change notification.
#[derive(Debug, Clone)]
pub struct RecvError(pub(super) ());

impl fmt::Display for RecvError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "channel closed")
}
}

impl Error for RecvError {}
}
Loading