From 1c725ee0a0c09dd0c756df6ae94daa3276d8824c Mon Sep 17 00:00:00 2001 From: draguve Date: Mon, 7 Jul 2025 23:03:58 -0700 Subject: [PATCH 1/8] Init working compression --- Cargo.lock | 62 ++++++++++++++++++++ Cargo.toml | 5 +- src/main.rs | 161 ++++++++++++++++++++++++++++++++++++++++------------ 3 files changed, 192 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 57f95a9..bb22fe9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,6 +142,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-compression" +version = "0.4.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40f6024f3f856663b45fd0c9b6f2024034a702f453549449e0d84a305900dad4" +dependencies = [ + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "zstd", + "zstd-safe", +] + [[package]] name = "async-trait" version = "0.1.86" @@ -344,6 +358,8 @@ version = "1.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c3d1b2e905a3a7b00a6141adb0e4c0bb941d11caf55349d863942a1cc44e3c9" dependencies = [ + "jobserver", + "libc", "shlex", ] @@ -2043,6 +2059,16 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" +[[package]] +name = "jobserver" +version = "0.1.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a" +dependencies = [ + "getrandom 0.3.3", + "libc", +] + [[package]] name = "js-sys" version = "0.3.77" @@ -2730,6 +2756,12 @@ dependencies = [ "spki", ] +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + [[package]] name = "pnet_base" version = "0.34.0" @@ -3521,6 +3553,7 @@ name = "sendme" version = "0.26.0" dependencies = [ "anyhow", + "async-compression", "base64", "clap", "console", @@ -3540,6 +3573,7 @@ dependencies = [ "serde_json", "tempfile", "tokio", + "tokio-util", "tracing", "tracing-subscriber", "walkdir", @@ -5273,3 +5307,31 @@ dependencies = [ "quote", "syn 2.0.98", ] + +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.15+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb81183ddd97d0c74cedf1d50d85c8d08c1b8b68ee863bdee9e706eedba1a237" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index 268ea45..e9cdb42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,8 @@ data-encoding = "2.6.0" n0-future = "0.1.2" base64 = { version = "0.22.1", optional = true } hex = "0.4.3" +async-compression = { version = "0.4.25", features = ["tokio", "zstd"], optional = true } +tokio-util = { version = "0.7.15",optional = true } [dev-dependencies] duct = "0.13.6" @@ -47,4 +49,5 @@ tempfile = "3.8.1" [features] clipboard = ["dep:base64"] -default = ["clipboard"] +zstd = ["async-compression","tokio-util"] +default = ["clipboard","zstd"] diff --git a/src/main.rs b/src/main.rs index b562069..1365ff3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ use std::{ }; use anyhow::Context; +use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder}; use clap::{ error::{ContextKind, ErrorKind}, CommandFactory, Parser, Subcommand, @@ -24,11 +25,12 @@ use iroh::{ discovery::{dns::DnsDiscovery, pkarr::PkarrPublisher}, Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey, Watcher, }; +use iroh_blobs::protocol::ChunkRanges; use iroh_blobs::{ api::{ blobs::{ - AddPathOptions, AddProgressItem, ExportMode, ExportOptions, ExportProgressItem, - ImportMode, + AddPathOptions, AddProgress, AddProgressItem, ExportMode, ExportOptions, + ExportProgressItem, ImportMode, }, remote::GetProgressItem, Store, TempTag, @@ -41,10 +43,15 @@ use iroh_blobs::{ ticket::BlobTicket, BlobFormat, Hash, }; +use iroh_blobs::api::blobs::EncodedItem; use n0_future::{task::AbortOnDropHandle, StreamExt}; use rand::Rng; use serde::{Deserialize, Serialize}; -use tokio::{select, sync::mpsc}; +use tokio::fs::File; +use tokio::io::{BufReader, BufWriter}; +use tokio::{io, select, sync::mpsc}; +use tokio_util::io::ReaderStream; +use tokio_util::io::StreamReader; use tracing::{error, trace}; use walkdir::WalkDir; @@ -142,6 +149,11 @@ pub struct CommonArgs { #[clap(long)] pub show_secret: bool, + + /// Compress the stream before sending it + #[cfg(feature = "zstd")] + #[clap(short = 'Z', long)] + pub zstd: bool, } /// Available command line options for configuring relays. @@ -355,6 +367,7 @@ async fn import( path: PathBuf, db: &Store, mp: &mut MultiProgress, + do_compress: bool, ) -> anyhow::Result<(TempTag, u64, Collection)> { let parallelism = num_cpus::get(); let path = path.canonicalize()?; @@ -391,11 +404,22 @@ async fn import( op.inc(1); let pb = mp.add(make_import_item_progress()); pb.set_message(format!("copying {name}")); - let import = db.add_path_with_opts(AddPathOptions { - path, - mode: ImportMode::TryReference, - format: BlobFormat::Raw, - }); + let import: AddProgress; + if do_compress { + pb.set_message(format!("compressing {name}")); + let file_stream = File::open(&path).await?; + let reader = BufReader::new(file_stream); + let encoder = ZstdEncoder::new(reader); + let compressed_stream = ReaderStream::new(encoder); + import = db.add_stream(compressed_stream).await; + } else { + import = db.add_path_with_opts(AddPathOptions { + path, + mode: ImportMode::TryReference, + format: BlobFormat::Raw, + }); + } + let mut stream = import.stream().await; let mut item_size = 0; let temp_tag = loop { @@ -464,7 +488,12 @@ fn get_export_path(root: &Path, name: &str) -> anyhow::Result { Ok(path) } -async fn export(db: &Store, collection: Collection, mp: &mut MultiProgress) -> anyhow::Result<()> { +async fn export( + db: &Store, + collection: Collection, + mp: &mut MultiProgress, + decompress: bool, +) -> anyhow::Result<()> { let root = std::env::current_dir()?; let op = mp.add(make_export_overall_progress()); op.set_length(collection.len() as u64); @@ -479,30 +508,79 @@ async fn export(db: &Store, collection: Collection, mp: &mut MultiProgress) -> a eprintln!("You can remove the file or directory and try again. The download will not be repeated."); anyhow::bail!("target {} already exists", target.display()); } - let mut stream = db - .export_with_opts(ExportOptions { - hash: *hash, - target, - mode: ExportMode::TryReference, - }) - .stream() - .await; - let pb = mp.add(make_export_item_progress()); - pb.set_message(format!("exporting {name}")); - while let Some(item) = stream.next().await { - match item { - ExportProgressItem::Size(size) => { - pb.set_length(size); - } - ExportProgressItem::CopyProgress(offset) => { - pb.set_position(offset); - } - ExportProgressItem::Done => { - pb.finish_and_clear(); - } - ExportProgressItem::Error(cause) => { - pb.finish_and_clear(); - anyhow::bail!("error exporting {}: {}", name, cause); + + if decompress { + + // let bao_stream = db + // .export_bao(*hash, ChunkRanges::all()) + // .into_byte_stream() + // .map(|res| res.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))); + + let mut i = 0; + + let bao_stream = db.export_bao(*hash, ChunkRanges::all()).stream() + .for_each(|res| { + i += 1; + match res { + EncodedItem::Size(size) => { + println!("Size {size}",) + } + EncodedItem::Parent(_) => { + + } + EncodedItem::Leaf(leaf) => { + if i%10 == 0{ + println!("Leaf {}",leaf.offset) + } + } + EncodedItem::Error(cause) => { + println!("Error {cause}") + } + EncodedItem::Done => { + } + } + }).await; + + let bao_stream = db.export_bao(*hash, ChunkRanges::all()).stream() + .filter_map(|res| { + + }) + + + // let mut stream_reader = StreamReader::new(bao_stream); + // let mut decoder = ZstdDecoder::new(stream_reader); + // let target_file = File::create(&target).await?; + // let mut output_writer = BufWriter::new(target_file); + // + // tokio::io::copy(&mut stream_reader, &mut output_writer).await?; + + } else { + let mut stream = db + .export_with_opts(ExportOptions { + hash: *hash, + target, + mode: ExportMode::TryReference, + }) + .stream() + .await; + + let pb = mp.add(make_export_item_progress()); + pb.set_message(format!("exporting {name}")); + while let Some(item) = stream.next().await { + match item { + ExportProgressItem::Size(size) => { + pb.set_length(size); + } + ExportProgressItem::CopyProgress(offset) => { + pb.set_position(offset); + } + ExportProgressItem::Done => { + pb.finish_and_clear(); + } + ExportProgressItem::Error(cause) => { + pb.finish_and_clear(); + anyhow::bail!("error exporting {}: {}", name, cause); + } } } } @@ -697,7 +775,13 @@ async fn send(args: SendArgs) -> anyhow::Result<()> { let store = FsStore::load(&blobs_data_dir2).await?; let blobs = Blobs::new(&store, endpoint.clone(), Some(progress_tx)); - let import_result = import(path2, blobs.store(), &mut mp).await?; + #[cfg(feature = "zstd")] + let do_compress = args.common.zstd; + + #[cfg(not(feature = "zstd"))] + let do_compress = false; + + let import_result = import(path2, blobs.store(), &mut mp, do_compress).await?; let dt = t0.elapsed(); let router = iroh::protocol::Router::builder(endpoint) @@ -1029,7 +1113,14 @@ async fn receive(args: ReceiveArgs) -> anyhow::Result<()> { println!("exporting to {first}"); } } - export(&db, collection, &mut mp).await?; + + #[cfg(feature = "zstd")] + let do_decompress = args.common.zstd; + + #[cfg(not(feature = "zstd"))] + let do_decompress = false; + + export(&db, collection, &mut mp, do_decompress).await?; anyhow::Ok((total_files, payload_size, stats)) }; let (total_files, payload_size, stats) = select! { From fa3f984fcba36ade710bee084ec4a7860edff1b6 Mon Sep 17 00:00:00 2001 From: draguve Date: Mon, 7 Jul 2025 23:30:08 -0700 Subject: [PATCH 2/8] Working decompress --- src/main.rs | 66 ++++++++++++++++++++--------------------------------- 1 file changed, 25 insertions(+), 41 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1365ff3..20e4974 100644 --- a/src/main.rs +++ b/src/main.rs @@ -510,50 +510,34 @@ async fn export( } if decompress { - - // let bao_stream = db - // .export_bao(*hash, ChunkRanges::all()) - // .into_byte_stream() - // .map(|res| res.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))); - - let mut i = 0; - - let bao_stream = db.export_bao(*hash, ChunkRanges::all()).stream() - .for_each(|res| { - i += 1; - match res { - EncodedItem::Size(size) => { - println!("Size {size}",) - } - EncodedItem::Parent(_) => { - - } - EncodedItem::Leaf(leaf) => { - if i%10 == 0{ - println!("Leaf {}",leaf.offset) - } - } - EncodedItem::Error(cause) => { - println!("Error {cause}") - } - EncodedItem::Done => { - } + let pb = mp.add(make_export_item_progress()); + pb.set_message(format!("Decompressing {name}")); + let byte_stream = db.export_bao(*hash, ChunkRanges::all()).stream() + .inspect(|res| match res { + EncodedItem::Size(size) => { + pb.set_length(*size); } - }).await; - - let bao_stream = db.export_bao(*hash, ChunkRanges::all()).stream() - .filter_map(|res| { - + EncodedItem::Leaf(leaf) => { + pb.set_position(leaf.offset); + } + EncodedItem::Done => { + pb.finish_and_clear(); + } + _ => {} }) + .filter_map(|res| { + match res { + EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)), + EncodedItem::Error(err) => Some(Err(io::Error::new(io::ErrorKind::Other, err.to_string()))), + _ => None, + } + }); - - // let mut stream_reader = StreamReader::new(bao_stream); - // let mut decoder = ZstdDecoder::new(stream_reader); - // let target_file = File::create(&target).await?; - // let mut output_writer = BufWriter::new(target_file); - // - // tokio::io::copy(&mut stream_reader, &mut output_writer).await?; - + let reader = StreamReader::new(byte_stream); + let mut decoder = ZstdDecoder::new(reader); + let target_file = File::create(&target).await?; + let mut output_writer = BufWriter::new(target_file); + io::copy(&mut decoder, &mut output_writer).await?; } else { let mut stream = db .export_with_opts(ExportOptions { From 81d0482d4489920bbcceba59082641c08ee83d9a Mon Sep 17 00:00:00 2001 From: draguve Date: Tue, 8 Jul 2025 01:13:58 -0700 Subject: [PATCH 3/8] Need to fix case when not building with zstd --- src/main.rs | 106 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 76 insertions(+), 30 deletions(-) diff --git a/src/main.rs b/src/main.rs index 20e4974..048c813 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ use std::{ }; use anyhow::Context; +#[cfg(feature = "zstd")] use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder}; use clap::{ error::{ContextKind, ErrorKind}, @@ -29,7 +30,7 @@ use iroh_blobs::protocol::ChunkRanges; use iroh_blobs::{ api::{ blobs::{ - AddPathOptions, AddProgress, AddProgressItem, ExportMode, ExportOptions, + AddPathOptions, AddProgress, AddProgressItem, EncodedItem, ExportMode, ExportOptions, ExportProgressItem, ImportMode, }, remote::GetProgressItem, @@ -43,15 +44,17 @@ use iroh_blobs::{ ticket::BlobTicket, BlobFormat, Hash, }; -use iroh_blobs::api::blobs::EncodedItem; use n0_future::{task::AbortOnDropHandle, StreamExt}; use rand::Rng; use serde::{Deserialize, Serialize}; -use tokio::fs::File; + +#[cfg(feature = "zstd")] +use tokio::fs::{create_dir_all, File}; +#[cfg(feature = "zstd")] use tokio::io::{BufReader, BufWriter}; use tokio::{io, select, sync::mpsc}; -use tokio_util::io::ReaderStream; -use tokio_util::io::StreamReader; +#[cfg(feature = "zstd")] +use tokio_util::io::{ReaderStream, StreamReader}; use tracing::{error, trace}; use walkdir::WalkDir; @@ -154,6 +157,10 @@ pub struct CommonArgs { #[cfg(feature = "zstd")] #[clap(short = 'Z', long)] pub zstd: bool, + + #[cfg(not(feature = "zstd"))] + #[clap(long, hide = true)] + pub zstd: bool, } /// Available command line options for configuring relays. @@ -367,7 +374,7 @@ async fn import( path: PathBuf, db: &Store, mp: &mut MultiProgress, - do_compress: bool, + _do_compress: bool, ) -> anyhow::Result<(TempTag, u64, Collection)> { let parallelism = num_cpus::get(); let path = path.canonicalize()?; @@ -405,7 +412,9 @@ async fn import( let pb = mp.add(make_import_item_progress()); pb.set_message(format!("copying {name}")); let import: AddProgress; - if do_compress { + + #[cfg(feature = "zstd")] + if _do_compress { pb.set_message(format!("compressing {name}")); let file_stream = File::open(&path).await?; let reader = BufReader::new(file_stream); @@ -420,6 +429,14 @@ async fn import( }); } + #[cfg(not(feature = "zstd"))] { + import = db.add_path_with_opts(AddPathOptions { + path, + mode: ImportMode::TryReference, + format: BlobFormat::Raw, + }); + } + let mut stream = import.stream().await; let mut item_size = 0; let temp_tag = loop { @@ -512,7 +529,9 @@ async fn export( if decompress { let pb = mp.add(make_export_item_progress()); pb.set_message(format!("Decompressing {name}")); - let byte_stream = db.export_bao(*hash, ChunkRanges::all()).stream() + let byte_stream = db + .export_bao(*hash, ChunkRanges::all()) + .stream() .inspect(|res| match res { EncodedItem::Size(size) => { pb.set_length(*size); @@ -525,16 +544,19 @@ async fn export( } _ => {} }) - .filter_map(|res| { - match res { - EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)), - EncodedItem::Error(err) => Some(Err(io::Error::new(io::ErrorKind::Other, err.to_string()))), - _ => None, + .filter_map(|res| match res { + EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)), + EncodedItem::Error(err) => { + Some(Err(io::Error::new(io::ErrorKind::Other, err.to_string()))) } + _ => None, }); let reader = StreamReader::new(byte_stream); let mut decoder = ZstdDecoder::new(reader); + if let Some(parent) = target.parent() { + create_dir_all(parent).await?; + } let target_file = File::create(&target).await?; let mut output_writer = BufWriter::new(target_file); io::copy(&mut decoder, &mut output_writer).await?; @@ -629,7 +651,7 @@ async fn show_provide_progress( ProgressStyle::with_template( "{msg}{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes}", )? - .progress_chars("#>-"), + .progress_chars("#>-"), ); pb.set_message(format!("{request_id} {hash}")); let Some(connection) = connections.get_mut(&connection_id) else { @@ -702,6 +724,27 @@ async fn show_provide_progress( Ok(()) } +fn zstd_enabled(zstd_requested: bool, _is_sending: bool) -> bool { + #[cfg(feature = "zstd")] + { + return zstd_requested; + } + + #[cfg(not(feature = "zstd"))] + { + if zstd_requested { + if _is_sending { + eprintln!( + "Warning: --zstd ignored (no support in this build). Sending uncompressed." + ); + } else { + eprintln!("Warning: This build does not support zstd decompression. Files will be saved with a `.zst` extension. You can manually decompress them using `unzstd `."); + } + } + return false; + } +} + async fn send(args: SendArgs) -> anyhow::Result<()> { let secret_key = get_or_create_secret(args.common.verbose > 0)?; if args.common.show_secret { @@ -734,6 +777,7 @@ async fn send(args: SendArgs) -> anyhow::Result<()> { ); std::process::exit(1); } + let do_compress = zstd_enabled(args.common.zstd, true); let mut mp = MultiProgress::new(); let mp2 = mp.clone(); @@ -759,12 +803,6 @@ async fn send(args: SendArgs) -> anyhow::Result<()> { let store = FsStore::load(&blobs_data_dir2).await?; let blobs = Blobs::new(&store, endpoint.clone(), Some(progress_tx)); - #[cfg(feature = "zstd")] - let do_compress = args.common.zstd; - - #[cfg(not(feature = "zstd"))] - let do_compress = false; - let import_result = import(path2, blobs.store(), &mut mp, do_compress).await?; let dt = t0.elapsed(); @@ -807,7 +845,11 @@ async fn send(args: SendArgs) -> anyhow::Result<()> { } println!("to get this data, use"); - println!("sendme receive {ticket}"); + println!( + "sendme receive{} {}", + if do_compress { " -Z" } else { "" }, + ticket + ); #[cfg(feature = "clipboard")] { @@ -943,8 +985,8 @@ fn make_export_item_progress() -> ProgressBar { ProgressStyle::with_template( "{msg}{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes}", ) - .unwrap() - .progress_chars("#>-"), + .unwrap() + .progress_chars("#>-"), ); pb } @@ -1008,6 +1050,9 @@ async fn receive(args: ReceiveArgs) -> anyhow::Result<()> { let iroh_data_dir = std::env::current_dir()?.join(dir_name); let db = iroh_blobs::store::fs::FsStore::load(&iroh_data_dir).await?; let db2 = db.clone(); + + let do_decompress = zstd_enabled(args.common.zstd, false); + trace!("load done!"); let fut = async move { trace!("running"); @@ -1094,16 +1139,17 @@ async fn receive(args: ReceiveArgs) -> anyhow::Result<()> { } if let Some((name, _)) = collection.iter().next() { if let Some(first) = name.split('/').next() { - println!("exporting to {first}"); + println!( + "exporting to {first}{}", + if do_decompress != args.common.zstd { + " -Z" + } else { + "" + } + ); } } - #[cfg(feature = "zstd")] - let do_decompress = args.common.zstd; - - #[cfg(not(feature = "zstd"))] - let do_decompress = false; - export(&db, collection, &mut mp, do_decompress).await?; anyhow::Ok((total_files, payload_size, stats)) }; From 44e89778cec6a38ce90477146a548a33e36b212f Mon Sep 17 00:00:00 2001 From: draguve Date: Tue, 8 Jul 2025 02:08:44 -0700 Subject: [PATCH 4/8] Fix code when not compiled with zstd --- src/main.rs | 136 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 91 insertions(+), 45 deletions(-) diff --git a/src/main.rs b/src/main.rs index 048c813..ee1a416 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,11 +26,14 @@ use iroh::{ discovery::{dns::DnsDiscovery, pkarr::PkarrPublisher}, Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey, Watcher, }; +#[cfg(feature = "zstd")] +use iroh_blobs::api::blobs::EncodedItem; +#[cfg(feature = "zstd")] use iroh_blobs::protocol::ChunkRanges; use iroh_blobs::{ api::{ blobs::{ - AddPathOptions, AddProgress, AddProgressItem, EncodedItem, ExportMode, ExportOptions, + AddPathOptions, AddProgress, AddProgressItem, ExportMode, ExportOptions, ExportProgressItem, ImportMode, }, remote::GetProgressItem, @@ -52,7 +55,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs::{create_dir_all, File}; #[cfg(feature = "zstd")] use tokio::io::{BufReader, BufWriter}; -use tokio::{io, select, sync::mpsc}; +use tokio::{select, sync::mpsc}; #[cfg(feature = "zstd")] use tokio_util::io::{ReaderStream, StreamReader}; use tracing::{error, trace}; @@ -153,13 +156,13 @@ pub struct CommonArgs { #[clap(long)] pub show_secret: bool, - /// Compress the stream before sending it + /// Use zstd to compress outgoing and decompress incoming data #[cfg(feature = "zstd")] #[clap(short = 'Z', long)] pub zstd: bool, #[cfg(not(feature = "zstd"))] - #[clap(long, hide = true)] + #[clap(short = 'Z', long, hide = true)] pub zstd: bool, } @@ -412,7 +415,7 @@ async fn import( let pb = mp.add(make_import_item_progress()); pb.set_message(format!("copying {name}")); let import: AddProgress; - + #[cfg(feature = "zstd")] if _do_compress { pb.set_message(format!("compressing {name}")); @@ -429,14 +432,15 @@ async fn import( }); } - #[cfg(not(feature = "zstd"))] { + #[cfg(not(feature = "zstd"))] + { import = db.add_path_with_opts(AddPathOptions { path, mode: ImportMode::TryReference, format: BlobFormat::Raw, }); } - + let mut stream = import.stream().await; let mut item_size = 0; let temp_tag = loop { @@ -505,11 +509,52 @@ fn get_export_path(root: &Path, name: &str) -> anyhow::Result { Ok(path) } +async fn export_single_file( + db: &Store, + mp: &MultiProgress, + hash: &Hash, + target: PathBuf, + name: &String, +) -> anyhow::Result<()> { + let mut stream = db + .export_with_opts(ExportOptions { + hash: *hash, + target: target.clone(), + mode: ExportMode::TryReference, + }) + .stream() + .await; + + let pb = mp.add(make_export_item_progress()); + pb.set_message(format!("exporting {name}")); + + while let Some(item) = stream.next().await { + match item { + ExportProgressItem::Size(size) => { + pb.set_length(size); + } + ExportProgressItem::CopyProgress(offset) => { + pb.set_position(offset); + } + ExportProgressItem::Done => { + pb.finish_and_clear(); + } + ExportProgressItem::Error(cause) => { + pb.finish_and_clear(); + anyhow::bail!("error exporting {}: {}", name, cause); + } + } + } + + Ok(()) +} + async fn export( db: &Store, collection: Collection, mp: &mut MultiProgress, - decompress: bool, + _decompress: bool, + _postfix_target: bool, ) -> anyhow::Result<()> { let root = std::env::current_dir()?; let op = mp.add(make_export_overall_progress()); @@ -517,6 +562,22 @@ async fn export( for (i, (name, hash)) in collection.iter().enumerate() { op.set_position(i as u64); let target = get_export_path(&root, name)?; + + #[cfg(not(feature = "zstd"))] + let target = if _postfix_target { + let file_name = target + .file_name() + .and_then(|n| n.to_str()) + .map(|n| format!("{}.zst", n)) + .ok_or_else(|| { + std::io::Error::new(std::io::ErrorKind::Other, "Invalid file name") + })?; + + target.with_file_name(file_name) + } else { + target + }; + if target.exists() { eprintln!( "target {} already exists. Export stopped.", @@ -526,7 +587,8 @@ async fn export( anyhow::bail!("target {} already exists", target.display()); } - if decompress { + #[cfg(feature = "zstd")] + if _decompress { let pb = mp.add(make_export_item_progress()); pb.set_message(format!("Decompressing {name}")); let byte_stream = db @@ -546,9 +608,10 @@ async fn export( }) .filter_map(|res| match res { EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)), - EncodedItem::Error(err) => { - Some(Err(io::Error::new(io::ErrorKind::Other, err.to_string()))) - } + EncodedItem::Error(err) => Some(Err(tokio::io::Error::new( + tokio::io::ErrorKind::Other, + err.to_string(), + ))), _ => None, }); @@ -559,36 +622,14 @@ async fn export( } let target_file = File::create(&target).await?; let mut output_writer = BufWriter::new(target_file); - io::copy(&mut decoder, &mut output_writer).await?; + tokio::io::copy(&mut decoder, &mut output_writer).await?; } else { - let mut stream = db - .export_with_opts(ExportOptions { - hash: *hash, - target, - mode: ExportMode::TryReference, - }) - .stream() - .await; + export_single_file(db, mp, hash, target, name).await?; + } - let pb = mp.add(make_export_item_progress()); - pb.set_message(format!("exporting {name}")); - while let Some(item) = stream.next().await { - match item { - ExportProgressItem::Size(size) => { - pb.set_length(size); - } - ExportProgressItem::CopyProgress(offset) => { - pb.set_position(offset); - } - ExportProgressItem::Done => { - pb.finish_and_clear(); - } - ExportProgressItem::Error(cause) => { - pb.finish_and_clear(); - anyhow::bail!("error exporting {}: {}", name, cause); - } - } - } + #[cfg(not(feature = "zstd"))] + { + export_single_file(db, mp, hash, target, name).await?; } } op.finish_and_clear(); @@ -726,9 +767,7 @@ async fn show_provide_progress( fn zstd_enabled(zstd_requested: bool, _is_sending: bool) -> bool { #[cfg(feature = "zstd")] - { - return zstd_requested; - } + return zstd_requested; #[cfg(not(feature = "zstd"))] { @@ -1142,7 +1181,7 @@ async fn receive(args: ReceiveArgs) -> anyhow::Result<()> { println!( "exporting to {first}{}", if do_decompress != args.common.zstd { - " -Z" + ".zst" } else { "" } @@ -1150,7 +1189,14 @@ async fn receive(args: ReceiveArgs) -> anyhow::Result<()> { } } - export(&db, collection, &mut mp, do_decompress).await?; + export( + &db, + collection, + &mut mp, + do_decompress, + do_decompress != args.common.zstd, + ) + .await?; anyhow::Ok((total_files, payload_size, stats)) }; let (total_files, payload_size, stats) = select! { From 08cf06928751f09ee4d238dcf19564715d44ccc8 Mon Sep 17 00:00:00 2001 From: draguve Date: Tue, 8 Jul 2025 02:29:04 -0700 Subject: [PATCH 5/8] Fix clipboard string --- src/main.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/main.rs b/src/main.rs index ee1a416..c7e253a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -896,7 +896,7 @@ async fn send(args: SendArgs) -> anyhow::Result<()> { // Add command to the clipboard if args.clipboard { - add_to_clipboard(&ticket); + add_to_clipboard(&ticket, do_compress); } let _keyboard = tokio::task::spawn(async move { @@ -904,7 +904,7 @@ async fn send(args: SendArgs) -> anyhow::Result<()> { println!("press c to copy command to clipboard, or use the --clipboard argument"); loop { if let Ok(Key::Char('c')) = term.read_key() { - add_to_clipboard(&ticket); + add_to_clipboard(&ticket, do_compress); } } }); @@ -926,7 +926,7 @@ async fn send(args: SendArgs) -> anyhow::Result<()> { } #[cfg(feature = "clipboard")] -fn add_to_clipboard(ticket: &BlobTicket) { +fn add_to_clipboard(ticket: &BlobTicket, add_decompress_tag: bool) { use std::io::{stdout, Write}; use base64::prelude::{Engine, BASE64_STANDARD}; @@ -934,7 +934,10 @@ fn add_to_clipboard(ticket: &BlobTicket) { // Use OSC 52 to copy content to clipboard. print!( "\x1B]52;c;{}\x07", - BASE64_STANDARD.encode(format!("sendme receive {ticket}")) + BASE64_STANDARD.encode(format!( + "sendme receive{} {ticket}", + if add_decompress_tag { " -Z" } else { "" } + )) ); stdout() From d636880c62ac08e442237152d3c2b6c22ba08ba0 Mon Sep 17 00:00:00 2001 From: draguve Date: Tue, 8 Jul 2025 14:54:33 -0700 Subject: [PATCH 6/8] compression quality --- src/main.rs | 38 +++++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/src/main.rs b/src/main.rs index c7e253a..8e37baf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,8 @@ use std::{ use anyhow::Context; #[cfg(feature = "zstd")] use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder}; +#[cfg(feature = "zstd")] +use async_compression::Level; use clap::{ error::{ContextKind, ErrorKind}, CommandFactory, Parser, Subcommand, @@ -158,11 +160,16 @@ pub struct CommonArgs { /// Use zstd to compress outgoing and decompress incoming data #[cfg(feature = "zstd")] - #[clap(short = 'Z', long)] + #[clap(short = 'z', long)] pub zstd: bool, + /// Compression level for zstd + #[cfg(feature = "zstd")] + #[clap(short = 'q', long, default_value_t = 3, requires("zstd"))] + pub compression_quality: u8, + #[cfg(not(feature = "zstd"))] - #[clap(short = 'Z', long, hide = true)] + #[clap(short = 'z', long, hide = true)] pub zstd: bool, } @@ -378,6 +385,7 @@ async fn import( db: &Store, mp: &mut MultiProgress, _do_compress: bool, + _compression_level: u8, ) -> anyhow::Result<(TempTag, u64, Collection)> { let parallelism = num_cpus::get(); let path = path.canonicalize()?; @@ -418,10 +426,13 @@ async fn import( #[cfg(feature = "zstd")] if _do_compress { - pb.set_message(format!("compressing {name}")); let file_stream = File::open(&path).await?; + pb.set_message(format!("Compressing {name}")); + pb.set_length(file_stream.metadata().await?.len()); let reader = BufReader::new(file_stream); - let encoder = ZstdEncoder::new(reader); + let encoder = + ZstdEncoder::with_quality(reader, Level::Precise(_compression_level as _)); + let compressed_stream = ReaderStream::new(encoder); import = db.add_stream(compressed_stream).await; } else { @@ -842,7 +853,20 @@ async fn send(args: SendArgs) -> anyhow::Result<()> { let store = FsStore::load(&blobs_data_dir2).await?; let blobs = Blobs::new(&store, endpoint.clone(), Some(progress_tx)); - let import_result = import(path2, blobs.store(), &mut mp, do_compress).await?; + #[cfg(feature = "zstd")] + let compression_quality = args.common.compression_quality.clamp(1, 22); + + #[cfg(not(feature = "zstd"))] + let compression_level = 0; + + let import_result = import( + path2, + blobs.store(), + &mut mp, + do_compress, + compression_quality, + ) + .await?; let dt = t0.elapsed(); let router = iroh::protocol::Router::builder(endpoint) @@ -886,7 +910,7 @@ async fn send(args: SendArgs) -> anyhow::Result<()> { println!("to get this data, use"); println!( "sendme receive{} {}", - if do_compress { " -Z" } else { "" }, + if do_compress { " -z" } else { "" }, ticket ); @@ -936,7 +960,7 @@ fn add_to_clipboard(ticket: &BlobTicket, add_decompress_tag: bool) { "\x1B]52;c;{}\x07", BASE64_STANDARD.encode(format!( "sendme receive{} {ticket}", - if add_decompress_tag { " -Z" } else { "" } + if add_decompress_tag { " -z" } else { "" } )) ); From f5a797c08947ab5ecb7e1b7ddef7bc4cd011c388 Mon Sep 17 00:00:00 2001 From: draguve Date: Tue, 8 Jul 2025 15:03:20 -0700 Subject: [PATCH 7/8] Fix lint --- src/main.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/main.rs b/src/main.rs index 8e37baf..7aac06a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -619,10 +619,7 @@ async fn export( }) .filter_map(|res| match res { EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)), - EncodedItem::Error(err) => Some(Err(tokio::io::Error::new( - tokio::io::ErrorKind::Other, - err.to_string(), - ))), + EncodedItem::Error(err) => Some(Err(tokio::io::Error::other(err.to_string()))), _ => None, }); From a127696013dca68507e67853f5e5dfcb732a40b8 Mon Sep 17 00:00:00 2001 From: draguve Date: Tue, 8 Jul 2025 15:43:37 -0700 Subject: [PATCH 8/8] Fix export print --- src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index 7aac06a..f8df3b0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -854,7 +854,7 @@ async fn send(args: SendArgs) -> anyhow::Result<()> { let compression_quality = args.common.compression_quality.clamp(1, 22); #[cfg(not(feature = "zstd"))] - let compression_level = 0; + let compression_quality = 0; let import_result = import( path2, @@ -1204,7 +1204,7 @@ async fn receive(args: ReceiveArgs) -> anyhow::Result<()> { if let Some(first) = name.split('/').next() { println!( "exporting to {first}{}", - if do_decompress != args.common.zstd { + if do_decompress != args.common.zstd && collection.len() == 1 { ".zst" } else { ""