@@ -38,6 +38,7 @@ use futures::stream::{self, StreamExt as _, TryStreamExt as _};
3838use human_repr:: HumanCount as _;
3939use humantime:: format_duration;
4040use md5:: { Digest as _, Md5 } ;
41+ use std:: sync:: atomic:: Ordering ;
4142use std:: {
4243 ffi:: OsStr ,
4344 fs:: File ,
@@ -299,9 +300,11 @@ async fn download_http_parallel(
299300 // Progress tracking - log every 5 seconds like the forest::progress system
300301 let bytes_downloaded = Arc :: new ( std:: sync:: atomic:: AtomicU64 :: new ( 0 ) ) ;
301302 let last_logged_bytes = Arc :: new ( std:: sync:: atomic:: AtomicU64 :: new ( 0 ) ) ;
302- let last_logged_time = Arc :: new ( parking_lot:: Mutex :: new ( Instant :: now ( ) ) ) ;
303+ // Store elapsed millis since start_time to avoid needing a Mutex<Instant>.
304+ let last_logged_millis = Arc :: new ( std:: sync:: atomic:: AtomicU64 :: new ( 0 ) ) ;
303305 let start_time = Instant :: now ( ) ;
304306 const UPDATE_FREQUENCY : Duration = Duration :: from_secs ( 5 ) ;
307+ const UPDATE_FREQUENCY_MS : u64 = UPDATE_FREQUENCY . as_millis ( ) as u64 ;
305308
306309 // Download chunks in parallel
307310 let download_tasks = ( 0 ..effective_connections) . map ( |i| {
@@ -310,7 +313,7 @@ async fn download_http_parallel(
310313 let tmp_path = tmp_dst_path. clone ( ) ;
311314 let bytes_downloaded = Arc :: clone ( & bytes_downloaded) ;
312315 let last_logged_bytes = Arc :: clone ( & last_logged_bytes) ;
313- let last_logged_time = Arc :: clone ( & last_logged_time ) ;
316+ let last_logged_millis = Arc :: clone ( & last_logged_millis ) ;
314317 let callback = callback. clone ( ) ;
315318
316319 let start = i * chunk_size;
@@ -347,65 +350,78 @@ async fn download_http_parallel(
347350
348351 // Stream bytes and update progress incrementally
349352 let mut stream = response. bytes_stream ( ) ;
350- let mut chunk_bytes_written = 0usize ;
351-
352- while let Some ( chunk_result) = stream. try_next ( ) . await ? {
353- // Write this chunk of data
354- file. write_all ( & chunk_result) . await ?;
355- chunk_bytes_written += chunk_result. len ( ) ;
356-
357- // Update global progress counter
358- let downloaded = bytes_downloaded. fetch_add (
359- chunk_result. len ( ) as u64 ,
360- std:: sync:: atomic:: Ordering :: Relaxed ,
361- ) + chunk_result. len ( ) as u64 ;
362-
363- // Log progress every 5 seconds (forest::progress format)
364- let now = Instant :: now ( ) ;
365- let mut last_logged = last_logged_time. lock ( ) ;
366- if ( now - * last_logged) > UPDATE_FREQUENCY {
367- let last_bytes =
368- last_logged_bytes. load ( std:: sync:: atomic:: Ordering :: Relaxed ) ;
369- let elapsed_secs = ( now - start_time) . as_secs_f64 ( ) ;
370- let seconds_since_last = ( now - * last_logged) . as_secs_f64 ( ) . max ( 0.1 ) ;
371- let speed = ( downloaded - last_bytes) as f64 / seconds_since_last;
372- let percent = if total_size > 0 {
373- downloaded * 100 / total_size
374- } else {
375- 0
376- } ;
377-
378- tracing:: info!(
379- target: "forest::progress" ,
380- "Loading {} / {}, {}%, {}/s, elapsed time: {}" ,
381- downloaded. human_count_bytes( ) ,
382- total_size. human_count_bytes( ) ,
383- percent,
384- speed. human_count_bytes( ) ,
385- format_duration( Duration :: from_secs( elapsed_secs as u64 ) )
386- ) ;
387-
388- * last_logged = now;
389- last_logged_bytes. store ( downloaded, std:: sync:: atomic:: Ordering :: Relaxed ) ;
390- }
391-
392- // Also call user callback if provided (for RPC state tracking)
393- call_progress_callback ( callback. as_deref ( ) , downloaded, total_size) ;
394- }
353+ let mut chunk_bytes_written = 0u64 ;
354+
355+ let result: anyhow:: Result < ( ) > = async {
356+ while let Some ( chunk_result) = stream. try_next ( ) . await ? {
357+ file. write_all ( & chunk_result) . await ?;
358+ chunk_bytes_written += chunk_result. len ( ) as u64 ;
359+
360+ let downloaded = bytes_downloaded
361+ . fetch_add ( chunk_result. len ( ) as u64 , Ordering :: Relaxed )
362+ + chunk_result. len ( ) as u64 ;
363+
364+ // Log progress every 5 seconds (lockless fast path)
365+ let elapsed_ms = start_time. elapsed ( ) . as_millis ( ) as u64 ;
366+ let prev_ms = last_logged_millis. load ( Ordering :: Relaxed ) ;
367+ if elapsed_ms. saturating_sub ( prev_ms) >= UPDATE_FREQUENCY_MS
368+ && last_logged_millis
369+ // Spurious failure is fine — another task logs instead.
370+ . compare_exchange_weak (
371+ prev_ms,
372+ elapsed_ms,
373+ Ordering :: Relaxed ,
374+ Ordering :: Relaxed ,
375+ )
376+ . is_ok ( )
377+ {
378+ let last_bytes = last_logged_bytes. load ( Ordering :: Relaxed ) ;
379+ let elapsed_secs = elapsed_ms as f64 / 1000.0 ;
380+ let seconds_since_last = ( elapsed_ms - prev_ms) as f64 / 1000.0 ;
381+ let speed = downloaded. saturating_sub ( last_bytes) as f64
382+ / seconds_since_last. max ( 0.1 ) ;
383+ let percent = if total_size > 0 {
384+ downloaded * 100 / total_size
385+ } else {
386+ 0
387+ } ;
388+
389+ tracing:: info!(
390+ target: "forest::progress" ,
391+ "Loading {} / {}, {}%, {}/s, elapsed time: {}" ,
392+ downloaded. human_count_bytes( ) ,
393+ total_size. human_count_bytes( ) ,
394+ percent,
395+ speed. human_count_bytes( ) ,
396+ format_duration( Duration :: from_secs(
397+ elapsed_secs as u64
398+ ) )
399+ ) ;
400+
401+ last_logged_bytes. store ( downloaded, Ordering :: Relaxed ) ;
402+ }
395403
396- file. flush ( ) . await ?;
404+ call_progress_callback ( callback. as_deref ( ) , downloaded, total_size) ;
405+ }
397406
398- // Verify we got the expected amount of data
399- if chunk_bytes_written != expected_size {
400- anyhow:: bail!(
401- "Chunk {} size mismatch: expected {} bytes, got {}" ,
402- i,
403- expected_size,
404- chunk_bytes_written
407+ file. flush ( ) . await ?;
408+ ensure ! (
409+ chunk_bytes_written == expected_size as u64 ,
410+ "Chunk {i} size mismatch: expected {expected_size} \
411+ bytes, got {chunk_bytes_written}"
405412 ) ;
413+ Ok ( ( ) )
406414 }
415+ . await ;
407416
408- Ok :: < _ , anyhow:: Error > ( ( ) )
417+ // On failure, undo progress so retries don't push past 100%.
418+ result. inspect_err ( |e| {
419+ tracing:: warn!(
420+ "Chunk {i} download failed after {}: {e:#}" ,
421+ chunk_bytes_written. human_count_bytes( ) ,
422+ ) ;
423+ bytes_downloaded. fetch_sub ( chunk_bytes_written, Ordering :: Relaxed ) ;
424+ } )
409425 } ;
410426
411427 download_chunk
0 commit comments