diff --git a/src/api/remote.rs b/src/api/remote.rs index 7a4055fb..5de12d5f 100644 --- a/src/api/remote.rs +++ b/src/api/remote.rs @@ -1064,8 +1064,15 @@ mod tests { use testresult::TestResult; use crate::{ + api::blobs::Blobs, protocol::{ChunkRangesSeq, GetRequest}, - store::fs::{tests::INTERESTING_SIZES, FsStore}, + store::{ + fs::{ + tests::{create_n0_bao, test_data, INTERESTING_SIZES}, + FsStore, + }, + mem::MemStore, + }, tests::{add_test_hash_seq, add_test_hash_seq_incomplete}, util::ChunkRangesExt, }; @@ -1117,6 +1124,38 @@ mod tests { Ok(()) } + async fn test_observe_partial(blobs: &Blobs) -> TestResult<()> { + let sizes = INTERESTING_SIZES; + for size in sizes { + let data = test_data(size); + let ranges = ChunkRanges::chunk(0); + let (hash, bao) = create_n0_bao(&data, &ranges)?; + blobs.import_bao_bytes(hash, ranges.clone(), bao).await?; + let bitfield = blobs.observe(hash).await?; + if size > 1024 { + assert_eq!(bitfield.ranges, ranges); + } else { + assert_eq!(bitfield.ranges, ChunkRanges::all()); + } + } + Ok(()) + } + + #[tokio::test] + async fn test_observe_partial_mem() -> TestResult<()> { + let store = MemStore::new(); + test_observe_partial(store.blobs()).await?; + Ok(()) + } + + #[tokio::test] + async fn test_observe_partial_fs() -> TestResult<()> { + let td = tempfile::tempdir()?; + let store = FsStore::load(td.path()).await?; + test_observe_partial(store.blobs()).await?; + Ok(()) + } + #[tokio::test] async fn test_local_info_hash_seq() -> TestResult<()> { let sizes = INTERESTING_SIZES; diff --git a/src/store/fs.rs b/src/store/fs.rs index b323571b..7ab310c7 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -64,22 +64,28 @@ //! safely shut down as well. Any store refs you are holding will be inoperable //! after this. use std::{ - fmt, fs, + fmt::{self, Debug}, + fs, future::Future, io::Write, num::NonZeroU64, ops::Deref, path::{Path, PathBuf}, - sync::Arc, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, }; use bao_tree::{ + blake3, io::{ mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt}, + outboard::PreOrderOutboard, sync::ReadAt, BaoContentItem, Leaf, }, - ChunkNum, ChunkRanges, + BaoTree, ChunkNum, ChunkRanges, }; use bytes::Bytes; use delete_set::{BaoFilePart, ProtectHandle}; @@ -106,9 +112,15 @@ use crate::{ ApiClient, }, store::{ - fs::util::entity_manager::{self, ActiveEntityState}, + fs::{ + bao_file::{ + BaoFileStorage, BaoFileStorageSubscriber, CompleteStorage, DataReader, + OutboardReader, + }, + util::entity_manager::{self, ActiveEntityState}, + }, util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned}, - Hash, + Hash, IROH_BLOCK_SIZE, }, util::{ channel::oneshot, @@ -186,8 +198,6 @@ struct TaskContext { pub db: meta::Db, // Handle to send internal commands pub internal_cmd_tx: tokio::sync::mpsc::Sender, - /// The file handle for the empty hash. - pub empty: BaoFileHandle, /// Handle to protect files from deletion. pub protect: ProtectHandle, } @@ -201,12 +211,6 @@ impl TaskContext { } } -impl entity_manager::Reset for Slot { - fn reset(&mut self) { - self.0 = Arc::new(tokio::sync::Mutex::new(None)); - } -} - #[derive(Debug)] struct EmParams; @@ -215,19 +219,13 @@ impl entity_manager::Params for EmParams { type GlobalState = Arc; - type EntityState = Slot; + type EntityState = BaoFileHandle; async fn on_shutdown( state: entity_manager::ActiveEntityState, - cause: entity_manager::ShutdownCause, + _cause: entity_manager::ShutdownCause, ) { - // this isn't strictly necessary. Drop will run anyway as soon as the - // state is reset to it's default value. Doing it here means that we - // have exact control over where it happens. - if let Some(mut handle) = state.state.0.lock().await.take() { - trace!("shutting down hash: {}, cause: {cause:?}", state.id); - handle.persist(&state); - } + state.persist().await; } } @@ -251,8 +249,139 @@ struct Actor { type HashContext = ActiveEntityState; +impl SyncEntityApi for HashContext { + /// Load the state from the database. + /// + /// If the state is Initial, this will start the load. + /// If it is Loading, it will wait until loading is done. + /// If it is any other state, it will be a noop. + async fn load(&self) { + enum Action { + Load, + Wait, + None, + } + let mut action = Action::None; + self.state.send_if_modified(|guard| match guard.deref() { + BaoFileStorage::Initial => { + *guard = BaoFileStorage::Loading; + action = Action::Load; + true + } + BaoFileStorage::Loading => { + action = Action::Wait; + false + } + _ => false, + }); + match action { + Action::Load => { + let state = if self.id == Hash::EMPTY { + BaoFileStorage::Complete(CompleteStorage { + data: MemOrFile::Mem(Bytes::new()), + outboard: MemOrFile::empty(), + }) + } else { + // we must assign a new state even in the error case, otherwise + // tasks waiting for loading would stall! + match self.global.db.get(self.id).await { + Ok(state) => match BaoFileStorage::open(state, self).await { + Ok(handle) => handle, + Err(_) => BaoFileStorage::Poisoned, + }, + Err(_) => BaoFileStorage::Poisoned, + } + }; + self.state.send_replace(state); + } + Action::Wait => { + // we are in state loading already, so we just need to wait for the + // other task to complete loading. + while matches!(self.state.borrow().deref(), BaoFileStorage::Loading) { + self.state.0.subscribe().changed().await.ok(); + } + } + Action::None => {} + } + } + + /// Write a batch and notify the db + async fn write_batch(&self, batch: &[BaoContentItem], bitfield: &Bitfield) -> io::Result<()> { + trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len()); + let mut res = Ok(None); + self.state.send_if_modified(|state| { + let Ok((state1, update)) = state.take().write_batch(batch, bitfield, self) else { + res = Err(io::Error::other("write batch failed")); + return false; + }; + res = Ok(update); + *state = state1; + true + }); + if let Some(update) = res? { + self.global.db.update(self.id, update).await?; + } + Ok(()) + } + + /// An AsyncSliceReader for the data file. + /// + /// Caution: this is a reader for the unvalidated data file. Reading this + /// can produce data that does not match the hash. + #[allow(refining_impl_trait_internal)] + fn data_reader(&self) -> DataReader { + DataReader(self.state.clone()) + } + + /// An AsyncSliceReader for the outboard file. + /// + /// The outboard file is used to validate the data file. It is not guaranteed + /// to be complete. + #[allow(refining_impl_trait_internal)] + fn outboard_reader(&self) -> OutboardReader { + OutboardReader(self.state.clone()) + } + + /// The most precise known total size of the data file. + fn current_size(&self) -> io::Result { + match self.state.borrow().deref() { + BaoFileStorage::Complete(mem) => Ok(mem.size()), + BaoFileStorage::PartialMem(mem) => Ok(mem.current_size()), + BaoFileStorage::Partial(file) => file.current_size(), + BaoFileStorage::Poisoned => Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => Err(io::Error::other("initial")), + BaoFileStorage::Loading => Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => Err(io::ErrorKind::NotFound.into()), + } + } + + /// The most precise known total size of the data file. + fn bitfield(&self) -> io::Result { + match self.state.borrow().deref() { + BaoFileStorage::Complete(mem) => Ok(mem.bitfield()), + BaoFileStorage::PartialMem(mem) => Ok(mem.bitfield().clone()), + BaoFileStorage::Partial(file) => Ok(file.bitfield().clone()), + BaoFileStorage::Poisoned => Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => Err(io::Error::other("initial")), + BaoFileStorage::Loading => Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => Err(io::ErrorKind::NotFound.into()), + } + } +} + impl HashContext { - pub fn db(&self) -> &meta::Db { + /// The outboard for the file. + pub fn outboard(&self) -> io::Result> { + let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE); + let outboard = self.outboard_reader(); + Ok(PreOrderOutboard { + root: blake3::Hash::from(self.id), + tree, + data: outboard, + }) + } + + fn db(&self) -> &meta::Db { &self.global.db } @@ -260,21 +389,18 @@ impl HashContext { &self.global.options } - pub async fn lock(&self) -> tokio::sync::MutexGuard<'_, Option> { - self.state.0.lock().await - } - - pub fn protect(&self, hash: Hash, parts: impl IntoIterator) { - self.global.protect.protect(hash, parts); + pub fn protect(&self, parts: impl IntoIterator) { + self.global.protect.protect(self.id, parts); } /// Update the entry state in the database, and wait for completion. - pub async fn update_await(&self, hash: Hash, state: EntryState) -> io::Result<()> { - self.db().update_await(hash, state).await?; + pub async fn update_await(&self, state: EntryState) -> io::Result<()> { + self.db().update_await(self.id, state).await?; Ok(()) } - pub async fn get_entry_state(&self, hash: Hash) -> io::Result>> { + pub async fn get_entry_state(&self) -> io::Result>> { + let hash = self.id; if hash == Hash::EMPTY { return Ok(Some(EntryState::Complete { data_location: DataLocation::Inline(Bytes::new()), @@ -285,115 +411,8 @@ impl HashContext { } /// Update the entry state in the database, and wait for completion. - pub async fn set(&self, hash: Hash, state: EntryState) -> io::Result<()> { - self.db().set(hash, state).await - } - - pub async fn get(&self, hash: Hash) -> api::Result { - if hash == Hash::EMPTY { - return Ok(self.global.empty.clone()); - } - let res = self - .state - .get_or_create(|| async { - let res = self.db().get(hash).await.map_err(io::Error::other)?; - let res = match res { - Some(state) => open_bao_file(state, self).await, - None => Err(io::Error::new(io::ErrorKind::NotFound, "hash not found")), - }; - Ok((res?, ())) - }) - .await - .map_err(api::Error::from); - let (res, _) = res?; - Ok(res) - } - - pub async fn get_or_create(&self, hash: Hash) -> api::Result { - if hash == Hash::EMPTY { - return Ok(self.global.empty.clone()); - } - let res = self - .state - .get_or_create(|| async { - let res = self.db().get(hash).await.map_err(io::Error::other)?; - let res = match res { - Some(state) => open_bao_file(state, self).await, - None => Ok(BaoFileHandle::new_partial_mem()), - }; - Ok((res?, ())) - }) - .await - .map_err(api::Error::from); - trace!("{res:?}"); - let (res, _) = res?; - Ok(res) - } -} - -async fn open_bao_file(state: EntryState, ctx: &HashContext) -> io::Result { - let hash = &ctx.id; - let options = &ctx.global.options; - Ok(match state { - EntryState::Complete { - data_location, - outboard_location, - } => { - let data = match data_location { - DataLocation::Inline(data) => MemOrFile::Mem(data), - DataLocation::Owned(size) => { - let path = options.path.data_path(hash); - let file = fs::File::open(&path)?; - MemOrFile::File(FixedSize::new(file, size)) - } - DataLocation::External(paths, size) => { - let Some(path) = paths.into_iter().next() else { - return Err(io::Error::other("no external data path")); - }; - let file = fs::File::open(&path)?; - MemOrFile::File(FixedSize::new(file, size)) - } - }; - let outboard = match outboard_location { - OutboardLocation::NotNeeded => MemOrFile::empty(), - OutboardLocation::Inline(data) => MemOrFile::Mem(data), - OutboardLocation::Owned => { - let path = options.path.outboard_path(hash); - let file = fs::File::open(&path)?; - MemOrFile::File(file) - } - }; - BaoFileHandle::new_complete(data, outboard) - } - EntryState::Partial { .. } => BaoFileHandle::new_partial_file(ctx).await?, - }) -} - -/// An entry for each hash, containing a weak reference to a BaoFileHandle -/// wrapped in a tokio mutex so handle creation is sequential. -#[derive(Debug, Clone, Default)] -pub(crate) struct Slot(Arc>>); - -impl Slot { - /// Get the handle if it exists and is still alive, otherwise load it from the database. - /// If there is nothing in the database, create a new in-memory handle. - /// - /// `make` will be called if the a live handle does not exist. - pub async fn get_or_create(&self, make: F) -> io::Result<(BaoFileHandle, T)> - where - F: FnOnce() -> Fut, - Fut: std::future::Future>, - T: Default, - { - let mut slot = self.0.lock().await; - if let Some(handle) = &*slot { - return Ok((handle.clone(), Default::default())); - } - let handle = make().await; - if let Ok((handle, _)) = &handle { - *slot = Some(handle.clone()); - } - handle + pub async fn set(&self, state: EntryState) -> io::Result<()> { + self.db().set(self.id, state).await } } @@ -619,7 +638,6 @@ impl Actor { options: options.clone(), db: meta::Db::new(db_send), internal_cmd_tx: fs_commands_tx, - empty: BaoFileHandle::new_complete(MemOrFile::empty(), MemOrFile::empty()), protect, }); rt.spawn(db_actor.run()); @@ -636,9 +654,12 @@ impl Actor { } trait HashSpecificCommand: HashSpecific + Send + 'static { + /// Handle the command on success by spawning a task into the per-hash context. fn handle(self, ctx: HashContext) -> impl Future + Send + 'static; - fn on_error(self) -> impl Future + Send + 'static; + /// Opportunity to send an error if spawning fails due to the task being busy (inbox full) + /// or dead (e.g. panic in one of the running tasks). + fn on_error(self, arg: SpawnArg) -> impl Future + Send + 'static; async fn spawn( self, @@ -647,25 +668,24 @@ trait HashSpecificCommand: HashSpecific + Send + 'static { ) where Self: Sized, { + let span = tracing::Span::current(); let task = manager - .spawn_boxed( - self.hash(), - Box::new(|x| { - Box::pin(async move { - match x { - SpawnArg::Active(state) => { - self.handle(state).await; - } - SpawnArg::Busy => { - self.on_error().await; - } - SpawnArg::Dead => { - self.on_error().await; - } + .spawn(self.hash(), |arg| { + async move { + match arg { + SpawnArg::Active(state) => { + self.handle(state).await; } - }) - }), - ) + SpawnArg::Busy => { + self.on_error(arg).await; + } + SpawnArg::Dead => { + self.on_error(arg).await; + } + } + } + .instrument(span) + }) .await; if let Some(task) = task { tasks.spawn(task); @@ -675,33 +695,70 @@ trait HashSpecificCommand: HashSpecific + Send + 'static { impl HashSpecificCommand for ObserveMsg { async fn handle(self, ctx: HashContext) { - observe(self, ctx).await + ctx.observe(self).await } - async fn on_error(self) {} + async fn on_error(self, _arg: SpawnArg) {} } impl HashSpecificCommand for ExportPathMsg { async fn handle(self, ctx: HashContext) { - export_path(self, ctx).await + ctx.export_path(self).await + } + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.tx + .send(ExportProgressItem::Error(api::Error::Io(err))) + .await + .ok(); } - async fn on_error(self) {} } impl HashSpecificCommand for ExportBaoMsg { async fn handle(self, ctx: HashContext) { - export_bao(self, ctx).await + ctx.export_bao(self).await + } + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.tx + .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(err))) + .await + .ok(); } - async fn on_error(self) {} } impl HashSpecificCommand for ExportRangesMsg { async fn handle(self, ctx: HashContext) { - export_ranges(self, ctx).await + ctx.export_ranges(self).await + } + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.tx + .send(ExportRangesItem::Error(api::Error::Io(err))) + .await + .ok(); } - async fn on_error(self) {} } impl HashSpecificCommand for ImportBaoMsg { async fn handle(self, ctx: HashContext) { - import_bao(self, ctx).await + ctx.import_bao(self).await + } + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.tx.send(Err(api::Error::Io(err))).await.ok(); } - async fn on_error(self) {} } impl HashSpecific for (TempTag, ImportEntryMsg) { fn hash(&self) -> Hash { @@ -711,9 +768,16 @@ impl HashSpecific for (TempTag, ImportEntryMsg) { impl HashSpecificCommand for (TempTag, ImportEntryMsg) { async fn handle(self, ctx: HashContext) { let (tt, cmd) = self; - finish_import(cmd, tt, ctx).await + ctx.finish_import(cmd, tt).await + } + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.1.tx.send(AddProgressItem::Error(err)).await.ok(); } - async fn on_error(self) {} } struct RtWrapper(Option); @@ -770,24 +834,156 @@ async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc) Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn finish_import(cmd: ImportEntryMsg, mut tt: TempTag, ctx: HashContext) { - let res = match finish_import_impl(cmd.inner, ctx).await { - Ok(()) => { - // for a remote call, we can't have the on_drop callback, so we have to leak the temp tag - // it will be cleaned up when either the process exits or scope ends - if cmd.tx.is_rpc() { - trace!("leaking temp tag {}", tt.hash_and_format()); - tt.leak(); - } - AddProgressItem::Done(tt) +/// The minimal API you need to implement for an entity for a store to work. +trait EntityApi { + /// Import from a stream of n0 bao encoded data. + async fn import_bao(&self, cmd: ImportBaoMsg); + /// Finish an import from a local file or memory. + async fn finish_import(&self, cmd: ImportEntryMsg, tt: TempTag); + /// Observe the bitfield of the entry. + async fn observe(&self, cmd: ObserveMsg); + /// Export byte ranges of the entry as data + async fn export_ranges(&self, cmd: ExportRangesMsg); + /// Export chunk ranges of the entry as a n0 bao encoded stream. + async fn export_bao(&self, cmd: ExportBaoMsg); + /// Export the entry to a local file. + async fn export_path(&self, cmd: ExportPathMsg); + /// Persist the entry at the end of its lifecycle. + async fn persist(&self); +} + +/// A more opinionated API that can be used as a helper to save implementation +/// effort when implementing the EntityApi trait. +trait SyncEntityApi: EntityApi { + /// Load the entry state from the database. This must make sure that it is + /// not run concurrently, so if load is called multiple times, all but one + /// must wait. You can use a tokio::sync::OnceCell or similar to achieve this. + async fn load(&self); + + /// Get a synchronous reader for the data file. + fn data_reader(&self) -> impl ReadBytesAt; + + /// Get a synchronous reader for the outboard file. + fn outboard_reader(&self) -> impl ReadAt; + + /// Get the best known size of the data file. + fn current_size(&self) -> io::Result; + + /// Get the bitfield of the entry. + fn bitfield(&self) -> io::Result; + + /// Write a batch of content items to the entry. + async fn write_batch(&self, batch: &[BaoContentItem], bitfield: &Bitfield) -> io::Result<()>; +} + +/// The high level entry point per entry. +impl EntityApi for HashContext { + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn import_bao(&self, cmd: ImportBaoMsg) { + trace!("{cmd:?}"); + self.load().await; + let ImportBaoMsg { + inner: ImportBaoRequest { size, .. }, + rx, + tx, + .. + } = cmd; + let res = import_bao_impl(self, size, rx).await; + trace!("{res:?}"); + tx.send(res).await.ok(); + } + + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn observe(&self, cmd: ObserveMsg) { + trace!("{cmd:?}"); + self.load().await; + BaoFileStorageSubscriber::new(self.state.subscribe()) + .forward(cmd.tx) + .await + .ok(); + } + + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn export_ranges(&self, mut cmd: ExportRangesMsg) { + trace!("{cmd:?}"); + self.load().await; + if let Err(cause) = export_ranges_impl(self, cmd.inner, &mut cmd.tx).await { + cmd.tx + .send(ExportRangesItem::Error(cause.into())) + .await + .ok(); } - Err(cause) => AddProgressItem::Error(cause), - }; - cmd.tx.send(res).await.ok(); + } + + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn export_bao(&self, mut cmd: ExportBaoMsg) { + trace!("{cmd:?}"); + self.load().await; + if let Err(cause) = export_bao_impl(self, cmd.inner, &mut cmd.tx).await { + // if the entry is in state NonExisting, this will be an io error with + // kind NotFound. So we must not wrap this somehow but pass it on directly. + cmd.tx + .send(bao_tree::io::EncodeError::Io(cause).into()) + .await + .ok(); + } + } + + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn export_path(&self, cmd: ExportPathMsg) { + trace!("{cmd:?}"); + self.load().await; + let ExportPathMsg { inner, mut tx, .. } = cmd; + if let Err(cause) = export_path_impl(self, inner, &mut tx).await { + tx.send(cause.into()).await.ok(); + } + } + + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn finish_import(&self, cmd: ImportEntryMsg, mut tt: TempTag) { + trace!("{cmd:?}"); + self.load().await; + let res = match finish_import_impl(self, cmd.inner).await { + Ok(()) => { + // for a remote call, we can't have the on_drop callback, so we have to leak the temp tag + // it will be cleaned up when either the process exits or scope ends + if cmd.tx.is_rpc() { + trace!("leaking temp tag {}", tt.hash_and_format()); + tt.leak(); + } + AddProgressItem::Done(tt) + } + Err(cause) => AddProgressItem::Error(cause), + }; + cmd.tx.send(res).await.ok(); + } + + #[instrument(skip_all, fields(hash = %self.id.fmt_short()))] + async fn persist(&self) { + self.state.send_if_modified(|guard| { + let hash = &self.id; + let BaoFileStorage::Partial(fs) = guard.take() else { + return false; + }; + let path = self.global.options.path.bitfield_path(hash); + trace!("writing bitfield for hash {} to {}", hash, path.display()); + if let Err(cause) = fs.sync_all(&path) { + error!( + "failed to write bitfield for {} at {}: {:?}", + hash, + path.display(), + cause + ); + } + false + }); + } } -async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::Result<()> { +async fn finish_import_impl(ctx: &HashContext, import_data: ImportEntry) -> io::Result<()> { + if ctx.id == Hash::EMPTY { + return Ok(()); // nothing to do for the empty hash + } let ImportEntry { source, hash, @@ -806,14 +1002,14 @@ async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::R debug_assert!(!options.is_inlined_data(*size)); } } - let guard = ctx.lock().await; - let handle = guard.as_ref().map(|x| x.clone()); + ctx.load().await; + let handle = &ctx.state; // if I do have an existing handle, I have to possibly deal with observers. // if I don't have an existing handle, there are 2 cases: // the entry exists in the db, but we don't have a handle // the entry does not exist at all. // convert the import source to a data location and drop the open files - ctx.protect(hash, [BaoFilePart::Data, BaoFilePart::Outboard]); + ctx.protect([BaoFilePart::Data, BaoFilePart::Outboard]); let data_location = match source { ImportSource::Memory(data) => DataLocation::Inline(data), ImportSource::External(path, _file, size) => DataLocation::External(vec![path], size), @@ -857,58 +1053,39 @@ async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::R OutboardLocation::Owned } }; - if let Some(handle) = handle { - let data = match &data_location { - DataLocation::Inline(data) => MemOrFile::Mem(data.clone()), - DataLocation::Owned(size) => { - let path = ctx.options().path.data_path(&hash); - let file = fs::File::open(&path)?; - MemOrFile::File(FixedSize::new(file, *size)) - } - DataLocation::External(paths, size) => { - let Some(path) = paths.iter().next() else { - return Err(io::Error::other("no external data path")); - }; - let file = fs::File::open(path)?; - MemOrFile::File(FixedSize::new(file, *size)) - } - }; - let outboard = match &outboard_location { - OutboardLocation::NotNeeded => MemOrFile::empty(), - OutboardLocation::Inline(data) => MemOrFile::Mem(data.clone()), - OutboardLocation::Owned => { - let path = ctx.options().path.outboard_path(&hash); - let file = fs::File::open(&path)?; - MemOrFile::File(file) - } - }; - handle.complete(data, outboard); - } + let data = match &data_location { + DataLocation::Inline(data) => MemOrFile::Mem(data.clone()), + DataLocation::Owned(size) => { + let path = ctx.options().path.data_path(&hash); + let file = fs::File::open(&path)?; + MemOrFile::File(FixedSize::new(file, *size)) + } + DataLocation::External(paths, size) => { + let Some(path) = paths.iter().next() else { + return Err(io::Error::other("no external data path")); + }; + let file = fs::File::open(path)?; + MemOrFile::File(FixedSize::new(file, *size)) + } + }; + let outboard = match &outboard_location { + OutboardLocation::NotNeeded => MemOrFile::empty(), + OutboardLocation::Inline(data) => MemOrFile::Mem(data.clone()), + OutboardLocation::Owned => { + let path = ctx.options().path.outboard_path(&hash); + let file = fs::File::open(&path)?; + MemOrFile::File(file) + } + }; + handle.complete(data, outboard); let state = EntryState::Complete { data_location, outboard_location, }; - ctx.update_await(hash, state).await?; + ctx.update_await(state).await?; Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn import_bao(cmd: ImportBaoMsg, ctx: HashContext) { - trace!("{cmd:?}"); - let ImportBaoMsg { - inner: ImportBaoRequest { size, hash }, - rx, - tx, - .. - } = cmd; - let res = match ctx.get_or_create(hash).await { - Ok(handle) => import_bao_impl(size, rx, handle, ctx).await, - Err(cause) => Err(cause), - }; - trace!("{res:?}"); - tx.send(res).await.ok(); -} - fn chunk_range(leaf: &Leaf) -> ChunkRanges { let start = ChunkNum::chunks(leaf.offset); let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64); @@ -916,10 +1093,9 @@ fn chunk_range(leaf: &Leaf) -> ChunkRanges { } async fn import_bao_impl( + ctx: &HashContext, size: NonZeroU64, mut rx: mpsc::Receiver, - handle: BaoFileHandle, - ctx: HashContext, ) -> api::Result<()> { trace!("importing bao: {} {} bytes", ctx.id.fmt_short(), size); let mut batch = Vec::::new(); @@ -928,7 +1104,7 @@ async fn import_bao_impl( // if the batch is not empty, the last item is a leaf and the current item is a parent, write the batch if !batch.is_empty() && batch[batch.len() - 1].is_leaf() && item.is_parent() { let bitfield = Bitfield::new_unchecked(ranges, size.into()); - handle.write_batch(&batch, &bitfield, &ctx).await?; + ctx.write_batch(&batch, &bitfield).await?; batch.clear(); ranges = ChunkRanges::empty(); } @@ -944,48 +1120,23 @@ async fn import_bao_impl( } if !batch.is_empty() { let bitfield = Bitfield::new_unchecked(ranges, size.into()); - handle.write_batch(&batch, &bitfield, &ctx).await?; + ctx.write_batch(&batch, &bitfield).await?; } Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn observe(cmd: ObserveMsg, ctx: HashContext) { - let Ok(handle) = ctx.get_or_create(cmd.hash).await else { - return; - }; - handle.subscribe().forward(cmd.tx).await.ok(); -} - -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn export_ranges(mut cmd: ExportRangesMsg, ctx: HashContext) { - match ctx.get(cmd.hash).await { - Ok(handle) => { - if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, handle).await { - cmd.tx - .send(ExportRangesItem::Error(cause.into())) - .await - .ok(); - } - } - Err(cause) => { - cmd.tx.send(ExportRangesItem::Error(cause)).await.ok(); - } - } -} - async fn export_ranges_impl( + ctx: &HashContext, cmd: ExportRangesRequest, tx: &mut mpsc::Sender, - handle: BaoFileHandle, ) -> io::Result<()> { let ExportRangesRequest { ranges, hash } = cmd; trace!( "exporting ranges: {hash} {ranges:?} size={}", - handle.current_size()? + ctx.current_size()? ); - let bitfield = handle.bitfield()?; - let data = handle.data_reader(); + let bitfield = ctx.bitfield()?; + let data = ctx.data_reader(); let size = bitfield.size(); for range in ranges.iter() { let range = match range { @@ -1017,58 +1168,29 @@ async fn export_ranges_impl( Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) { - match ctx.get(cmd.hash).await { - Ok(handle) => { - if let Err(cause) = export_bao_impl(cmd.inner, &mut cmd.tx, handle).await { - cmd.tx - .send(bao_tree::io::EncodeError::Io(io::Error::other(cause)).into()) - .await - .ok(); - } - } - Err(cause) => { - let crate::api::Error::Io(cause) = cause; - cmd.tx - .send(bao_tree::io::EncodeError::Io(cause).into()) - .await - .ok(); - } - } -} - async fn export_bao_impl( + ctx: &HashContext, cmd: ExportBaoRequest, tx: &mut mpsc::Sender, - handle: BaoFileHandle, -) -> anyhow::Result<()> { +) -> io::Result<()> { let ExportBaoRequest { ranges, hash, .. } = cmd; - let outboard = handle.outboard(&hash)?; + let outboard = ctx.outboard()?; let size = outboard.tree.size(); - if size == 0 && hash != Hash::EMPTY { + if size == 0 && cmd.hash != Hash::EMPTY { // we have no data whatsoever, so we stop here return Ok(()); } trace!("exporting bao: {hash} {ranges:?} size={size}",); - let data = handle.data_reader(); + let data = ctx.data_reader(); let tx = BaoTreeSender::new(tx); traverse_ranges_validated(data, outboard, &ranges, tx).await?; Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn export_path(cmd: ExportPathMsg, ctx: HashContext) { - let ExportPathMsg { inner, mut tx, .. } = cmd; - if let Err(cause) = export_path_impl(inner, &mut tx, ctx).await { - tx.send(cause.into()).await.ok(); - } -} - async fn export_path_impl( + ctx: &HashContext, cmd: ExportPathRequest, tx: &mut mpsc::Sender, - ctx: HashContext, ) -> api::Result<()> { let ExportPathRequest { mode, target, .. } = cmd; if !target.is_absolute() { @@ -1080,8 +1202,7 @@ async fn export_path_impl( if let Some(parent) = target.parent() { fs::create_dir_all(parent)?; } - let _guard = ctx.lock().await; - let state = ctx.get_entry_state(cmd.hash).await?; + let state = ctx.get_entry_state().await?; let (data_location, outboard_location) = match state { Some(EntryState::Complete { data_location, @@ -1143,13 +1264,10 @@ async fn export_path_impl( } } } - ctx.set( - cmd.hash, - EntryState::Complete { - data_location: DataLocation::External(vec![target], size), - outboard_location, - }, - ) + ctx.set(EntryState::Complete { + data_location: DataLocation::External(vec![target], size), + outboard_location, + }) .await?; } }, @@ -1193,8 +1311,14 @@ impl FsStore { /// Load or create a new store with custom options, returning an additional sender for file store specific commands. pub async fn load_with_opts(db_path: PathBuf, options: Options) -> anyhow::Result { + static THREAD_NR: AtomicU64 = AtomicU64::new(0); let rt = tokio::runtime::Builder::new_multi_thread() - .thread_name("iroh-blob-store") + .thread_name_fn(|| { + format!( + "iroh-blob-store-{}", + THREAD_NR.fetch_add(1, Ordering::SeqCst) + ) + }) .enable_time() .build()?; let handle = rt.handle().clone(); @@ -1412,7 +1536,7 @@ pub mod tests { // import data via import_bytes, check that we can observe it and that it is complete #[tokio::test] - async fn test_import_bytes() -> TestResult<()> { + async fn test_import_bytes_simple() -> TestResult<()> { tracing_subscriber::fmt::try_init().ok(); let testdir = tempfile::tempdir()?; let db_dir = testdir.path().join("db"); @@ -1943,7 +2067,6 @@ pub mod tests { if path.is_file() { if let Some(file_ext) = path.extension() { if file_ext.to_string_lossy().to_lowercase() == ext { - println!("Deleting: {}", path.display()); fs::remove_file(path)?; } } diff --git a/src/store/fs/bao_file.rs b/src/store/fs/bao_file.rs index 1fe91404..d0217040 100644 --- a/src/store/fs/bao_file.rs +++ b/src/store/fs/bao_file.rs @@ -4,7 +4,6 @@ use std::{ io, ops::Deref, path::Path, - sync::Arc, }; use bao_tree::{ @@ -21,7 +20,7 @@ use bytes::{Bytes, BytesMut}; use derive_more::Debug; use irpc::channel::mpsc; use tokio::sync::watch; -use tracing::{debug, error, info, trace}; +use tracing::{debug, info, trace}; use super::{ entry_state::{DataLocation, EntryState, OutboardLocation}, @@ -31,7 +30,7 @@ use super::{ use crate::{ api::blobs::Bitfield, store::{ - fs::{meta::raw_outboard_size, HashContext}, + fs::{meta::raw_outboard_size, util::entity_manager, HashContext}, util::{ read_checksummed_and_truncate, write_checksummed, FixedSize, MemOrFile, PartialMemStorage, DD, @@ -143,7 +142,7 @@ impl PartialFileStorage { &self.bitfield } - fn sync_all(&self, bitfield_path: &Path) -> io::Result<()> { + pub(super) fn sync_all(&self, bitfield_path: &Path) -> io::Result<()> { self.data.sync_all()?; self.outboard.sync_all()?; self.sizes.sync_all()?; @@ -236,7 +235,7 @@ impl PartialFileStorage { )) } - fn current_size(&self) -> io::Result { + pub(super) fn current_size(&self) -> io::Result { read_size(&self.sizes) } @@ -286,8 +285,24 @@ fn read_size(size_file: &File) -> io::Result { } /// The storage for a bao file. This can be either in memory or on disk. -#[derive(derive_more::From)] +/// +/// The two initial states `Initial` and `Loading` are used to coordinate the +/// loading of the entry from the metadata database. Once that is complete, +/// you should never see these states again. +/// +/// From the remaining states you can get into `Poisoned` if there is an +/// IO error during an operation. +/// +/// `Poisioned` is also used once the handle is persisted and no longer usable. +#[derive(derive_more::From, Default)] pub(crate) enum BaoFileStorage { + /// Initial state, we don't know anything yet. + #[default] + Initial, + /// Currently loading the entry from the metadata. + Loading, + /// There is no info about this hash in the metadata db. + NonExisting, /// The entry is incomplete and in memory. /// /// Since it is incomplete, it must be writeable. @@ -305,13 +320,8 @@ pub(crate) enum BaoFileStorage { /// /// Writing to this is a no-op, since it is already complete. Complete(CompleteStorage), - /// We will get into that state if there is an io error in the middle of an operation - /// - /// Also, when the handle is dropped we will poison the storage, so poisoned - /// can be seen when the handle is revived during the drop. - /// - /// BaoFileHandleWeak::upgrade() will return None if the storage is poisoned, - /// treat it as dead. + /// We will get into that state if there is an io error in the middle of an operation, + /// or after the handle is persisted and no longer usable. Poisoned, } @@ -322,16 +332,13 @@ impl fmt::Debug for BaoFileStorage { BaoFileStorage::Partial(x) => x.fmt(f), BaoFileStorage::Complete(x) => x.fmt(f), BaoFileStorage::Poisoned => f.debug_struct("Poisoned").finish(), + BaoFileStorage::Initial => f.debug_struct("Initial").finish(), + BaoFileStorage::Loading => f.debug_struct("Loading").finish(), + BaoFileStorage::NonExisting => f.debug_struct("NonExisting").finish(), } } } -impl Default for BaoFileStorage { - fn default() -> Self { - BaoFileStorage::Complete(Default::default()) - } -} - impl PartialMemStorage { /// Converts this storage into a complete storage, using the given hash for /// path names and the given options for decisions about inlining. @@ -387,22 +394,32 @@ impl PartialMemStorage { impl BaoFileStorage { pub fn bitfield(&self) -> Bitfield { match self { - BaoFileStorage::Complete(x) => Bitfield::complete(x.data.size()), + BaoFileStorage::Initial => { + panic!("initial storage should not be used") + } + BaoFileStorage::Loading => { + panic!("loading storage should not be used") + } + BaoFileStorage::NonExisting => Bitfield::empty(), BaoFileStorage::PartialMem(x) => x.bitfield.clone(), BaoFileStorage::Partial(x) => x.bitfield.clone(), + BaoFileStorage::Complete(x) => Bitfield::complete(x.data.size()), BaoFileStorage::Poisoned => { panic!("poisoned storage should not be used") } } } - fn write_batch( + pub(super) fn write_batch( self, batch: &[BaoContentItem], bitfield: &Bitfield, ctx: &HashContext, ) -> io::Result<(Self, Option>)> { Ok(match self { + BaoFileStorage::NonExisting => { + Self::new_partial_mem().write_batch(batch, bitfield, ctx)? + } BaoFileStorage::PartialMem(mut ms) => { // check if we need to switch to file mode, otherwise write to memory if max_offset(batch) <= ctx.global.options.inline.max_data_inlined { @@ -465,7 +482,7 @@ impl BaoFileStorage { // unless there is a bug, this would just write the exact same data (self, None) } - BaoFileStorage::Poisoned => { + _ => { // we are poisoned, so just ignore the write (self, None) } @@ -473,7 +490,7 @@ impl BaoFileStorage { } /// Create a new mutable mem storage. - pub fn partial_mem() -> Self { + pub fn new_partial_mem() -> Self { Self::PartialMem(Default::default()) } @@ -483,13 +500,14 @@ impl BaoFileStorage { match self { Self::Complete(_) => Ok(()), Self::PartialMem(_) => Ok(()), + Self::NonExisting => Ok(()), Self::Partial(file) => { file.data.sync_all()?; file.outboard.sync_all()?; file.sizes.sync_all()?; Ok(()) } - Self::Poisoned => { + Self::Poisoned | Self::Initial | Self::Loading => { // we are poisoned, so just ignore the sync Ok(()) } @@ -502,37 +520,22 @@ impl BaoFileStorage { } /// A cheaply cloneable handle to a bao file, including the hash and the configuration. -#[derive(Debug, Clone, derive_more::Deref)] -pub(crate) struct BaoFileHandle(Arc>); +#[derive(Debug, Clone, Default, derive_more::Deref)] +pub(crate) struct BaoFileHandle(pub(super) watch::Sender); -impl BaoFileHandle { - pub(super) fn persist(&mut self, ctx: &HashContext) { - self.send_if_modified(|guard| { - let hash = &ctx.id; - if Arc::strong_count(&self.0) > 1 { - return false; - } - let BaoFileStorage::Partial(fs) = guard.take() else { - return false; - }; - let path = ctx.global.options.path.bitfield_path(hash); - trace!("writing bitfield for hash {} to {}", hash, path.display()); - if let Err(cause) = fs.sync_all(&path) { - error!( - "failed to write bitfield for {} at {}: {:?}", - hash, - path.display(), - cause - ); - } - false - }); +impl entity_manager::Reset for BaoFileHandle { + fn reset(&mut self) { + self.send_replace(BaoFileStorage::Initial); + } + + fn ref_count(&self) -> usize { + self.0.receiver_count() + self.0.sender_count() } } /// A reader for a bao file, reading just the data. #[derive(Debug)] -pub struct DataReader(BaoFileHandle); +pub struct DataReader(pub(super) BaoFileHandle); impl ReadBytesAt for DataReader { fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result { @@ -542,13 +545,16 @@ impl ReadBytesAt for DataReader { BaoFileStorage::Partial(x) => x.data.read_bytes_at(offset, size), BaoFileStorage::Complete(x) => x.data.read_bytes_at(offset, size), BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")), + BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()), } } } /// A reader for the outboard part of a bao file. #[derive(Debug)] -pub struct OutboardReader(BaoFileHandle); +pub struct OutboardReader(pub(super) BaoFileHandle); impl ReadAt for OutboardReader { fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result { @@ -558,22 +564,51 @@ impl ReadAt for OutboardReader { BaoFileStorage::PartialMem(x) => x.outboard.read_at(offset, buf), BaoFileStorage::Partial(x) => x.outboard.read_at(offset, buf), BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")), + BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()), } } } -impl BaoFileHandle { - #[allow(dead_code)] - pub fn id(&self) -> usize { - Arc::as_ptr(&self.0) as usize - } - - /// Create a new bao file handle. - /// - /// This will create a new file handle with an empty memory storage. - pub fn new_partial_mem() -> Self { - let storage = BaoFileStorage::partial_mem(); - Self(Arc::new(watch::Sender::new(storage))) +impl BaoFileStorage { + pub async fn open(state: Option>, ctx: &HashContext) -> io::Result { + let hash = &ctx.id; + let options = &ctx.global.options; + Ok(match state { + Some(EntryState::Complete { + data_location, + outboard_location, + }) => { + let data = match data_location { + DataLocation::Inline(data) => MemOrFile::Mem(data), + DataLocation::Owned(size) => { + let path = options.path.data_path(hash); + let file = std::fs::File::open(&path)?; + MemOrFile::File(FixedSize::new(file, size)) + } + DataLocation::External(paths, size) => { + let Some(path) = paths.into_iter().next() else { + return Err(io::Error::other("no external data path")); + }; + let file = std::fs::File::open(&path)?; + MemOrFile::File(FixedSize::new(file, size)) + } + }; + let outboard = match outboard_location { + OutboardLocation::NotNeeded => MemOrFile::empty(), + OutboardLocation::Inline(data) => MemOrFile::Mem(data), + OutboardLocation::Owned => { + let path = options.path.outboard_path(hash); + let file = std::fs::File::open(&path)?; + MemOrFile::File(file) + } + }; + Self::new_complete(data, outboard) + } + Some(EntryState::Partial { .. }) => Self::new_partial_file(ctx).await?, + None => Self::NonExisting, + }) } /// Create a new bao file handle with a partial file. @@ -581,7 +616,7 @@ impl BaoFileHandle { let hash = &ctx.id; let options = ctx.global.options.clone(); let storage = PartialFileStorage::load(hash, &options.path)?; - let storage = if storage.bitfield.is_complete() { + Ok(if storage.bitfield.is_complete() { let size = storage.bitfield.size; let (storage, entry_state) = storage.into_complete(size, &options)?; debug!("File was reconstructed as complete"); @@ -589,8 +624,7 @@ impl BaoFileHandle { storage.into() } else { storage.into() - }; - Ok(Self(Arc::new(watch::Sender::new(storage)))) + }) } /// Create a new complete bao file handle. @@ -598,10 +632,11 @@ impl BaoFileHandle { data: MemOrFile>, outboard: MemOrFile, ) -> Self { - let storage = CompleteStorage { data, outboard }.into(); - Self(Arc::new(watch::Sender::new(storage))) + CompleteStorage { data, outboard }.into() } +} +impl BaoFileHandle { /// Complete the handle pub fn complete( &self, @@ -609,14 +644,14 @@ impl BaoFileHandle { outboard: MemOrFile, ) { self.send_if_modified(|guard| { - let res = match guard { - BaoFileStorage::Complete(_) => None, - BaoFileStorage::PartialMem(entry) => Some(&mut entry.bitfield), - BaoFileStorage::Partial(entry) => Some(&mut entry.bitfield), - BaoFileStorage::Poisoned => None, + let needs_complete = match guard { + BaoFileStorage::NonExisting => true, + BaoFileStorage::Complete(_) => false, + BaoFileStorage::PartialMem(_) => true, + BaoFileStorage::Partial(_) => true, + _ => false, }; - if let Some(bitfield) = res { - bitfield.update(&Bitfield::complete(data.size())); + if needs_complete { *guard = BaoFileStorage::Complete(CompleteStorage { data, outboard }); true } else { @@ -624,87 +659,6 @@ impl BaoFileHandle { } }); } - - pub fn subscribe(&self) -> BaoFileStorageSubscriber { - BaoFileStorageSubscriber::new(self.0.subscribe()) - } - - /// True if the file is complete. - #[allow(dead_code)] - pub fn is_complete(&self) -> bool { - matches!(self.borrow().deref(), BaoFileStorage::Complete(_)) - } - - /// An AsyncSliceReader for the data file. - /// - /// Caution: this is a reader for the unvalidated data file. Reading this - /// can produce data that does not match the hash. - pub fn data_reader(&self) -> DataReader { - DataReader(self.clone()) - } - - /// An AsyncSliceReader for the outboard file. - /// - /// The outboard file is used to validate the data file. It is not guaranteed - /// to be complete. - pub fn outboard_reader(&self) -> OutboardReader { - OutboardReader(self.clone()) - } - - /// The most precise known total size of the data file. - pub fn current_size(&self) -> io::Result { - match self.borrow().deref() { - BaoFileStorage::Complete(mem) => Ok(mem.size()), - BaoFileStorage::PartialMem(mem) => Ok(mem.current_size()), - BaoFileStorage::Partial(file) => file.current_size(), - BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), - } - } - - /// The most precise known total size of the data file. - pub fn bitfield(&self) -> io::Result { - match self.borrow().deref() { - BaoFileStorage::Complete(mem) => Ok(mem.bitfield()), - BaoFileStorage::PartialMem(mem) => Ok(mem.bitfield().clone()), - BaoFileStorage::Partial(file) => Ok(file.bitfield().clone()), - BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), - } - } - - /// The outboard for the file. - pub fn outboard(&self, hash: &Hash) -> io::Result> { - let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE); - let outboard = self.outboard_reader(); - Ok(PreOrderOutboard { - root: blake3::Hash::from(*hash), - tree, - data: outboard, - }) - } - - /// Write a batch and notify the db - pub(super) async fn write_batch( - &self, - batch: &[BaoContentItem], - bitfield: &Bitfield, - ctx: &HashContext, - ) -> io::Result<()> { - trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len()); - let mut res = Ok(None); - self.send_if_modified(|state| { - let Ok((state1, update)) = state.take().write_batch(batch, bitfield, ctx) else { - res = Err(io::Error::other("write batch failed")); - return false; - }; - res = Ok(update); - *state = state1; - true - }); - if let Some(update) = res? { - ctx.global.db.update(ctx.id, update).await?; - } - Ok(()) - } } impl PartialMemStorage { diff --git a/src/store/fs/util/entity_manager.rs b/src/store/fs/util/entity_manager.rs index f9628434..64fd9f85 100644 --- a/src/store/fs/util/entity_manager.rs +++ b/src/store/fs/util/entity_manager.rs @@ -12,6 +12,12 @@ use tokio::sync::{mpsc, oneshot}; pub trait Reset: Default { /// Reset the state to its default value. fn reset(&mut self); + + /// A ref count to ensure that the state is unique when shutting down. + /// + /// You are not allowed to clone the state out of a task, even though that + /// is possible. + fn ref_count(&self) -> usize; } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -213,6 +219,8 @@ mod entity_actor { ) .await .ok(); + println!("Calling on_shutdown {}", self.state.state.ref_count()); + assert_eq!(self.state.state.ref_count(), 1); P::on_shutdown(self.state.clone(), ShutdownCause::Idle).await; // Notify the main actor that we have completed shutdown. // here we also give back the rest of ourselves so the main actor can recycle us. @@ -839,6 +847,10 @@ mod tests { fn reset(&mut self) { *self.0.borrow_mut() = Default::default(); } + + fn ref_count(&self) -> usize { + Arc::strong_count(&self.0) + } } #[derive(Debug, Default)] @@ -1095,6 +1107,10 @@ mod tests { fn reset(&mut self) { *self.0.borrow_mut() = Default::default(); } + + fn ref_count(&self) -> usize { + 1 + } } fn get_path(root: impl AsRef, id: u64) -> PathBuf {