From ba0ab3f54ef9bda8625135ba75195c735ad17bb6 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 9 Jul 2025 12:05:39 +0300 Subject: [PATCH 1/3] Add version of tokio::sync::Watcher that only works locally and copy the relevant API --- Cargo.toml | 2 +- src/store/fs.rs | 17 ++++++++---- src/store/fs/util.rs | 1 + src/store/fs/util/watcher.rs | 51 ++++++++++++++++++++++++++++++++++++ 4 files changed, 65 insertions(+), 6 deletions(-) create mode 100644 src/store/fs/util/watcher.rs diff --git a/Cargo.toml b/Cargo.toml index 46aca852..1f851632 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ iroh-base = "0.90" reflink-copy = "0.1.24" irpc = { version = "0.5.0", features = ["rpc", "quinn_endpoint_setup", "message_spans", "stream", "derive"], default-features = false } iroh-metrics = { version = "0.35" } +atomic_refcell = "0.1.13" [dev-dependencies] clap = { version = "4.5.31", features = ["derive"] } @@ -58,7 +59,6 @@ testresult = "0.4.1" tracing-subscriber = { version = "0.3.19", features = ["fmt"] } tracing-test = "0.2.5" walkdir = "2.5.0" -atomic_refcell = "0.1.13" [features] hide-proto-docs = [] diff --git a/src/store/fs.rs b/src/store/fs.rs index 930b0a50..18cb1862 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -106,7 +106,7 @@ use crate::{ ApiClient, }, store::{ - fs::util::entity_manager::{self, ActiveEntityState}, + fs::util::entity_manager::{self, ActiveEntityState, ShutdownCause}, util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned}, Hash, }, @@ -217,10 +217,17 @@ impl entity_manager::Params for EmParams { type EntityState = Slot; - async fn on_shutdown( - _state: entity_manager::ActiveEntityState, - _cause: entity_manager::ShutdownCause, - ) { + async fn on_shutdown(state: HashContext, cause: ShutdownCause) { + // this isn't strictly necessary. Drop will run anyway as soon as the + // state is reset to it's default value. Doing it here means that we + // have exact control over where it happens. + if let Some(handle) = state.state.0.lock().await.take() { + trace!( + "shutting down entity manager for hash: {}, cause: {cause:?}", + state.id + ); + drop(handle); + } } } diff --git a/src/store/fs/util.rs b/src/store/fs/util.rs index 1cbd01bc..a3d424ce 100644 --- a/src/store/fs/util.rs +++ b/src/store/fs/util.rs @@ -2,6 +2,7 @@ use std::future::Future; use tokio::{select, sync::mpsc}; pub(crate) mod entity_manager; +pub(crate) mod watcher; /// A wrapper for a tokio mpsc receiver that allows peeking at the next message. #[derive(Debug)] diff --git a/src/store/fs/util/watcher.rs b/src/store/fs/util/watcher.rs new file mode 100644 index 00000000..4c324d1e --- /dev/null +++ b/src/store/fs/util/watcher.rs @@ -0,0 +1,51 @@ +use std::{ops::Deref, sync::Arc}; + +use atomic_refcell::AtomicRefCell; + +struct Shared { + value: AtomicRefCell, + notify: tokio::sync::Notify, +} + +pub struct Sender(Arc>); + +pub struct Receiver(Arc>); + +impl Sender { + pub fn new(value: T) -> Self { + Self(Arc::new(Shared { + value: AtomicRefCell::new(value), + notify: tokio::sync::Notify::new(), + })) + } + + pub fn send_if_modified(&self, modify: F) -> bool + where + F: FnOnce(&mut T) -> bool, + { + let mut value = self.0.value.borrow_mut(); + let modified = modify(&mut value); + if modified { + self.0.notify.notify_waiters(); + } + modified + } + + pub fn borrow(&self) -> impl Deref + '_ { + self.0.value.borrow() + } + + pub fn subscribe(&self) -> Receiver { + Receiver(self.0.clone()) + } +} + +impl Receiver { + pub async fn changed(&self) { + self.0.notify.notified().await; + } + + pub fn borrow(&self) -> impl Deref + '_ { + self.0.value.borrow() + } +} From 71f0766befb243b68a7160a7900c69de1199d0d1 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 9 Jul 2025 12:23:53 +0300 Subject: [PATCH 2/3] Use single threaded version of tokio::sync::watch Since we are guaranteed to be single threaded within the handler fns for a single hash, we can use a much more lightweight watcher impl. Also, tokio::sync::watch is optimized for the case where there are a lot of parallel watchers, using a giant BigNotify internally. We want to optimize for the case where there are just 0 or a few watchers, so we are not that much concerned with the thundering herd problem. --- src/store/fs/bao_file.rs | 2 +- src/store/fs/util.rs | 2 +- src/store/fs/util/watch.rs | 87 ++++++++++++++++++++++++++++++++++++ src/store/fs/util/watcher.rs | 51 --------------------- 4 files changed, 89 insertions(+), 53 deletions(-) create mode 100644 src/store/fs/util/watch.rs delete mode 100644 src/store/fs/util/watcher.rs diff --git a/src/store/fs/bao_file.rs b/src/store/fs/bao_file.rs index 6a86a889..58c5dfeb 100644 --- a/src/store/fs/bao_file.rs +++ b/src/store/fs/bao_file.rs @@ -20,7 +20,7 @@ use bao_tree::{ use bytes::{Bytes, BytesMut}; use derive_more::Debug; use irpc::channel::mpsc; -use tokio::sync::watch; +use super::util::watch; use tracing::{debug, error, info, trace}; use super::{ diff --git a/src/store/fs/util.rs b/src/store/fs/util.rs index a3d424ce..b739394a 100644 --- a/src/store/fs/util.rs +++ b/src/store/fs/util.rs @@ -2,7 +2,7 @@ use std::future::Future; use tokio::{select, sync::mpsc}; pub(crate) mod entity_manager; -pub(crate) mod watcher; +pub(crate) mod watch; /// A wrapper for a tokio mpsc receiver that allows peeking at the next message. #[derive(Debug)] diff --git a/src/store/fs/util/watch.rs b/src/store/fs/util/watch.rs new file mode 100644 index 00000000..316321ea --- /dev/null +++ b/src/store/fs/util/watch.rs @@ -0,0 +1,87 @@ +use std::{ops::Deref, sync::Arc}; + +use atomic_refcell::{AtomicRef, AtomicRefCell}; + +struct State { + value: T, + dropped: bool, +} + +struct Shared { + value: AtomicRefCell>, + notify: tokio::sync::Notify, +} + +pub struct Sender(Arc>); + +pub struct Receiver(Arc>); + +impl Sender { + pub fn new(value: T) -> Self { + Self(Arc::new(Shared { + value: AtomicRefCell::new(State { + value, + dropped: false, + }), + notify: tokio::sync::Notify::new(), + })) + } + + pub fn send_if_modified(&self, modify: F) -> bool + where + F: FnOnce(&mut T) -> bool, + { + let mut state = self.0.value.borrow_mut(); + let modified = modify(&mut state.value); + if modified { + self.0.notify.notify_waiters(); + } + modified + } + + pub fn borrow(&self) -> impl Deref + '_ { + AtomicRef::map(self.0.value.borrow(), |state| &state.value) + } + + pub fn subscribe(&self) -> Receiver { + Receiver(self.0.clone()) + } +} + +impl Drop for Sender { + fn drop(&mut self) { + self.0.value.borrow_mut().dropped = true; + self.0.notify.notify_waiters(); + } +} + +impl Receiver { + pub async fn changed(&self) -> Result<(), error::RecvError> { + self.0.notify.notified().await; + if self.0.value.borrow().dropped { + Err(error::RecvError(())) + } else { + Ok(()) + } + } + + pub fn borrow(&self) -> impl Deref + '_ { + AtomicRef::map(self.0.value.borrow(), |state| &state.value) + } +} + +pub mod error { + use std::{error::Error, fmt}; + + /// Error produced when receiving a change notification. + #[derive(Debug, Clone)] + pub struct RecvError(pub(super) ()); + + impl fmt::Display for RecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } + } + + impl Error for RecvError {} +} \ No newline at end of file diff --git a/src/store/fs/util/watcher.rs b/src/store/fs/util/watcher.rs deleted file mode 100644 index 4c324d1e..00000000 --- a/src/store/fs/util/watcher.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::{ops::Deref, sync::Arc}; - -use atomic_refcell::AtomicRefCell; - -struct Shared { - value: AtomicRefCell, - notify: tokio::sync::Notify, -} - -pub struct Sender(Arc>); - -pub struct Receiver(Arc>); - -impl Sender { - pub fn new(value: T) -> Self { - Self(Arc::new(Shared { - value: AtomicRefCell::new(value), - notify: tokio::sync::Notify::new(), - })) - } - - pub fn send_if_modified(&self, modify: F) -> bool - where - F: FnOnce(&mut T) -> bool, - { - let mut value = self.0.value.borrow_mut(); - let modified = modify(&mut value); - if modified { - self.0.notify.notify_waiters(); - } - modified - } - - pub fn borrow(&self) -> impl Deref + '_ { - self.0.value.borrow() - } - - pub fn subscribe(&self) -> Receiver { - Receiver(self.0.clone()) - } -} - -impl Receiver { - pub async fn changed(&self) { - self.0.notify.notified().await; - } - - pub fn borrow(&self) -> impl Deref + '_ { - self.0.value.borrow() - } -} From 6dfdd6a7ca110ff7d482ddc57029febb6499c3f1 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 9 Jul 2025 12:33:14 +0300 Subject: [PATCH 3/3] fmt --- src/store/fs/bao_file.rs | 2 +- src/store/fs/util/watch.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/store/fs/bao_file.rs b/src/store/fs/bao_file.rs index 58c5dfeb..0b2bd4f0 100644 --- a/src/store/fs/bao_file.rs +++ b/src/store/fs/bao_file.rs @@ -20,12 +20,12 @@ use bao_tree::{ use bytes::{Bytes, BytesMut}; use derive_more::Debug; use irpc::channel::mpsc; -use super::util::watch; use tracing::{debug, error, info, trace}; use super::{ entry_state::{DataLocation, EntryState, OutboardLocation}, options::{Options, PathOptions}, + util::watch, BaoFilePart, }; use crate::{ diff --git a/src/store/fs/util/watch.rs b/src/store/fs/util/watch.rs index 316321ea..58a56a0a 100644 --- a/src/store/fs/util/watch.rs +++ b/src/store/fs/util/watch.rs @@ -53,7 +53,7 @@ impl Drop for Sender { self.0.value.borrow_mut().dropped = true; self.0.notify.notify_waiters(); } -} +} impl Receiver { pub async fn changed(&self) -> Result<(), error::RecvError> { @@ -69,7 +69,7 @@ impl Receiver { AtomicRef::map(self.0.value.borrow(), |state| &state.value) } } - + pub mod error { use std::{error::Error, fmt}; @@ -84,4 +84,4 @@ pub mod error { } impl Error for RecvError {} -} \ No newline at end of file +}