diff --git a/Cargo.toml b/Cargo.toml index 017faf03..51ace78e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ [workspace.package] authors = ["Thomas Eizinger", "Artem Medvedev ", "Mervyn McCreight"] edition = "2021" -keywords = ["docker", "testcontainers"] +keywords = ["docker", "testcontainers", "docker-compose"] license = "MIT OR Apache-2.0" readme = "README.md" repository = "https://github.com/testcontainers/testcontainers-rs" diff --git a/testcontainers/Cargo.toml b/testcontainers/Cargo.toml index 6e7867c6..ef4dba3b 100644 --- a/testcontainers/Cargo.toml +++ b/testcontainers/Cargo.toml @@ -25,6 +25,7 @@ docker_credential = "1.3.1" either = "1.12.0" etcetera = "0.8.0" futures = "0.3" +itertools = "0.14" log = "0.4" memchr = "2.7.2" parse-display = "0.9.0" @@ -42,6 +43,7 @@ tokio-tar = "0.3.1" tokio-util = { version = "0.7.10", features = ["io"] } ulid = { version = "1.1.3", optional = true } url = { version = "2", features = ["serde"] } +uuid = { version = "1.8.0", features = ["v4"] } [features] default = [] diff --git a/testcontainers/src/compose/client.rs b/testcontainers/src/compose/client.rs new file mode 100644 index 00000000..330e083c --- /dev/null +++ b/testcontainers/src/compose/client.rs @@ -0,0 +1,64 @@ +use std::{fmt, path::PathBuf}; + +use crate::core::async_container::raw::RawContainer; + +pub(super) mod containerised; +pub(super) mod local; + +pub(super) enum ComposeClient { + Local(local::LocalComposeCli), + Containerised(containerised::ContainerisedComposeCli), +} + +impl ComposeClient { + pub(super) fn new_local(compose_files: Vec) -> Self { + ComposeClient::Local(local::LocalComposeCli::new(compose_files)) + } + + pub(super) async fn new_containerised(compose_files: Vec) -> Self { + ComposeClient::Containerised( + containerised::ContainerisedComposeCli::new(compose_files).await, + ) + } +} + +pub(super) struct UpCommand { + pub(super) project_name: String, + pub(super) wait_timeout: std::time::Duration, +} + +pub(super) struct DownCommand { + pub(super) project_name: String, + pub(super) rmi: bool, + pub(super) volumes: bool, +} + +pub(super) trait ComposeInterface { + async fn up(&self, command: UpCommand) -> Result<(), std::io::Error>; + async fn down(&self, command: DownCommand) -> Result<(), std::io::Error>; +} + +impl ComposeInterface for ComposeClient { + async fn up(&self, command: UpCommand) -> Result<(), std::io::Error> { + match self { + ComposeClient::Local(client) => client.up(command).await, + ComposeClient::Containerised(client) => client.up(command).await, + } + } + + async fn down(&self, command: DownCommand) -> Result<(), std::io::Error> { + match self { + ComposeClient::Local(client) => client.down(command).await, + ComposeClient::Containerised(client) => client.down(command).await, + } + } +} + +impl fmt::Debug for ComposeClient { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ComposeClient::Local(_) => write!(f, "LocalComposeCli"), + ComposeClient::Containerised(_) => write!(f, "ContainerisedComposeCli"), + } + } +} diff --git a/testcontainers/src/compose/client/containerised.rs b/testcontainers/src/compose/client/containerised.rs new file mode 100644 index 00000000..7e1dd7ef --- /dev/null +++ b/testcontainers/src/compose/client/containerised.rs @@ -0,0 +1,91 @@ +use std::{io::Error, path::PathBuf}; + +use crate::{ + compose::client::{ComposeInterface, DownCommand, UpCommand}, + core::{CmdWaitFor, ExecCommand, Mount}, + images::docker_cli::DockerCli, + runners::AsyncRunner, + ContainerAsync, ContainerRequest, ImageExt, +}; + +pub(crate) struct ContainerisedComposeCli { + container: ContainerAsync, + compose_files_in_container: Vec, +} + +impl ContainerisedComposeCli { + pub(super) async fn new(compose_files: Vec) -> Self { + let mut image = ContainerRequest::from(DockerCli::new("/var/run/docker.sock")); + + let compose_files_in_container: Vec = compose_files + .iter() + .enumerate() + .map(|(i, _)| format!("/docker-compose-{i}.yml")) + .collect(); + let mounts: Vec<_> = compose_files + .iter() + .zip(compose_files_in_container.iter()) + .map(|(path, file_name)| Mount::bind_mount(path.to_str().unwrap(), file_name)) + .collect(); + + for mount in mounts { + image = image.with_mount(mount); + } + + let container = image.start().await.expect("TODO: Handle error"); + + Self { + container, + compose_files_in_container, + } + } +} + +impl ComposeInterface for ContainerisedComposeCli { + async fn up(&self, command: UpCommand) -> Result<(), Error> { + let mut cmd = vec![ + "docker".to_string(), + "compose".to_string(), + "--project-name".to_string(), + command.project_name.clone(), + ]; + + for file in &self.compose_files_in_container { + cmd.push("-f".to_string()); + cmd.push(file.to_string()); + } + + cmd.push("up".to_string()); + cmd.push("--wait".to_string()); + // add timeout + cmd.push("--wait-timeout".to_string()); + cmd.push(command.wait_timeout.as_secs().to_string()); + + let exec = ExecCommand::new(cmd); + // todo: error handling + self.container.exec(exec).await.map_err(Error::other)?; + + Ok(()) + } + + async fn down(&self, command: DownCommand) -> Result<(), Error> { + let mut cmd = vec![ + "docker".to_string(), + "compose".to_string(), + "--project-name".to_string(), + command.project_name.clone(), + "down".to_string(), + ]; + + if command.volumes { + cmd.push("--volumes".to_string()); + } + if command.rmi { + cmd.push("--rmi".to_string()); + } + + let exec = ExecCommand::new(cmd).with_cmd_ready_condition(CmdWaitFor::exit_code(0)); + self.container.exec(exec).await.map_err(Error::other)?; + Ok(()) + } +} diff --git a/testcontainers/src/compose/client/local.rs b/testcontainers/src/compose/client/local.rs new file mode 100644 index 00000000..e8bcb185 --- /dev/null +++ b/testcontainers/src/compose/client/local.rs @@ -0,0 +1,74 @@ +use std::{ + io::Error, + path::{Path, PathBuf}, +}; + +use crate::compose::client::{ComposeInterface, DownCommand, UpCommand}; + +#[derive(Debug)] +pub(crate) struct LocalComposeCli { + compose_files: Vec, + working_dir: PathBuf, +} + +impl LocalComposeCli { + pub(super) fn new(compose_files: Vec) -> Self { + let working_dir = Self::extract_current_dir(&compose_files).to_path_buf(); + + Self { + compose_files, + working_dir, + } + } + + fn extract_current_dir(compose_files: &[PathBuf]) -> &Path { + // TODO: error handling + compose_files + .first() + .expect("At least one compose file is required") + .parent() + .expect("Compose file path must be absolute") + } +} + +impl ComposeInterface for LocalComposeCli { + async fn up(&self, command: UpCommand) -> Result<(), Error> { + let mut cmd = tokio::process::Command::new("docker"); + cmd.current_dir(self.working_dir.as_path()) + .arg("compose") + .arg("--project-name") + .arg(&command.project_name); + + for compose_file in &self.compose_files { + cmd.arg("-f").arg(compose_file); + } + cmd.arg("up") + .arg("--wait") + .arg("--wait-timeout") + .arg(command.wait_timeout.as_secs().to_string()); + + cmd.output().await?; + + Ok(()) + } + + async fn down(&self, command: DownCommand) -> Result<(), Error> { + let mut cmd = tokio::process::Command::new("docker"); + cmd.current_dir(self.working_dir.as_path()) + .arg("compose") + .arg("--project-name") + .arg(&command.project_name) + .arg("down"); + + if command.volumes { + cmd.arg("--volumes"); + } + if command.rmi { + cmd.arg("--rmi"); + } + + cmd.output().await?; + + Ok(()) + } +} diff --git a/testcontainers/src/compose/mod.rs b/testcontainers/src/compose/mod.rs new file mode 100644 index 00000000..cd915ce1 --- /dev/null +++ b/testcontainers/src/compose/mod.rs @@ -0,0 +1,132 @@ +use std::{path::Path, sync::Arc}; + +use crate::{compose::client::ComposeInterface, core::async_drop}; + +mod client; + +#[derive(Debug)] +pub struct DockerCompose { + project_name: String, + client: Arc, + remove_volumes: bool, + remove_images: bool, +} + +impl DockerCompose { + /// Create a new docker compose with a local client (using docker-cli installed locally) + /// If you don't have docker-cli installed, you can use `with_containerised_client` instead + pub fn with_local_client(compose_files: &[impl AsRef]) -> Self { + let compose_files = compose_files + .iter() + .map(|p| p.as_ref().to_path_buf()) + .collect(); + + let client = Arc::new(client::ComposeClient::new_local(compose_files)); + + Self::new(client) + } + + /// Create a new docker compose with a containerised client (doesn't require docker-cli installed locally) + pub async fn with_containerised_client(compose_files: &[impl AsRef]) -> Self { + let compose_files = compose_files + .iter() + .map(|p| p.as_ref().to_path_buf()) + .collect(); + + let client = Arc::new(client::ComposeClient::new_containerised(compose_files).await); + + Self::new(client) + } + + /// Start the docker compose + pub async fn up(&self) { + self.client + .up(client::UpCommand { + project_name: self.project_name.clone(), + wait_timeout: std::time::Duration::from_secs(60), + }) + .await + .expect("TODO: error handling"); + } + + /// Remove volumes when dropping the docker compose or not + pub fn with_remove_volumes(&mut self, remove_volumes: bool) -> &mut Self { + self.remove_volumes = remove_volumes; + self + } + + /// Remove images when dropping the docker compose or not + pub fn with_remove_images(&mut self, remove_images: bool) -> &mut Self { + self.remove_images = remove_images; + self + } + + fn new(client: Arc) -> Self { + let project_name = uuid::Uuid::new_v4().to_string(); + + Self { + project_name, + client, + remove_volumes: true, + remove_images: false, + } + } +} + +impl Drop for DockerCompose { + fn drop(&mut self) { + let project_name = self.project_name.clone(); + let client = self.client.clone(); + let rmi = self.remove_images; + let volumes = self.remove_volumes; + let drop_task = async move { + let res = client + .down(client::DownCommand { + project_name, + rmi, + volumes, + }) + .await; + + match res { + Ok(()) => log::info!("docker compose successfully dropped"), + Err(e) => log::error!("failed to drop docker compose: {}", e), + } + }; + + async_drop::async_drop(drop_task); + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use super::*; + + // #[tokio::test] + // async fn test_containerised_docker_compose() { + // let path_to_compose = PathBuf::from(format!( + // "{}/tests/test-compose.yml", + // env!("CARGO_MANIFEST_DIR") + // )); + // let docker_compose = + // DockerCompose::with_containerised_client(&[path_to_compose.as_path()]).await; + // docker_compose.up().await; + // tokio::time::sleep(std::time::Duration::from_secs(1)).await; + // let res = reqwest::get("http://localhost:8081/").await.unwrap(); + // assert!(res.status().is_success()); + // } + + #[tokio::test] + async fn test_local_docker_compose() { + let path_to_compose = PathBuf::from(format!( + "{}/tests/test-compose.yml", + env!("CARGO_MANIFEST_DIR") + )); + let docker_compose = DockerCompose::with_local_client(&[path_to_compose.as_path()]); + docker_compose.up().await; + let client = reqwest::get("http://localhost:8081").await.unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(15)).await; + } +} diff --git a/testcontainers/src/core/containers/async_container.rs b/testcontainers/src/core/containers/async_container.rs index f618d74a..91535397 100644 --- a/testcontainers/src/core/containers/async_container.rs +++ b/testcontainers/src/core/containers/async_container.rs @@ -1,23 +1,14 @@ -use std::{fmt, net::IpAddr, pin::Pin, str::FromStr, sync::Arc, time::Duration}; +use std::{fmt, ops::Deref, sync::Arc}; -use tokio::io::{AsyncBufRead, AsyncReadExt}; use tokio_stream::StreamExt; use crate::{ - core::{ - async_drop, - client::Client, - env, - error::{ContainerMissingInfo, ExecError, Result, TestcontainersError}, - network::Network, - ports::Ports, - wait::WaitStrategy, - CmdWaitFor, ContainerPort, ContainerState, ExecCommand, WaitFor, - }, + core::{async_drop, client::Client, env, error::Result, network::Network, ContainerState}, ContainerRequest, Image, }; pub(super) mod exec; +pub(crate) mod raw; /// Represents a running docker container that has been started using an async client. /// @@ -36,10 +27,8 @@ pub(super) mod exec; /// /// [drop_impl]: struct.ContainerAsync.html#impl-Drop pub struct ContainerAsync { - id: String, + pub(super) raw: raw::RawContainer, image: ContainerRequest, - pub(super) docker_client: Arc, - #[allow(dead_code)] network: Option>, dropped: bool, #[cfg(feature = "reusable-containers")] @@ -79,9 +68,8 @@ where let log_consumers = std::mem::take(&mut container_req.log_consumers); let container = ContainerAsync { - id, + raw: raw::RawContainer::new(id, docker_client), image: container_req, - docker_client, network, dropped: false, #[cfg(feature = "reusable-containers")] @@ -89,8 +77,8 @@ where }; if !log_consumers.is_empty() { - let mut logs = container.docker_client.logs(&container.id, true); - let container_id = container.id.clone(); + let mut logs = container.docker_client().logs(container.id(), true); + let container_id = container.id().to_string(); tokio::spawn(async move { while let Some(result) = logs.next().await { match result { @@ -113,11 +101,6 @@ where container } - /// Returns the id of this container. - pub fn id(&self) -> &str { - &self.id - } - /// Returns a reference to the [`Image`] of this container. /// /// [`Image`]: trait.Image.html @@ -125,233 +108,28 @@ where self.image.image() } - pub async fn ports(&self) -> Result { - self.docker_client.ports(&self.id).await.map_err(Into::into) - } - - /// Returns the mapped host port for an internal port of this docker container, on the host's - /// IPv4 interfaces. - /// - /// By default, `u16` is considered as TCP port. Also, you can convert `u16` to [`ContainerPort`] port - /// by using [`crate::core::IntoContainerPort`] trait. - /// - /// This method does **not** magically expose the given port, it simply performs a mapping on - /// the already exposed ports. If a docker container does not expose a port, this method will return an error. - pub async fn get_host_port_ipv4(&self, internal_port: impl Into) -> Result { - let internal_port = internal_port.into(); - self.ports() - .await? - .map_to_host_port_ipv4(internal_port) - .ok_or_else(|| TestcontainersError::PortNotExposed { - id: self.id.clone(), - port: internal_port, - }) - } - - /// Returns the mapped host port for an internal port of this docker container, on the host's - /// IPv6 interfaces. - /// - /// By default, `u16` is considered as TCP port. Also, you can convert `u16` to [`ContainerPort`] port - /// by using [`crate::core::IntoContainerPort`] trait. - /// - /// This method does **not** magically expose the given port, it simply performs a mapping on - /// the already exposed ports. If a docker container does not expose a port, this method will return an error. - pub async fn get_host_port_ipv6(&self, internal_port: impl Into) -> Result { - let internal_port = internal_port.into(); - self.ports() - .await? - .map_to_host_port_ipv6(internal_port) - .ok_or_else(|| TestcontainersError::PortNotExposed { - id: self.id.clone(), - port: internal_port, - }) - } - - /// Returns the bridge ip address of docker container as specified in NetworkSettings.Networks.IPAddress - pub async fn get_bridge_ip_address(&self) -> Result { - let container_id = &self.id; - let container_settings = self.docker_client.inspect(container_id).await?; - - let host_config = container_settings - .host_config - .ok_or_else(|| ContainerMissingInfo::new(container_id, "HostConfig"))?; - - let network_mode = host_config - .network_mode - .ok_or_else(|| ContainerMissingInfo::new(container_id, "HostConfig.NetworkMode"))?; - - let network_settings = self.docker_client.inspect_network(&network_mode).await?; - - network_settings.driver.ok_or_else(|| { - TestcontainersError::other(format!("network {} is not in bridge mode", network_mode)) - })?; - - let container_network_settings = container_settings - .network_settings - .ok_or_else(|| ContainerMissingInfo::new(container_id, "NetworkSettings"))?; - - let mut networks = container_network_settings - .networks - .ok_or_else(|| ContainerMissingInfo::new(container_id, "NetworkSettings.Networks"))?; - - let ip = networks - .remove(&network_mode) - .and_then(|network| network.ip_address) - .ok_or_else(|| { - ContainerMissingInfo::new(container_id, "NetworkSettings.Networks.IpAddress") - })?; - - IpAddr::from_str(&ip).map_err(TestcontainersError::other) - } - - /// Returns the host that this container may be reached on (may not be the local machine) - /// Suitable for use in URL - pub async fn get_host(&self) -> Result { - self.docker_client - .docker_hostname() - .await - .map_err(Into::into) - } - - /// Executes a command in the container. - pub async fn exec(&self, cmd: ExecCommand) -> Result { - let ExecCommand { - cmd, - container_ready_conditions, - cmd_ready_condition, - } = cmd; - - log::debug!("Executing command {:?}", cmd); - - let mut exec = self.docker_client.exec(&self.id, cmd).await?; - self.block_until_ready(container_ready_conditions).await?; - - match cmd_ready_condition { - CmdWaitFor::StdOutMessage { message } => { - exec.stdout() - .wait_for_message(&message, 1) - .await - .map_err(ExecError::from)?; - } - CmdWaitFor::StdErrMessage { message } => { - exec.stderr() - .wait_for_message(&message, 1) - .await - .map_err(ExecError::from)?; - } - CmdWaitFor::Exit { code } => { - let exec_id = exec.id().to_string(); - loop { - let inspect = self.docker_client.inspect_exec(&exec_id).await?; - - if let Some(actual) = inspect.exit_code { - if let Some(expected) = code { - if actual != expected { - Err(ExecError::ExitCodeMismatch { expected, actual })?; - } - } - break; - } else { - tokio::time::sleep(Duration::from_millis(500)).await; - } - } - } - CmdWaitFor::Duration { length } => { - tokio::time::sleep(length).await; - } - _ => {} - } - - Ok(exec::ExecResult { - client: self.docker_client.clone(), - id: exec.id, - stdout: exec.stdout.into_inner(), - stderr: exec.stderr.into_inner(), - }) - } - /// Starts the container. pub async fn start(&self) -> Result<()> { - self.docker_client.start(&self.id).await?; + self.raw.start().await?; let state = ContainerState::from_container(self).await?; for cmd in self.image.exec_after_start(state)? { - self.exec(cmd).await?; + self.raw.exec(cmd).await?; } Ok(()) } - /// Stops the container (not the same with `pause`). - pub async fn stop(&self) -> Result<()> { - log::debug!("Stopping docker container {}", self.id); - - self.docker_client.stop(&self.id).await?; - Ok(()) - } - /// Removes the container. pub async fn rm(mut self) -> Result<()> { - log::debug!("Deleting docker container {}", self.id); + log::debug!("Deleting docker container {}", self.id()); - self.docker_client.rm(&self.id).await?; + self.raw.docker_client().rm(self.id()).await?; #[cfg(feature = "watchdog")] - crate::watchdog::unregister(&self.id); + crate::watchdog::unregister(self.id()); self.dropped = true; Ok(()) } - - /// Returns an asynchronous reader for stdout. - /// - /// Accepts a boolean parameter to follow the logs: - /// - pass `true` to read logs from the moment the container starts until it stops (returns I/O error with kind [`UnexpectedEof`](std::io::ErrorKind::UnexpectedEof) if container removed). - /// - pass `false` to read logs from startup to present. - pub fn stdout(&self, follow: bool) -> Pin> { - let stdout = self.docker_client.stdout_logs(&self.id, follow); - Box::pin(tokio_util::io::StreamReader::new(stdout)) - } - - /// Returns an asynchronous reader for stderr. - /// - /// Accepts a boolean parameter to follow the logs: - /// - pass `true` to read logs from the moment the container starts until it stops (returns I/O error with [`UnexpectedEof`](std::io::ErrorKind::UnexpectedEof) if container removed). - /// - pass `false` to read logs from startup to present. - pub fn stderr(&self, follow: bool) -> Pin> { - let stderr = self.docker_client.stderr_logs(&self.id, follow); - Box::pin(tokio_util::io::StreamReader::new(stderr)) - } - - /// Returns stdout as a vector of bytes available at the moment of call (from container startup to present). - /// - /// If you want to read stdout in asynchronous manner, use [`ContainerAsync::stdout`] instead. - pub async fn stdout_to_vec(&self) -> Result> { - let mut stdout = Vec::new(); - self.stdout(false).read_to_end(&mut stdout).await?; - Ok(stdout) - } - - /// Returns stderr as a vector of bytes available at the moment of call (from container startup to present). - /// - /// If you want to read stderr in asynchronous manner, use [`ContainerAsync::stderr`] instead. - pub async fn stderr_to_vec(&self) -> Result> { - let mut stderr = Vec::new(); - self.stderr(false).read_to_end(&mut stderr).await?; - Ok(stderr) - } - - pub(crate) async fn block_until_ready(&self, ready_conditions: Vec) -> Result<()> { - log::debug!("Waiting for container {} to be ready", self.id); - let id = self.id(); - - for condition in ready_conditions { - condition - .wait_until_ready(&self.docker_client, self) - .await?; - } - - log::debug!("Container {id} is now ready!"); - Ok(()) - } } impl fmt::Debug for ContainerAsync @@ -361,9 +139,9 @@ where fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut repr = f.debug_struct("ContainerAsync"); - repr.field("id", &self.id) + repr.field("id", &self.id()) .field("image", &self.image) - .field("command", &self.docker_client.config.command()) + .field("command", &self.docker_client().config.command()) .field("network", &self.network) .field("dropped", &self.dropped); @@ -384,16 +162,19 @@ where use crate::ReuseDirective::{Always, CurrentSession}; if !self.dropped && matches!(self.reuse, Always | CurrentSession) { - log::debug!("Declining to reap container marked for reuse: {}", &self.id); + log::debug!( + "Declining to reap container marked for reuse: {}", + &self.id() + ); return; } } if !self.dropped { - let id = self.id.clone(); - let client = self.docker_client.clone(); - let command = self.docker_client.config.command(); + let id = self.id().to_string(); + let client = self.docker_client().clone(); + let command = self.docker_client().config.command(); let drop_task = async move { log::trace!("Drop was called for container {id}, cleaning up"); @@ -412,11 +193,18 @@ where }; async_drop::async_drop(drop_task); - // async_drop::block_on!(drop_task, "failed to remove container on drop"); } } } +impl Deref for ContainerAsync { + type Target = raw::RawContainer; + + fn deref(&self) -> &Self::Target { + &self.raw + } +} + #[cfg(test)] mod tests { use tokio::io::AsyncBufReadExt; diff --git a/testcontainers/src/core/containers/async_container/raw.rs b/testcontainers/src/core/containers/async_container/raw.rs new file mode 100644 index 00000000..818054a5 --- /dev/null +++ b/testcontainers/src/core/containers/async_container/raw.rs @@ -0,0 +1,258 @@ +use std::{fmt, net::IpAddr, pin::Pin, str::FromStr, sync::Arc, time::Duration}; + +use tokio::io::{AsyncBufRead, AsyncReadExt}; + +use super::{exec, Client}; +use crate::{ + core::{ + error::{ContainerMissingInfo, ExecError, Result}, + ports::Ports, + wait::WaitStrategy, + CmdWaitFor, ContainerPort, ExecCommand, WaitFor, + }, + TestcontainersError, +}; + +/// Represents a docker container without any additional functionality. +/// It basically wraps a docker container id and a docker client to expose some common functionality. +pub struct RawContainer { + id: String, + docker_client: Arc, +} + +impl RawContainer { + pub(crate) fn new(id: String, docker_client: Arc) -> Self { + Self { id, docker_client } + } + + pub(crate) fn docker_client(&self) -> &Arc { + &self.docker_client + } + + /// Returns the id of this container. + pub fn id(&self) -> &str { + &self.id + } + + pub async fn ports(&self) -> Result { + self.docker_client.ports(&self.id).await.map_err(Into::into) + } + + /// Returns the mapped host port for an internal port of this docker container, on the host's + /// IPv4 interfaces. + /// + /// By default, `u16` is considered as TCP port. Also, you can convert `u16` to [`ContainerPort`] port + /// by using [`crate::core::IntoContainerPort`] trait. + /// + /// This method does **not** magically expose the given port, it simply performs a mapping on + /// the already exposed ports. If a docker container does not expose a port, this method will return an error. + pub async fn get_host_port_ipv4(&self, internal_port: impl Into) -> Result { + let internal_port = internal_port.into(); + self.ports() + .await? + .map_to_host_port_ipv4(internal_port) + .ok_or_else(|| TestcontainersError::PortNotExposed { + id: self.id.clone(), + port: internal_port, + }) + } + + /// Returns the mapped host port for an internal port of this docker container, on the host's + /// IPv6 interfaces. + /// + /// By default, `u16` is considered as TCP port. Also, you can convert `u16` to [`ContainerPort`] port + /// by using [`crate::core::IntoContainerPort`] trait. + /// + /// This method does **not** magically expose the given port, it simply performs a mapping on + /// the already exposed ports. If a docker container does not expose a port, this method will return an error. + pub async fn get_host_port_ipv6(&self, internal_port: impl Into) -> Result { + let internal_port = internal_port.into(); + self.ports() + .await? + .map_to_host_port_ipv6(internal_port) + .ok_or_else(|| TestcontainersError::PortNotExposed { + id: self.id.clone(), + port: internal_port, + }) + } + + /// Returns the bridge ip address of docker container as specified in NetworkSettings.Networks.IPAddress + pub async fn get_bridge_ip_address(&self) -> Result { + let container_id = &self.id; + let container_settings = self.docker_client.inspect(container_id).await?; + + let host_config = container_settings + .host_config + .ok_or_else(|| ContainerMissingInfo::new(container_id, "HostConfig"))?; + + let network_mode = host_config + .network_mode + .ok_or_else(|| ContainerMissingInfo::new(container_id, "HostConfig.NetworkMode"))?; + + let network_settings = self.docker_client.inspect_network(&network_mode).await?; + + network_settings.driver.ok_or_else(|| { + TestcontainersError::other(format!("network {} is not in bridge mode", network_mode)) + })?; + + let container_network_settings = container_settings + .network_settings + .ok_or_else(|| ContainerMissingInfo::new(container_id, "NetworkSettings"))?; + + let mut networks = container_network_settings + .networks + .ok_or_else(|| ContainerMissingInfo::new(container_id, "NetworkSettings.Networks"))?; + + let ip = networks + .remove(&network_mode) + .and_then(|network| network.ip_address) + .ok_or_else(|| { + ContainerMissingInfo::new(container_id, "NetworkSettings.Networks.IpAddress") + })?; + + IpAddr::from_str(&ip).map_err(TestcontainersError::other) + } + + /// Returns the host that this container may be reached on (may not be the local machine) + /// Suitable for use in URL + pub async fn get_host(&self) -> Result { + self.docker_client + .docker_hostname() + .await + .map_err(Into::into) + } + + /// Executes a command in the container. + pub async fn exec(&self, cmd: ExecCommand) -> Result { + let ExecCommand { + cmd, + container_ready_conditions, + cmd_ready_condition, + } = cmd; + + log::debug!("Executing command {:?}", cmd); + + let mut exec = self.docker_client.exec(&self.id, cmd).await?; + self.block_until_ready(container_ready_conditions).await?; + + match cmd_ready_condition { + CmdWaitFor::StdOutMessage { message } => { + exec.stdout() + .wait_for_message(&message, 1) + .await + .map_err(ExecError::from)?; + } + CmdWaitFor::StdErrMessage { message } => { + exec.stderr() + .wait_for_message(&message, 1) + .await + .map_err(ExecError::from)?; + } + CmdWaitFor::Exit { code } => { + let exec_id = exec.id().to_string(); + loop { + let inspect = self.docker_client.inspect_exec(&exec_id).await?; + + if let Some(actual) = inspect.exit_code { + if let Some(expected) = code { + if actual != expected { + Err(ExecError::ExitCodeMismatch { expected, actual })?; + } + } + break; + } else { + tokio::time::sleep(Duration::from_millis(500)).await; + } + } + } + CmdWaitFor::Duration { length } => { + tokio::time::sleep(length).await; + } + _ => {} + } + + Ok(exec::ExecResult { + client: self.docker_client.clone(), + id: exec.id, + stdout: exec.stdout.into_inner(), + stderr: exec.stderr.into_inner(), + }) + } + + /// Starts the container. + pub async fn start(&self) -> Result<()> { + self.docker_client.start(&self.id).await?; + Ok(()) + } + + /// Stops the container (not the same with `pause`). + pub async fn stop(&self) -> Result<()> { + log::debug!("Stopping docker container {}", self.id); + + self.docker_client.stop(&self.id).await?; + Ok(()) + } + + /// Returns an asynchronous reader for stdout. + /// + /// Accepts a boolean parameter to follow the logs: + /// - pass `true` to read logs from the moment the container starts until it stops (returns I/O error with kind [`UnexpectedEof`](std::io::ErrorKind::UnexpectedEof) if container removed). + /// - pass `false` to read logs from startup to present. + pub fn stdout(&self, follow: bool) -> Pin> { + let stdout = self.docker_client.stdout_logs(&self.id, follow); + Box::pin(tokio_util::io::StreamReader::new(stdout)) + } + + /// Returns an asynchronous reader for stderr. + /// + /// Accepts a boolean parameter to follow the logs: + /// - pass `true` to read logs from the moment the container starts until it stops (returns I/O error with [`UnexpectedEof`](std::io::ErrorKind::UnexpectedEof) if container removed). + /// - pass `false` to read logs from startup to present. + pub fn stderr(&self, follow: bool) -> Pin> { + let stderr = self.docker_client.stderr_logs(&self.id, follow); + Box::pin(tokio_util::io::StreamReader::new(stderr)) + } + + /// Returns stdout as a vector of bytes available at the moment of call (from container startup to present). + /// + /// If you want to read stdout in asynchronous manner, use [`ContainerAsync::stdout`] instead. + pub async fn stdout_to_vec(&self) -> Result> { + let mut stdout = Vec::new(); + self.stdout(false).read_to_end(&mut stdout).await?; + Ok(stdout) + } + + /// Returns stderr as a vector of bytes available at the moment of call (from container startup to present). + /// + /// If you want to read stderr in asynchronous manner, use [`ContainerAsync::stderr`] instead. + pub async fn stderr_to_vec(&self) -> Result> { + let mut stderr = Vec::new(); + self.stderr(false).read_to_end(&mut stderr).await?; + Ok(stderr) + } + + pub(crate) async fn block_until_ready(&self, ready_conditions: Vec) -> Result<()> { + log::debug!("Waiting for container {} to be ready", self.id); + let id = self.id(); + + for condition in ready_conditions { + condition + .wait_until_ready(&self.docker_client, self) + .await?; + } + + log::debug!("Container {id} is now ready!"); + Ok(()) + } +} + +impl fmt::Debug for RawContainer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut repr = f.debug_struct("ContainerAsync"); + + repr.field("id", &self.id) + .field("command", &self.docker_client.config.command()); + + repr.finish() + } +} diff --git a/testcontainers/src/core/containers/sync_container.rs b/testcontainers/src/core/containers/sync_container.rs index 7b26e36f..9f4cdc77 100644 --- a/testcontainers/src/core/containers/sync_container.rs +++ b/testcontainers/src/core/containers/sync_container.rs @@ -41,7 +41,10 @@ where .field("id", &self.id()) .field("image", &self.image()) .field("ports", &self.ports()) - .field("command", &self.async_impl().docker_client.config.command()) + .field( + "command", + &self.async_impl().docker_client().config.command(), + ) .finish() } } @@ -202,7 +205,7 @@ impl Drop for Container { fn drop(&mut self) { if let Some(active) = self.inner.take() { active.runtime.block_on(async { - match active.async_impl.docker_client.config.command() { + match active.async_impl.docker_client().config.command() { env::Command::Remove => { if let Err(e) = active.async_impl.rm().await { log::error!("Failed to remove container on drop: {}", e); diff --git a/testcontainers/src/core/ports.rs b/testcontainers/src/core/ports.rs index 0a8e377c..c12fd8d5 100644 --- a/testcontainers/src/core/ports.rs +++ b/testcontainers/src/core/ports.rs @@ -75,6 +75,14 @@ impl Ports { pub fn map_to_host_port_ipv6(&self, container_port: impl Into) -> Option { self.ipv6_mapping.get(&container_port.into()).cloned() } + + pub(crate) fn ipv4_mapping(&self) -> &HashMap { + &self.ipv4_mapping + } + + pub(crate) fn ipv6_mapping(&self) -> &HashMap { + &self.ipv6_mapping + } } impl TryFrom for Ports { diff --git a/testcontainers/src/core/wait/exit_strategy.rs b/testcontainers/src/core/wait/exit_strategy.rs index eed8f3c8..fb6fea7a 100644 --- a/testcontainers/src/core/wait/exit_strategy.rs +++ b/testcontainers/src/core/wait/exit_strategy.rs @@ -1,9 +1,7 @@ use std::time::Duration; -use crate::{ - core::{client::Client, error::WaitContainerError, wait::WaitStrategy}, - ContainerAsync, Image, -}; +use super::RawContainer; +use crate::core::{client::Client, error::WaitContainerError, wait::WaitStrategy}; #[derive(Debug, Clone)] pub struct ExitWaitStrategy { @@ -34,10 +32,10 @@ impl ExitWaitStrategy { } impl WaitStrategy for ExitWaitStrategy { - async fn wait_until_ready( + async fn wait_until_ready( self, client: &Client, - container: &ContainerAsync, + container: &RawContainer, ) -> crate::core::error::Result<()> { loop { let container_state = client diff --git a/testcontainers/src/core/wait/health_strategy.rs b/testcontainers/src/core/wait/health_strategy.rs index 58c5f4ff..381600f7 100644 --- a/testcontainers/src/core/wait/health_strategy.rs +++ b/testcontainers/src/core/wait/health_strategy.rs @@ -2,10 +2,8 @@ use std::time::Duration; use bollard::models::HealthStatusEnum::*; -use crate::{ - core::{client::Client, error::WaitContainerError, wait::WaitStrategy}, - ContainerAsync, Image, -}; +use super::RawContainer; +use crate::core::{client::Client, error::WaitContainerError, wait::WaitStrategy}; #[derive(Debug, Clone)] pub struct HealthWaitStrategy { @@ -28,10 +26,10 @@ impl HealthWaitStrategy { } impl WaitStrategy for HealthWaitStrategy { - async fn wait_until_ready( + async fn wait_until_ready( self, client: &Client, - container: &ContainerAsync, + container: &RawContainer, ) -> crate::core::error::Result<()> { loop { let health_status = client diff --git a/testcontainers/src/core/wait/http_strategy.rs b/testcontainers/src/core/wait/http_strategy.rs index ea0715d7..5c20fd91 100644 --- a/testcontainers/src/core/wait/http_strategy.rs +++ b/testcontainers/src/core/wait/http_strategy.rs @@ -3,9 +3,10 @@ use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration}; use bytes::Bytes; use url::{Host, Url}; +use super::RawContainer; use crate::{ core::{client::Client, error::WaitContainerError, wait::WaitStrategy, ContainerPort}, - ContainerAsync, Image, TestcontainersError, + TestcontainersError, }; /// Error type for waiting for container readiness based on HTTP response. @@ -205,18 +206,27 @@ impl HttpWaitStrategy { } impl WaitStrategy for HttpWaitStrategy { - async fn wait_until_ready( + async fn wait_until_ready( self, _client: &Client, - container: &ContainerAsync, + container: &RawContainer, ) -> crate::core::error::Result<()> { let host = container.get_host().await?; - let container_port = self - .port - .or_else(|| container.image().expose_ports().first().copied()) - .ok_or(WaitContainerError::from( - HttpWaitError::NoExposedPortsForHttpWait, - ))?; + + let container_port = match self.port { + Some(port) => port, + None => { + let ports = container.ports().await?; + *ports + .ipv4_mapping() + .keys() + .next() + .or(ports.ipv6_mapping().keys().next()) + .ok_or(WaitContainerError::from( + HttpWaitError::NoExposedPortsForHttpWait, + ))? + } + }; let host_port = match host { Host::Domain(ref domain) => match container.get_host_port_ipv4(container_port).await { diff --git a/testcontainers/src/core/wait/log_strategy.rs b/testcontainers/src/core/wait/log_strategy.rs index 6347bdbf..2f5d298a 100644 --- a/testcontainers/src/core/wait/log_strategy.rs +++ b/testcontainers/src/core/wait/log_strategy.rs @@ -1,13 +1,11 @@ use bytes::Bytes; -use crate::{ - core::{ - client::Client, - error::WaitContainerError, - logs::{LogSource, WaitingStreamWrapper}, - wait::WaitStrategy, - }, - ContainerAsync, Image, +use super::RawContainer; +use crate::core::{ + client::Client, + error::WaitContainerError, + logs::{LogSource, WaitingStreamWrapper}, + wait::WaitStrategy, }; #[derive(Debug, Clone)] @@ -48,10 +46,10 @@ impl LogWaitStrategy { } impl WaitStrategy for LogWaitStrategy { - async fn wait_until_ready( + async fn wait_until_ready( self, client: &Client, - container: &ContainerAsync, + container: &RawContainer, ) -> crate::core::error::Result<()> { let log_stream = match self.source { LogSource::StdOut => client.stdout_logs(container.id(), true), diff --git a/testcontainers/src/core/wait/mod.rs b/testcontainers/src/core/wait/mod.rs index afb258d7..67cd6601 100644 --- a/testcontainers/src/core/wait/mod.rs +++ b/testcontainers/src/core/wait/mod.rs @@ -7,10 +7,7 @@ pub use health_strategy::HealthWaitStrategy; pub use http_strategy::HttpWaitStrategy; pub use log_strategy::LogWaitStrategy; -use crate::{ - core::{client::Client, logs::LogSource}, - ContainerAsync, Image, -}; +use crate::core::{async_container::raw::RawContainer, client::Client, logs::LogSource}; pub(crate) mod cmd_wait; pub(crate) mod exit_strategy; @@ -20,10 +17,10 @@ pub(crate) mod http_strategy; pub(crate) mod log_strategy; pub(crate) trait WaitStrategy { - async fn wait_until_ready( + async fn wait_until_ready( self, client: &Client, - container: &ContainerAsync, + container: &RawContainer, ) -> crate::core::error::Result<()>; } @@ -126,10 +123,10 @@ impl From for WaitFor { } impl WaitStrategy for WaitFor { - async fn wait_until_ready( + async fn wait_until_ready( self, client: &Client, - container: &ContainerAsync, + container: &RawContainer, ) -> crate::core::error::Result<()> { match self { WaitFor::Log(strategy) => strategy.wait_until_ready(client, container).await?, diff --git a/testcontainers/src/images/docker_cli.rs b/testcontainers/src/images/docker_cli.rs new file mode 100644 index 00000000..209827de --- /dev/null +++ b/testcontainers/src/images/docker_cli.rs @@ -0,0 +1,56 @@ +use std::{borrow::Cow, collections::BTreeMap}; + +use crate::{ + core::{mounts::Mount, WaitFor}, + Image, +}; + +const IMAGE_NAME: &str = "docker"; +const IMAGE_TAG: &str = "26.1-cli"; + +pub(crate) struct DockerCli { + envs: BTreeMap, + mounts: Vec, +} + +impl DockerCli { + pub(crate) fn new(docker_socket: &str) -> Self { + let mounts = vec![Mount::bind_mount(docker_socket, "/docker.sock")]; + let envs = BTreeMap::from([("DOCKER_HOST".to_string(), "unix:///docker.sock".to_string())]); + + DockerCli { envs, mounts } + } +} + +impl Image for DockerCli { + fn name(&self) -> &str { + IMAGE_NAME + } + + fn tag(&self) -> &str { + IMAGE_TAG + } + + fn ready_conditions(&self) -> Vec { + vec![] + } + + fn env_vars( + &self, + ) -> impl IntoIterator>, impl Into>)> { + Box::new(self.envs.iter()) + } + + fn mounts(&self) -> impl IntoIterator { + Box::new(self.mounts.iter()) + } + + fn entrypoint(&self) -> Option<&str> { + Some("/bin/sh") + } + + fn cmd(&self) -> impl IntoIterator>> { + // keep container alive until dropped + ["-c", "sleep infinity"] + } +} diff --git a/testcontainers/src/images/mod.rs b/testcontainers/src/images/mod.rs index 0add7dbc..4130bbbd 100644 --- a/testcontainers/src/images/mod.rs +++ b/testcontainers/src/images/mod.rs @@ -1 +1,3 @@ +pub(crate) mod docker_cli; pub mod generic; +pub(crate) mod socat; diff --git a/testcontainers/src/images/socat.rs b/testcontainers/src/images/socat.rs new file mode 100644 index 00000000..6686b3d3 --- /dev/null +++ b/testcontainers/src/images/socat.rs @@ -0,0 +1,55 @@ +// Socat can be useful if we would want to allow exposing not exposed ports of a container. +// But for now it's unused, so dead code is allowed. +#![allow(dead_code)] + +use std::{borrow::Cow, collections::HashMap}; + +use itertools::Itertools; + +use crate::{core::WaitFor, Image}; + +/// A socat container is used as a TCP proxy, enabling any port of another container to be exposed +/// publicly, even if that container does not make the port public itself. +pub(crate) struct Socat { + targets: HashMap, +} + +impl Socat { + pub(crate) fn new() -> Self { + Self { + targets: HashMap::new(), + } + } + + pub(crate) fn add_target(&mut self, port: u16, value: String) { + self.targets.insert(port, value); + } +} + +impl Image for Socat { + fn name(&self) -> &str { + "alpine/socat" + } + + fn tag(&self) -> &str { + "1.8.0.1" + } + + fn ready_conditions(&self) -> Vec { + vec![WaitFor::healthcheck()] + } + + fn entrypoint(&self) -> Option<&str> { + Some("/bin/sh") + } + + fn cmd(&self) -> impl IntoIterator>> { + [ + "-c".to_string(), + self.targets + .iter() + .map(|(port, value)| format!("socat TCP-LISTEN:{port},fork,reuseaddr TCP:{value}")) + .join(" & "), + ] + } +} diff --git a/testcontainers/src/lib.rs b/testcontainers/src/lib.rs index 978ffdb2..6927c3ec 100644 --- a/testcontainers/src/lib.rs +++ b/testcontainers/src/lib.rs @@ -94,6 +94,7 @@ pub(crate) mod watchdog; mod images; pub use images::generic::GenericImage; +pub mod compose; pub mod runners; /// Re-export of the `bollard` crate to allow direct interaction with the Docker API. diff --git a/testcontainers/src/runners/async_runner.rs b/testcontainers/src/runners/async_runner.rs index 0c81030e..1d45dbdd 100644 --- a/testcontainers/src/runners/async_runner.rs +++ b/testcontainers/src/runners/async_runner.rs @@ -289,8 +289,7 @@ where res => res, }?; - let copy_to_sources: Vec<&CopyToContainer> = - container_req.copy_to_sources().map(Into::into).collect(); + let copy_to_sources: Vec<&CopyToContainer> = container_req.copy_to_sources().collect(); for copy_to_source in copy_to_sources { client diff --git a/testcontainers/tests/test-compose.yml b/testcontainers/tests/test-compose.yml new file mode 100644 index 00000000..8097437c --- /dev/null +++ b/testcontainers/tests/test-compose.yml @@ -0,0 +1,9 @@ +services: + redis: + image: redis + ports: + - "6383:6379" + web1: + image: simple_web_server + ports: + - "8081:80" \ No newline at end of file