Skip to content

Commit

Permalink
expose piece-writing function which expects caller to bin pack (#935)
Browse files Browse the repository at this point in the history
* feat(pieces): add write_and_preprocess to API

- comment the behavior of the add_piece function
- add write_and_preprocess function, which will ultimately replace add_piece

* fix(typo): fix typo in comment

* feat(commp): generate comm_p while writing and preprocessing
  • Loading branch information
laser authored Nov 4, 2019
1 parent 6629732 commit a7170e5
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 26 deletions.
2 changes: 2 additions & 0 deletions filecoin-proofs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ tar = "0.4.26"
rayon = "1.1.0"
blake2s_simd = "0.5.8"
hex = "0.4.0"
tee = "0.1.0"
os_pipe = "0.9.1"

[dependencies.reqwest]
version = "0.9"
Expand Down
163 changes: 139 additions & 24 deletions filecoin-proofs/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::path::{Path, PathBuf};

use storage_proofs::drgraph::DefaultTreeHasher;
use storage_proofs::hasher::Hasher;
use storage_proofs::pieces::generate_piece_commitment_bytes_from_source;
use storage_proofs::porep::PoRep;
use storage_proofs::sector::SectorId;
use storage_proofs::stacked::{generate_replica_id, StackedDrg};
Expand All @@ -30,6 +29,8 @@ pub(crate) mod util;

pub use self::post::*;
pub use self::seal::*;
use std::io;
use storage_proofs::pieces::generate_piece_commitment_bytes_from_source;

/// Unseals the sector at `sealed_path` and returns the bytes for a piece
/// whose first (unpadded) byte begins at `offset` and ends at `offset` plus
Expand Down Expand Up @@ -75,62 +76,176 @@ pub fn get_unsealed_range<T: Into<PathBuf> + AsRef<Path>>(
Ok(UnpaddedBytesAmount(written as u64))
}

// Takes a piece and the size of the piece and returns the comm_p.
// Generates a piece commitment for the provided byte source. Returns an error
// if the byte source produced more than `piece_size` bytes.
pub fn generate_piece_commitment<T: std::io::Read>(
unpadded_piece_file: T,
unpadded_piece_size: UnpaddedBytesAmount,
source: T,
piece_size: UnpaddedBytesAmount,
) -> error::Result<PieceInfo> {
let mut padded_piece_file = tempfile()?;
let unpadded_piece_size_with_alignment = add_piece(
unpadded_piece_file,
&mut padded_piece_file,
unpadded_piece_size,
&[],
)?;
ensure_piece_size(piece_size)?;

let mut temp_piece_file = tempfile()?;

// send the source through the preprocessor, writing output to temp file
let n = write_padded(source, &temp_piece_file)
.map_err(|err| format_err!("failed to write and preprocess bytes: {:?}", err))?;

let n = UnpaddedBytesAmount(n as u64);

if n != piece_size {
return Err(format_err!(
"wrote more bytes ({:?}) than expected ({:?}) when preprocessing",
n,
piece_size
));
}

let _ = padded_piece_file.seek(SeekFrom::Start(0))?;
temp_piece_file.seek(SeekFrom::Start(0))?;

let commitment =
generate_piece_commitment_bytes_from_source::<DefaultPieceHasher>(&mut padded_piece_file)?;
generate_piece_commitment_bytes_from_source::<DefaultPieceHasher>(&mut temp_piece_file)?;

Ok(PieceInfo {
commitment,
size: unpadded_piece_size_with_alignment,
size: piece_size,
})
}

/// Write a piece. Returns the unpadded, but aligned size of the piece.
/// Computes a NUL-byte prefix and/or suffix for `source` using the provided
/// `piece_lengths` and `piece_size` (such that the `source`, after
/// preprocessing, will occupy a subtree of a merkle tree built using the bytes
/// from `target`), runs the resultant byte stream through the preprocessor,
/// and writes the result to `target`. Returns a tuple containing the number of
/// bytes written to `target` (`source` plus alignment) and the commitment.
///
/// WARNING: Depending on the ordering and size of the pieces in
/// `piece_lengths`, this function could write a prefix of NUL bytes which
/// wastes ($SIZESECTORSIZE/2)-$MINIMUM_PIECE_SIZE space. This function will be
/// deprecated in favor of `write_and_preprocess`, and miners will be prevented
/// from sealing sectors containing more than $TOOMUCH alignment bytes.
pub fn add_piece<R, W>(
source: R,
target: W,
piece_size: UnpaddedBytesAmount,
piece_lengths: &[UnpaddedBytesAmount],
) -> error::Result<UnpaddedBytesAmount>
) -> error::Result<(UnpaddedBytesAmount, Commitment)>
where
R: Read,
W: Read + Write + Seek,
{
ensure_piece_size(piece_size)?;

let (aligned_source_size, alignment, aligned_source) =
get_aligned_source(source, &piece_lengths, piece_size);

// allows us to tee the source byte stream
let (mut pipe_r, pipe_w) =
os_pipe::pipe().map_err(|err| format_err!("failed to create pipe: {:?}", err))?;

// all bytes read from the TeeReader are written to its writer, no bytes
// will be read from the TeeReader before they are written to its writer
let tee_r = tee::TeeReader::new(aligned_source, pipe_w);

// reads from tee_r block until the tee's source bytes can be written to its
// writer, so to prevent write_padded from blocking indefinitely, we need
// to spin up a separate thread (to read from the pipe which receives writes
// from the TeeReader)
let t_handle = std::thread::spawn(move || {
// discard n left-alignment bytes
let n = alignment.left_bytes.into();
io::copy(&mut pipe_r.by_ref().take(n), &mut io::sink())
.map_err(|err| format_err!("failed to skip alignment bytes: {:?}", err))?;

// generate commitment for piece bytes
let result =
generate_piece_commitment(&mut pipe_r.by_ref().take(piece_size.into()), piece_size);

// drain the remaining bytes (all alignment) from the reader
std::io::copy(&mut pipe_r.by_ref(), &mut io::sink())
.map_err(|err| format_err!("failed to drain reader: {:?}", err))
.and_then(|_| result)
});

// send the source through the preprocessor, writing output to target
let write_rslt = write_padded(tee_r, target)
.map_err(|err| format_err!("failed to write and preprocess bytes: {:?}", err));

// block until piece commitment-generating thread returns
let join_rslt = t_handle
.join()
.map_err(|err| format_err!("join piece commitment-generating thread failed: {:?}", err));

match (write_rslt, join_rslt) {
(Ok(n), Ok(Ok(r))) => {
let n = UnpaddedBytesAmount(n as u64);

ensure!(
aligned_source_size == n,
"expected to write {:?} source bytes, but actually wrote {:?}",
aligned_source_size,
n
);

Ok((n, r.commitment))
}
(Ok(n), Ok(Err(err))) => {
let e = format_err!(
"wrote {:?} to target but then failed to generate piece commitment: {:?}",
n,
err
);
Err(e)
}
(Ok(n), Err(err)) => {
let e = format_err!(
"wrote {:?} to target but then failed to generate piece commitment: {:?}",
n,
err
);
Err(e)
}
(Err(err), _) => {
let e = format_err!("failed to write and preprocess: {:?}", err);
Err(e)
}
}
}

fn ensure_piece_size(piece_size: UnpaddedBytesAmount) -> error::Result<()> {
ensure!(
piece_size >= UnpaddedBytesAmount(MINIMUM_PIECE_SIZE),
"Piece must be at least {} bytes",
MINIMUM_PIECE_SIZE
);

let padded_piece_size: PaddedBytesAmount = piece_size.into();
ensure!(
u64::from(padded_piece_size).is_power_of_two(),
"Bit-padded piece size must be a power of 2 ({:?})",
padded_piece_size,
);

let (bytes_with_alignment, aligned_source) =
get_aligned_source(source, &piece_lengths, piece_size);
let written = write_padded(aligned_source, target)?;
ensure!(
u64::from(bytes_with_alignment) == written as u64,
"Invalid write"
);
Ok(())
}

Ok(bytes_with_alignment)
/// Writes bytes from `source` to `target`, adding bit-padding ("preprocessing")
/// as needed. Returns a tuple containing the number of bytes written to
/// `target` and the commitment.
///
/// WARNING: This function neither prepends nor appends alignment bytes to the
/// `target`; it is the caller's responsibility to ensure properly sized
/// and ordered writes to `target` such that `source`-bytes occupy whole
/// subtrees of the final merkle tree built over `target`.
pub fn write_and_preprocess<R, W>(
source: R,
target: W,
piece_size: UnpaddedBytesAmount,
) -> error::Result<(UnpaddedBytesAmount, Commitment)>
where
R: Read,
W: Read + Write + Seek,
{
add_piece(source, target, piece_size, Default::default())
}

#[cfg(test)]
Expand Down
5 changes: 3 additions & 2 deletions filecoin-proofs/src/pieces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub fn compute_comm_d(sector_size: SectorSize, piece_infos: &[PieceInfo]) -> Res
"Too many pieces"
);

// make sure the pice sizes are at most a sector size large
// make sure the piece sizes are at most a sector size large
let piece_size: u64 = piece_infos
.iter()
.map(|info| u64::from(PaddedBytesAmount::from(info.size)))
Expand Down Expand Up @@ -267,14 +267,15 @@ pub fn get_aligned_source<T: Read>(
source: T,
pieces: &[UnpaddedBytesAmount],
piece_bytes: UnpaddedBytesAmount,
) -> (UnpaddedBytesAmount, impl Read) {
) -> (UnpaddedBytesAmount, PieceAlignment, impl Read) {
let written_bytes = sum_piece_bytes_with_alignment(pieces);
let piece_alignment = get_piece_alignment(written_bytes, piece_bytes);
let expected_num_bytes_written =
piece_alignment.left_bytes + piece_bytes + piece_alignment.right_bytes;

(
expected_num_bytes_written,
piece_alignment.clone(),
with_alignment(source, piece_alignment),
)
}
Expand Down

0 comments on commit a7170e5

Please sign in to comment.