Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use concrete error types for ipld/* and shared #463

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 56 additions & 32 deletions fvm/src/blockstore/buffered.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright 2019-2022 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use anyhow::{anyhow, Result};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use cid::Cid;
use fvm_ipld_blockstore::{Blockstore, Buffered};
Expand Down Expand Up @@ -38,33 +37,66 @@ where
}
}

#[derive(thiserror::Error, Debug)]
pub enum Error<E> {
#[error("flush: {0}")]
Flush(#[from] FlushError),
#[error("blockstore: {0}")]
Blockstore(E),
}

impl<BS> Buffered for BufferedBlockstore<BS>
where
BS: Blockstore,
{
/// Flushes the buffered cache based on the root node.
/// This will recursively traverse the cache and write all data connected by links to this
/// root Cid.
fn flush(&self, root: &Cid) -> Result<()> {
fn flush(&self, root: &Cid) -> Result<(), Error<BS::Error>> {
let mut buffer = Vec::new();
let mut s = self.write.borrow_mut();
copy_rec(&s, *root, &mut buffer)?;

self.base.put_many_keyed(buffer)?;
self.base
.put_many_keyed(buffer)
.map_err(Error::Blockstore)?;
*s = Default::default();

Ok(())
}
}

#[derive(thiserror::Error, Debug)]
pub enum FlushError {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd reduce this to:

  • MissingBlock (can't find a block)
  • IPLD (encoding error)

(how can we even have an IO error that's not a blockstore error?)

If we're super-specific, we'll have to break this error type every time we change some detail (switch to libipld's link enumeration function, support new IPLD codecs, etc.).

#[error("io: {0}")]
Io(#[from] std::io::Error),
#[error("cid: {0}")]
Cid(#[from] cid::Error),
#[error("cbor input was not canonical (lval {0} with value < {1})")]
HeaderNotCanonical(usize, &'static str),
#[error("invalid header cbor_read_header_buf")]
HeaderInvalid,
#[error("expected cbor type byte string in input")]
UnexpectedByteString,
#[error("string in cbor input too long")]
StringTooLong,
#[error("Invalid link ({0}) in flushing buffered store")]
InvalidLink(Cid),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MissingBlock?

#[error("unhandled cbor type: {0}")]
UnhandledCborType(u8),
}

/// Given a CBOR encoded Buffer, returns a tuple of:
/// the type of the CBOR object along with extra
/// elements we expect to read. More info on this can be found in
/// Appendix C. of RFC 7049 which defines the CBOR specification.
/// This was implemented because the CBOR library we use does not expose low
/// methods like this, requiring us to deserialize the whole CBOR payload, which
/// is unnecessary and quite inefficient for our usecase here.
fn cbor_read_header_buf<B: Read>(br: &mut B, scratch: &mut [u8]) -> anyhow::Result<(u8, usize)> {
fn cbor_read_header_buf<B: Read>(
br: &mut B,
scratch: &mut [u8],
) -> Result<(u8, usize), FlushError> {
let first = br.read_u8()?;
let maj = (first & 0xe0) >> 5;
let low = first & 0x1f;
Expand All @@ -74,49 +106,41 @@ fn cbor_read_header_buf<B: Read>(br: &mut B, scratch: &mut [u8]) -> anyhow::Resu
} else if low == 24 {
let val = br.read_u8()?;
if val < 24 {
return Err(anyhow!(
"cbor input was not canonical (lval 24 with value < 24)"
));
return Err(FlushError::HeaderNotCanonical(24, "24"));
}
Ok((maj, val as usize))
} else if low == 25 {
br.read_exact(&mut scratch[..2])?;
let val = BigEndian::read_u16(&scratch[..2]);
if val <= u8::MAX as u16 {
return Err(anyhow!(
"cbor input was not canonical (lval 25 with value <= MaxUint8)"
));
return Err(FlushError::HeaderNotCanonical(25, "MaxUint8"));
}
Ok((maj, val as usize))
} else if low == 26 {
br.read_exact(&mut scratch[..4])?;
let val = BigEndian::read_u32(&scratch[..4]);
if val <= u16::MAX as u32 {
return Err(anyhow!(
"cbor input was not canonical (lval 26 with value <= MaxUint16)"
));
return Err(FlushError::HeaderNotCanonical(26, "MaxUint16"));
}
Ok((maj, val as usize))
} else if low == 27 {
br.read_exact(&mut scratch[..8])?;
let val = BigEndian::read_u64(&scratch[..8]);
if val <= u32::MAX as u64 {
return Err(anyhow!(
"cbor input was not canonical (lval 27 with value <= MaxUint32)"
));
return Err(FlushError::HeaderNotCanonical(27, "MaxUint32"));
}
Ok((maj, val as usize))
} else {
Err(anyhow!("invalid header cbor_read_header_buf"))
Err(FlushError::HeaderInvalid)
}
}

/// Given a CBOR serialized IPLD buffer, read through all of it and return all the Links.
/// This function is useful because it is quite a bit more fast than doing this recursively on a
/// deserialized IPLD object.
fn scan_for_links<B: Read + Seek, F>(buf: &mut B, mut callback: F) -> Result<()>
fn scan_for_links<B: Read + Seek, F>(buf: &mut B, mut callback: F) -> Result<(), FlushError>
where
F: FnMut(Cid) -> anyhow::Result<()>,
F: FnMut(Cid) -> Result<(), FlushError>,
{
let mut scratch: [u8; 100] = [0; 100];
let mut remaining = 1;
Expand All @@ -136,10 +160,10 @@ where
let (maj, extra) = cbor_read_header_buf(buf, &mut scratch)?;
// The actual CID is expected to be a byte string
if maj != 2 {
return Err(anyhow!("expected cbor type byte string in input"));
return Err(FlushError::UnexpectedByteString);
}
if extra > 100 {
return Err(anyhow!("string in cbor input too long"));
return Err(FlushError::StringTooLong);
}
buf.read_exact(&mut scratch[..extra])?;
let c = Cid::try_from(&scratch[1..extra])?;
Expand All @@ -157,7 +181,7 @@ where
remaining += extra * 2;
}
_ => {
return Err(anyhow!("unhandled cbor type: {}", maj));
return Err(FlushError::UnhandledCborType(maj));
}
}
remaining -= 1;
Expand All @@ -170,16 +194,14 @@ fn copy_rec<'a>(
cache: &'a HashMap<Cid, Vec<u8>>,
root: Cid,
buffer: &mut Vec<(Cid, &'a [u8])>,
) -> Result<()> {
) -> Result<(), FlushError> {
// TODO: Make this non-recursive.
// Skip identity and Filecoin commitment Cids
if root.codec() != DAG_CBOR {
return Ok(());
}

let block = &*cache
.get(&root)
.ok_or_else(|| anyhow!("Invalid link ({}) in flushing buffered store", root))?;
let block = &*cache.get(&root).ok_or(FlushError::InvalidLink(root))?;

scan_for_links(&mut Cursor::new(block), |link| {
if link.codec() != DAG_CBOR {
Expand All @@ -205,28 +227,30 @@ impl<BS> Blockstore for BufferedBlockstore<BS>
where
BS: Blockstore,
{
fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
type Error = Error<BS::Error>;

fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>, Self::Error> {
Ok(if let Some(data) = self.write.borrow().get(cid) {
Some(data.clone())
} else {
self.base.get(cid)?
self.base.get(cid).map_err(Error::Blockstore)?
})
}

fn put_keyed(&self, cid: &Cid, buf: &[u8]) -> Result<()> {
fn put_keyed(&self, cid: &Cid, buf: &[u8]) -> Result<(), Self::Error> {
self.write.borrow_mut().insert(*cid, Vec::from(buf));
Ok(())
}

fn has(&self, k: &Cid) -> Result<bool> {
fn has(&self, k: &Cid) -> Result<bool, Self::Error> {
if self.write.borrow().contains_key(k) {
Ok(true)
} else {
Ok(self.base.has(k)?)
Ok(self.base.has(k).map_err(Error::Blockstore)?)
}
}

fn put_many_keyed<D, I>(&self, blocks: I) -> Result<()>
fn put_many_keyed<D, I>(&self, blocks: I) -> Result<(), Self::Error>
where
Self: Sized,
D: AsRef<[u8]>,
Expand Down
2 changes: 1 addition & 1 deletion fvm/src/state_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ where
where
F: FnMut(Address, &ActorState) -> anyhow::Result<()>,
{
self.hamt.for_each(|k, v| {
self.hamt.try_for_each(|k, v| {
let addr = Address::from_bytes(&k.0)?;
f(addr, v)
})?;
Expand Down
1 change: 0 additions & 1 deletion ipld/amt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ thiserror = "1.0"
once_cell = "1.5"
ahash = { version = "0.7", optional = true }
itertools = "0.10"
anyhow = "1.0.51"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

fvm_ipld_blockstore = { version = "0.1", path = "../blockstore" }
fvm_ipld_encoding = { version = "0.1", path = "../encoding" }

Expand Down
4 changes: 3 additions & 1 deletion ipld/amt/benches/amt_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ fn for_each(c: &mut Criterion) {
c.bench_function("AMT for_each function", |b| {
b.iter(|| {
let a = Amt::load(&cid, &db).unwrap();
black_box(a).for_each(|_, _v: &u64| Ok(())).unwrap();
black_box(a)
.try_for_each(|_, _v: &u64| Ok::<_, ()>(()))
.unwrap();
})
});
}
Expand Down
Loading