diff --git a/ampup/Cargo.toml b/ampup/Cargo.toml index 18c319f..060c59e 100644 --- a/ampup/Cargo.toml +++ b/ampup/Cargo.toml @@ -13,8 +13,11 @@ name = "ampup" [dependencies] anyhow = "1.0.80" clap = { version = "4.5.2", features = ["derive", "env"] } +console = "0.16" +dialoguer = "0.12" fs-err = "3.0.0" futures = "0.3" +indicatif = "0.18" reqwest = { version = "0.13", default-features = false, features = [ "json", "query", @@ -26,13 +29,11 @@ serde = { version = "1.0", features = ["derive"] } tempfile = "3.13.0" tokio = { version = "1.36.0", features = [ "macros", - "test-util", - "rt-multi-thread", "parking_lot", + "rt-multi-thread", + "sync", + "test-util", ] } -indicatif = "0.18" -console = "0.16" -dialoguer = "0.12" [build-dependencies] vergen-gitcl = { version = "9.0.0", features = ["build"] } diff --git a/ampup/src/commands/install.rs b/ampup/src/commands/install.rs index 8575d1b..7d3f5c8 100644 --- a/ampup/src/commands/install.rs +++ b/ampup/src/commands/install.rs @@ -2,6 +2,7 @@ use anyhow::Result; use crate::{ config::Config, + download_manager::DownloadManager, github::GitHubClient, install::Installer, platform::{Architecture, Platform}, @@ -19,8 +20,7 @@ pub async fn run( jobs: Option, ) -> Result<()> { let config = Config::new(install_dir)?; - // Will be passed to DownloadManager for bounded-concurrent downloads - let _max_concurrent = jobs.unwrap_or(4); + let max_concurrent = jobs.unwrap_or(4); // Resolve token with fallback chain: explicit → gh auth token → unauthenticated let resolved_token = token::resolve_github_token(github_token); @@ -89,7 +89,8 @@ pub async fn run( ui::detail!("Platform: {}, Architecture: {}", platform, arch); // Install the binary - let installer = Installer::new(version_manager, github); + let download_manager = DownloadManager::new(github, max_concurrent); + let installer = Installer::new(version_manager, download_manager); installer .install_from_release(&version, platform, arch) .await?; diff --git a/ampup/src/commands/update.rs b/ampup/src/commands/update.rs index 0af3c74..f053404 100644 --- a/ampup/src/commands/update.rs +++ b/ampup/src/commands/update.rs @@ -1,12 +1,13 @@ use anyhow::{Context, Result}; use semver::Version; -use crate::{github::GitHubClient, ui, updater::Updater}; +use crate::{github::GitHubClient, token, ui, updater::Updater}; pub async fn run(repo: String, github_token: Option) -> Result<()> { ui::info!("Checking for updates"); - let github = GitHubClient::new(repo, github_token)?; + let resolved_token = token::resolve_github_token(github_token); + let github = GitHubClient::new(repo, resolved_token)?; let updater = Updater::new(github); let current_version = updater.get_current_version(); diff --git a/ampup/src/download_manager.rs b/ampup/src/download_manager.rs new file mode 100644 index 0000000..7000984 --- /dev/null +++ b/ampup/src/download_manager.rs @@ -0,0 +1,932 @@ +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; + +use anyhow::{Context, Result}; +use fs_err as fs; +use tokio::{sync::Semaphore, task::JoinSet}; + +use crate::github::{GitHubClient, ResolvedAsset}; + +// --------------------------------------------------------------------------- +// Public types +// --------------------------------------------------------------------------- + +/// A single artifact to download from a GitHub release. +pub struct DownloadTask { + /// GitHub release asset name (e.g., "ampd-linux-x86_64") + pub artifact_name: String, + /// Destination filename inside the version directory (e.g., "ampd") + pub dest_filename: String, +} + +/// Errors that occur during bounded-concurrent download operations. +/// +/// Used by [`DownloadManager::download_all`] and its helper functions. +/// Each variant represents a distinct failure mode in the download-verify-stage +/// pipeline. +#[derive(Debug)] +pub enum DownloadError { + /// A download task failed after one automatic retry. + /// + /// The download was attempted twice (initial + one retry) and both attempts + /// failed. The `source` error contains the retry failure with the initial + /// failure chained as context. + /// + /// Possible causes: + /// - Network connectivity issues (DNS, timeout, connection reset) + /// - GitHub API errors (5xx, asset removed mid-download) + /// - Rate limiting that persisted through the HTTP-layer retry + TaskFailed { + artifact_name: String, + source: anyhow::Error, + }, + + /// Downloaded artifact was empty (zero bytes). + /// + /// The HTTP request succeeded but the response body contained no data. + /// This typically indicates a problem with the release packaging on GitHub + /// rather than a network issue. + EmptyArtifact { artifact_name: String }, + + /// Failed to write an artifact to the staging directory. + /// + /// The download and verification succeeded, but writing the artifact data + /// to the temporary staging directory failed. This usually indicates a + /// filesystem issue (permissions, disk full, path too long). + StagingWrite { + artifact_name: String, + path: PathBuf, + source: std::io::Error, + }, + + /// Internal error: the concurrency semaphore was unexpectedly closed. + /// + /// This should not occur under normal operation. It indicates a logic bug + /// where the semaphore was dropped or closed while tasks were still waiting + /// to acquire permits. + SemaphoreClosed { artifact_name: String }, +} + +impl std::fmt::Display for DownloadError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::TaskFailed { + artifact_name, + source, + } => { + writeln!(f, "Download failed for artifact")?; + writeln!(f, " Artifact: {}", artifact_name)?; + writeln!(f, " Error: {}", source)?; + writeln!(f)?; + writeln!(f, " The download was retried once and still failed.")?; + write!(f, " Check your network connection and try again.")?; + } + Self::EmptyArtifact { artifact_name } => { + writeln!(f, "Downloaded artifact is empty")?; + writeln!(f, " Artifact: {}", artifact_name)?; + writeln!(f)?; + writeln!( + f, + " The release asset was downloaded but contains no data." + )?; + write!( + f, + " This may indicate a problem with the release packaging." + )?; + } + Self::StagingWrite { + artifact_name, + path, + source, + } => { + writeln!(f, "Failed to write artifact to staging directory")?; + writeln!(f, " Artifact: {}", artifact_name)?; + writeln!(f, " Path: {}", path.display())?; + write!(f, " Error: {}", source)?; + } + Self::SemaphoreClosed { artifact_name } => { + writeln!(f, "Internal error: concurrency semaphore closed")?; + write!(f, " Artifact: {}", artifact_name)?; + } + } + Ok(()) + } +} + +impl std::error::Error for DownloadError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::TaskFailed { source, .. } => Some(source.as_ref()), + Self::StagingWrite { source, .. } => Some(source), + Self::EmptyArtifact { .. } | Self::SemaphoreClosed { .. } => None, + } + } +} + +// --------------------------------------------------------------------------- +// DownloadManager +// --------------------------------------------------------------------------- + +/// Manages bounded-concurrent downloads of release artifacts. +/// +/// Downloads proceed in parallel up to `max_concurrent` tasks. Each task +/// downloads an artifact, verifies it (currently: non-empty check), and +/// writes it to a staging directory. Only after all tasks succeed does +/// the staging directory get atomically renamed to the final version +/// directory. +/// +/// If any task fails (after one retry), all in-flight tasks are cancelled +/// and the staging directory is cleaned up automatically via `TempDir` drop. +pub struct DownloadManager { + github: GitHubClient, + max_concurrent: usize, +} + +impl DownloadManager { + /// Create a new download manager. + /// + /// `max_concurrent` is clamped to a minimum of 1 to prevent deadlocks. + /// Pass 1 for sequential downloads (useful for debugging). + pub fn new(github: GitHubClient, max_concurrent: usize) -> Self { + Self { + github, + max_concurrent: max_concurrent.max(1), + } + } + + /// Download all tasks concurrently and write results to `version_dir`. + /// + /// Fetches release metadata once, then spawns bounded-concurrent download + /// tasks. Uses a staging directory (sibling of `version_dir`) for + /// atomicity. If all downloads succeed and pass verification, the staging + /// directory is renamed to `version_dir`. If any download fails, all + /// in-flight tasks are cancelled and the staging directory is cleaned up. + pub async fn download_all( + &self, + tasks: Vec, + version: &str, + version_dir: PathBuf, + ) -> Result<()> { + // Resolve all asset metadata with a single API call so that each + // spawned task can download directly without re-fetching the release. + let asset_names: Vec<&str> = tasks.iter().map(|t| t.artifact_name.as_str()).collect(); + let resolved = self + .github + .resolve_release_assets(version, &asset_names) + .await?; + + let parent = version_dir.parent().ok_or_else(|| { + anyhow::anyhow!("version_dir has no parent: {}", version_dir.display()) + })?; + + // Staging dir in the same parent ensures same filesystem for atomic rename + let staging_dir = + tempfile::tempdir_in(parent).context("Failed to create staging directory")?; + + let semaphore = Arc::new(Semaphore::new(self.max_concurrent)); + let mut join_set: JoinSet> = JoinSet::new(); + + for (task, asset) in tasks.into_iter().zip(resolved) { + let github = self.github.clone(); + let sem = semaphore.clone(); + let staging_path = staging_dir.path().to_path_buf(); + + join_set.spawn(async move { + let _permit = sem + .acquire() + .await + .map_err(|_| DownloadError::SemaphoreClosed { + artifact_name: task.artifact_name.clone(), + })?; + + let data = download_with_retry(&github, &asset).await?; + verify_artifact(&task.artifact_name, &data)?; + write_to_staging(&staging_path, &task.dest_filename, &data)?; + + Ok(()) + }); + } + + // Collect results — fail fast on first error + while let Some(result) = join_set.join_next().await { + match result { + Ok(Ok(())) => {} + Ok(Err(e)) => { + join_set.shutdown().await; + return Err(e.into()); + } + Err(join_err) => { + join_set.shutdown().await; + return Err(anyhow::anyhow!("download task panicked: {}", join_err)); + } + } + } + + // Set executable permissions on all staged files + #[cfg(unix)] + set_executable_permissions(staging_dir.path())?; + + // Safe replacement: rename existing dir to backup, swap staging in, then + // remove backup. If the swap fails, restore from backup so the previous + // install is never lost. + // + // NOTE: `with_extension("old")` is wrong for semver names because it + // replaces the last dotted segment (v0.1.0 → v0.1.old), causing + // collisions across patch versions. Instead, append ".old" to the full + // directory name. + let backup_dir = append_extension(&version_dir, "old"); + // Remove stale backup from a previous crashed install so the backup + // rename below doesn't fail with ENOTEMPTY. + if backup_dir.exists() { + let _ = fs::remove_dir_all(&backup_dir); + } + if version_dir.exists() { + fs::rename(&version_dir, &backup_dir).with_context(|| { + format!( + "failed to back up existing version directory {}", + version_dir.display() + ) + })?; + } + + // Atomic move: staging → version_dir. + // `keep()` detaches the TempDir so `drop` won't remove it. If the + // rename below fails we must clean up `staging_path` ourselves. + let staging_path = staging_dir.keep(); + if let Err(err) = fs::rename(&staging_path, &version_dir) { + // Clean up the orphaned staging directory + let _ = fs::remove_dir_all(&staging_path); + + // Restore backup so the previous install is not lost + if backup_dir.exists() + && let Err(restore_err) = fs::rename(&backup_dir, &version_dir) + { + crate::ui::warn!( + "Failed to restore previous install from {}: {}", + backup_dir.display(), + restore_err + ); + } + return Err(err).with_context(|| { + format!( + "failed to move staging directory to {}", + version_dir.display() + ) + }); + } + + // Best-effort cleanup of the backup + if backup_dir.exists() { + let _ = fs::remove_dir_all(&backup_dir); + } + + Ok(()) + } +} + +// --------------------------------------------------------------------------- +// Private helpers +// --------------------------------------------------------------------------- + +/// Append an extension to a path without replacing an existing dotted segment. +/// +/// Unlike [`Path::with_extension`], which replaces the last dotted segment +/// (e.g. `v0.1.0` → `v0.1.old`), this function appends to the full name +/// (e.g. `v0.1.0` → `v0.1.0.old`). This is critical for semver-style +/// directory names where dot-separated components are not file extensions. +fn append_extension(path: &Path, ext: &str) -> PathBuf { + let mut s = path.as_os_str().to_os_string(); + s.push("."); + s.push(ext); + PathBuf::from(s) +} + +/// Download a resolved asset with one retry on failure. +/// +/// This targets network and transient errors. Rate-limit retries (429) are +/// already handled at the HTTP layer by `GitHubClient::send_with_rate_limit`, +/// so a rate-limited request will have been retried there before surfacing +/// as an error here. +async fn download_with_retry( + github: &GitHubClient, + asset: &ResolvedAsset, +) -> std::result::Result, DownloadError> { + // `false` suppresses per-file progress bars — DownloadManager will + // provide aggregate progress reporting in a future PR. + match github.download_resolved_asset(asset, false).await { + Ok(data) => Ok(data), + Err(first_err) => { + crate::ui::warn!("Download failed for {}, retrying once...", asset.name); + + github + .download_resolved_asset(asset, false) + .await + .map_err(|retry_err| DownloadError::TaskFailed { + artifact_name: asset.name.clone(), + source: retry_err + .context(format!("retry also failed (first error: {})", first_err)), + }) + } + } +} + +/// Verify a downloaded artifact. Currently checks non-empty. +/// Per-artifact checksum/attestation verification will be added in a follow-up PR. +fn verify_artifact(artifact_name: &str, data: &[u8]) -> std::result::Result<(), DownloadError> { + if data.is_empty() { + return Err(DownloadError::EmptyArtifact { + artifact_name: artifact_name.to_string(), + }); + } + Ok(()) +} + +/// Write artifact data to the staging directory. +fn write_to_staging( + staging_path: &Path, + dest_filename: &str, + data: &[u8], +) -> std::result::Result<(), DownloadError> { + let dest = staging_path.join(dest_filename); + fs::write(&dest, data).map_err(|err| DownloadError::StagingWrite { + artifact_name: dest_filename.to_string(), + path: dest, + source: err, + }) +} + +/// Set executable permissions (0o755) on all files in a directory. +#[cfg(unix)] +fn set_executable_permissions(dir: &Path) -> Result<()> { + use std::os::unix::fs::PermissionsExt; + + for entry in fs::read_dir(dir).context("failed to list staging directory")? { + let entry = entry.context("failed to read staging directory entry")?; + let path = entry.path(); + let mut perms = fs::metadata(&path) + .with_context(|| format!("failed to read metadata for {}", path.display()))? + .permissions(); + perms.set_mode(0o755); + fs::set_permissions(&path, perms) + .with_context(|| format!("failed to set permissions on {}", path.display()))?; + } + Ok(()) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + mod verify_artifact { + use super::*; + + /// Rejects zero-byte downloads so corrupt installs are caught early. + #[test] + fn verify_artifact_with_empty_data_returns_empty_artifact_error() { + //* Given + let data: Vec = vec![]; + + //* When + let result = verify_artifact("ampd-linux-x86_64", &data); + + //* Then + let err = result.expect_err("should return DownloadError for empty data"); + assert!( + matches!(err, DownloadError::EmptyArtifact { .. }), + "expected EmptyArtifact error, got: {:?}", + err + ); + } + } + + #[cfg(unix)] + mod set_executable_permissions { + use std::os::unix::fs::PermissionsExt; + + use super::*; + + /// Ensures every file in the version directory is marked executable. + #[test] + fn set_executable_permissions_with_multiple_files_sets_0o755_on_all() { + //* Given + let dir = tempfile::tempdir().expect("should create temp directory"); + fs::write(dir.path().join("ampd"), b"ampd-binary").expect("should write ampd"); + fs::write(dir.path().join("ampctl"), b"ampctl-binary").expect("should write ampctl"); + + //* When + let result = set_executable_permissions(dir.path()); + + //* Then + assert!(result.is_ok(), "should set permissions on all files"); + for name in ["ampd", "ampctl"] { + let perms = fs::metadata(dir.path().join(name)) + .expect("should read metadata") + .permissions(); + assert_eq!( + perms.mode() & 0o777, + 0o755, + "{} should have 0o755 permissions", + name + ); + } + } + } + + /// Tests the backup-swap-cleanup pattern used by `download_all` to replace + /// an existing version directory safely. These exercise the filesystem + /// operations directly (not through `download_all`) because the full method + /// requires a live `GitHubClient`. + mod staging_swap { + use super::*; + + /// Verifies that an existing version directory is atomically replaced + /// with fresh content via the backup-swap pattern. + #[test] + fn backup_swap_with_existing_version_dir_replaces_contents() { + //* Given + let parent = tempfile::tempdir().expect("should create parent directory"); + let version_dir = parent.path().join("v1.0.0"); + let backup_dir = append_extension(&version_dir, "old"); + fs::create_dir_all(&version_dir).expect("should create stale version directory"); + fs::write(version_dir.join("stale-file"), b"stale").expect("should write stale file"); + + let staging_dir = + tempfile::tempdir_in(parent.path()).expect("should create staging directory"); + fs::write(staging_dir.path().join("ampd"), b"fresh-binary") + .expect("should write to staging"); + + //* When — mirrors the backup-swap logic in download_all + let backup_result = fs::rename(&version_dir, &backup_dir); + let staging_path = staging_dir.keep(); + let swap_result = fs::rename(&staging_path, &version_dir); + + //* Then + assert!(backup_result.is_ok(), "should back up existing version_dir"); + assert!(swap_result.is_ok(), "should swap staging into version_dir"); + let content = fs::read(version_dir.join("ampd")).expect("should read installed binary"); + assert_eq!( + content, b"fresh-binary", + "version_dir should contain the fresh binary, not stale data" + ); + + // Cleanup backup + if backup_dir.exists() { + let _ = fs::remove_dir_all(&backup_dir); + } + } + + /// Handles a leftover `.old` directory from a previously crashed install. + #[test] + fn backup_swap_with_stale_backup_dir_succeeds() { + //* Given — a leftover .old dir from a previous crashed install + let parent = tempfile::tempdir().expect("should create parent directory"); + let version_dir = parent.path().join("v1.0.0"); + let backup_dir = append_extension(&version_dir, "old"); + fs::create_dir_all(&version_dir).expect("should create version directory"); + fs::write(version_dir.join("ampd"), b"current").expect("should write current binary"); + fs::create_dir_all(&backup_dir).expect("should create stale backup directory"); + fs::write(backup_dir.join("ampd"), b"stale-backup").expect("should write stale backup"); + + //* When — mirrors the stale backup cleanup + backup-swap logic + if backup_dir.exists() { + let _ = fs::remove_dir_all(&backup_dir); + } + let backup_result = fs::rename(&version_dir, &backup_dir); + + //* Then + assert!( + backup_result.is_ok(), + "should succeed after removing stale backup directory" + ); + assert!( + !version_dir.exists(), + "version_dir should be moved to backup" + ); + assert!( + backup_dir.exists(), + "backup_dir should contain the previous install" + ); + + // Cleanup + let _ = fs::remove_dir_all(&backup_dir); + } + } + + mod append_extension { + use super::*; + + /// Guards against `with_extension` truncating semver names like `v0.1.0`. + #[test] + fn append_extension_with_semver_name_preserves_full_name() { + //* Given + let path = PathBuf::from("/versions/v0.1.0"); + + //* When + let result = append_extension(&path, "old"); + + //* Then + assert_eq!( + result, + PathBuf::from("/versions/v0.1.0.old"), + "should append .old to the full directory name, not replace .0" + ); + } + } + + /// End-to-end tests for `download_all` using a mock HTTP server. + /// Exercises bounded concurrency, fail-fast cancellation, retry + /// behavior, and the staging-to-version-dir swap. + mod it_download_all { + use std::sync::atomic::{AtomicUsize, Ordering}; + + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + use super::*; + + /// Route configuration for the mock HTTP server. + #[derive(Clone)] + struct Route { + /// Path substring to match against the request path. + prefix: &'static str, + /// Response body to return on success. + body: Vec, + /// Number of times to return 500 before succeeding. + /// Shared across connections so retries see the updated count. + fail_count: Arc, + } + + impl Route { + /// Create a route that always succeeds. + fn ok(prefix: &'static str, body: Vec) -> Self { + Self { + prefix, + body, + fail_count: Arc::new(AtomicUsize::new(0)), + } + } + + /// Create a route that returns 500 for the first `n` requests, + /// then succeeds. + fn fail_then_ok(prefix: &'static str, body: Vec, n: usize) -> Self { + Self { + prefix, + body, + fail_count: Arc::new(AtomicUsize::new(n)), + } + } + } + + /// Spawn a mock HTTP server on a pre-bound listener. + /// + /// `routes` maps path substrings to response bodies. The server accepts + /// connections in a loop, reads the request line to extract the path, + /// and sends back the matching response with 200 OK — or 404 if no + /// route matches. Routes with a non-zero `fail_count` return 500 until + /// the count is exhausted. + fn start_mock_server( + listener: tokio::net::TcpListener, + routes: Vec, + ) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + loop { + let Ok((mut stream, _)) = listener.accept().await else { + break; + }; + let routes = routes.clone(); + + tokio::spawn(async move { + let mut buf = [0u8; 4096]; + let n = stream.read(&mut buf).await.expect("should read request"); + let request = String::from_utf8_lossy(&buf[..n]); + let path = request + .lines() + .next() + .and_then(|line| line.split_whitespace().nth(1)) + .unwrap_or("/"); + + let response = routes + .iter() + .find(|r| path.contains(r.prefix)) + .map(|route| { + // Atomically decrement the fail counter. Uses + // compare_exchange in a loop to avoid the race + // where fetch_sub wraps 0 → usize::MAX. + let should_fail = loop { + let current = route.fail_count.load(Ordering::Relaxed); + if current == 0 { + break false; + } + if route + .fail_count + .compare_exchange( + current, + current - 1, + Ordering::Relaxed, + Ordering::Relaxed, + ) + .is_ok() + { + break true; + } + }; + + if should_fail { + b"HTTP/1.1 500 Internal Server Error\r\n\ + Content-Length: 0\r\n\r\n" + .to_vec() + } else { + format!( + "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", + route.body.len() + ) + .into_bytes() + .into_iter() + .chain(route.body.iter().copied()) + .collect::>() + } + }) + .unwrap_or_else(|| { + b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n".to_vec() + }); + + stream + .write_all(&response) + .await + .expect("should write response"); + }); + } + }) + } + + /// Build release JSON for the mock server with the given assets. + fn release_json(addr: std::net::SocketAddr, asset_names: &[&str]) -> Vec { + let assets: Vec = asset_names + .iter() + .enumerate() + .map(|(i, name)| { + format!( + r#"{{"id":{},"name":"{}","browser_download_url":"http://{}/download/{}"}}"#, + i + 1, + name, + addr, + name, + ) + }) + .collect(); + format!(r#"{{"tag_name":"v1.0.0","assets":[{}]}}"#, assets.join(",")).into_bytes() + } + + /// Common test setup: bind a mock server, create a `DownloadManager`, + /// and prepare a temp directory with a version path. + struct TestFixture { + manager: DownloadManager, + version_dir: PathBuf, + _tmp: tempfile::TempDir, + server_handle: tokio::task::JoinHandle<()>, + } + + impl TestFixture { + /// Create a fixture with the given release assets, download routes, + /// and concurrency level. The release metadata route is prepended + /// automatically. + async fn new( + release_assets: &[&str], + download_routes: Vec, + max_concurrent: usize, + ) -> Self { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("should bind to a random port"); + let addr = listener.local_addr().expect("should have a local address"); + + let release_body = release_json(addr, release_assets); + let mut routes = vec![Route::ok("tags/v1.0.0", release_body)]; + routes.extend(download_routes); + + let server_handle = start_mock_server(listener, routes); + + let api_base = format!("http://{}", addr); + let github = + GitHubClient::with_api_base(api_base).expect("should create test client"); + let manager = DownloadManager::new(github, max_concurrent); + + let tmp = tempfile::tempdir().expect("should create temp directory"); + let version_dir = tmp.path().join("v1.0.0"); + + Self { + manager, + version_dir, + _tmp: tmp, + server_handle, + } + } + + /// Run `download_all` with the given tasks. + async fn download(&self, tasks: Vec) -> Result<()> { + self.manager + .download_all(tasks, "v1.0.0", self.version_dir.clone()) + .await + } + } + + impl Drop for TestFixture { + fn drop(&mut self) { + self.server_handle.abort(); + } + } + + /// The standard two-artifact task list (ampd + ampctl). + fn standard_tasks() -> Vec { + vec![ + DownloadTask { + artifact_name: "ampd-linux-x86_64".to_string(), + dest_filename: "ampd".to_string(), + }, + DownloadTask { + artifact_name: "ampctl-linux-x86_64".to_string(), + dest_filename: "ampctl".to_string(), + }, + ] + } + + /// Happy path: both artifacts download and land in the version directory. + #[tokio::test] + async fn download_all_with_two_assets_writes_both_to_version_dir() { + //* Given + let ampd_data = b"fake-ampd-binary".to_vec(); + let ampctl_data = b"fake-ampctl-binary".to_vec(); + + let fixture = TestFixture::new( + &["ampd-linux-x86_64", "ampctl-linux-x86_64"], + vec![ + Route::ok("download/ampd-linux-x86_64", ampd_data.clone()), + Route::ok("download/ampctl-linux-x86_64", ampctl_data.clone()), + ], + 4, + ) + .await; + + //* When + let result = fixture.download(standard_tasks()).await; + + //* Then + assert!( + result.is_ok(), + "download_all should succeed: {:?}", + result.err() + ); + assert_eq!( + fs::read(fixture.version_dir.join("ampd")).expect("should read ampd"), + ampd_data, + "ampd binary should match downloaded content" + ); + assert_eq!( + fs::read(fixture.version_dir.join("ampctl")).expect("should read ampctl"), + ampctl_data, + "ampctl binary should match downloaded content" + ); + } + + /// A missing asset fails the whole batch and leaves no partial install. + #[tokio::test] + async fn download_all_with_missing_asset_fails_without_partial_install() { + //* Given — release only contains ampd; ampctl is missing + let fixture = TestFixture::new( + &["ampd-linux-x86_64"], + vec![Route::ok( + "download/ampd-linux-x86_64", + b"fake-ampd-binary".to_vec(), + )], + 4, + ) + .await; + + //* When + let result = fixture.download(standard_tasks()).await; + + //* Then + assert!( + result.is_err(), + "download_all should fail when an asset is missing from the release" + ); + assert!( + !fixture.version_dir.exists(), + "version_dir should not exist after failed download (no partial install)" + ); + } + + /// `-j 1` (sequential) mode still produces a correct install. + #[tokio::test] + async fn download_all_with_sequential_mode_succeeds() { + //* Given — same as happy path but with max_concurrent = 1 + let fixture = TestFixture::new( + &["ampd-linux-x86_64", "ampctl-linux-x86_64"], + vec![ + Route::ok("download/ampd-linux-x86_64", b"ampd-bytes".to_vec()), + Route::ok("download/ampctl-linux-x86_64", b"ampctl-bytes".to_vec()), + ], + 1, + ) + .await; + + //* When + let result = fixture.download(standard_tasks()).await; + + //* Then + assert!( + result.is_ok(), + "download_all with -j 1 should succeed: {:?}", + result.err() + ); + assert!( + fixture.version_dir.join("ampd").exists(), + "ampd should be installed in version_dir" + ); + assert!( + fixture.version_dir.join("ampctl").exists(), + "ampctl should be installed in version_dir" + ); + } + + /// A single 500 is retried and the download ultimately succeeds. + #[tokio::test] + async fn download_all_with_transient_failure_succeeds_on_retry() { + //* Given — ampd download returns 500 on the first request, then 200 + let ampd_data = b"ampd-after-retry".to_vec(); + + let fixture = TestFixture::new( + &["ampd-linux-x86_64"], + vec![Route::fail_then_ok( + "download/ampd-linux-x86_64", + ampd_data.clone(), + 1, // fail once, then succeed + )], + 4, + ) + .await; + + let tasks = vec![DownloadTask { + artifact_name: "ampd-linux-x86_64".to_string(), + dest_filename: "ampd".to_string(), + }]; + + //* When + let result = fixture.download(tasks).await; + + //* Then + assert!( + result.is_ok(), + "download_all should succeed after retry: {:?}", + result.err() + ); + assert_eq!( + fs::read(fixture.version_dir.join("ampd")).expect("should read ampd"), + ampd_data, + "ampd binary should contain data from the successful retry" + ); + } + + /// Persistent 500s exhaust all retries and fail with no partial install. + #[tokio::test] + async fn download_all_with_persistent_failure_fails_after_retry() { + //* Given — ampd download returns 500 on all attempts. + // Each download attempt triggers 2 HTTP requests (initial + 5xx + // retry in send_with_rate_limit), and download_with_retry makes 2 + // attempts, so 4 failures are needed for persistent failure. + let fixture = TestFixture::new( + &["ampd-linux-x86_64"], + vec![Route::fail_then_ok( + "download/ampd-linux-x86_64", + b"should-never-be-read".to_vec(), + 4, // 2 attempts × 2 HTTP requests each (initial + 5xx retry) + )], + 4, + ) + .await; + + let tasks = vec![DownloadTask { + artifact_name: "ampd-linux-x86_64".to_string(), + dest_filename: "ampd".to_string(), + }]; + + //* When + let result = fixture.download(tasks).await; + + //* Then + assert!( + result.is_err(), + "download_all should fail when both initial and retry fail" + ); + assert!( + !fixture.version_dir.exists(), + "version_dir should not exist after permanent failure" + ); + } + } +} diff --git a/ampup/src/github.rs b/ampup/src/github.rs index 1da2fad..1b929ac 100644 --- a/ampup/src/github.rs +++ b/ampup/src/github.rs @@ -171,6 +171,22 @@ impl std::fmt::Display for GitHubError { impl std::error::Error for GitHubError {} +/// A release asset resolved from GitHub metadata, ready to download. +/// +/// Produced by [`GitHubClient::resolve_release_assets`] and consumed by +/// [`GitHubClient::download_resolved_asset`]. This allows fetching release +/// metadata once and then downloading multiple assets without redundant API +/// calls. +#[derive(Clone, Debug)] +pub struct ResolvedAsset { + /// Asset ID on GitHub (used for API-based downloads of private repos). + pub id: u64, + /// Asset name (e.g. "ampd-linux-x86_64"). + pub name: String, + /// Direct browser download URL (used for public repos). + pub url: String, +} + #[derive(Debug, Deserialize)] struct Release { #[serde(rename = "tag_name")] @@ -186,8 +202,9 @@ struct Asset { url: String, } -/// Clone is cheap: `reqwest::Client` and `rate_limiter` are both `Arc`-backed. -/// Needed so `DownloadManager` can move a handle into each spawned download task. +/// Cloneable so `DownloadManager` can move a handle into each spawned task. +/// `reqwest::Client` and `rate_limiter` are `Arc`-backed; `repo` and `token` +/// are small strings cloned by value. #[derive(Clone)] pub struct GitHubClient { client: reqwest::Client, @@ -238,6 +255,26 @@ impl GitHubClient { }) } + /// Create a client with a custom API base URL for testing. + /// + /// `api_base` replaces the standard GitHub API URL so requests go to a + /// local mock server instead. + #[cfg(test)] + pub(crate) fn with_api_base(api_base: String) -> Result { + let client = reqwest::Client::builder() + .build() + .context("Failed to create request client")?; + let rate_limiter = Arc::new(GitHubRateLimiter::new(false)); + + Ok(Self { + client, + repo: "test/repo".to_string(), + token: None, + api: api_base, + rate_limiter, + }) + } + /// Get the latest release version pub async fn get_latest_version(&self) -> Result { let release = self.get_latest_release().await?; @@ -266,7 +303,81 @@ impl GitHubClient { Ok(()) } - /// Send a request with rate-limit awareness and one retry on 429. + /// Find an asset by name within a release, returning `AssetNotFound` if + /// no match exists. + fn find_asset<'a>( + &self, + release: &'a Release, + asset_name: &str, + version: &str, + ) -> Result<&'a Asset> { + release + .assets + .iter() + .find(|a| a.name == asset_name) + .ok_or_else(|| { + GitHubError::AssetNotFound { + repo: self.repo.clone(), + asset_name: asset_name.to_string(), + version: version.to_string(), + available_assets: release.assets.iter().map(|a| a.name.clone()).collect(), + } + .into() + }) + } + + /// Resolve multiple asset names from a single release, fetching the release + /// metadata only once. + /// + /// Returns a `ResolvedAsset` for each requested name. Fails with + /// `GitHubError::AssetNotFound` on the first name that does not match any + /// asset in the release. + pub async fn resolve_release_assets( + &self, + version: &str, + asset_names: &[&str], + ) -> Result> { + let release = self.get_tagged_release(version).await?; + + let mut resolved = Vec::with_capacity(asset_names.len()); + for &name in asset_names { + let asset = self.find_asset(&release, name, version)?; + resolved.push(ResolvedAsset { + id: asset.id, + name: asset.name.clone(), + url: asset.url.clone(), + }); + } + Ok(resolved) + } + + /// Download a previously resolved asset without re-fetching release + /// metadata. + pub async fn download_resolved_asset( + &self, + asset: &ResolvedAsset, + show_progress: bool, + ) -> Result> { + if self.token.is_some() { + self.download_asset_via_api(asset.id, &asset.name, show_progress) + .await + } else { + self.download_asset_direct(&asset.url, &asset.name, show_progress) + .await + } + } + + /// Send a request with rate-limit awareness, one retry on 429, and one + /// retry on transient server/transport errors. + /// + /// Retry order: + /// 1. Rate-limit (429/403-rate-limited) — wait for `Retry-After`, retry once + /// 2. Server error (5xx) — 1-second delay, retry once + /// 3. Transport error (connection reset, DNS, timeout) — 1-second delay, retry once + /// + /// These retries protect metadata fetches (`get_release`, + /// `resolve_release_assets`) which have no outer retry layer. Download + /// paths have an additional retry in `DownloadManager::download_with_retry`. async fn send_with_rate_limit( &self, build_request: impl Fn() -> reqwest::RequestBuilder, @@ -274,16 +385,56 @@ impl GitHubClient { ) -> Result { self.check_rate_limit_pause().await?; - let response = build_request() - .send() - .await - .with_context(|| context_msg.to_string())?; + let response = match build_request().send().await { + Ok(resp) => resp, + Err(first_err) => { + // One retry on transport errors (connection reset, DNS, timeout) + crate::ui::warn!("Request failed ({}), retrying once...", first_err); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + self.check_rate_limit_pause().await?; + + build_request().send().await.with_context(|| { + format!( + "{} (retry also failed, first error: {})", + context_msg, first_err + ) + })? + } + }; + + let response = + if let Some(retry_after) = self.rate_limiter.update_from_response(&response).await { + crate::ui::warn!( + "Rate limited by GitHub API, retrying in {} seconds...", + retry_after + ); + self.check_rate_limit_pause().await?; + + let response = build_request() + .send() + .await + .with_context(|| context_msg.to_string())?; + + if let Some(retry_after) = self.rate_limiter.update_from_response(&response).await { + return Err(GitHubError::RateLimited { + retry_after_secs: retry_after, + has_token: self.token.is_some(), + } + .into()); + } - if let Some(retry_after) = self.rate_limiter.update_from_response(&response).await { + response + } else { + response + }; + + // One retry on server errors (5xx) — transient GitHub/CDN blips + if response.status().is_server_error() { crate::ui::warn!( - "Rate limited by GitHub API, retrying in {} seconds...", - retry_after + "Server error (HTTP {}), retrying once...", + response.status().as_u16() ); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; self.check_rate_limit_pause().await?; let response = build_request() @@ -291,14 +442,7 @@ impl GitHubClient { .await .with_context(|| context_msg.to_string())?; - if let Some(retry_after) = self.rate_limiter.update_from_response(&response).await { - return Err(GitHubError::RateLimited { - retry_after_secs: retry_after, - has_token: self.token.is_some(), - } - .into()); - } - + self.rate_limiter.update_from_response(&response).await; return Ok(response); } @@ -361,33 +505,38 @@ impl GitHubClient { Ok(release) } - /// Download a release asset by name - pub async fn download_release_asset(&self, version: &str, asset_name: &str) -> Result> { + /// Download a release asset by name. + /// + /// When `show_progress` is `true`, an indicatif progress bar is rendered for + /// this individual download. Set to `false` when downloads are managed by + /// `DownloadManager` (which will provide aggregate progress in the future). + pub async fn download_release_asset( + &self, + version: &str, + asset_name: &str, + show_progress: bool, + ) -> Result> { let release = self.get_tagged_release(version).await?; - - // Find the asset - let asset = release - .assets - .iter() - .find(|a| a.name == asset_name) - .ok_or_else(|| GitHubError::AssetNotFound { - repo: self.repo.clone(), - asset_name: asset_name.to_string(), - version: version.to_string(), - available_assets: release.assets.iter().map(|a| a.name.clone()).collect(), - })?; + let asset = self.find_asset(&release, asset_name, version)?; if self.token.is_some() { // For private repositories, we need to use the API to download - self.download_asset_via_api(asset.id, asset_name).await + self.download_asset_via_api(asset.id, asset_name, show_progress) + .await } else { // For public repositories, use direct download URL - self.download_asset_direct(&asset.url, asset_name).await + self.download_asset_direct(&asset.url, asset_name, show_progress) + .await } } /// Download asset via GitHub API (for private repos) - async fn download_asset_via_api(&self, asset_id: u64, asset_name: &str) -> Result> { + async fn download_asset_via_api( + &self, + asset_id: u64, + asset_name: &str, + show_progress: bool, + ) -> Result> { let url = format!( "https://api.github.com/repos/{}/releases/assets/{}", self.repo, asset_id @@ -404,25 +553,35 @@ impl GitHubClient { ) .await?; - self.download_with_progress(response, &url, asset_name) + self.download_with_progress(response, &url, asset_name, show_progress) .await } /// Download asset directly (for public repos) - async fn download_asset_direct(&self, url: &str, asset_name: &str) -> Result> { + async fn download_asset_direct( + &self, + url: &str, + asset_name: &str, + show_progress: bool, + ) -> Result> { let response = self .send_with_rate_limit(|| self.client.get(url), "Failed to download asset") .await?; - self.download_with_progress(response, url, asset_name).await + self.download_with_progress(response, url, asset_name, show_progress) + .await } - /// Download with progress bar from a response + /// Download with optional progress bar from a response. + /// + /// When `show_progress` is `false`, bytes are collected silently (used by + /// `DownloadManager` which manages its own aggregate progress reporting). async fn download_with_progress( &self, response: reqwest::Response, url: &str, asset_name: &str, + show_progress: bool, ) -> Result> { if !response.status().is_success() { let status = response.status(); @@ -435,29 +594,31 @@ impl GitHubClient { .into()); } - // Get content length for progress bar - let total_size = response.content_length(); - - // Setup progress bar - let pb = if let Some(size) = total_size { - let pb = ProgressBar::new(size); - pb.set_style( - ProgressStyle::default_bar() - .template( - "{msg} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})", - ) - .context("Invalid progress bar template")? - .progress_chars("#>-"), - ); - pb.set_message(format!("{} Downloading", console::style("→").cyan())); - pb + // Setup progress bar (hidden when DownloadManager handles progress) + let pb = if show_progress { + let total_size = response.content_length(); + if let Some(size) = total_size { + let pb = ProgressBar::new(size); + pb.set_style( + ProgressStyle::default_bar() + .template( + "{msg} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})", + ) + .context("Invalid progress bar template")? + .progress_chars("#>-"), + ); + pb.set_message(format!("{} Downloading", console::style("→").cyan())); + pb + } else { + let pb = ProgressBar::new_spinner(); + pb.set_message(format!( + "{} Downloading (size unknown)", + console::style("→").cyan() + )); + pb + } } else { - let pb = ProgressBar::new_spinner(); - pb.set_message(format!( - "{} Downloading (size unknown)", - console::style("→").cyan() - )); - pb + ProgressBar::hidden() }; // Stream and collect chunks @@ -472,7 +633,9 @@ impl GitHubClient { pb.set_position(downloaded); } - pb.finish_with_message(format!("{} Downloaded", console::style("✓").green().bold())); + if show_progress { + pb.finish_with_message(format!("{} Downloaded", console::style("✓").green().bold())); + } Ok(buffer) } diff --git a/ampup/src/install.rs b/ampup/src/install.rs index 1af9bd0..f598ec4 100644 --- a/ampup/src/install.rs +++ b/ampup/src/install.rs @@ -1,59 +1,26 @@ -use anyhow::{Context, Result}; -use fs_err as fs; +use anyhow::Result; use crate::{ - github::GitHubClient, + download_manager::{DownloadManager, DownloadTask}, platform::{Architecture, Platform}, ui, version_manager::VersionManager, }; -#[derive(Debug)] -pub enum InstallError { - EmptyBinary { version: String }, -} - -impl std::fmt::Display for InstallError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::EmptyBinary { version } => { - writeln!(f, "Downloaded binary is empty")?; - writeln!(f, " Version: {}", version)?; - writeln!(f)?; - writeln!( - f, - " The release asset was downloaded but contains no data." - )?; - writeln!( - f, - " This may indicate a problem with the release packaging." - )?; - writeln!( - f, - " Try downloading a different version or report this issue." - )?; - } - } - Ok(()) - } -} - -impl std::error::Error for InstallError {} - pub struct Installer { version_manager: VersionManager, - github: GitHubClient, + download_manager: DownloadManager, } impl Installer { - pub fn new(version_manager: VersionManager, github: GitHubClient) -> Self { + pub fn new(version_manager: VersionManager, download_manager: DownloadManager) -> Self { Self { version_manager, - github, + download_manager, } } - /// Install ampd and ampctl from a GitHub release + /// Install ampd and ampctl from a GitHub release. pub async fn install_from_release( &self, version: &str, @@ -62,91 +29,34 @@ impl Installer { ) -> Result<()> { self.version_manager.config().ensure_dirs()?; - // Download and install ampd let ampd_artifact = format!("ampd-{}-{}", platform.as_str(), arch.as_str()); - ui::info!("Downloading {} for {}", ui::version(version), ampd_artifact); - - let ampd_data = self - .github - .download_release_asset(version, &d_artifact) - .await?; - - if ampd_data.is_empty() { - return Err(InstallError::EmptyBinary { - version: version.to_string(), - } - .into()); - } - - ui::detail!("Downloaded {} bytes for ampd", ampd_data.len()); - - // Download and install ampctl let ampctl_artifact = format!("ampctl-{}-{}", platform.as_str(), arch.as_str()); + ui::info!( - "Downloading {} for {}", + "Downloading {} ({}, {})", ui::version(version), + ampd_artifact, ampctl_artifact ); - let ampctl_data = self - .github - .download_release_asset(version, &ctl_artifact) + let tasks = vec![ + DownloadTask { + artifact_name: ampd_artifact, + dest_filename: "ampd".to_string(), + }, + DownloadTask { + artifact_name: ampctl_artifact, + dest_filename: "ampctl".to_string(), + }, + ]; + + let version_dir = self.version_manager.config().versions_dir.join(version); + + self.download_manager + .download_all(tasks, version, version_dir) .await?; - if ampctl_data.is_empty() { - return Err(InstallError::EmptyBinary { - version: version.to_string(), - } - .into()); - } - - ui::detail!("Downloaded {} bytes for ampctl", ampctl_data.len()); - - // Install both binaries - self.install_binaries(version, &d_data, &ctl_data)?; - - Ok(()) - } - - /// Install both ampd and ampctl binaries to the version directory - fn install_binaries(&self, version: &str, ampd_data: &[u8], ampctl_data: &[u8]) -> Result<()> { - let config = self.version_manager.config(); - - // Create version directory - let version_dir = config.versions_dir.join(version); - fs::create_dir_all(&version_dir).context("Failed to create version directory")?; - - // Install ampd - let ampd_path = version_dir.join("ampd"); - fs::write(&d_path, ampd_data).context("Failed to write ampd binary")?; - - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - let mut perms = fs::metadata(&d_path) - .context("Failed to get ampd metadata")? - .permissions(); - perms.set_mode(0o755); - fs::set_permissions(&d_path, perms) - .context("Failed to set executable permissions on ampd")?; - } - - // Install ampctl - let ampctl_path = version_dir.join("ampctl"); - fs::write(&ctl_path, ampctl_data).context("Failed to write ampctl binary")?; - - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - let mut perms = fs::metadata(&ctl_path) - .context("Failed to get ampctl metadata")? - .permissions(); - perms.set_mode(0o755); - fs::set_permissions(&ctl_path, perms) - .context("Failed to set executable permissions on ampctl")?; - } - - // Activate this version using the version manager + // Activation barrier: all downloads succeeded, now create symlinks self.version_manager.activate(version)?; Ok(()) diff --git a/ampup/src/lib.rs b/ampup/src/lib.rs index 8484923..5caaac0 100644 --- a/ampup/src/lib.rs +++ b/ampup/src/lib.rs @@ -1,6 +1,7 @@ pub mod builder; pub mod commands; pub mod config; +pub mod download_manager; pub mod github; pub mod install; pub mod platform; diff --git a/ampup/src/rate_limiter.rs b/ampup/src/rate_limiter.rs index 4d6cb15..d244e47 100644 --- a/ampup/src/rate_limiter.rs +++ b/ampup/src/rate_limiter.rs @@ -152,31 +152,8 @@ mod tests { //! following the project convention for modules with 10+ tests (see //! docs/code/test-files.md "Module Structure Within cfg(test)"). - use tokio::io::{AsyncReadExt, AsyncWriteExt}; - use super::*; - /// Spawn a one-shot TCP server that returns a raw HTTP response. - /// Accepts one connection, drains the request, writes `response_bytes`, then closes. - async fn mock_http_response(response_bytes: Vec) -> std::net::SocketAddr { - let listener = tokio::net::TcpListener::bind("127.0.0.1:0") - .await - .expect("should bind to a random port"); - let addr = listener.local_addr().expect("should have a local address"); - - tokio::spawn(async move { - let (mut stream, _) = listener.accept().await.expect("should accept a connection"); - let mut buf = [0u8; 1024]; - let _ = stream.read(&mut buf).await; - stream - .write_all(&response_bytes) - .await - .expect("should write response"); - }); - - addr - } - /// Tests for the blocking gate that callers use before making HTTP requests. mod wait_if_paused { use super::*; @@ -246,8 +223,7 @@ mod tests { let limiter = GitHubRateLimiter::new(false); { let mut state = limiter.inner.lock().await; - state.paused_until = - Some(Instant::now() + Duration::from_secs(60 + 1)); + state.paused_until = Some(Instant::now() + Duration::from_secs(60 + 1)); } //* When @@ -351,9 +327,32 @@ mod tests { /// End-to-end tests that send real HTTP through reqwest to verify that the /// header names (`X-RateLimit-Remaining`, `X-RateLimit-Reset`, `Retry-After`) /// are parsed correctly from actual HTTP responses. - mod update_from_response { + mod it_update_from_response { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use super::*; + /// Spawn a one-shot TCP server that returns a raw HTTP response. + /// Accepts one connection, drains the request, writes `response_bytes`, then closes. + async fn mock_http_response(response_bytes: Vec) -> std::net::SocketAddr { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("should bind to a random port"); + let addr = listener.local_addr().expect("should have a local address"); + + tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.expect("should accept a connection"); + let mut buf = [0u8; 1024]; + let _ = stream.read(&mut buf).await; + stream + .write_all(&response_bytes) + .await + .expect("should write response"); + }); + + addr + } + #[tokio::test] async fn with_ok_status_parses_remaining_header() { //* Given diff --git a/ampup/src/token.rs b/ampup/src/token.rs index 44763c4..c0e32e0 100644 --- a/ampup/src/token.rs +++ b/ampup/src/token.rs @@ -9,8 +9,12 @@ use std::process::Command; /// Note: `--github-token` values may be visible in process listings (`ps aux`). /// Prefer `GITHUB_TOKEN` env var or `gh auth token` for sensitive environments. pub fn resolve_github_token(explicit: Option) -> Option { - if explicit.is_some() { - return explicit; + // Filter out empty/whitespace-only tokens so they fall through to the + // gh CLI fallback instead of sending a useless `Bearer ` header. + if let Some(token) = explicit + && !token.trim().is_empty() + { + return Some(token); } try_gh_auth_token() @@ -63,13 +67,34 @@ mod tests { } #[test] - fn resolve_github_token_with_none_exercises_fallback_without_panicking() { - //* Given — no explicit token provided + fn resolve_github_token_with_empty_string_falls_through_to_fallback() { + //* Given + let explicit = Some("".to_string()); + + //* When — empty token should be treated as absent, not as a valid credential + let result = resolve_github_token(explicit); - //* When — the result depends on whether `gh` is installed and - //* authenticated, so we only assert it doesn't panic. - let _result = resolve_github_token(None); + //* Then — result depends on gh CLI availability, but must NOT be Some("") + assert_ne!( + result, + Some("".to_string()), + "should not return an empty string as a valid token" + ); + } - //* Then — reaching this point means the fallback chain completed + #[test] + fn resolve_github_token_with_whitespace_only_falls_through_to_fallback() { + //* Given + let explicit = Some(" ".to_string()); + + //* When + let result = resolve_github_token(explicit); + + //* Then + assert_ne!( + result, + Some(" ".to_string()), + "should not return a whitespace-only string as a valid token" + ); } } diff --git a/ampup/src/updater.rs b/ampup/src/updater.rs index cdf48b4..5b1e86b 100644 --- a/ampup/src/updater.rs +++ b/ampup/src/updater.rs @@ -40,7 +40,7 @@ impl Updater { let binary_data = self .github - .download_release_asset(version, &artifact_name) + .download_release_asset(version, &artifact_name, true) .await .context("Failed to download ampup binary")?; diff --git a/docs/features/app-ampup.md b/docs/features/app-ampup.md index fc67bcb..634caa3 100644 --- a/docs/features/app-ampup.md +++ b/docs/features/app-ampup.md @@ -50,6 +50,12 @@ ampup install v0.1.0 # Install with custom directory ampup install --install-dir ~/.custom/amp v0.2.0 + +# Install with parallel downloads (default: 4 concurrent) +ampup install -j 8 + +# Install sequentially (useful for debugging) +ampup install -j 1 ``` ### List Installed Versions @@ -113,6 +119,9 @@ Clones the repository (or uses local path), runs `cargo build --release`, and in # Update ampd/ampctl to latest release ampup update +# Update with parallel downloads +ampup update -j 8 + # Equivalent to: ampup install ``` @@ -162,12 +171,16 @@ The self-update performs atomic in-place replacement of the running executable. ### Installation Flow -1. User runs `ampup install [version]` -2. Detect platform (Linux/Darwin) and architecture (x86_64/aarch64) -3. Query GitHub API for release (latest or specific tag) -4. Download artifact: `ampd-{platform}-{arch}` -5. Extract to `~/.amp/versions//` -6. Activate version (create symlinks) +1. User runs `ampup install [version] [-j N]` +2. Resolve GitHub token (explicit `--github-token` → `gh auth token` → unauthenticated) +3. Detect platform (Linux/Darwin) and architecture (x86_64/aarch64) +4. Query GitHub API for release (latest or specific tag) +5. Download artifacts concurrently (bounded by `-j`, default 4): `ampd-{platform}-{arch}`, `ampctl-{platform}-{arch}` + - Downloads write to a staging directory (sibling of version dir for atomic rename) + - Each download is verified (non-empty) and retried once on failure + - If any download fails, in-flight downloads are cancelled and the staging directory is cleaned up +6. Atomically move staging directory to `~/.amp/versions//` +7. Activate version (create symlinks) — only after all downloads succeed ### Build Flow @@ -227,3 +240,15 @@ All commands accept `--install-dir` to override the default installation directo ampup --install-dir /opt/amp install v0.1.0 ampup --install-dir /opt/amp list ``` + +The `install` and `update` commands accept `--jobs` / `-j` to control download concurrency: + +```bash +# Use 8 concurrent downloads +ampup install -j 8 + +# Sequential mode (useful for debugging) +ampup install -j 1 +``` + +Default concurrency is 4. Setting `-j 1` disables parallelism.