From 77fbb05b88619fb993b68415ff5f09abbc446eb6 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 9 Jul 2025 18:36:10 +0300 Subject: [PATCH 01/11] First step for removing the slot mutex --- src/store/fs/bao_file.rs | 150 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 139 insertions(+), 11 deletions(-) diff --git a/src/store/fs/bao_file.rs b/src/store/fs/bao_file.rs index 1fe91404..9f526f07 100644 --- a/src/store/fs/bao_file.rs +++ b/src/store/fs/bao_file.rs @@ -286,8 +286,15 @@ fn read_size(size_file: &File) -> io::Result { } /// The storage for a bao file. This can be either in memory or on disk. -#[derive(derive_more::From)] +#[derive(derive_more::From, Default)] pub(crate) enum BaoFileStorage { + /// Initial state, we don't know anything yet. + #[default] + Initial, + /// Currently loading the entry from the metadata. + Loading, + /// There is no info about this hash in the metadata db. + NonExisting, /// The entry is incomplete and in memory. /// /// Since it is incomplete, it must be writeable. @@ -322,16 +329,13 @@ impl fmt::Debug for BaoFileStorage { BaoFileStorage::Partial(x) => x.fmt(f), BaoFileStorage::Complete(x) => x.fmt(f), BaoFileStorage::Poisoned => f.debug_struct("Poisoned").finish(), + BaoFileStorage::Initial => f.debug_struct("Initial").finish(), + BaoFileStorage::Loading => f.debug_struct("Loading").finish(), + BaoFileStorage::NonExisting => f.debug_struct("NonExisting").finish(), } } } -impl Default for BaoFileStorage { - fn default() -> Self { - BaoFileStorage::Complete(Default::default()) - } -} - impl PartialMemStorage { /// Converts this storage into a complete storage, using the given hash for /// path names and the given options for decisions about inlining. @@ -387,9 +391,16 @@ impl PartialMemStorage { impl BaoFileStorage { pub fn bitfield(&self) -> Bitfield { match self { - BaoFileStorage::Complete(x) => Bitfield::complete(x.data.size()), + BaoFileStorage::Initial => { + panic!("initial storage should not be used") + } + BaoFileStorage::Loading => { + panic!("loading storage should not be used") + } + BaoFileStorage::NonExisting => Bitfield::empty(), BaoFileStorage::PartialMem(x) => x.bitfield.clone(), BaoFileStorage::Partial(x) => x.bitfield.clone(), + BaoFileStorage::Complete(x) => Bitfield::complete(x.data.size()), BaoFileStorage::Poisoned => { panic!("poisoned storage should not be used") } @@ -465,7 +476,7 @@ impl BaoFileStorage { // unless there is a bug, this would just write the exact same data (self, None) } - BaoFileStorage::Poisoned => { + _ => { // we are poisoned, so just ignore the write (self, None) } @@ -483,13 +494,14 @@ impl BaoFileStorage { match self { Self::Complete(_) => Ok(()), Self::PartialMem(_) => Ok(()), + Self::NonExisting => Ok(()), Self::Partial(file) => { file.data.sync_all()?; file.outboard.sync_all()?; file.sizes.sync_all()?; Ok(()) } - Self::Poisoned => { + Self::Poisoned | Self::Initial | Self::Loading => { // we are poisoned, so just ignore the sync Ok(()) } @@ -506,6 +518,45 @@ impl BaoFileStorage { pub(crate) struct BaoFileHandle(Arc>); impl BaoFileHandle { + pub(super) async fn load(&self, ctx: &HashContext) { + enum Action { + Load, + Wait, + None, + } + let mut action = Action::None; + self.send_if_modified(|guard| match guard.deref() { + BaoFileStorage::Initial => { + *guard = BaoFileStorage::Loading; + action = Action::Load; + true + } + BaoFileStorage::Loading => { + action = Action::Wait; + false + } + _ => false, + }); + match action { + Action::Load => { + let state = match ctx.global.db.get(ctx.id).await { + Ok(state) => match BaoFileStorage::open(state, ctx).await { + Ok(handle) => handle, + Err(_) => BaoFileStorage::Poisoned, + }, + Err(_) => BaoFileStorage::Poisoned, + }; + self.send_replace(state); + } + Action::Wait => { + while let BaoFileStorage::Loading = self.borrow().deref() { + self.0.subscribe().changed().await.ok(); + } + } + Action::None => {} + } + } + pub(super) fn persist(&mut self, ctx: &HashContext) { self.send_if_modified(|guard| { let hash = &ctx.id; @@ -542,6 +593,9 @@ impl ReadBytesAt for DataReader { BaoFileStorage::Partial(x) => x.data.read_bytes_at(offset, size), BaoFileStorage::Complete(x) => x.data.read_bytes_at(offset, size), BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")), + BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()), } } } @@ -558,10 +612,78 @@ impl ReadAt for OutboardReader { BaoFileStorage::PartialMem(x) => x.outboard.read_at(offset, buf), BaoFileStorage::Partial(x) => x.outboard.read_at(offset, buf), BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")), + BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()), } } } +impl BaoFileStorage { + pub async fn open(state: Option>, ctx: &HashContext) -> io::Result { + let hash = &ctx.id; + let options = &ctx.global.options; + Ok(match state { + Some(EntryState::Complete { + data_location, + outboard_location, + }) => { + let data = match data_location { + DataLocation::Inline(data) => MemOrFile::Mem(data), + DataLocation::Owned(size) => { + let path = options.path.data_path(hash); + let file = std::fs::File::open(&path)?; + MemOrFile::File(FixedSize::new(file, size)) + } + DataLocation::External(paths, size) => { + let Some(path) = paths.into_iter().next() else { + return Err(io::Error::other("no external data path")); + }; + let file = std::fs::File::open(&path)?; + MemOrFile::File(FixedSize::new(file, size)) + } + }; + let outboard = match outboard_location { + OutboardLocation::NotNeeded => MemOrFile::empty(), + OutboardLocation::Inline(data) => MemOrFile::Mem(data), + OutboardLocation::Owned => { + let path = options.path.outboard_path(hash); + let file = std::fs::File::open(&path)?; + MemOrFile::File(file) + } + }; + Self::new_complete(data, outboard) + } + Some(EntryState::Partial { .. }) => Self::new_partial_file(ctx).await?, + None => Self::NonExisting, + }) + } + + /// Create a new bao file handle with a partial file. + pub(super) async fn new_partial_file(ctx: &HashContext) -> io::Result { + let hash = &ctx.id; + let options = ctx.global.options.clone(); + let storage = PartialFileStorage::load(hash, &options.path)?; + Ok(if storage.bitfield.is_complete() { + let size = storage.bitfield.size; + let (storage, entry_state) = storage.into_complete(size, &options)?; + debug!("File was reconstructed as complete"); + ctx.global.db.set(*hash, entry_state).await?; + storage.into() + } else { + storage.into() + }) + } + + /// Create a new complete bao file handle. + pub fn new_complete( + data: MemOrFile>, + outboard: MemOrFile, + ) -> Self { + CompleteStorage { data, outboard }.into() + } +} + impl BaoFileHandle { #[allow(dead_code)] pub fn id(&self) -> usize { @@ -613,7 +735,7 @@ impl BaoFileHandle { BaoFileStorage::Complete(_) => None, BaoFileStorage::PartialMem(entry) => Some(&mut entry.bitfield), BaoFileStorage::Partial(entry) => Some(&mut entry.bitfield), - BaoFileStorage::Poisoned => None, + _ => None, }; if let Some(bitfield) = res { bitfield.update(&Bitfield::complete(data.size())); @@ -658,6 +780,9 @@ impl BaoFileHandle { BaoFileStorage::PartialMem(mem) => Ok(mem.current_size()), BaoFileStorage::Partial(file) => file.current_size(), BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")), + BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()), } } @@ -668,6 +793,9 @@ impl BaoFileHandle { BaoFileStorage::PartialMem(mem) => Ok(mem.bitfield().clone()), BaoFileStorage::Partial(file) => Ok(file.bitfield().clone()), BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")), + BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()), } } From ae95f2878ccb7faff007326234d9a92a7731cdc0 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 9 Jul 2025 18:47:12 +0300 Subject: [PATCH 02/11] prepare BaoFileHandle to be the full state --- src/store/fs/bao_file.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/store/fs/bao_file.rs b/src/store/fs/bao_file.rs index 9f526f07..70f71f60 100644 --- a/src/store/fs/bao_file.rs +++ b/src/store/fs/bao_file.rs @@ -31,7 +31,7 @@ use super::{ use crate::{ api::blobs::Bitfield, store::{ - fs::{meta::raw_outboard_size, HashContext}, + fs::{meta::raw_outboard_size, util::entity_manager, HashContext}, util::{ read_checksummed_and_truncate, write_checksummed, FixedSize, MemOrFile, PartialMemStorage, DD, @@ -514,9 +514,15 @@ impl BaoFileStorage { } /// A cheaply cloneable handle to a bao file, including the hash and the configuration. -#[derive(Debug, Clone, derive_more::Deref)] +#[derive(Debug, Clone, Default, derive_more::Deref)] pub(crate) struct BaoFileHandle(Arc>); +impl entity_manager::Reset for BaoFileHandle { + fn reset(&mut self) { + self.send_replace(BaoFileStorage::Initial); + } +} + impl BaoFileHandle { pub(super) async fn load(&self, ctx: &HashContext) { enum Action { From c786a045ec204289d20f5dbd7833b63a3079e3e9 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 10 Jul 2025 11:17:33 +0300 Subject: [PATCH 03/11] Flatten the state we keep per active blob --- src/api/remote.rs | 41 +++- src/store/fs.rs | 320 +++++++++++----------------- src/store/fs/bao_file.rs | 149 ++++--------- src/store/fs/gc.rs | 2 + src/store/fs/util/entity_manager.rs | 17 ++ 5 files changed, 219 insertions(+), 310 deletions(-) diff --git a/src/api/remote.rs b/src/api/remote.rs index 7a4055fb..5de12d5f 100644 --- a/src/api/remote.rs +++ b/src/api/remote.rs @@ -1064,8 +1064,15 @@ mod tests { use testresult::TestResult; use crate::{ + api::blobs::Blobs, protocol::{ChunkRangesSeq, GetRequest}, - store::fs::{tests::INTERESTING_SIZES, FsStore}, + store::{ + fs::{ + tests::{create_n0_bao, test_data, INTERESTING_SIZES}, + FsStore, + }, + mem::MemStore, + }, tests::{add_test_hash_seq, add_test_hash_seq_incomplete}, util::ChunkRangesExt, }; @@ -1117,6 +1124,38 @@ mod tests { Ok(()) } + async fn test_observe_partial(blobs: &Blobs) -> TestResult<()> { + let sizes = INTERESTING_SIZES; + for size in sizes { + let data = test_data(size); + let ranges = ChunkRanges::chunk(0); + let (hash, bao) = create_n0_bao(&data, &ranges)?; + blobs.import_bao_bytes(hash, ranges.clone(), bao).await?; + let bitfield = blobs.observe(hash).await?; + if size > 1024 { + assert_eq!(bitfield.ranges, ranges); + } else { + assert_eq!(bitfield.ranges, ChunkRanges::all()); + } + } + Ok(()) + } + + #[tokio::test] + async fn test_observe_partial_mem() -> TestResult<()> { + let store = MemStore::new(); + test_observe_partial(store.blobs()).await?; + Ok(()) + } + + #[tokio::test] + async fn test_observe_partial_fs() -> TestResult<()> { + let td = tempfile::tempdir()?; + let store = FsStore::load(td.path()).await?; + test_observe_partial(store.blobs()).await?; + Ok(()) + } + #[tokio::test] async fn test_local_info_hash_seq() -> TestResult<()> { let sizes = INTERESTING_SIZES; diff --git a/src/store/fs.rs b/src/store/fs.rs index b323571b..90bd4bc2 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -106,7 +106,10 @@ use crate::{ ApiClient, }, store::{ - fs::util::entity_manager::{self, ActiveEntityState}, + fs::{ + bao_file::{BaoFileStorage, CompleteStorage}, + util::entity_manager::{self, ActiveEntityState}, + }, util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned}, Hash, }, @@ -186,8 +189,6 @@ struct TaskContext { pub db: meta::Db, // Handle to send internal commands pub internal_cmd_tx: tokio::sync::mpsc::Sender, - /// The file handle for the empty hash. - pub empty: BaoFileHandle, /// Handle to protect files from deletion. pub protect: ProtectHandle, } @@ -201,12 +202,6 @@ impl TaskContext { } } -impl entity_manager::Reset for Slot { - fn reset(&mut self) { - self.0 = Arc::new(tokio::sync::Mutex::new(None)); - } -} - #[derive(Debug)] struct EmParams; @@ -215,19 +210,15 @@ impl entity_manager::Params for EmParams { type GlobalState = Arc; - type EntityState = Slot; + type EntityState = BaoFileHandle; async fn on_shutdown( state: entity_manager::ActiveEntityState, - cause: entity_manager::ShutdownCause, + _cause: entity_manager::ShutdownCause, ) { - // this isn't strictly necessary. Drop will run anyway as soon as the - // state is reset to it's default value. Doing it here means that we - // have exact control over where it happens. - if let Some(mut handle) = state.state.0.lock().await.take() { - trace!("shutting down hash: {}, cause: {cause:?}", state.id); - handle.persist(&state); - } + // println!("persisting entry {:?} {:?}", state.id, state.state.borrow().bitfield()); + state.state.clone().persist(&state); + // println!("persisting entry done {:?}", state.id); } } @@ -252,6 +243,61 @@ struct Actor { type HashContext = ActiveEntityState; impl HashContext { + /// Load the state from the database. + /// + /// If the state is Initial, this will start the load. + /// If it is Loading, it will wait until loading is done. + /// If it is any other state, it will be a noop. + pub async fn load(&self) { + enum Action { + Load, + Wait, + None, + } + let mut action = Action::None; + self.state.send_if_modified(|guard| match guard.deref() { + BaoFileStorage::Initial => { + *guard = BaoFileStorage::Loading; + action = Action::Load; + true + } + BaoFileStorage::Loading => { + action = Action::Wait; + false + } + _ => false, + }); + match action { + Action::Load => { + let state = if self.id == Hash::EMPTY { + BaoFileStorage::Complete(CompleteStorage { + data: MemOrFile::Mem(Bytes::new()), + outboard: MemOrFile::empty(), + }) + } else { + // we must assign a new state even in the error case, otherwise + // tasks waiting for loading would stall! + match self.global.db.get(self.id).await { + Ok(state) => match BaoFileStorage::open(state, self).await { + Ok(handle) => handle, + Err(_) => BaoFileStorage::Poisoned, + }, + Err(_) => BaoFileStorage::Poisoned, + } + }; + self.state.send_replace(state); + } + Action::Wait => { + // we are in state loading already, so we just need to wait for the + // other task to complete loading. + while matches!(self.state.borrow().deref(), BaoFileStorage::Loading) { + self.state.0.subscribe().changed().await.ok(); + } + } + Action::None => {} + } + } + pub fn db(&self) -> &meta::Db { &self.global.db } @@ -260,10 +306,6 @@ impl HashContext { &self.global.options } - pub async fn lock(&self) -> tokio::sync::MutexGuard<'_, Option> { - self.state.0.lock().await - } - pub fn protect(&self, hash: Hash, parts: impl IntoIterator) { self.global.protect.protect(hash, parts); } @@ -288,113 +330,6 @@ impl HashContext { pub async fn set(&self, hash: Hash, state: EntryState) -> io::Result<()> { self.db().set(hash, state).await } - - pub async fn get(&self, hash: Hash) -> api::Result { - if hash == Hash::EMPTY { - return Ok(self.global.empty.clone()); - } - let res = self - .state - .get_or_create(|| async { - let res = self.db().get(hash).await.map_err(io::Error::other)?; - let res = match res { - Some(state) => open_bao_file(state, self).await, - None => Err(io::Error::new(io::ErrorKind::NotFound, "hash not found")), - }; - Ok((res?, ())) - }) - .await - .map_err(api::Error::from); - let (res, _) = res?; - Ok(res) - } - - pub async fn get_or_create(&self, hash: Hash) -> api::Result { - if hash == Hash::EMPTY { - return Ok(self.global.empty.clone()); - } - let res = self - .state - .get_or_create(|| async { - let res = self.db().get(hash).await.map_err(io::Error::other)?; - let res = match res { - Some(state) => open_bao_file(state, self).await, - None => Ok(BaoFileHandle::new_partial_mem()), - }; - Ok((res?, ())) - }) - .await - .map_err(api::Error::from); - trace!("{res:?}"); - let (res, _) = res?; - Ok(res) - } -} - -async fn open_bao_file(state: EntryState, ctx: &HashContext) -> io::Result { - let hash = &ctx.id; - let options = &ctx.global.options; - Ok(match state { - EntryState::Complete { - data_location, - outboard_location, - } => { - let data = match data_location { - DataLocation::Inline(data) => MemOrFile::Mem(data), - DataLocation::Owned(size) => { - let path = options.path.data_path(hash); - let file = fs::File::open(&path)?; - MemOrFile::File(FixedSize::new(file, size)) - } - DataLocation::External(paths, size) => { - let Some(path) = paths.into_iter().next() else { - return Err(io::Error::other("no external data path")); - }; - let file = fs::File::open(&path)?; - MemOrFile::File(FixedSize::new(file, size)) - } - }; - let outboard = match outboard_location { - OutboardLocation::NotNeeded => MemOrFile::empty(), - OutboardLocation::Inline(data) => MemOrFile::Mem(data), - OutboardLocation::Owned => { - let path = options.path.outboard_path(hash); - let file = fs::File::open(&path)?; - MemOrFile::File(file) - } - }; - BaoFileHandle::new_complete(data, outboard) - } - EntryState::Partial { .. } => BaoFileHandle::new_partial_file(ctx).await?, - }) -} - -/// An entry for each hash, containing a weak reference to a BaoFileHandle -/// wrapped in a tokio mutex so handle creation is sequential. -#[derive(Debug, Clone, Default)] -pub(crate) struct Slot(Arc>>); - -impl Slot { - /// Get the handle if it exists and is still alive, otherwise load it from the database. - /// If there is nothing in the database, create a new in-memory handle. - /// - /// `make` will be called if the a live handle does not exist. - pub async fn get_or_create(&self, make: F) -> io::Result<(BaoFileHandle, T)> - where - F: FnOnce() -> Fut, - Fut: std::future::Future>, - T: Default, - { - let mut slot = self.0.lock().await; - if let Some(handle) = &*slot { - return Ok((handle.clone(), Default::default())); - } - let handle = make().await; - if let Ok((handle, _)) = &handle { - *slot = Some(handle.clone()); - } - handle - } } impl Actor { @@ -619,7 +554,6 @@ impl Actor { options: options.clone(), db: meta::Db::new(db_send), internal_cmd_tx: fs_commands_tx, - empty: BaoFileHandle::new_complete(MemOrFile::empty(), MemOrFile::empty()), protect, }); rt.spawn(db_actor.run()); @@ -772,6 +706,7 @@ async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc) #[instrument(skip_all, fields(hash = %cmd.hash_short()))] async fn finish_import(cmd: ImportEntryMsg, mut tt: TempTag, ctx: HashContext) { + trace!("{cmd:?}"); let res = match finish_import_impl(cmd.inner, ctx).await { Ok(()) => { // for a remote call, we can't have the on_drop callback, so we have to leak the temp tag @@ -788,6 +723,9 @@ async fn finish_import(cmd: ImportEntryMsg, mut tt: TempTag, ctx: HashContext) { } async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::Result<()> { + if ctx.id == Hash::EMPTY { + return Ok(()); // nothing to do for the empty hash + } let ImportEntry { source, hash, @@ -806,8 +744,8 @@ async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::R debug_assert!(!options.is_inlined_data(*size)); } } - let guard = ctx.lock().await; - let handle = guard.as_ref().map(|x| x.clone()); + ctx.load().await; + let handle = &ctx.state; // if I do have an existing handle, I have to possibly deal with observers. // if I don't have an existing handle, there are 2 cases: // the entry exists in the db, but we don't have a handle @@ -857,33 +795,31 @@ async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::R OutboardLocation::Owned } }; - if let Some(handle) = handle { - let data = match &data_location { - DataLocation::Inline(data) => MemOrFile::Mem(data.clone()), - DataLocation::Owned(size) => { - let path = ctx.options().path.data_path(&hash); - let file = fs::File::open(&path)?; - MemOrFile::File(FixedSize::new(file, *size)) - } - DataLocation::External(paths, size) => { - let Some(path) = paths.iter().next() else { - return Err(io::Error::other("no external data path")); - }; - let file = fs::File::open(path)?; - MemOrFile::File(FixedSize::new(file, *size)) - } - }; - let outboard = match &outboard_location { - OutboardLocation::NotNeeded => MemOrFile::empty(), - OutboardLocation::Inline(data) => MemOrFile::Mem(data.clone()), - OutboardLocation::Owned => { - let path = ctx.options().path.outboard_path(&hash); - let file = fs::File::open(&path)?; - MemOrFile::File(file) - } - }; - handle.complete(data, outboard); - } + let data = match &data_location { + DataLocation::Inline(data) => MemOrFile::Mem(data.clone()), + DataLocation::Owned(size) => { + let path = ctx.options().path.data_path(&hash); + let file = fs::File::open(&path)?; + MemOrFile::File(FixedSize::new(file, *size)) + } + DataLocation::External(paths, size) => { + let Some(path) = paths.iter().next() else { + return Err(io::Error::other("no external data path")); + }; + let file = fs::File::open(path)?; + MemOrFile::File(FixedSize::new(file, *size)) + } + }; + let outboard = match &outboard_location { + OutboardLocation::NotNeeded => MemOrFile::empty(), + OutboardLocation::Inline(data) => MemOrFile::Mem(data.clone()), + OutboardLocation::Owned => { + let path = ctx.options().path.outboard_path(&hash); + let file = fs::File::open(&path)?; + MemOrFile::File(file) + } + }; + handle.complete(data, outboard); let state = EntryState::Complete { data_location, outboard_location, @@ -895,16 +831,15 @@ async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::R #[instrument(skip_all, fields(hash = %cmd.hash_short()))] async fn import_bao(cmd: ImportBaoMsg, ctx: HashContext) { trace!("{cmd:?}"); + let hash = ctx.id; let ImportBaoMsg { - inner: ImportBaoRequest { size, hash }, + inner: ImportBaoRequest { size, .. }, rx, tx, .. } = cmd; - let res = match ctx.get_or_create(hash).await { - Ok(handle) => import_bao_impl(size, rx, handle, ctx).await, - Err(cause) => Err(cause), - }; + ctx.load().await; + let res = import_bao_impl(size, rx, ctx.state.clone(), ctx).await; trace!("{res:?}"); tx.send(res).await.ok(); } @@ -951,26 +886,21 @@ async fn import_bao_impl( #[instrument(skip_all, fields(hash = %cmd.hash_short()))] async fn observe(cmd: ObserveMsg, ctx: HashContext) { - let Ok(handle) = ctx.get_or_create(cmd.hash).await else { - return; - }; + trace!("{cmd:?}"); + ctx.load().await; + let handle = ctx.state.clone(); handle.subscribe().forward(cmd.tx).await.ok(); } #[instrument(skip_all, fields(hash = %cmd.hash_short()))] async fn export_ranges(mut cmd: ExportRangesMsg, ctx: HashContext) { - match ctx.get(cmd.hash).await { - Ok(handle) => { - if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, handle).await { - cmd.tx - .send(ExportRangesItem::Error(cause.into())) - .await - .ok(); - } - } - Err(cause) => { - cmd.tx.send(ExportRangesItem::Error(cause)).await.ok(); - } + trace!("{cmd:?}"); + ctx.load().await; + if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, ctx.state.clone()).await { + cmd.tx + .send(ExportRangesItem::Error(cause.into())) + .await + .ok(); } } @@ -1019,22 +949,14 @@ async fn export_ranges_impl( #[instrument(skip_all, fields(hash = %cmd.hash_short()))] async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) { - match ctx.get(cmd.hash).await { - Ok(handle) => { - if let Err(cause) = export_bao_impl(cmd.inner, &mut cmd.tx, handle).await { - cmd.tx - .send(bao_tree::io::EncodeError::Io(io::Error::other(cause)).into()) - .await - .ok(); - } - } - Err(cause) => { - let crate::api::Error::Io(cause) = cause; - cmd.tx - .send(bao_tree::io::EncodeError::Io(cause).into()) - .await - .ok(); - } + ctx.load().await; + if let Err(cause) = export_bao_impl(cmd.inner, &mut cmd.tx, ctx.state.clone()).await { + // if the entry is in state NonExisting, this will be an io error with + // kind NotFound. So we must not wrap this somehow but pass it on directly. + cmd.tx + .send(bao_tree::io::EncodeError::Io(cause).into()) + .await + .ok(); } } @@ -1042,11 +964,11 @@ async fn export_bao_impl( cmd: ExportBaoRequest, tx: &mut mpsc::Sender, handle: BaoFileHandle, -) -> anyhow::Result<()> { +) -> io::Result<()> { let ExportBaoRequest { ranges, hash, .. } = cmd; let outboard = handle.outboard(&hash)?; let size = outboard.tree.size(); - if size == 0 && hash != Hash::EMPTY { + if size == 0 && cmd.hash != Hash::EMPTY { // we have no data whatsoever, so we stop here return Ok(()); } @@ -1080,7 +1002,6 @@ async fn export_path_impl( if let Some(parent) = target.parent() { fs::create_dir_all(parent)?; } - let _guard = ctx.lock().await; let state = ctx.get_entry_state(cmd.hash).await?; let (data_location, outboard_location) = match state { Some(EntryState::Complete { @@ -1412,7 +1333,7 @@ pub mod tests { // import data via import_bytes, check that we can observe it and that it is complete #[tokio::test] - async fn test_import_bytes() -> TestResult<()> { + async fn test_import_bytes_simple() -> TestResult<()> { tracing_subscriber::fmt::try_init().ok(); let testdir = tempfile::tempdir()?; let db_dir = testdir.path().join("db"); @@ -1943,7 +1864,6 @@ pub mod tests { if path.is_file() { if let Some(file_ext) = path.extension() { if file_ext.to_string_lossy().to_lowercase() == ext { - println!("Deleting: {}", path.display()); fs::remove_file(path)?; } } diff --git a/src/store/fs/bao_file.rs b/src/store/fs/bao_file.rs index 70f71f60..f2f8a538 100644 --- a/src/store/fs/bao_file.rs +++ b/src/store/fs/bao_file.rs @@ -4,7 +4,6 @@ use std::{ io, ops::Deref, path::Path, - sync::Arc, }; use bao_tree::{ @@ -414,6 +413,9 @@ impl BaoFileStorage { ctx: &HashContext, ) -> io::Result<(Self, Option>)> { Ok(match self { + BaoFileStorage::NonExisting => { + Self::new_partial_mem().write_batch(batch, bitfield, ctx)? + } BaoFileStorage::PartialMem(mut ms) => { // check if we need to switch to file mode, otherwise write to memory if max_offset(batch) <= ctx.global.options.inline.max_data_inlined { @@ -484,7 +486,7 @@ impl BaoFileStorage { } /// Create a new mutable mem storage. - pub fn partial_mem() -> Self { + pub fn new_partial_mem() -> Self { Self::PartialMem(Default::default()) } @@ -515,75 +517,15 @@ impl BaoFileStorage { /// A cheaply cloneable handle to a bao file, including the hash and the configuration. #[derive(Debug, Clone, Default, derive_more::Deref)] -pub(crate) struct BaoFileHandle(Arc>); +pub(crate) struct BaoFileHandle(pub(super) watch::Sender); impl entity_manager::Reset for BaoFileHandle { fn reset(&mut self) { self.send_replace(BaoFileStorage::Initial); } -} -impl BaoFileHandle { - pub(super) async fn load(&self, ctx: &HashContext) { - enum Action { - Load, - Wait, - None, - } - let mut action = Action::None; - self.send_if_modified(|guard| match guard.deref() { - BaoFileStorage::Initial => { - *guard = BaoFileStorage::Loading; - action = Action::Load; - true - } - BaoFileStorage::Loading => { - action = Action::Wait; - false - } - _ => false, - }); - match action { - Action::Load => { - let state = match ctx.global.db.get(ctx.id).await { - Ok(state) => match BaoFileStorage::open(state, ctx).await { - Ok(handle) => handle, - Err(_) => BaoFileStorage::Poisoned, - }, - Err(_) => BaoFileStorage::Poisoned, - }; - self.send_replace(state); - } - Action::Wait => { - while let BaoFileStorage::Loading = self.borrow().deref() { - self.0.subscribe().changed().await.ok(); - } - } - Action::None => {} - } - } - - pub(super) fn persist(&mut self, ctx: &HashContext) { - self.send_if_modified(|guard| { - let hash = &ctx.id; - if Arc::strong_count(&self.0) > 1 { - return false; - } - let BaoFileStorage::Partial(fs) = guard.take() else { - return false; - }; - let path = ctx.global.options.path.bitfield_path(hash); - trace!("writing bitfield for hash {} to {}", hash, path.display()); - if let Err(cause) = fs.sync_all(&path) { - error!( - "failed to write bitfield for {} at {}: {:?}", - hash, - path.display(), - cause - ); - } - false - }); + fn ref_count(&self) -> usize { + self.0.receiver_count() + self.0.sender_count() } } @@ -691,43 +633,29 @@ impl BaoFileStorage { } impl BaoFileHandle { - #[allow(dead_code)] - pub fn id(&self) -> usize { - Arc::as_ptr(&self.0) as usize - } - - /// Create a new bao file handle. - /// - /// This will create a new file handle with an empty memory storage. - pub fn new_partial_mem() -> Self { - let storage = BaoFileStorage::partial_mem(); - Self(Arc::new(watch::Sender::new(storage))) - } - - /// Create a new bao file handle with a partial file. - pub(super) async fn new_partial_file(ctx: &HashContext) -> io::Result { - let hash = &ctx.id; - let options = ctx.global.options.clone(); - let storage = PartialFileStorage::load(hash, &options.path)?; - let storage = if storage.bitfield.is_complete() { - let size = storage.bitfield.size; - let (storage, entry_state) = storage.into_complete(size, &options)?; - debug!("File was reconstructed as complete"); - ctx.global.db.set(*hash, entry_state).await?; - storage.into() - } else { - storage.into() - }; - Ok(Self(Arc::new(watch::Sender::new(storage)))) - } - - /// Create a new complete bao file handle. - pub fn new_complete( - data: MemOrFile>, - outboard: MemOrFile, - ) -> Self { - let storage = CompleteStorage { data, outboard }.into(); - Self(Arc::new(watch::Sender::new(storage))) + pub(super) fn persist(&mut self, ctx: &HashContext) { + self.send_if_modified(|guard| { + let hash = &ctx.id; + // if Arc::strong_count(&self.0) > 1 { + // // no one is listening, so we don't need to persist + // println!("abort persist - not unique {} {} {}", Arc::strong_count(&self.0), self.0.receiver_count(), self.0.sender_count()); + // return false; + // } + let BaoFileStorage::Partial(fs) = guard.take() else { + return false; + }; + let path = ctx.global.options.path.bitfield_path(hash); + trace!("writing bitfield for hash {} to {}", hash, path.display()); + if let Err(cause) = fs.sync_all(&path) { + error!( + "failed to write bitfield for {} at {}: {:?}", + hash, + path.display(), + cause + ); + } + false + }); } /// Complete the handle @@ -737,14 +665,14 @@ impl BaoFileHandle { outboard: MemOrFile, ) { self.send_if_modified(|guard| { - let res = match guard { - BaoFileStorage::Complete(_) => None, - BaoFileStorage::PartialMem(entry) => Some(&mut entry.bitfield), - BaoFileStorage::Partial(entry) => Some(&mut entry.bitfield), - _ => None, + let needs_complete = match guard { + BaoFileStorage::NonExisting => true, + BaoFileStorage::Complete(_) => false, + BaoFileStorage::PartialMem(_) => true, + BaoFileStorage::Partial(_) => true, + _ => false, }; - if let Some(bitfield) = res { - bitfield.update(&Bitfield::complete(data.size())); + if needs_complete { *guard = BaoFileStorage::Complete(CompleteStorage { data, outboard }); true } else { @@ -826,15 +754,18 @@ impl BaoFileHandle { trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len()); let mut res = Ok(None); self.send_if_modified(|state| { + println!("write_batch {:?}", bitfield); let Ok((state1, update)) = state.take().write_batch(batch, bitfield, ctx) else { res = Err(io::Error::other("write batch failed")); return false; }; res = Ok(update); + println!("{:?}", state1); *state = state1; true }); if let Some(update) = res? { + println!("update {:?}", update); ctx.global.db.update(ctx.id, update).await?; } Ok(()) diff --git a/src/store/fs/gc.rs b/src/store/fs/gc.rs index 70333f3e..e9275545 100644 --- a/src/store/fs/gc.rs +++ b/src/store/fs/gc.rs @@ -360,6 +360,8 @@ mod tests { // check that `get_bytes` returns an error. let res = store.get_bytes(hash).await; assert!(res.is_err()); + + println!("{:?}", res); assert!(matches!( res, Err(ExportBaoError::ExportBaoInner { diff --git a/src/store/fs/util/entity_manager.rs b/src/store/fs/util/entity_manager.rs index dd1673a3..34b9919c 100644 --- a/src/store/fs/util/entity_manager.rs +++ b/src/store/fs/util/entity_manager.rs @@ -12,6 +12,12 @@ use tokio::sync::{mpsc, oneshot}; pub trait Reset: Default { /// Reset the state to its default value. fn reset(&mut self); + + /// A ref count to ensure that the state is unique when shutting down. + /// + /// You are not allowed to clone the state out of a task, even though that + /// is possible. + fn ref_count(&self) -> usize; } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -213,6 +219,8 @@ mod entity_actor { ) .await .ok(); + println!("Calling on_shutdown {}", self.state.state.ref_count()); + assert_eq!(self.state.state.ref_count(), 1); P::on_shutdown(self.state.clone(), ShutdownCause::Idle).await; // Notify the main actor that we have completed shutdown. // here we also give back the rest of ourselves so the main actor can recycle us. @@ -466,6 +474,7 @@ mod main_actor { ); return None; }; + println!("ENTITY_MANAGER recycling {:?}", state.id); // re-assemble the actor from the parts let mut actor = entity_actor::Actor { main: self.internal_send.clone(), @@ -788,6 +797,10 @@ mod tests { fn reset(&mut self) { *self.0.borrow_mut() = Default::default(); } + + fn ref_count(&self) -> usize { + Arc::strong_count(&self.0) + } } struct Counters; @@ -901,6 +914,10 @@ mod tests { fn reset(&mut self) { *self.0.borrow_mut() = State::Disk; } + + fn ref_count(&self) -> usize { + 1 + } } fn get_path(root: impl AsRef, id: u64) -> PathBuf { From 94df43fa5f253413f706d18a65fbc91693149ece Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 10 Jul 2025 11:23:51 +0300 Subject: [PATCH 04/11] Remove debug statements --- src/store/fs/util/entity_manager.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/store/fs/util/entity_manager.rs b/src/store/fs/util/entity_manager.rs index 34b9919c..7987b6f2 100644 --- a/src/store/fs/util/entity_manager.rs +++ b/src/store/fs/util/entity_manager.rs @@ -474,7 +474,6 @@ mod main_actor { ); return None; }; - println!("ENTITY_MANAGER recycling {:?}", state.id); // re-assemble the actor from the parts let mut actor = entity_actor::Actor { main: self.internal_send.clone(), From 48ffd487fbab85f4f03db66ffded308dc3311962 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 10 Jul 2025 11:25:39 +0300 Subject: [PATCH 05/11] Remove unused code --- src/store/fs.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index 90bd4bc2..6813999f 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -831,7 +831,6 @@ async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::R #[instrument(skip_all, fields(hash = %cmd.hash_short()))] async fn import_bao(cmd: ImportBaoMsg, ctx: HashContext) { trace!("{cmd:?}"); - let hash = ctx.id; let ImportBaoMsg { inner: ImportBaoRequest { size, .. }, rx, From a5400024f81f367148a2d696b9f54656fbe13338 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 10 Jul 2025 11:46:28 +0300 Subject: [PATCH 06/11] Remove more debug code --- src/store/fs/bao_file.rs | 3 --- src/store/fs/gc.rs | 2 -- 2 files changed, 5 deletions(-) diff --git a/src/store/fs/bao_file.rs b/src/store/fs/bao_file.rs index f2f8a538..93edbd98 100644 --- a/src/store/fs/bao_file.rs +++ b/src/store/fs/bao_file.rs @@ -754,18 +754,15 @@ impl BaoFileHandle { trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len()); let mut res = Ok(None); self.send_if_modified(|state| { - println!("write_batch {:?}", bitfield); let Ok((state1, update)) = state.take().write_batch(batch, bitfield, ctx) else { res = Err(io::Error::other("write batch failed")); return false; }; res = Ok(update); - println!("{:?}", state1); *state = state1; true }); if let Some(update) = res? { - println!("update {:?}", update); ctx.global.db.update(ctx.id, update).await?; } Ok(()) diff --git a/src/store/fs/gc.rs b/src/store/fs/gc.rs index e9275545..70333f3e 100644 --- a/src/store/fs/gc.rs +++ b/src/store/fs/gc.rs @@ -360,8 +360,6 @@ mod tests { // check that `get_bytes` returns an error. let res = store.get_bytes(hash).await; assert!(res.is_err()); - - println!("{:?}", res); assert!(matches!( res, Err(ExportBaoError::ExportBaoInner { From 6e237f82a10438be297ad25bc8074008541bd970 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 10 Jul 2025 15:19:22 +0300 Subject: [PATCH 07/11] instrument the per-hash tasks as well --- src/store/fs.rs | 50 +++++++++++++++-------------- src/store/fs/bao_file.rs | 18 +++++++---- src/store/fs/util/entity_manager.rs | 29 +++++++++-------- 3 files changed, 52 insertions(+), 45 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index 6813999f..840a5e1a 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -570,9 +570,12 @@ impl Actor { } trait HashSpecificCommand: HashSpecific + Send + 'static { + /// Handle the command on success by spawning a task into the per-hash context. fn handle(self, ctx: HashContext) -> impl Future + Send + 'static; - fn on_error(self) -> impl Future + Send + 'static; + /// Opportunity to send an error if spawning fails due to the task being busy (inbox full) + /// or dead (e.g. panic in one of the running tasks). + fn on_error(self, arg: SpawnArg) -> impl Future + Send + 'static; async fn spawn( self, @@ -581,25 +584,24 @@ trait HashSpecificCommand: HashSpecific + Send + 'static { ) where Self: Sized, { + let span = tracing::Span::current(); let task = manager - .spawn_boxed( - self.hash(), - Box::new(|x| { - Box::pin(async move { - match x { - SpawnArg::Active(state) => { - self.handle(state).await; - } - SpawnArg::Busy => { - self.on_error().await; - } - SpawnArg::Dead => { - self.on_error().await; - } + .spawn(self.hash(), |arg| { + async move { + match arg { + SpawnArg::Active(state) => { + self.handle(state).await; } - }) - }), - ) + SpawnArg::Busy => { + self.on_error(arg).await; + } + SpawnArg::Dead => { + self.on_error(arg).await; + } + } + } + .instrument(span) + }) .await; if let Some(task) = task { tasks.spawn(task); @@ -611,31 +613,31 @@ impl HashSpecificCommand for ObserveMsg { async fn handle(self, ctx: HashContext) { observe(self, ctx).await } - async fn on_error(self) {} + async fn on_error(self, _arg: SpawnArg) {} } impl HashSpecificCommand for ExportPathMsg { async fn handle(self, ctx: HashContext) { export_path(self, ctx).await } - async fn on_error(self) {} + async fn on_error(self, _arg: SpawnArg) {} } impl HashSpecificCommand for ExportBaoMsg { async fn handle(self, ctx: HashContext) { export_bao(self, ctx).await } - async fn on_error(self) {} + async fn on_error(self, _arg: SpawnArg) {} } impl HashSpecificCommand for ExportRangesMsg { async fn handle(self, ctx: HashContext) { export_ranges(self, ctx).await } - async fn on_error(self) {} + async fn on_error(self, _arg: SpawnArg) {} } impl HashSpecificCommand for ImportBaoMsg { async fn handle(self, ctx: HashContext) { import_bao(self, ctx).await } - async fn on_error(self) {} + async fn on_error(self, _arg: SpawnArg) {} } impl HashSpecific for (TempTag, ImportEntryMsg) { fn hash(&self) -> Hash { @@ -647,7 +649,7 @@ impl HashSpecificCommand for (TempTag, ImportEntryMsg) { let (tt, cmd) = self; finish_import(cmd, tt, ctx).await } - async fn on_error(self) {} + async fn on_error(self, _arg: SpawnArg) {} } struct RtWrapper(Option); diff --git a/src/store/fs/bao_file.rs b/src/store/fs/bao_file.rs index 93edbd98..6502c4e6 100644 --- a/src/store/fs/bao_file.rs +++ b/src/store/fs/bao_file.rs @@ -285,6 +285,15 @@ fn read_size(size_file: &File) -> io::Result { } /// The storage for a bao file. This can be either in memory or on disk. +/// +/// The two initial states `Initial` and `Loading` are used to coordinate the +/// loading of the entry from the metadata database. Once that is complete, +/// you should never see these states again. +/// +/// From the remaining states you can get into `Poisoned` if there is an +/// IO error during an operation. +/// +/// `Poisioned` is also used once the handle is persisted and no longer usable. #[derive(derive_more::From, Default)] pub(crate) enum BaoFileStorage { /// Initial state, we don't know anything yet. @@ -311,13 +320,8 @@ pub(crate) enum BaoFileStorage { /// /// Writing to this is a no-op, since it is already complete. Complete(CompleteStorage), - /// We will get into that state if there is an io error in the middle of an operation - /// - /// Also, when the handle is dropped we will poison the storage, so poisoned - /// can be seen when the handle is revived during the drop. - /// - /// BaoFileHandleWeak::upgrade() will return None if the storage is poisoned, - /// treat it as dead. + /// We will get into that state if there is an io error in the middle of an operation, + /// or after the handle is persisted and no longer usable. Poisoned, } diff --git a/src/store/fs/util/entity_manager.rs b/src/store/fs/util/entity_manager.rs index 7987b6f2..7810340c 100644 --- a/src/store/fs/util/entity_manager.rs +++ b/src/store/fs/util/entity_manager.rs @@ -373,24 +373,25 @@ mod main_actor { } /// Friendly version of `spawn_boxed` that does the boxing - pub async fn spawn(&mut self, id: P::EntityId, f: F, tasks: &mut JoinSet<()>) + #[must_use = "this function may return a future that must be spawned by the caller"] + pub async fn spawn( + &mut self, + id: P::EntityId, + f: F, + ) -> Option + Send + 'static> where F: FnOnce(SpawnArg

) -> Fut + Send + 'static, Fut: Future + Send + 'static, { - let task = self - .spawn_boxed( - id, - Box::new(|x| { - Box::pin(async move { - f(x).await; - }) - }), - ) - .await; - if let Some(task) = task { - tasks.spawn(task); - } + self.spawn_boxed( + id, + Box::new(|x| { + Box::pin(async move { + f(x).await; + }) + }), + ) + .await } #[must_use = "this function may return a future that must be spawned by the caller"] From f55959867a424dd25e6f797e2e0ea64664c5b1f9 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 11 Jul 2025 10:51:03 +0300 Subject: [PATCH 08/11] Move things around in preparation for introducing traits --- src/store/fs.rs | 300 ++++++++++++++++++++++++++------------- src/store/fs/bao_file.rs | 124 +--------------- 2 files changed, 205 insertions(+), 219 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index 840a5e1a..697e5564 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -74,12 +74,14 @@ use std::{ }; use bao_tree::{ + blake3, io::{ mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt}, + outboard::PreOrderOutboard, sync::ReadAt, BaoContentItem, Leaf, }, - ChunkNum, ChunkRanges, + BaoTree, ChunkNum, ChunkRanges, }; use bytes::Bytes; use delete_set::{BaoFilePart, ProtectHandle}; @@ -107,11 +109,14 @@ use crate::{ }, store::{ fs::{ - bao_file::{BaoFileStorage, CompleteStorage}, + bao_file::{ + BaoFileStorage, BaoFileStorageSubscriber, CompleteStorage, DataReader, + OutboardReader, + }, util::entity_manager::{self, ActiveEntityState}, }, util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned}, - Hash, + Hash, IROH_BLOCK_SIZE, }, util::{ channel::oneshot, @@ -216,9 +221,7 @@ impl entity_manager::Params for EmParams { state: entity_manager::ActiveEntityState, _cause: entity_manager::ShutdownCause, ) { - // println!("persisting entry {:?} {:?}", state.id, state.state.borrow().bitfield()); - state.state.clone().persist(&state); - // println!("persisting entry done {:?}", state.id); + state.persist(); } } @@ -298,7 +301,103 @@ impl HashContext { } } - pub fn db(&self) -> &meta::Db { + pub(super) fn persist(&self) { + self.state.send_if_modified(|guard| { + let hash = &self.id; + let BaoFileStorage::Partial(fs) = guard.take() else { + return false; + }; + let path = self.global.options.path.bitfield_path(hash); + trace!("writing bitfield for hash {} to {}", hash, path.display()); + if let Err(cause) = fs.sync_all(&path) { + error!( + "failed to write bitfield for {} at {}: {:?}", + hash, + path.display(), + cause + ); + } + false + }); + } + + /// Write a batch and notify the db + pub(super) async fn write_batch( + &self, + batch: &[BaoContentItem], + bitfield: &Bitfield, + ) -> io::Result<()> { + trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len()); + let mut res = Ok(None); + self.state.send_if_modified(|state| { + let Ok((state1, update)) = state.take().write_batch(batch, bitfield, &self) else { + res = Err(io::Error::other("write batch failed")); + return false; + }; + res = Ok(update); + *state = state1; + true + }); + if let Some(update) = res? { + self.global.db.update(self.id, update).await?; + } + Ok(()) + } + + /// An AsyncSliceReader for the data file. + /// + /// Caution: this is a reader for the unvalidated data file. Reading this + /// can produce data that does not match the hash. + pub fn data_reader(&self) -> DataReader { + DataReader(self.state.clone()) + } + + /// An AsyncSliceReader for the outboard file. + /// + /// The outboard file is used to validate the data file. It is not guaranteed + /// to be complete. + pub fn outboard_reader(&self) -> OutboardReader { + OutboardReader(self.state.clone()) + } + + /// The most precise known total size of the data file. + pub fn current_size(&self) -> io::Result { + match self.state.borrow().deref() { + BaoFileStorage::Complete(mem) => Ok(mem.size()), + BaoFileStorage::PartialMem(mem) => Ok(mem.current_size()), + BaoFileStorage::Partial(file) => file.current_size(), + BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")), + BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()), + } + } + + /// The most precise known total size of the data file. + pub fn bitfield(&self) -> io::Result { + match self.state.borrow().deref() { + BaoFileStorage::Complete(mem) => Ok(mem.bitfield()), + BaoFileStorage::PartialMem(mem) => Ok(mem.bitfield().clone()), + BaoFileStorage::Partial(file) => Ok(file.bitfield().clone()), + BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")), + BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()), + } + } + + /// The outboard for the file. + pub fn outboard(&self) -> io::Result> { + let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE); + let outboard = self.outboard_reader(); + Ok(PreOrderOutboard { + root: blake3::Hash::from(self.id), + tree, + data: outboard, + }) + } + + fn db(&self) -> &meta::Db { &self.global.db } @@ -306,17 +405,18 @@ impl HashContext { &self.global.options } - pub fn protect(&self, hash: Hash, parts: impl IntoIterator) { - self.global.protect.protect(hash, parts); + pub fn protect(&self, parts: impl IntoIterator) { + self.global.protect.protect(self.id, parts); } /// Update the entry state in the database, and wait for completion. - pub async fn update_await(&self, hash: Hash, state: EntryState) -> io::Result<()> { - self.db().update_await(hash, state).await?; + pub async fn update_await(&self, state: EntryState) -> io::Result<()> { + self.db().update_await(self.id, state).await?; Ok(()) } - pub async fn get_entry_state(&self, hash: Hash) -> io::Result>> { + pub async fn get_entry_state(&self) -> io::Result>> { + let hash = self.id; if hash == Hash::EMPTY { return Ok(Some(EntryState::Complete { data_location: DataLocation::Inline(Bytes::new()), @@ -327,8 +427,8 @@ impl HashContext { } /// Update the entry state in the database, and wait for completion. - pub async fn set(&self, hash: Hash, state: EntryState) -> io::Result<()> { - self.db().set(hash, state).await + pub async fn set(&self, state: EntryState) -> io::Result<()> { + self.db().set(self.id, state).await } } @@ -611,31 +711,31 @@ trait HashSpecificCommand: HashSpecific + Send + 'static { impl HashSpecificCommand for ObserveMsg { async fn handle(self, ctx: HashContext) { - observe(self, ctx).await + observe(&ctx, self).await } async fn on_error(self, _arg: SpawnArg) {} } impl HashSpecificCommand for ExportPathMsg { async fn handle(self, ctx: HashContext) { - export_path(self, ctx).await + export_path(&ctx, self).await } async fn on_error(self, _arg: SpawnArg) {} } impl HashSpecificCommand for ExportBaoMsg { async fn handle(self, ctx: HashContext) { - export_bao(self, ctx).await + export_bao(&ctx, self).await } async fn on_error(self, _arg: SpawnArg) {} } impl HashSpecificCommand for ExportRangesMsg { async fn handle(self, ctx: HashContext) { - export_ranges(self, ctx).await + export_ranges(&ctx, self).await } async fn on_error(self, _arg: SpawnArg) {} } impl HashSpecificCommand for ImportBaoMsg { async fn handle(self, ctx: HashContext) { - import_bao(self, ctx).await + import_bao(&ctx, self).await } async fn on_error(self, _arg: SpawnArg) {} } @@ -647,7 +747,7 @@ impl HashSpecific for (TempTag, ImportEntryMsg) { impl HashSpecificCommand for (TempTag, ImportEntryMsg) { async fn handle(self, ctx: HashContext) { let (tt, cmd) = self; - finish_import(cmd, tt, ctx).await + finish_import(&ctx, cmd, tt).await } async fn on_error(self, _arg: SpawnArg) {} } @@ -707,9 +807,67 @@ async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc) } #[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn finish_import(cmd: ImportEntryMsg, mut tt: TempTag, ctx: HashContext) { +async fn import_bao(ctx: &HashContext, cmd: ImportBaoMsg) { trace!("{cmd:?}"); - let res = match finish_import_impl(cmd.inner, ctx).await { + let ImportBaoMsg { + inner: ImportBaoRequest { size, .. }, + rx, + tx, + .. + } = cmd; + ctx.load().await; + let res = import_bao_impl(ctx, size, rx).await; + trace!("{res:?}"); + tx.send(res).await.ok(); +} + +#[instrument(skip_all, fields(hash = %cmd.hash_short()))] +async fn observe(ctx: &HashContext, cmd: ObserveMsg) { + trace!("{cmd:?}"); + ctx.load().await; + BaoFileStorageSubscriber::new(ctx.state.subscribe()) + .forward(cmd.tx) + .await + .ok(); +} + +#[instrument(skip_all, fields(hash = %cmd.hash_short()))] +async fn export_ranges(ctx: &HashContext, mut cmd: ExportRangesMsg) { + trace!("{cmd:?}"); + ctx.load().await; + if let Err(cause) = export_ranges_impl(ctx, cmd.inner, &mut cmd.tx).await { + cmd.tx + .send(ExportRangesItem::Error(cause.into())) + .await + .ok(); + } +} + +#[instrument(skip_all, fields(hash = %cmd.hash_short()))] +async fn export_bao(ctx: &HashContext, mut cmd: ExportBaoMsg) { + ctx.load().await; + if let Err(cause) = export_bao_impl(ctx, cmd.inner, &mut cmd.tx).await { + // if the entry is in state NonExisting, this will be an io error with + // kind NotFound. So we must not wrap this somehow but pass it on directly. + cmd.tx + .send(bao_tree::io::EncodeError::Io(cause).into()) + .await + .ok(); + } +} + +#[instrument(skip_all, fields(hash = %cmd.hash_short()))] +async fn export_path(ctx: &HashContext, cmd: ExportPathMsg) { + let ExportPathMsg { inner, mut tx, .. } = cmd; + if let Err(cause) = export_path_impl(ctx, inner, &mut tx).await { + tx.send(cause.into()).await.ok(); + } +} + +#[instrument(skip_all, fields(hash = %cmd.hash_short()))] +async fn finish_import(ctx: &HashContext, cmd: ImportEntryMsg, mut tt: TempTag) { + trace!("{cmd:?}"); + let res = match finish_import_impl(ctx, cmd.inner).await { Ok(()) => { // for a remote call, we can't have the on_drop callback, so we have to leak the temp tag // it will be cleaned up when either the process exits or scope ends @@ -724,7 +882,7 @@ async fn finish_import(cmd: ImportEntryMsg, mut tt: TempTag, ctx: HashContext) { cmd.tx.send(res).await.ok(); } -async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::Result<()> { +async fn finish_import_impl(ctx: &HashContext, import_data: ImportEntry) -> io::Result<()> { if ctx.id == Hash::EMPTY { return Ok(()); // nothing to do for the empty hash } @@ -753,7 +911,7 @@ async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::R // the entry exists in the db, but we don't have a handle // the entry does not exist at all. // convert the import source to a data location and drop the open files - ctx.protect(hash, [BaoFilePart::Data, BaoFilePart::Outboard]); + ctx.protect([BaoFilePart::Data, BaoFilePart::Outboard]); let data_location = match source { ImportSource::Memory(data) => DataLocation::Inline(data), ImportSource::External(path, _file, size) => DataLocation::External(vec![path], size), @@ -826,25 +984,10 @@ async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::R data_location, outboard_location, }; - ctx.update_await(hash, state).await?; + ctx.update_await(state).await?; Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn import_bao(cmd: ImportBaoMsg, ctx: HashContext) { - trace!("{cmd:?}"); - let ImportBaoMsg { - inner: ImportBaoRequest { size, .. }, - rx, - tx, - .. - } = cmd; - ctx.load().await; - let res = import_bao_impl(size, rx, ctx.state.clone(), ctx).await; - trace!("{res:?}"); - tx.send(res).await.ok(); -} - fn chunk_range(leaf: &Leaf) -> ChunkRanges { let start = ChunkNum::chunks(leaf.offset); let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64); @@ -852,10 +995,9 @@ fn chunk_range(leaf: &Leaf) -> ChunkRanges { } async fn import_bao_impl( + ctx: &HashContext, size: NonZeroU64, mut rx: mpsc::Receiver, - handle: BaoFileHandle, - ctx: HashContext, ) -> api::Result<()> { trace!("importing bao: {} {} bytes", ctx.id.fmt_short(), size); let mut batch = Vec::::new(); @@ -864,7 +1006,7 @@ async fn import_bao_impl( // if the batch is not empty, the last item is a leaf and the current item is a parent, write the batch if !batch.is_empty() && batch[batch.len() - 1].is_leaf() && item.is_parent() { let bitfield = Bitfield::new_unchecked(ranges, size.into()); - handle.write_batch(&batch, &bitfield, &ctx).await?; + ctx.write_batch(&batch, &bitfield).await?; batch.clear(); ranges = ChunkRanges::empty(); } @@ -880,43 +1022,23 @@ async fn import_bao_impl( } if !batch.is_empty() { let bitfield = Bitfield::new_unchecked(ranges, size.into()); - handle.write_batch(&batch, &bitfield, &ctx).await?; + ctx.write_batch(&batch, &bitfield).await?; } Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn observe(cmd: ObserveMsg, ctx: HashContext) { - trace!("{cmd:?}"); - ctx.load().await; - let handle = ctx.state.clone(); - handle.subscribe().forward(cmd.tx).await.ok(); -} - -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn export_ranges(mut cmd: ExportRangesMsg, ctx: HashContext) { - trace!("{cmd:?}"); - ctx.load().await; - if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, ctx.state.clone()).await { - cmd.tx - .send(ExportRangesItem::Error(cause.into())) - .await - .ok(); - } -} - async fn export_ranges_impl( + ctx: &HashContext, cmd: ExportRangesRequest, tx: &mut mpsc::Sender, - handle: BaoFileHandle, ) -> io::Result<()> { let ExportRangesRequest { ranges, hash } = cmd; trace!( "exporting ranges: {hash} {ranges:?} size={}", - handle.current_size()? + ctx.current_size()? ); - let bitfield = handle.bitfield()?; - let data = handle.data_reader(); + let bitfield = ctx.bitfield()?; + let data = ctx.data_reader(); let size = bitfield.size(); for range in ranges.iter() { let range = match range { @@ -948,50 +1070,29 @@ async fn export_ranges_impl( Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) { - ctx.load().await; - if let Err(cause) = export_bao_impl(cmd.inner, &mut cmd.tx, ctx.state.clone()).await { - // if the entry is in state NonExisting, this will be an io error with - // kind NotFound. So we must not wrap this somehow but pass it on directly. - cmd.tx - .send(bao_tree::io::EncodeError::Io(cause).into()) - .await - .ok(); - } -} - async fn export_bao_impl( + ctx: &HashContext, cmd: ExportBaoRequest, tx: &mut mpsc::Sender, - handle: BaoFileHandle, ) -> io::Result<()> { let ExportBaoRequest { ranges, hash, .. } = cmd; - let outboard = handle.outboard(&hash)?; + let outboard = ctx.outboard()?; let size = outboard.tree.size(); if size == 0 && cmd.hash != Hash::EMPTY { // we have no data whatsoever, so we stop here return Ok(()); } trace!("exporting bao: {hash} {ranges:?} size={size}",); - let data = handle.data_reader(); + let data = ctx.data_reader(); let tx = BaoTreeSender::new(tx); traverse_ranges_validated(data, outboard, &ranges, tx).await?; Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn export_path(cmd: ExportPathMsg, ctx: HashContext) { - let ExportPathMsg { inner, mut tx, .. } = cmd; - if let Err(cause) = export_path_impl(inner, &mut tx, ctx).await { - tx.send(cause.into()).await.ok(); - } -} - async fn export_path_impl( + ctx: &HashContext, cmd: ExportPathRequest, tx: &mut mpsc::Sender, - ctx: HashContext, ) -> api::Result<()> { let ExportPathRequest { mode, target, .. } = cmd; if !target.is_absolute() { @@ -1003,7 +1104,7 @@ async fn export_path_impl( if let Some(parent) = target.parent() { fs::create_dir_all(parent)?; } - let state = ctx.get_entry_state(cmd.hash).await?; + let state = ctx.get_entry_state().await?; let (data_location, outboard_location) = match state { Some(EntryState::Complete { data_location, @@ -1065,13 +1166,10 @@ async fn export_path_impl( } } } - ctx.set( - cmd.hash, - EntryState::Complete { - data_location: DataLocation::External(vec![target], size), - outboard_location, - }, - ) + ctx.set(EntryState::Complete { + data_location: DataLocation::External(vec![target], size), + outboard_location, + }) .await?; } }, diff --git a/src/store/fs/bao_file.rs b/src/store/fs/bao_file.rs index 6502c4e6..d0217040 100644 --- a/src/store/fs/bao_file.rs +++ b/src/store/fs/bao_file.rs @@ -20,7 +20,7 @@ use bytes::{Bytes, BytesMut}; use derive_more::Debug; use irpc::channel::mpsc; use tokio::sync::watch; -use tracing::{debug, error, info, trace}; +use tracing::{debug, info, trace}; use super::{ entry_state::{DataLocation, EntryState, OutboardLocation}, @@ -142,7 +142,7 @@ impl PartialFileStorage { &self.bitfield } - fn sync_all(&self, bitfield_path: &Path) -> io::Result<()> { + pub(super) fn sync_all(&self, bitfield_path: &Path) -> io::Result<()> { self.data.sync_all()?; self.outboard.sync_all()?; self.sizes.sync_all()?; @@ -235,7 +235,7 @@ impl PartialFileStorage { )) } - fn current_size(&self) -> io::Result { + pub(super) fn current_size(&self) -> io::Result { read_size(&self.sizes) } @@ -410,7 +410,7 @@ impl BaoFileStorage { } } - fn write_batch( + pub(super) fn write_batch( self, batch: &[BaoContentItem], bitfield: &Bitfield, @@ -535,7 +535,7 @@ impl entity_manager::Reset for BaoFileHandle { /// A reader for a bao file, reading just the data. #[derive(Debug)] -pub struct DataReader(BaoFileHandle); +pub struct DataReader(pub(super) BaoFileHandle); impl ReadBytesAt for DataReader { fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result { @@ -554,7 +554,7 @@ impl ReadBytesAt for DataReader { /// A reader for the outboard part of a bao file. #[derive(Debug)] -pub struct OutboardReader(BaoFileHandle); +pub struct OutboardReader(pub(super) BaoFileHandle); impl ReadAt for OutboardReader { fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result { @@ -637,31 +637,6 @@ impl BaoFileStorage { } impl BaoFileHandle { - pub(super) fn persist(&mut self, ctx: &HashContext) { - self.send_if_modified(|guard| { - let hash = &ctx.id; - // if Arc::strong_count(&self.0) > 1 { - // // no one is listening, so we don't need to persist - // println!("abort persist - not unique {} {} {}", Arc::strong_count(&self.0), self.0.receiver_count(), self.0.sender_count()); - // return false; - // } - let BaoFileStorage::Partial(fs) = guard.take() else { - return false; - }; - let path = ctx.global.options.path.bitfield_path(hash); - trace!("writing bitfield for hash {} to {}", hash, path.display()); - if let Err(cause) = fs.sync_all(&path) { - error!( - "failed to write bitfield for {} at {}: {:?}", - hash, - path.display(), - cause - ); - } - false - }); - } - /// Complete the handle pub fn complete( &self, @@ -684,93 +659,6 @@ impl BaoFileHandle { } }); } - - pub fn subscribe(&self) -> BaoFileStorageSubscriber { - BaoFileStorageSubscriber::new(self.0.subscribe()) - } - - /// True if the file is complete. - #[allow(dead_code)] - pub fn is_complete(&self) -> bool { - matches!(self.borrow().deref(), BaoFileStorage::Complete(_)) - } - - /// An AsyncSliceReader for the data file. - /// - /// Caution: this is a reader for the unvalidated data file. Reading this - /// can produce data that does not match the hash. - pub fn data_reader(&self) -> DataReader { - DataReader(self.clone()) - } - - /// An AsyncSliceReader for the outboard file. - /// - /// The outboard file is used to validate the data file. It is not guaranteed - /// to be complete. - pub fn outboard_reader(&self) -> OutboardReader { - OutboardReader(self.clone()) - } - - /// The most precise known total size of the data file. - pub fn current_size(&self) -> io::Result { - match self.borrow().deref() { - BaoFileStorage::Complete(mem) => Ok(mem.size()), - BaoFileStorage::PartialMem(mem) => Ok(mem.current_size()), - BaoFileStorage::Partial(file) => file.current_size(), - BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), - BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")), - BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")), - BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()), - } - } - - /// The most precise known total size of the data file. - pub fn bitfield(&self) -> io::Result { - match self.borrow().deref() { - BaoFileStorage::Complete(mem) => Ok(mem.bitfield()), - BaoFileStorage::PartialMem(mem) => Ok(mem.bitfield().clone()), - BaoFileStorage::Partial(file) => Ok(file.bitfield().clone()), - BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), - BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")), - BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")), - BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()), - } - } - - /// The outboard for the file. - pub fn outboard(&self, hash: &Hash) -> io::Result> { - let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE); - let outboard = self.outboard_reader(); - Ok(PreOrderOutboard { - root: blake3::Hash::from(*hash), - tree, - data: outboard, - }) - } - - /// Write a batch and notify the db - pub(super) async fn write_batch( - &self, - batch: &[BaoContentItem], - bitfield: &Bitfield, - ctx: &HashContext, - ) -> io::Result<()> { - trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len()); - let mut res = Ok(None); - self.send_if_modified(|state| { - let Ok((state1, update)) = state.take().write_batch(batch, bitfield, ctx) else { - res = Err(io::Error::other("write batch failed")); - return false; - }; - res = Ok(update); - *state = state1; - true - }); - if let Some(update) = res? { - ctx.global.db.update(ctx.id, update).await?; - } - Ok(()) - } } impl PartialMemStorage { From bc6bc717d14e311a47f42b2187b505b870e49a51 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 11 Jul 2025 11:07:21 +0300 Subject: [PATCH 09/11] Group high level fns. --- src/store/fs.rs | 162 ++++++++++++++++++++++++++---------------------- 1 file changed, 89 insertions(+), 73 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index 697e5564..ff257ffd 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -70,7 +70,10 @@ use std::{ num::NonZeroU64, ops::Deref, path::{Path, PathBuf}, - sync::Arc, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, }; use bao_tree::{ @@ -711,31 +714,31 @@ trait HashSpecificCommand: HashSpecific + Send + 'static { impl HashSpecificCommand for ObserveMsg { async fn handle(self, ctx: HashContext) { - observe(&ctx, self).await + ctx.observe(self).await } async fn on_error(self, _arg: SpawnArg) {} } impl HashSpecificCommand for ExportPathMsg { async fn handle(self, ctx: HashContext) { - export_path(&ctx, self).await + ctx.export_path(self).await } async fn on_error(self, _arg: SpawnArg) {} } impl HashSpecificCommand for ExportBaoMsg { async fn handle(self, ctx: HashContext) { - export_bao(&ctx, self).await + ctx.export_bao(self).await } async fn on_error(self, _arg: SpawnArg) {} } impl HashSpecificCommand for ExportRangesMsg { async fn handle(self, ctx: HashContext) { - export_ranges(&ctx, self).await + ctx.export_ranges(self).await } async fn on_error(self, _arg: SpawnArg) {} } impl HashSpecificCommand for ImportBaoMsg { async fn handle(self, ctx: HashContext) { - import_bao(&ctx, self).await + ctx.import_bao(self).await } async fn on_error(self, _arg: SpawnArg) {} } @@ -747,7 +750,7 @@ impl HashSpecific for (TempTag, ImportEntryMsg) { impl HashSpecificCommand for (TempTag, ImportEntryMsg) { async fn handle(self, ctx: HashContext) { let (tt, cmd) = self; - finish_import(&ctx, cmd, tt).await + ctx.finish_import(cmd, tt).await } async fn on_error(self, _arg: SpawnArg) {} } @@ -806,80 +809,87 @@ async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc) Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn import_bao(ctx: &HashContext, cmd: ImportBaoMsg) { - trace!("{cmd:?}"); - let ImportBaoMsg { - inner: ImportBaoRequest { size, .. }, - rx, - tx, - .. - } = cmd; - ctx.load().await; - let res = import_bao_impl(ctx, size, rx).await; - trace!("{res:?}"); - tx.send(res).await.ok(); -} - -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn observe(ctx: &HashContext, cmd: ObserveMsg) { - trace!("{cmd:?}"); - ctx.load().await; - BaoFileStorageSubscriber::new(ctx.state.subscribe()) - .forward(cmd.tx) - .await - .ok(); -} - -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn export_ranges(ctx: &HashContext, mut cmd: ExportRangesMsg) { - trace!("{cmd:?}"); - ctx.load().await; - if let Err(cause) = export_ranges_impl(ctx, cmd.inner, &mut cmd.tx).await { - cmd.tx - .send(ExportRangesItem::Error(cause.into())) +/// The high level entry point per entry. +impl HashContext { + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn import_bao(&self, cmd: ImportBaoMsg) { + trace!("{cmd:?}"); + self.load().await; + let ImportBaoMsg { + inner: ImportBaoRequest { size, .. }, + rx, + tx, + .. + } = cmd; + let res = import_bao_impl(self, size, rx).await; + trace!("{res:?}"); + tx.send(res).await.ok(); + } + + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn observe(&self, cmd: ObserveMsg) { + trace!("{cmd:?}"); + self.load().await; + BaoFileStorageSubscriber::new(self.state.subscribe()) + .forward(cmd.tx) .await .ok(); } -} -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn export_bao(ctx: &HashContext, mut cmd: ExportBaoMsg) { - ctx.load().await; - if let Err(cause) = export_bao_impl(ctx, cmd.inner, &mut cmd.tx).await { - // if the entry is in state NonExisting, this will be an io error with - // kind NotFound. So we must not wrap this somehow but pass it on directly. - cmd.tx - .send(bao_tree::io::EncodeError::Io(cause).into()) - .await - .ok(); + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn export_ranges(&self, mut cmd: ExportRangesMsg) { + trace!("{cmd:?}"); + self.load().await; + if let Err(cause) = export_ranges_impl(self, cmd.inner, &mut cmd.tx).await { + cmd.tx + .send(ExportRangesItem::Error(cause.into())) + .await + .ok(); + } } -} -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn export_path(ctx: &HashContext, cmd: ExportPathMsg) { - let ExportPathMsg { inner, mut tx, .. } = cmd; - if let Err(cause) = export_path_impl(ctx, inner, &mut tx).await { - tx.send(cause.into()).await.ok(); + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn export_bao(&self, mut cmd: ExportBaoMsg) { + trace!("{cmd:?}"); + self.load().await; + if let Err(cause) = export_bao_impl(self, cmd.inner, &mut cmd.tx).await { + // if the entry is in state NonExisting, this will be an io error with + // kind NotFound. So we must not wrap this somehow but pass it on directly. + cmd.tx + .send(bao_tree::io::EncodeError::Io(cause).into()) + .await + .ok(); + } } -} -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn finish_import(ctx: &HashContext, cmd: ImportEntryMsg, mut tt: TempTag) { - trace!("{cmd:?}"); - let res = match finish_import_impl(ctx, cmd.inner).await { - Ok(()) => { - // for a remote call, we can't have the on_drop callback, so we have to leak the temp tag - // it will be cleaned up when either the process exits or scope ends - if cmd.tx.is_rpc() { - trace!("leaking temp tag {}", tt.hash_and_format()); - tt.leak(); - } - AddProgressItem::Done(tt) + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn export_path(&self, cmd: ExportPathMsg) { + trace!("{cmd:?}"); + self.load().await; + let ExportPathMsg { inner, mut tx, .. } = cmd; + if let Err(cause) = export_path_impl(self, inner, &mut tx).await { + tx.send(cause.into()).await.ok(); } - Err(cause) => AddProgressItem::Error(cause), - }; - cmd.tx.send(res).await.ok(); + } + + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn finish_import(&self, cmd: ImportEntryMsg, mut tt: TempTag) { + trace!("{cmd:?}"); + self.load().await; + let res = match finish_import_impl(self, cmd.inner).await { + Ok(()) => { + // for a remote call, we can't have the on_drop callback, so we have to leak the temp tag + // it will be cleaned up when either the process exits or scope ends + if cmd.tx.is_rpc() { + trace!("leaking temp tag {}", tt.hash_and_format()); + tt.leak(); + } + AddProgressItem::Done(tt) + } + Err(cause) => AddProgressItem::Error(cause), + }; + cmd.tx.send(res).await.ok(); + } } async fn finish_import_impl(ctx: &HashContext, import_data: ImportEntry) -> io::Result<()> { @@ -1213,8 +1223,14 @@ impl FsStore { /// Load or create a new store with custom options, returning an additional sender for file store specific commands. pub async fn load_with_opts(db_path: PathBuf, options: Options) -> anyhow::Result { + static THREAD_NR: AtomicU64 = AtomicU64::new(0); let rt = tokio::runtime::Builder::new_multi_thread() - .thread_name("iroh-blob-store") + .thread_name_fn(|| { + format!( + "iroh-blob-store-{}", + THREAD_NR.fetch_add(1, Ordering::SeqCst) + ) + }) .enable_time() .build()?; let handle = rt.handle().clone(); From 632131f147e38b13db6a9d1703c1a268dcff6f9b Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 11 Jul 2025 11:56:22 +0300 Subject: [PATCH 10/11] Introduce traits to structure the API The longer term plan for this is to give people something to implement if they want their own store, so they don't have to do everything by themselves! --- src/store/fs.rs | 182 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 135 insertions(+), 47 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index ff257ffd..b3eb6908 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -64,7 +64,8 @@ //! safely shut down as well. Any store refs you are holding will be inoperable //! after this. use std::{ - fmt, fs, + fmt::{self, Debug}, + fs, future::Future, io::Write, num::NonZeroU64, @@ -224,7 +225,7 @@ impl entity_manager::Params for EmParams { state: entity_manager::ActiveEntityState, _cause: entity_manager::ShutdownCause, ) { - state.persist(); + state.persist().await; } } @@ -248,13 +249,13 @@ struct Actor { type HashContext = ActiveEntityState; -impl HashContext { +impl SyncEntityApi for HashContext { /// Load the state from the database. /// /// If the state is Initial, this will start the load. /// If it is Loading, it will wait until loading is done. /// If it is any other state, it will be a noop. - pub async fn load(&self) { + async fn load(&self) { enum Action { Load, Wait, @@ -304,32 +305,8 @@ impl HashContext { } } - pub(super) fn persist(&self) { - self.state.send_if_modified(|guard| { - let hash = &self.id; - let BaoFileStorage::Partial(fs) = guard.take() else { - return false; - }; - let path = self.global.options.path.bitfield_path(hash); - trace!("writing bitfield for hash {} to {}", hash, path.display()); - if let Err(cause) = fs.sync_all(&path) { - error!( - "failed to write bitfield for {} at {}: {:?}", - hash, - path.display(), - cause - ); - } - false - }); - } - /// Write a batch and notify the db - pub(super) async fn write_batch( - &self, - batch: &[BaoContentItem], - bitfield: &Bitfield, - ) -> io::Result<()> { + async fn write_batch(&self, batch: &[BaoContentItem], bitfield: &Bitfield) -> io::Result<()> { trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len()); let mut res = Ok(None); self.state.send_if_modified(|state| { @@ -351,7 +328,8 @@ impl HashContext { /// /// Caution: this is a reader for the unvalidated data file. Reading this /// can produce data that does not match the hash. - pub fn data_reader(&self) -> DataReader { + #[allow(refining_impl_trait_internal)] + fn data_reader(&self) -> DataReader { DataReader(self.state.clone()) } @@ -359,36 +337,39 @@ impl HashContext { /// /// The outboard file is used to validate the data file. It is not guaranteed /// to be complete. - pub fn outboard_reader(&self) -> OutboardReader { + #[allow(refining_impl_trait_internal)] + fn outboard_reader(&self) -> OutboardReader { OutboardReader(self.state.clone()) } /// The most precise known total size of the data file. - pub fn current_size(&self) -> io::Result { + fn current_size(&self) -> io::Result { match self.state.borrow().deref() { BaoFileStorage::Complete(mem) => Ok(mem.size()), BaoFileStorage::PartialMem(mem) => Ok(mem.current_size()), BaoFileStorage::Partial(file) => file.current_size(), - BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), - BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")), - BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")), - BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()), + BaoFileStorage::Poisoned => Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => Err(io::Error::other("initial")), + BaoFileStorage::Loading => Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => Err(io::ErrorKind::NotFound.into()), } } /// The most precise known total size of the data file. - pub fn bitfield(&self) -> io::Result { + fn bitfield(&self) -> io::Result { match self.state.borrow().deref() { BaoFileStorage::Complete(mem) => Ok(mem.bitfield()), BaoFileStorage::PartialMem(mem) => Ok(mem.bitfield().clone()), BaoFileStorage::Partial(file) => Ok(file.bitfield().clone()), - BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), - BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")), - BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")), - BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()), + BaoFileStorage::Poisoned => Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => Err(io::Error::other("initial")), + BaoFileStorage::Loading => Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => Err(io::ErrorKind::NotFound.into()), } } +} +impl HashContext { /// The outboard for the file. pub fn outboard(&self) -> io::Result> { let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE); @@ -722,25 +703,62 @@ impl HashSpecificCommand for ExportPathMsg { async fn handle(self, ctx: HashContext) { ctx.export_path(self).await } - async fn on_error(self, _arg: SpawnArg) {} + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.tx + .send(ExportProgressItem::Error(api::Error::Io(err))) + .await + .ok(); + } } impl HashSpecificCommand for ExportBaoMsg { async fn handle(self, ctx: HashContext) { ctx.export_bao(self).await } - async fn on_error(self, _arg: SpawnArg) {} + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.tx + .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(err))) + .await + .ok(); + } } impl HashSpecificCommand for ExportRangesMsg { async fn handle(self, ctx: HashContext) { ctx.export_ranges(self).await } - async fn on_error(self, _arg: SpawnArg) {} + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.tx + .send(ExportRangesItem::Error(api::Error::Io(err))) + .await + .ok(); + } } impl HashSpecificCommand for ImportBaoMsg { async fn handle(self, ctx: HashContext) { ctx.import_bao(self).await } - async fn on_error(self, _arg: SpawnArg) {} + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.tx.send(Err(api::Error::Io(err))).await.ok(); + } } impl HashSpecific for (TempTag, ImportEntryMsg) { fn hash(&self) -> Hash { @@ -752,7 +770,14 @@ impl HashSpecificCommand for (TempTag, ImportEntryMsg) { let (tt, cmd) = self; ctx.finish_import(cmd, tt).await } - async fn on_error(self, _arg: SpawnArg) {} + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.1.tx.send(AddProgressItem::Error(err)).await.ok(); + } } struct RtWrapper(Option); @@ -809,8 +834,50 @@ async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc) Ok(()) } +/// The minimal API you need to implement for an entity for a store to work. +trait EntityApi { + /// Import from a stream of n0 bao encoded data. + async fn import_bao(&self, cmd: ImportBaoMsg); + /// Finish an import from a local file or memory. + async fn finish_import(&self, cmd: ImportEntryMsg, tt: TempTag); + /// Observe the bitfield of the entry. + async fn observe(&self, cmd: ObserveMsg); + /// Export byte ranges of the entry as data + async fn export_ranges(&self, cmd: ExportRangesMsg); + /// Export chunk ranges of the entry as a n0 bao encoded stream. + async fn export_bao(&self, cmd: ExportBaoMsg); + /// Export the entry to a local file. + async fn export_path(&self, cmd: ExportPathMsg); + /// Persist the entry at the end of its lifecycle. + async fn persist(&self); +} + +/// A more opinionated API that can be used as a helper to save implementation +/// effort when implementing the EntityApi trait. +trait SyncEntityApi: EntityApi { + /// Load the entry state from the database. This must make sure that it is + /// not run concurrently, so if load is called multiple times, all but one + /// must wait. You can use a tokio::sync::OnceCell or similar to achieve this. + async fn load(&self); + + /// Get a synchronous reader for the data file. + fn data_reader(&self) -> impl ReadBytesAt; + + /// Get a synchronous reader for the outboard file. + fn outboard_reader(&self) -> impl ReadAt; + + /// Get the best known size of the data file. + fn current_size(&self) -> io::Result; + + /// Get the bitfield of the entry. + fn bitfield(&self) -> io::Result; + + /// Write a batch of content items to the entry. + async fn write_batch(&self, batch: &[BaoContentItem], bitfield: &Bitfield) -> io::Result<()>; +} + /// The high level entry point per entry. -impl HashContext { +impl EntityApi for HashContext { #[instrument(skip_all, fields(hash = %cmd.hash_short()))] async fn import_bao(&self, cmd: ImportBaoMsg) { trace!("{cmd:?}"); @@ -890,6 +957,27 @@ impl HashContext { }; cmd.tx.send(res).await.ok(); } + + #[instrument(skip_all, fields(hash = %self.id.fmt_short()))] + async fn persist(&self) { + self.state.send_if_modified(|guard| { + let hash = &self.id; + let BaoFileStorage::Partial(fs) = guard.take() else { + return false; + }; + let path = self.global.options.path.bitfield_path(hash); + trace!("writing bitfield for hash {} to {}", hash, path.display()); + if let Err(cause) = fs.sync_all(&path) { + error!( + "failed to write bitfield for {} at {}: {:?}", + hash, + path.display(), + cause + ); + } + false + }); + } } async fn finish_import_impl(ctx: &HashContext, import_data: ImportEntry) -> io::Result<()> { From 9a8547af4134fec25519274e1f9156a63ead2148 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 11 Jul 2025 14:57:00 +0300 Subject: [PATCH 11/11] clippy --- src/store/fs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index b3eb6908..7ab310c7 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -310,7 +310,7 @@ impl SyncEntityApi for HashContext { trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len()); let mut res = Ok(None); self.state.send_if_modified(|state| { - let Ok((state1, update)) = state.take().write_batch(batch, bitfield, &self) else { + let Ok((state1, update)) = state.take().write_batch(batch, bitfield, self) else { res = Err(io::Error::other("write batch failed")); return false; };