From a7170e5b5b56ea5227ca85bfbc7acaf55cfebace Mon Sep 17 00:00:00 2001 From: Erin Swenson-Healey Date: Mon, 4 Nov 2019 09:19:23 -0800 Subject: [PATCH] expose piece-writing function which expects caller to bin pack (#935) * feat(pieces): add write_and_preprocess to API - comment the behavior of the add_piece function - add write_and_preprocess function, which will ultimately replace add_piece * fix(typo): fix typo in comment * feat(commp): generate comm_p while writing and preprocessing --- filecoin-proofs/Cargo.toml | 2 + filecoin-proofs/src/api/mod.rs | 163 ++++++++++++++++++++++++++++----- filecoin-proofs/src/pieces.rs | 5 +- 3 files changed, 144 insertions(+), 26 deletions(-) diff --git a/filecoin-proofs/Cargo.toml b/filecoin-proofs/Cargo.toml index 7d633d9e3..eea6cd17e 100644 --- a/filecoin-proofs/Cargo.toml +++ b/filecoin-proofs/Cargo.toml @@ -41,6 +41,8 @@ tar = "0.4.26" rayon = "1.1.0" blake2s_simd = "0.5.8" hex = "0.4.0" +tee = "0.1.0" +os_pipe = "0.9.1" [dependencies.reqwest] version = "0.9" diff --git a/filecoin-proofs/src/api/mod.rs b/filecoin-proofs/src/api/mod.rs index 29159630e..f6e0032d5 100644 --- a/filecoin-proofs/src/api/mod.rs +++ b/filecoin-proofs/src/api/mod.rs @@ -4,7 +4,6 @@ use std::path::{Path, PathBuf}; use storage_proofs::drgraph::DefaultTreeHasher; use storage_proofs::hasher::Hasher; -use storage_proofs::pieces::generate_piece_commitment_bytes_from_source; use storage_proofs::porep::PoRep; use storage_proofs::sector::SectorId; use storage_proofs::stacked::{generate_replica_id, StackedDrg}; @@ -30,6 +29,8 @@ pub(crate) mod util; pub use self::post::*; pub use self::seal::*; +use std::io; +use storage_proofs::pieces::generate_piece_commitment_bytes_from_source; /// Unseals the sector at `sealed_path` and returns the bytes for a piece /// whose first (unpadded) byte begins at `offset` and ends at `offset` plus @@ -75,46 +76,148 @@ pub fn get_unsealed_range + AsRef>( Ok(UnpaddedBytesAmount(written as u64)) } -// Takes a piece and the size of the piece and returns the comm_p. +// Generates a piece commitment for the provided byte source. Returns an error +// if the byte source produced more than `piece_size` bytes. pub fn generate_piece_commitment( - unpadded_piece_file: T, - unpadded_piece_size: UnpaddedBytesAmount, + source: T, + piece_size: UnpaddedBytesAmount, ) -> error::Result { - let mut padded_piece_file = tempfile()?; - let unpadded_piece_size_with_alignment = add_piece( - unpadded_piece_file, - &mut padded_piece_file, - unpadded_piece_size, - &[], - )?; + ensure_piece_size(piece_size)?; + + let mut temp_piece_file = tempfile()?; + + // send the source through the preprocessor, writing output to temp file + let n = write_padded(source, &temp_piece_file) + .map_err(|err| format_err!("failed to write and preprocess bytes: {:?}", err))?; + + let n = UnpaddedBytesAmount(n as u64); + + if n != piece_size { + return Err(format_err!( + "wrote more bytes ({:?}) than expected ({:?}) when preprocessing", + n, + piece_size + )); + } - let _ = padded_piece_file.seek(SeekFrom::Start(0))?; + temp_piece_file.seek(SeekFrom::Start(0))?; let commitment = - generate_piece_commitment_bytes_from_source::(&mut padded_piece_file)?; + generate_piece_commitment_bytes_from_source::(&mut temp_piece_file)?; Ok(PieceInfo { commitment, - size: unpadded_piece_size_with_alignment, + size: piece_size, }) } -/// Write a piece. Returns the unpadded, but aligned size of the piece. +/// Computes a NUL-byte prefix and/or suffix for `source` using the provided +/// `piece_lengths` and `piece_size` (such that the `source`, after +/// preprocessing, will occupy a subtree of a merkle tree built using the bytes +/// from `target`), runs the resultant byte stream through the preprocessor, +/// and writes the result to `target`. Returns a tuple containing the number of +/// bytes written to `target` (`source` plus alignment) and the commitment. +/// +/// WARNING: Depending on the ordering and size of the pieces in +/// `piece_lengths`, this function could write a prefix of NUL bytes which +/// wastes ($SIZESECTORSIZE/2)-$MINIMUM_PIECE_SIZE space. This function will be +/// deprecated in favor of `write_and_preprocess`, and miners will be prevented +/// from sealing sectors containing more than $TOOMUCH alignment bytes. pub fn add_piece( source: R, target: W, piece_size: UnpaddedBytesAmount, piece_lengths: &[UnpaddedBytesAmount], -) -> error::Result +) -> error::Result<(UnpaddedBytesAmount, Commitment)> where R: Read, W: Read + Write + Seek, { + ensure_piece_size(piece_size)?; + + let (aligned_source_size, alignment, aligned_source) = + get_aligned_source(source, &piece_lengths, piece_size); + + // allows us to tee the source byte stream + let (mut pipe_r, pipe_w) = + os_pipe::pipe().map_err(|err| format_err!("failed to create pipe: {:?}", err))?; + + // all bytes read from the TeeReader are written to its writer, no bytes + // will be read from the TeeReader before they are written to its writer + let tee_r = tee::TeeReader::new(aligned_source, pipe_w); + + // reads from tee_r block until the tee's source bytes can be written to its + // writer, so to prevent write_padded from blocking indefinitely, we need + // to spin up a separate thread (to read from the pipe which receives writes + // from the TeeReader) + let t_handle = std::thread::spawn(move || { + // discard n left-alignment bytes + let n = alignment.left_bytes.into(); + io::copy(&mut pipe_r.by_ref().take(n), &mut io::sink()) + .map_err(|err| format_err!("failed to skip alignment bytes: {:?}", err))?; + + // generate commitment for piece bytes + let result = + generate_piece_commitment(&mut pipe_r.by_ref().take(piece_size.into()), piece_size); + + // drain the remaining bytes (all alignment) from the reader + std::io::copy(&mut pipe_r.by_ref(), &mut io::sink()) + .map_err(|err| format_err!("failed to drain reader: {:?}", err)) + .and_then(|_| result) + }); + + // send the source through the preprocessor, writing output to target + let write_rslt = write_padded(tee_r, target) + .map_err(|err| format_err!("failed to write and preprocess bytes: {:?}", err)); + + // block until piece commitment-generating thread returns + let join_rslt = t_handle + .join() + .map_err(|err| format_err!("join piece commitment-generating thread failed: {:?}", err)); + + match (write_rslt, join_rslt) { + (Ok(n), Ok(Ok(r))) => { + let n = UnpaddedBytesAmount(n as u64); + + ensure!( + aligned_source_size == n, + "expected to write {:?} source bytes, but actually wrote {:?}", + aligned_source_size, + n + ); + + Ok((n, r.commitment)) + } + (Ok(n), Ok(Err(err))) => { + let e = format_err!( + "wrote {:?} to target but then failed to generate piece commitment: {:?}", + n, + err + ); + Err(e) + } + (Ok(n), Err(err)) => { + let e = format_err!( + "wrote {:?} to target but then failed to generate piece commitment: {:?}", + n, + err + ); + Err(e) + } + (Err(err), _) => { + let e = format_err!("failed to write and preprocess: {:?}", err); + Err(e) + } + } +} + +fn ensure_piece_size(piece_size: UnpaddedBytesAmount) -> error::Result<()> { ensure!( piece_size >= UnpaddedBytesAmount(MINIMUM_PIECE_SIZE), "Piece must be at least {} bytes", MINIMUM_PIECE_SIZE ); + let padded_piece_size: PaddedBytesAmount = piece_size.into(); ensure!( u64::from(padded_piece_size).is_power_of_two(), @@ -122,15 +225,27 @@ where padded_piece_size, ); - let (bytes_with_alignment, aligned_source) = - get_aligned_source(source, &piece_lengths, piece_size); - let written = write_padded(aligned_source, target)?; - ensure!( - u64::from(bytes_with_alignment) == written as u64, - "Invalid write" - ); + Ok(()) +} - Ok(bytes_with_alignment) +/// Writes bytes from `source` to `target`, adding bit-padding ("preprocessing") +/// as needed. Returns a tuple containing the number of bytes written to +/// `target` and the commitment. +/// +/// WARNING: This function neither prepends nor appends alignment bytes to the +/// `target`; it is the caller's responsibility to ensure properly sized +/// and ordered writes to `target` such that `source`-bytes occupy whole +/// subtrees of the final merkle tree built over `target`. +pub fn write_and_preprocess( + source: R, + target: W, + piece_size: UnpaddedBytesAmount, +) -> error::Result<(UnpaddedBytesAmount, Commitment)> +where + R: Read, + W: Read + Write + Seek, +{ + add_piece(source, target, piece_size, Default::default()) } #[cfg(test)] diff --git a/filecoin-proofs/src/pieces.rs b/filecoin-proofs/src/pieces.rs index 3191ef619..f21612535 100644 --- a/filecoin-proofs/src/pieces.rs +++ b/filecoin-proofs/src/pieces.rs @@ -36,7 +36,7 @@ pub fn compute_comm_d(sector_size: SectorSize, piece_infos: &[PieceInfo]) -> Res "Too many pieces" ); - // make sure the pice sizes are at most a sector size large + // make sure the piece sizes are at most a sector size large let piece_size: u64 = piece_infos .iter() .map(|info| u64::from(PaddedBytesAmount::from(info.size))) @@ -267,7 +267,7 @@ pub fn get_aligned_source( source: T, pieces: &[UnpaddedBytesAmount], piece_bytes: UnpaddedBytesAmount, -) -> (UnpaddedBytesAmount, impl Read) { +) -> (UnpaddedBytesAmount, PieceAlignment, impl Read) { let written_bytes = sum_piece_bytes_with_alignment(pieces); let piece_alignment = get_piece_alignment(written_bytes, piece_bytes); let expected_num_bytes_written = @@ -275,6 +275,7 @@ pub fn get_aligned_source( ( expected_num_bytes_written, + piece_alignment.clone(), with_alignment(source, piece_alignment), ) }