diff --git a/src/store/fs.rs b/src/store/fs.rs index 53ed163a..b323571b 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -218,9 +218,16 @@ impl entity_manager::Params for EmParams { type EntityState = Slot; async fn on_shutdown( - _state: entity_manager::ActiveEntityState, - _cause: entity_manager::ShutdownCause, + state: entity_manager::ActiveEntityState, + cause: entity_manager::ShutdownCause, ) { + // this isn't strictly necessary. Drop will run anyway as soon as the + // state is reset to it's default value. Doing it here means that we + // have exact control over where it happens. + if let Some(mut handle) = state.state.0.lock().await.take() { + trace!("shutting down hash: {}, cause: {cause:?}", state.id); + handle.persist(&state); + } } } @@ -291,7 +298,7 @@ impl HashContext { .get_or_create(|| async { let res = self.db().get(hash).await.map_err(io::Error::other)?; let res = match res { - Some(state) => open_bao_file(&hash, state, &self.global).await, + Some(state) => open_bao_file(state, self).await, None => Err(io::Error::new(io::ErrorKind::NotFound, "hash not found")), }; Ok((res?, ())) @@ -311,11 +318,8 @@ impl HashContext { .get_or_create(|| async { let res = self.db().get(hash).await.map_err(io::Error::other)?; let res = match res { - Some(state) => open_bao_file(&hash, state, &self.global).await, - None => Ok(BaoFileHandle::new_partial_mem( - hash, - self.global.options.clone(), - )), + Some(state) => open_bao_file(state, self).await, + None => Ok(BaoFileHandle::new_partial_mem()), }; Ok((res?, ())) }) @@ -327,12 +331,9 @@ impl HashContext { } } -async fn open_bao_file( - hash: &Hash, - state: EntryState, - ctx: &TaskContext, -) -> io::Result { - let options = &ctx.options; +async fn open_bao_file(state: EntryState, ctx: &HashContext) -> io::Result { + let hash = &ctx.id; + let options = &ctx.global.options; Ok(match state { EntryState::Complete { data_location, @@ -362,9 +363,9 @@ async fn open_bao_file( MemOrFile::File(file) } }; - BaoFileHandle::new_complete(*hash, data, outboard, options.clone()) + BaoFileHandle::new_complete(data, outboard) } - EntryState::Partial { .. } => BaoFileHandle::new_partial_file(*hash, ctx).await?, + EntryState::Partial { .. } => BaoFileHandle::new_partial_file(ctx).await?, }) } @@ -618,12 +619,7 @@ impl Actor { options: options.clone(), db: meta::Db::new(db_send), internal_cmd_tx: fs_commands_tx, - empty: BaoFileHandle::new_complete( - Hash::EMPTY, - MemOrFile::empty(), - MemOrFile::empty(), - options, - ), + empty: BaoFileHandle::new_complete(MemOrFile::empty(), MemOrFile::empty()), protect, }); rt.spawn(db_actor.run()); @@ -925,18 +921,14 @@ async fn import_bao_impl( handle: BaoFileHandle, ctx: HashContext, ) -> api::Result<()> { - trace!( - "importing bao: {} {} bytes", - handle.hash().fmt_short(), - size - ); + trace!("importing bao: {} {} bytes", ctx.id.fmt_short(), size); let mut batch = Vec::::new(); let mut ranges = ChunkRanges::empty(); while let Some(item) = rx.recv().await? { // if the batch is not empty, the last item is a leaf and the current item is a parent, write the batch if !batch.is_empty() && batch[batch.len() - 1].is_leaf() && item.is_parent() { let bitfield = Bitfield::new_unchecked(ranges, size.into()); - handle.write_batch(&batch, &bitfield, &ctx.global).await?; + handle.write_batch(&batch, &bitfield, &ctx).await?; batch.clear(); ranges = ChunkRanges::empty(); } @@ -952,7 +944,7 @@ async fn import_bao_impl( } if !batch.is_empty() { let bitfield = Bitfield::new_unchecked(ranges, size.into()); - handle.write_batch(&batch, &bitfield, &ctx.global).await?; + handle.write_batch(&batch, &bitfield, &ctx).await?; } Ok(()) } @@ -992,7 +984,6 @@ async fn export_ranges_impl( "exporting ranges: {hash} {ranges:?} size={}", handle.current_size()? ); - debug_assert!(handle.hash() == hash, "hash mismatch"); let bitfield = handle.bitfield()?; let data = handle.data_reader(); let size = bitfield.size(); @@ -1053,8 +1044,7 @@ async fn export_bao_impl( handle: BaoFileHandle, ) -> anyhow::Result<()> { let ExportBaoRequest { ranges, hash, .. } = cmd; - debug_assert!(handle.hash() == hash, "hash mismatch"); - let outboard = handle.outboard()?; + let outboard = handle.outboard(&hash)?; let size = outboard.tree.size(); if size == 0 && hash != Hash::EMPTY { // we have no data whatsoever, so we stop here diff --git a/src/store/fs/bao_file.rs b/src/store/fs/bao_file.rs index bf150ae8..1fe91404 100644 --- a/src/store/fs/bao_file.rs +++ b/src/store/fs/bao_file.rs @@ -31,7 +31,7 @@ use super::{ use crate::{ api::blobs::Bitfield, store::{ - fs::{meta::raw_outboard_size, TaskContext}, + fs::{meta::raw_outboard_size, HashContext}, util::{ read_checksummed_and_truncate, write_checksummed, FixedSize, MemOrFile, PartialMemStorage, DD, @@ -335,18 +335,16 @@ impl Default for BaoFileStorage { impl PartialMemStorage { /// Converts this storage into a complete storage, using the given hash for /// path names and the given options for decisions about inlining. - fn into_complete( - self, - hash: &Hash, - ctx: &TaskContext, - ) -> io::Result<(CompleteStorage, EntryState)> { + fn into_complete(self, ctx: &HashContext) -> io::Result<(CompleteStorage, EntryState)> { + let options = &ctx.global.options; + let hash = &ctx.id; let size = self.current_size(); let outboard_size = raw_outboard_size(size); - let (data, data_location) = if ctx.options.is_inlined_data(size) { + let (data, data_location) = if options.is_inlined_data(size) { let data: Bytes = self.data.to_vec().into(); (MemOrFile::Mem(data.clone()), DataLocation::Inline(data)) } else { - let data_path = ctx.options.path.data_path(hash); + let data_path = options.path.data_path(hash); let mut data_file = create_read_write(&data_path)?; self.data.persist(&mut data_file)?; ( @@ -354,7 +352,8 @@ impl PartialMemStorage { DataLocation::Owned(size), ) }; - let (outboard, outboard_location) = if ctx.options.is_inlined_outboard(outboard_size) { + let (outboard, outboard_location) = if ctx.global.options.is_inlined_outboard(outboard_size) + { if outboard_size > 0 { let outboard: Bytes = self.outboard.to_vec().into(); ( @@ -365,7 +364,7 @@ impl PartialMemStorage { (MemOrFile::empty(), OutboardLocation::NotNeeded) } } else { - let outboard_path = ctx.options.path.outboard_path(hash); + let outboard_path = ctx.global.options.path.outboard_path(hash); let mut outboard_file = create_read_write(&outboard_path)?; self.outboard.persist(&mut outboard_file)?; let outboard_location = if outboard_size == 0 { @@ -401,21 +400,20 @@ impl BaoFileStorage { self, batch: &[BaoContentItem], bitfield: &Bitfield, - ctx: &TaskContext, - hash: &Hash, + ctx: &HashContext, ) -> io::Result<(Self, Option>)> { Ok(match self { BaoFileStorage::PartialMem(mut ms) => { // check if we need to switch to file mode, otherwise write to memory - if max_offset(batch) <= ctx.options.inline.max_data_inlined { + if max_offset(batch) <= ctx.global.options.inline.max_data_inlined { ms.write_batch(bitfield.size(), batch)?; let changes = ms.bitfield.update(bitfield); let new = changes.new_state(); if new.complete { - let (cs, update) = ms.into_complete(hash, ctx)?; + let (cs, update) = ms.into_complete(ctx)?; (cs.into(), Some(update)) } else { - let fs = ms.persist(ctx, hash)?; + let fs = ms.persist(ctx)?; let update = EntryState::Partial { size: new.validated_size, }; @@ -428,13 +426,13 @@ impl BaoFileStorage { // a write at the end of a very large file. // // opt: we should check if we become complete to avoid going from mem to partial to complete - let mut fs = ms.persist(ctx, hash)?; + let mut fs = ms.persist(ctx)?; fs.write_batch(bitfield.size(), batch)?; let changes = fs.bitfield.update(bitfield); let new = changes.new_state(); if new.complete { let size = new.validated_size.unwrap(); - let (cs, update) = fs.into_complete(size, &ctx.options)?; + let (cs, update) = fs.into_complete(size, &ctx.global.options)?; (cs.into(), Some(update)) } else { let update = EntryState::Partial { @@ -450,7 +448,7 @@ impl BaoFileStorage { let new = changes.new_state(); if new.complete { let size = new.validated_size.unwrap(); - let (cs, update) = fs.into_complete(size, &ctx.options)?; + let (cs, update) = fs.into_complete(size, &ctx.global.options)?; (cs.into(), Some(update)) } else if changes.was_validated() { // we are still partial, but now we know the size @@ -503,48 +501,26 @@ impl BaoFileStorage { } } -/// The inner part of a bao file handle. -pub struct BaoFileHandleInner { - pub(crate) storage: watch::Sender, - hash: Hash, - options: Arc, -} - -impl fmt::Debug for BaoFileHandleInner { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let guard = self.storage.borrow(); - let storage = guard.deref(); - f.debug_struct("BaoFileHandleInner") - .field("hash", &DD(self.hash)) - .field("storage", &storage) - .finish_non_exhaustive() - } -} - /// A cheaply cloneable handle to a bao file, including the hash and the configuration. #[derive(Debug, Clone, derive_more::Deref)] -pub struct BaoFileHandle(Arc); +pub(crate) struct BaoFileHandle(Arc>); impl BaoFileHandle { - pub fn persist(&mut self) { - self.0.storage.send_if_modified(|guard| { + pub(super) fn persist(&mut self, ctx: &HashContext) { + self.send_if_modified(|guard| { + let hash = &ctx.id; if Arc::strong_count(&self.0) > 1 { return false; } let BaoFileStorage::Partial(fs) = guard.take() else { return false; }; - let options = &self.options; - let path = options.path.bitfield_path(&self.hash); - trace!( - "writing bitfield for hash {} to {}", - self.hash, - path.display() - ); + let path = ctx.global.options.path.bitfield_path(hash); + trace!("writing bitfield for hash {} to {}", hash, path.display()); if let Err(cause) = fs.sync_all(&path) { error!( "failed to write bitfield for {} at {}: {:?}", - self.hash, + hash, path.display(), cause ); @@ -554,19 +530,13 @@ impl BaoFileHandle { } } -impl Drop for BaoFileHandle { - fn drop(&mut self) { - self.persist(); - } -} - /// A reader for a bao file, reading just the data. #[derive(Debug)] pub struct DataReader(BaoFileHandle); impl ReadBytesAt for DataReader { fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result { - let guard = self.0.storage.borrow(); + let guard = self.0.borrow(); match guard.deref() { BaoFileStorage::PartialMem(x) => x.data.read_bytes_at(offset, size), BaoFileStorage::Partial(x) => x.data.read_bytes_at(offset, size), @@ -582,7 +552,7 @@ pub struct OutboardReader(BaoFileHandle); impl ReadAt for OutboardReader { fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result { - let guard = self.0.storage.borrow(); + let guard = self.0.borrow(); match guard.deref() { BaoFileStorage::Complete(x) => x.outboard.read_at(offset, buf), BaoFileStorage::PartialMem(x) => x.outboard.read_at(offset, buf), @@ -601,48 +571,35 @@ impl BaoFileHandle { /// Create a new bao file handle. /// /// This will create a new file handle with an empty memory storage. - pub fn new_partial_mem(hash: Hash, options: Arc) -> Self { + pub fn new_partial_mem() -> Self { let storage = BaoFileStorage::partial_mem(); - Self(Arc::new(BaoFileHandleInner { - storage: watch::Sender::new(storage), - hash, - options: options.clone(), - })) + Self(Arc::new(watch::Sender::new(storage))) } /// Create a new bao file handle with a partial file. - pub(super) async fn new_partial_file(hash: Hash, ctx: &TaskContext) -> io::Result { - let options = ctx.options.clone(); - let storage = PartialFileStorage::load(&hash, &options.path)?; + pub(super) async fn new_partial_file(ctx: &HashContext) -> io::Result { + let hash = &ctx.id; + let options = ctx.global.options.clone(); + let storage = PartialFileStorage::load(hash, &options.path)?; let storage = if storage.bitfield.is_complete() { let size = storage.bitfield.size; let (storage, entry_state) = storage.into_complete(size, &options)?; debug!("File was reconstructed as complete"); - ctx.db.set(hash, entry_state).await?; + ctx.global.db.set(*hash, entry_state).await?; storage.into() } else { storage.into() }; - Ok(Self(Arc::new(BaoFileHandleInner { - storage: watch::Sender::new(storage), - hash, - options, - }))) + Ok(Self(Arc::new(watch::Sender::new(storage)))) } /// Create a new complete bao file handle. pub fn new_complete( - hash: Hash, data: MemOrFile>, outboard: MemOrFile, - options: Arc, ) -> Self { let storage = CompleteStorage { data, outboard }.into(); - Self(Arc::new(BaoFileHandleInner { - storage: watch::Sender::new(storage), - hash, - options, - })) + Self(Arc::new(watch::Sender::new(storage))) } /// Complete the handle @@ -651,7 +608,7 @@ impl BaoFileHandle { data: MemOrFile>, outboard: MemOrFile, ) { - self.storage.send_if_modified(|guard| { + self.send_if_modified(|guard| { let res = match guard { BaoFileStorage::Complete(_) => None, BaoFileStorage::PartialMem(entry) => Some(&mut entry.bitfield), @@ -669,13 +626,13 @@ impl BaoFileHandle { } pub fn subscribe(&self) -> BaoFileStorageSubscriber { - BaoFileStorageSubscriber::new(self.0.storage.subscribe()) + BaoFileStorageSubscriber::new(self.0.subscribe()) } /// True if the file is complete. #[allow(dead_code)] pub fn is_complete(&self) -> bool { - matches!(self.storage.borrow().deref(), BaoFileStorage::Complete(_)) + matches!(self.borrow().deref(), BaoFileStorage::Complete(_)) } /// An AsyncSliceReader for the data file. @@ -696,7 +653,7 @@ impl BaoFileHandle { /// The most precise known total size of the data file. pub fn current_size(&self) -> io::Result { - match self.storage.borrow().deref() { + match self.borrow().deref() { BaoFileStorage::Complete(mem) => Ok(mem.size()), BaoFileStorage::PartialMem(mem) => Ok(mem.current_size()), BaoFileStorage::Partial(file) => file.current_size(), @@ -706,7 +663,7 @@ impl BaoFileHandle { /// The most precise known total size of the data file. pub fn bitfield(&self) -> io::Result { - match self.storage.borrow().deref() { + match self.borrow().deref() { BaoFileStorage::Complete(mem) => Ok(mem.bitfield()), BaoFileStorage::PartialMem(mem) => Ok(mem.bitfield().clone()), BaoFileStorage::Partial(file) => Ok(file.bitfield().clone()), @@ -715,34 +672,27 @@ impl BaoFileHandle { } /// The outboard for the file. - pub fn outboard(&self) -> io::Result> { - let root = self.hash.into(); + pub fn outboard(&self, hash: &Hash) -> io::Result> { let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE); let outboard = self.outboard_reader(); Ok(PreOrderOutboard { - root, + root: blake3::Hash::from(*hash), tree, data: outboard, }) } - /// The hash of the file. - pub fn hash(&self) -> Hash { - self.hash - } - /// Write a batch and notify the db pub(super) async fn write_batch( &self, batch: &[BaoContentItem], bitfield: &Bitfield, - ctx: &TaskContext, + ctx: &HashContext, ) -> io::Result<()> { trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len()); let mut res = Ok(None); - self.storage.send_if_modified(|state| { - let Ok((state1, update)) = state.take().write_batch(batch, bitfield, ctx, &self.hash) - else { + self.send_if_modified(|state| { + let Ok((state1, update)) = state.take().write_batch(batch, bitfield, ctx) else { res = Err(io::Error::other("write batch failed")); return false; }; @@ -751,7 +701,7 @@ impl BaoFileHandle { true }); if let Some(update) = res? { - ctx.db.update(self.hash, update).await?; + ctx.global.db.update(ctx.id, update).await?; } Ok(()) } @@ -759,9 +709,10 @@ impl BaoFileHandle { impl PartialMemStorage { /// Persist the batch to disk. - fn persist(self, ctx: &TaskContext, hash: &Hash) -> io::Result { - let options = &ctx.options.path; - ctx.protect.protect( + fn persist(self, ctx: &HashContext) -> io::Result { + let options = &ctx.global.options.path; + let hash = &ctx.id; + ctx.global.protect.protect( *hash, [ BaoFilePart::Data,