Skip to content

refactor: simplify state of fs store entry #102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: entity-manager
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 22 additions & 32 deletions src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,16 @@ impl entity_manager::Params for EmParams {
type EntityState = Slot;

async fn on_shutdown(
_state: entity_manager::ActiveEntityState<Self>,
_cause: entity_manager::ShutdownCause,
state: entity_manager::ActiveEntityState<Self>,
cause: entity_manager::ShutdownCause,
) {
// this isn't strictly necessary. Drop will run anyway as soon as the
// state is reset to it's default value. Doing it here means that we
// have exact control over where it happens.
if let Some(mut handle) = state.state.0.lock().await.take() {
trace!("shutting down hash: {}, cause: {cause:?}", state.id);
handle.persist(&state);
}
}
}

Expand Down Expand Up @@ -291,7 +298,7 @@ impl HashContext {
.get_or_create(|| async {
let res = self.db().get(hash).await.map_err(io::Error::other)?;
let res = match res {
Some(state) => open_bao_file(&hash, state, &self.global).await,
Some(state) => open_bao_file(state, self).await,
None => Err(io::Error::new(io::ErrorKind::NotFound, "hash not found")),
};
Ok((res?, ()))
Expand All @@ -311,11 +318,8 @@ impl HashContext {
.get_or_create(|| async {
let res = self.db().get(hash).await.map_err(io::Error::other)?;
let res = match res {
Some(state) => open_bao_file(&hash, state, &self.global).await,
None => Ok(BaoFileHandle::new_partial_mem(
hash,
self.global.options.clone(),
)),
Some(state) => open_bao_file(state, self).await,
None => Ok(BaoFileHandle::new_partial_mem()),
};
Ok((res?, ()))
})
Expand All @@ -327,12 +331,9 @@ impl HashContext {
}
}

async fn open_bao_file(
hash: &Hash,
state: EntryState<Bytes>,
ctx: &TaskContext,
) -> io::Result<BaoFileHandle> {
let options = &ctx.options;
async fn open_bao_file(state: EntryState<Bytes>, ctx: &HashContext) -> io::Result<BaoFileHandle> {
let hash = &ctx.id;
let options = &ctx.global.options;
Ok(match state {
EntryState::Complete {
data_location,
Expand Down Expand Up @@ -362,9 +363,9 @@ async fn open_bao_file(
MemOrFile::File(file)
}
};
BaoFileHandle::new_complete(*hash, data, outboard, options.clone())
BaoFileHandle::new_complete(data, outboard)
}
EntryState::Partial { .. } => BaoFileHandle::new_partial_file(*hash, ctx).await?,
EntryState::Partial { .. } => BaoFileHandle::new_partial_file(ctx).await?,
})
}

Expand Down Expand Up @@ -618,12 +619,7 @@ impl Actor {
options: options.clone(),
db: meta::Db::new(db_send),
internal_cmd_tx: fs_commands_tx,
empty: BaoFileHandle::new_complete(
Hash::EMPTY,
MemOrFile::empty(),
MemOrFile::empty(),
options,
),
empty: BaoFileHandle::new_complete(MemOrFile::empty(), MemOrFile::empty()),
protect,
});
rt.spawn(db_actor.run());
Expand Down Expand Up @@ -925,18 +921,14 @@ async fn import_bao_impl(
handle: BaoFileHandle,
ctx: HashContext,
) -> api::Result<()> {
trace!(
"importing bao: {} {} bytes",
handle.hash().fmt_short(),
size
);
trace!("importing bao: {} {} bytes", ctx.id.fmt_short(), size);
let mut batch = Vec::<BaoContentItem>::new();
let mut ranges = ChunkRanges::empty();
while let Some(item) = rx.recv().await? {
// if the batch is not empty, the last item is a leaf and the current item is a parent, write the batch
if !batch.is_empty() && batch[batch.len() - 1].is_leaf() && item.is_parent() {
let bitfield = Bitfield::new_unchecked(ranges, size.into());
handle.write_batch(&batch, &bitfield, &ctx.global).await?;
handle.write_batch(&batch, &bitfield, &ctx).await?;
batch.clear();
ranges = ChunkRanges::empty();
}
Expand All @@ -952,7 +944,7 @@ async fn import_bao_impl(
}
if !batch.is_empty() {
let bitfield = Bitfield::new_unchecked(ranges, size.into());
handle.write_batch(&batch, &bitfield, &ctx.global).await?;
handle.write_batch(&batch, &bitfield, &ctx).await?;
}
Ok(())
}
Expand Down Expand Up @@ -992,7 +984,6 @@ async fn export_ranges_impl(
"exporting ranges: {hash} {ranges:?} size={}",
handle.current_size()?
);
debug_assert!(handle.hash() == hash, "hash mismatch");
let bitfield = handle.bitfield()?;
let data = handle.data_reader();
let size = bitfield.size();
Expand Down Expand Up @@ -1053,8 +1044,7 @@ async fn export_bao_impl(
handle: BaoFileHandle,
) -> anyhow::Result<()> {
let ExportBaoRequest { ranges, hash, .. } = cmd;
debug_assert!(handle.hash() == hash, "hash mismatch");
let outboard = handle.outboard()?;
let outboard = handle.outboard(&hash)?;
let size = outboard.tree.size();
if size == 0 && hash != Hash::EMPTY {
// we have no data whatsoever, so we stop here
Expand Down
Loading
Loading