Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion core/testcontainers/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import platform
import subprocess
import sys
from collections.abc import Generator
from contextlib import AbstractContextManager, ExitStack, contextmanager
from pathlib import Path
from typing import Any, Final, Optional
from typing import Any, Final, Optional, TypeVar

LINUX = "linux"
MAC = "mac"
Expand Down Expand Up @@ -98,3 +100,57 @@ def get_running_in_container_id() -> Optional[str]:
if path.startswith("/docker"):
return path.removeprefix("/docker/")
return None


T = TypeVar("T")


@contextmanager
def run_containers(*containers: AbstractContextManager[T]) -> Generator[tuple[T, ...], None, None]:
"""
Context manager that runs multiple container instances and ensures proper cleanup.

Each container is started in the order provided and yields control once all containers
are running. When the context exits, containers are stopped in reverse order (LIFO).

This is particularly useful for integration tests or resource setups where multiple
containers need to run together and be reliably cleaned up afterward.

Parameters
----------
*containers : iterable
One or more container instancesto.

Yields
------
list
A list of the started container instances, in the same order they were provided.

Examples
--------
>>> import sqlalchemy.engine
>>> from testcontainers.core.network import Network
>>> from testcontainers.core.utils import run_containers
>>> from testcontainers.postgres import PostgresContainer
>>>
>>> network = Network()
>>> network.create()
>>>
>>> with run_containers(
... PostgresContainer(network=network),
... PostgresContainer(image='postgres:16', network=network),
... ) as containers:
... c1, c2 = containers
... conn1 = sqlalchemy.engine.create_engine(c1.get_connection_url()).connect()
... conn2 = sqlalchemy.engine.create_engine(c2.get_connection_url()).connect()
...
... result1 = conn1.execute(sqlalchemy.text("select version()")).fetchone()
... result2 = conn2.execute(sqlalchemy.text("select version()")).fetchone()
...
... print(result1, result2, sep='\\n')
...
>>> # The network is removed only after all containers have been stopped.
>>> network.remove()
"""
with ExitStack() as stack:
yield tuple(stack.enter_context(container) for container in containers)
20 changes: 20 additions & 0 deletions core/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
from pytest import MonkeyPatch, raises, mark

from testcontainers.core import utils
from testcontainers.core.container import DockerContainer
from testcontainers.core.utils import run_containers

import docker


def test_setup_logger() -> None:
Expand Down Expand Up @@ -76,3 +80,19 @@ def test_get_running_container_id(fake_cgroup: Path) -> None:
container_id = "b78eebb08f89158ed6e2ed2fe"
fake_cgroup.write_text(f"13:cpuset:/docker/{container_id}")
assert utils.get_running_in_container_id() == container_id


def test_run_container():
client = docker.from_env()
running_containers = len(client.containers.list())

with run_containers(DockerContainer("hello-world"), DockerContainer("hello-world")) as containers:
assert len(client.containers.list()) == running_containers + 2

for container in containers:
stdout, stderr = container.get_logs()
assert isinstance(stdout, bytes)
assert isinstance(stderr, bytes)
assert "Hello from Docker".encode() in stdout, "There should be something on stdout"

assert len(client.containers.list()) == running_containers + 1 # account for ryuk