diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 74546f0b..514b0b36 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -33,3 +33,4 @@ repos: - execnet>=2.1.0 - types-psutil - setproctitle + - filelock>=3.13.1 diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 2f29201a..915a3a30 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,10 @@ +pytest-xdist 3.ZZZ.ZZZ (2024-zz-zz) +=================================== + +Features +-------- +- `#1126 `_: New ``isoscope`` scheduler. + pytest-xdist 3.6.1 (2024-04-28) =============================== diff --git a/docs/distribution.rst b/docs/distribution.rst index ebbfa088..b00b44a0 100644 --- a/docs/distribution.rst +++ b/docs/distribution.rst @@ -49,6 +49,19 @@ The test distribution algorithm is configured with the ``--dist`` command-line o .. _distribution modes: +* ``--dist isoscope``: Scope Isolation Scheduler. Tests are grouped by module for + test functions and by class for test methods. Tests are executed one group at a + time, distributed across available workers. This groupwise isolation guarantees + that all tests in one group complete execution before running another group of + tests. This can be useful when module-level or class-level fixtures of one group + could create undesirable side-effects for tests in other test groups, while + taking advantage of distributed execution of tests within each group. Grouping + by class takes priority over grouping by module. NOTE: the use of this scheduler + requires distributed coordination for setup and teardown such as provided by + the ``iso_scheduling`` fixture or an alternate implementation of distributed + coordination - see the ``iso_scheduling.coordinate_setup_teardown`` usage example + in iso_scheduling_plugin.py. + * ``--dist load`` **(default)**: Sends pending tests to any worker that is available, without any guaranteed order. Scheduling can be fine-tuned with the `--maxschedchunk` option, see output of `pytest --help`. diff --git a/pyproject.toml b/pyproject.toml index 70cb2b00..c6f703d7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ classifiers = [ requires-python = ">=3.8" dependencies = [ "execnet>=2.1", + "filelock>=3.13.1", "pytest>=7.0.0", ] dynamic = ["version"] @@ -48,6 +49,7 @@ Tracker = "https://github.com/pytest-dev/pytest-xdist/issues" [project.entry-points.pytest11] xdist = "xdist.plugin" +"xdist.iso_scheduling_plugin" = "xdist.iso_scheduling_plugin" "xdist.looponfail" = "xdist.looponfail" [project.optional-dependencies] diff --git a/src/xdist/dsession.py b/src/xdist/dsession.py index 62079a28..c27390bd 100644 --- a/src/xdist/dsession.py +++ b/src/xdist/dsession.py @@ -15,6 +15,7 @@ from xdist.remote import Producer from xdist.remote import WorkerInfo from xdist.scheduler import EachScheduling +from xdist.scheduler import IsoScopeScheduling from xdist.scheduler import LoadFileScheduling from xdist.scheduler import LoadGroupScheduling from xdist.scheduler import LoadScheduling @@ -113,6 +114,8 @@ def pytest_xdist_make_scheduler( dist = config.getvalue("dist") if dist == "each": return EachScheduling(config, log) + if dist == "isoscope": + return IsoScopeScheduling(config, log) if dist == "load": return LoadScheduling(config, log) if dist == "loadscope": diff --git a/src/xdist/iso_scheduling_plugin.py b/src/xdist/iso_scheduling_plugin.py new file mode 100644 index 00000000..7a17f604 --- /dev/null +++ b/src/xdist/iso_scheduling_plugin.py @@ -0,0 +1,606 @@ +# This code was contributed to pytest-xdist by Akamai Technologies Inc. +# Copyright 2024 Akamai Technologies, Inc. +# Developed by Vitaly Kruglikov at Akamai Technologies, Inc. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""Pytest Fixtures for supporting users of isoscope scheduling. + +NOTE: These fixtures are NOT compatible with any other xdist schedulers. + +NOTE: DO NOT IMPORT this module. It needs to be loaded via pytest's +`conftest.pytest_plugins` mechanism. Pytest doc discourages importing fixtures +directly from other modules - see +https://docs.pytest.org/en/7.1.x/how-to/fixtures.html: +> "Sometimes users will import fixtures from other projects for use, however this +is not recommended: importing fixtures into a module will register them in +pytest as defined in that module". +""" + +from __future__ import annotations + +import contextlib +import functools +import json +import logging +import pathlib +from typing import TYPE_CHECKING + +import filelock +import pytest + +from xdist.iso_scheduling_utils import CoordinationTimeoutError +from xdist.iso_scheduling_utils import DistributedSetupContext +from xdist.iso_scheduling_utils import DistributedSetupCoordinator +from xdist.iso_scheduling_utils import DistributedTeardownContext +from xdist.iso_scheduling_utils import IsoSchedulingFixture + + +if TYPE_CHECKING: + from collections.abc import Callable + from collections.abc import Generator + + +_LOGGER = logging.getLogger(__name__) + + +@pytest.fixture(scope="session") +def iso_scheduling( + tmp_path_factory: pytest.TempPathFactory, testrun_uid: str, worker_id: str +) -> IsoSchedulingFixture: + """A session-scoped pytest fixture for coordinating setup/teardown of test + scope/class which is executing under isoscope scheduling. + + Based on the filelock idea described in section + "Making session-scoped fixtures execute only once" of + https://pytest-xdist.readthedocs.io/en/stable/how-to.html. + + NOTE: Each XDist remote worker is running its own Pytest Session, so we want + only the worker that starts its session first to execute the setup logic and + only the worker that finishes its session last to execute the teardown logic + using a form of distributed coordination. This way, setup is executed exactly + once before any worker executes any of the scope's tests, and teardown is + executed only after the last worker finishes test execution. + + USAGE EXAMPLE: + + ``` + from __future__ import annotations + from typing import TYPE_CHECKING + import pytest + + if TYPE_CHECKING: + from xdist.iso_scheduling_utils import ( + IsoSchedulingFixture, + DistributedSetupContext, + DistributedTeardownContext + ) + + class TestSomething: + + @classmethod + @pytest.fixture(scope='class', autouse=True) + def distributed_setup_and_teardown( + cls, + iso_scheduling: IsoSchedulingFixture: + request: pytest.FixtureRequest): + + # Distributed Setup and Teardown + with iso_scheduling.coordinate_setup_teardown( + setup_request=request) as coordinator: + # Distributed Setup + coordinator.maybe_call_setup(cls.patch_system_under_test) + + try: + # Yield control back to the XDist Worker to allow the + # test cases to run + yield + finally: + # Distributed Teardown + coordinator.maybe_call_teardown(cls.revert_system_under_test) + + @classmethod + def patch_system_under_test( + cls, + setup_context: DistributedSetupContext) -> None: + # Initialize the System Under Test for all the test cases in + # this test class and store state in `setup_context.client_dir`. + + @classmethod + def revert_system_under_test( + cls, + teardown_context: DistributedTeardownContext): + # Fetch state from `teardown_context.client_dir` and revert + # changes made by `patch_system_under_test()`. + + def test_case1(self) + ... + + def test_case2(self) + ... + + def test_case3(self) + ... + ``` + + :param tmp_path_factory: (pytest fixture) interface for temporary + directories and files. + :param testrun_uid: (pytest-xdist fixture) Unique id of the current test + run. This value is common to all XDist worker Pytest Sessions in the + current test run. + :param worker_id: (pytest-xdist fixture) Remote XDist worker ID which is + executing this Pytest Session. + :return: A callable that takes no args and returns a context manager which + yields an instance of `DistributedSetupCoordinator` for the current + Pytest Session. + """ + return _IsoSchedulingFixtureImpl( + tmp_path_factory=tmp_path_factory, testrun_uid=testrun_uid, worker_id=worker_id + ) + + +class _IsoSchedulingFixtureImpl(IsoSchedulingFixture): + """Context manager yielding a new instance of the implementation of the + `DistributedSetupCoordinator` interface. + + An instance of _IsoSchedulingFixtureImpl is returned by our pytest + fixture `iso_scheduling`. + """ + + # pylint: disable=too-few-public-methods + + def __init__( + self, tmp_path_factory: pytest.TempPathFactory, testrun_uid: str, worker_id: str + ): + """ + :param tmp_path_factory: pytest interface for temporary directories. + :param testrun_uid: Unique id of the current test run. This value is + common to all XDist worker Pytest Sessions in the current test run. + :param worker_id: Remote XDist worker ID which is executing this Pytest + Session. NOTE: Each XDist remote worker is running its own Pytest + Session for the subset of test cases assigned to it. + """ + self._tmp_path_factory = tmp_path_factory + self._testrun_uid = testrun_uid + self._worker_id = worker_id + + @contextlib.contextmanager + def coordinate_setup_teardown( + self, setup_request: pytest.FixtureRequest + ) -> Generator[DistributedSetupCoordinator, None, None]: + """Context manager that yields an instance of + `DistributedSetupCoordinator` for distributed coordination of Setup + and Teardown. + + NOTE: In python3.9 and later, a more appropriate return type would be + `contextlib.AbstractContextManager[DistributedSetupCoordinator]`. + + :param setup_request: Value of the pytest `request` fixture obtained + directly by the calling setup-teardown fixture. + """ + # __enter__ + coordinator = _DistributedSetupCoordinatorImpl( + setup_request=setup_request, + tmp_path_factory=self._tmp_path_factory, + testrun_uid=self._testrun_uid, + worker_id=self._worker_id, + ) + + # Yield control to the managed code block + yield coordinator + + # __exit__ + # We can do some cleanup or validation here, but nothing for now + + +class _DistributedSetupCoordinatorImpl(DistributedSetupCoordinator): + """Distributed scope/class setup/teardown coordination for isoscope + scheduling. + + NOTE: do not instantiate this class directly. Use the + `iso_scheduling` fixture instead! + + """ + + _DISTRIBUTED_SETUP_ROOT_DIR_LINK_NAME = "distributed_setup" + + def __init__( + self, + setup_request: pytest.FixtureRequest, + tmp_path_factory: pytest.TempPathFactory, + testrun_uid: str, + worker_id: str, + ): + """ + :param setup_request: Value of the pytest `request` fixture obtained + directly by the calling setup-teardown fixture. + :param tmp_path_factory: Pytest interface for temporary directories and + files. + :param testrun_uid: Unique id of the current test run. + This is common to all XDist worker Pytest Sessions in the + current test run. NOTE: each XDist worker is running its own Pytest + Session. + :param worker_id: Remote XDist worker ID which is executing this Pytest + Session. + """ + self._setup_request: pytest.FixtureRequest = setup_request + + # NOTE: `tmp_path_factory.getbasetemp()` returns worker-specific temp + # directory. `tmp_path_factory.getbasetemp().parent` is common to all + # workers in the current PyTest test run. + self._root_context_base_dir: pathlib.Path = ( + tmp_path_factory.getbasetemp().parent + / self._DISTRIBUTED_SETUP_ROOT_DIR_LINK_NAME + / testrun_uid + ) + + self._worker_id: str = worker_id + + self._setup_context: DistributedSetupContext | None = None + self._teardown_context: DistributedTeardownContext | None = None + + def maybe_call_setup( + self, + setup_callback: Callable[[DistributedSetupContext], None], + timeout: float = DistributedSetupCoordinator.DEFAULT_TIMEOUT_SEC, + ) -> None: + """Invoke the Setup callback only if distributed setup has not been + performed yet from any other XDist worker for your test scope. + Process-safe. + + Call `maybe_call_setup` from the pytest setup-teardown fixture of your + isoscope-scheduled test (typically test class) if it needs to + initialize a resource which is common to all of its test cases which may + be executing in different XDist worker processes (such as a subnet in + `subnet.xml`). + + `maybe_call_setup` MUST ALWAYS be called in conjunction with + `maybe_call_teardown`. + + :param setup_callback: Callback for performing Setup that is common to + the pytest scope from which `maybe_call_setup` is invoked. + :param timeout: Lock acquisition timeout in seconds + + :return: An instance of `DistributedSetupContext` which MUST be passed + in the corresponding call to `maybe_call_teardown`. + + :raise CoordinationTimeoutError: If attempt to acquire the lock times out. + """ + # `maybe_call_setup()` may be called only once per instance of + # `_SetupCoordinator` + assert ( + self._setup_context is None + ), f"maybe_call_setup()` already called {self._setup_context=}" + + node_path: pathlib.Path = self._setup_request.node.path + + root_context_dir: pathlib.Path = ( + self._root_context_base_dir + / node_path.relative_to(node_path.parts[0]) + / self._setup_request.node.name + ) + + with _DistributedSetupCoordinationImpl.acquire_distributed_setup( + root_context_dir=root_context_dir, + worker_id=self._worker_id, + setup_request=self._setup_request, + timeout=timeout, + ) as setup_context: + self._setup_context = setup_context + if self._setup_context.distributed_setup_allowed: + setup_callback(self._setup_context) + + def maybe_call_teardown( + self, + teardown_callback: Callable[[DistributedTeardownContext], None], + timeout: float = DistributedSetupCoordinator.DEFAULT_TIMEOUT_SEC, + ) -> None: + """Invoke the Teardown callback only in when called in the context of + the final XDist Worker process to have finished the execution of the + tests for your test scope. Process-safe. + + Call `maybe_call_teardown` from the pytest setup-teardown fixture of + your isoscope-scheduled test (typically test class) if it needs to + initialize a resource which is common to all of its test cases which may + be executing in different XDist worker processes (such as a subnet in + `subnet.xml`). + + NOTE: `maybe_call_teardown` MUST ALWAYS be called in conjunction with + `maybe_call_setup`. + + :param teardown_callback: Callback for performing Teardown that is + common to the pytest scope from which `maybe_call_teardown` is + invoked. + :param timeout: Lock acquisition timeout in seconds + + :raise CoordinationTimeoutError: If attempt to acquire the lock times out. + """ + # Make sure `maybe_call_setup()` was already called on this instance + # of `_SetupCoordinator` + assert ( + self._setup_context is not None + ), f"maybe_call_setup() not called yet {self._setup_context=}" + + # Make sure `maybe_call_teardown()` hasn't been called on this instance + # of `_SetupCoordinator` yet + assert ( + self._teardown_context is None + ), f"maybe_call_teardown() already called {self._teardown_context=}" + + with _DistributedSetupCoordinationImpl.acquire_distributed_teardown( + setup_context=self._setup_context, timeout=timeout + ) as teardown_context: + self._teardown_context = teardown_context + if self._teardown_context.distributed_teardown_allowed: + teardown_callback(self._teardown_context) + + +def _map_file_lock_exception(f: Callable): # type: ignore[no-untyped-def, type-arg] + """Decorator: map `FileLock` exceptions of interest to our own exceptions.""" + + @functools.wraps(f) + def wrapper(*args, **kwargs): # type: ignore[no-untyped-def] + try: + return f(*args, **kwargs) + except filelock.Timeout as err: + raise CoordinationTimeoutError( + f"Another instance of this test scope/class is holding the " + f"lock too long or timeout value is too short: {err}" + ) from err + + return wrapper + + +class _DistributedSetupCoordinationImpl: + """Low-level implementation of Context Managers for Coordinating + Distributed Setup and Teardown for users of isoscope scheduling. + """ + + _ROOT_STATE_FILE_NAME = "root_state.json" + _ROOT_LOCK_FILE_NAME = "lock" + + class DistributedState: + """State of the Distributed Setup-Teardown Coordination.""" + + def __init__(self, setup_count: int, teardown_count: int) -> None: + self.setup_count = setup_count + self.teardown_count = teardown_count + + def __repr__(self) -> str: + return ( + f"<{self.__class__.__qualname__}: " + f"setup_count={self.setup_count}; " + f"teardown_count={self.teardown_count}>" + ) + + @classmethod + def load_from_file_path( + cls, state_file_path: pathlib.Path + ) -> _DistributedSetupCoordinationImpl.DistributedState: + """Load the state instance from the given file path. + + :param state_file_path: + :return: Instance of the state constructed from the contents of the + given file. + """ + return cls(**json.loads(state_file_path.read_text())) + + @property + def as_json_kwargs_dict(self) -> dict[str, int]: + """ + :return: JSON-compatible representation of the instance that is also + suitable for constructing the instance after fetching from file. + as in the following example: + + ``` + state_kwargs = json.load(open(state_file_path)) + DistributedState(**state_kwargs) + ``` + """ + return { + "setup_count": self.setup_count, + "teardown_count": self.teardown_count, + } + + def save_to_file_path(self, state_file_path: pathlib.Path) -> None: + """Save this state instance to the given file path. + + :param state_file_path: + :return: + """ + state_file_path.write_text(json.dumps(self.as_json_kwargs_dict)) + + @classmethod + @contextlib.contextmanager + @_map_file_lock_exception + def acquire_distributed_setup( + cls, + root_context_dir: pathlib.Path, + worker_id: str, + setup_request: pytest.FixtureRequest, + timeout: float, + ) -> Generator[DistributedSetupContext, None, None]: + """Low-level implementation of Context Manager for Coordinating + Distributed Setup for isoscope scheduling. + + :param root_context_dir: Scope/class-specific root directory for + saving this context manager's state. This directory is common to + all xdist workers for the given test scope/class. + :param worker_id: XDist worker ID for logging. + :param setup_request: Value of the pytest `request` fixture obtained + directly by the calling setup-teardown fixture. + :param timeout: Lock acquisition timeout in seconds + + :raise CoordinationTimeoutError: If attempt to acquire the lock times out. + """ + # + # Before control passes to the managed code block + # + setup_context = DistributedSetupContext( + setup_allowed=False, + root_context_dir=root_context_dir, + worker_id=worker_id, + setup_request=setup_request, + ) + + state_file_path = cls._get_root_state_file_path(root_context_dir) + + # Acquire resource + with filelock.FileLock( + str(cls._get_root_lock_file_path(root_context_dir)), timeout=timeout + ): + if state_file_path.is_file(): + state = cls.DistributedState.load_from_file_path(state_file_path) + # We never save state with setup_count <= 0 + assert state.setup_count > 0, ( + f"acquire_distributed_setup: non-positive setup " + f"count read from state file - {state_file_path=}; " + f"{worker_id=}; {state}" + ) + # No Teardowns should be executing before all Setups + # complete + assert state.teardown_count == 0, ( + f"acquire_distributed_setup: non-zero teardown " + f"count read from state file - {state_file_path=}; " + f"{worker_id=}; {state}" + ) + else: + # State file not created yet + state = cls.DistributedState(setup_count=0, teardown_count=0) + + state.setup_count += 1 + + setup_context.distributed_setup_allowed = state.setup_count == 1 + + # + # Yield control to the managed code block + # + _LOGGER.info( # pylint: disable=logging-fstring-interpolation + f"acquire_distributed_setup: yielding control to " + f"managed block - {worker_id=}; {setup_context=}" + ) + yield setup_context + + # + # Control returns from the managed code block, unless control + # left managed code with an exception + # + + # Save state to file + state.save_to_file_path(state_file_path) + + @classmethod + @contextlib.contextmanager + @_map_file_lock_exception + def acquire_distributed_teardown( + cls, setup_context: DistributedSetupContext, timeout: float + ) -> Generator[DistributedTeardownContext, None, None]: + """Low-level implementation of Context Manager for Coordinating + Distributed Teardown for the isoscope scheduling. + + :param setup_context: The instance of `DistributedSetupContext` that was + yielded by the corresponding use of the + `_distributed_setup_permission` context manager. + :param timeout: Lock acquisition timeout in seconds + + :raise CoordinationTimeoutError: If attempt to acquire the lock times out. + """ + # + # Before control passes to the managed code block + # + teardown_context = DistributedTeardownContext( + teardown_allowed=False, setup_context=setup_context + ) + + # NOTE: Friend-of-class protected member access + root_context_dir = teardown_context._root_context_dir # pylint: disable=protected-access + + worker_id = teardown_context.worker_id + + state_file_path = cls._get_root_state_file_path(root_context_dir) + + # Acquire resource + with filelock.FileLock( + str(cls._get_root_lock_file_path(root_context_dir)), timeout=timeout + ): + if state_file_path.is_file(): + state = cls.DistributedState.load_from_file_path(state_file_path) + assert state.setup_count > 0, ( + f"acquire_distributed_teardown: non-positive " + f"setup_count read from state file - {state_file_path=}; " + f"{worker_id=}; {state.setup_count=} <= 0; {state}" + ) + assert state.teardown_count < state.setup_count, ( + f"acquire_distributed_teardown: teardown_count " + f"already >= setup_count read from state file - " + f"{state_file_path=}; {worker_id=}; " + f"{state.teardown_count=} >= {state.setup_count=}" + ) + else: + raise RuntimeError( + f"acquire_distributed_teardown: state file not found: " + f"{state_file_path=}; {worker_id=}" + ) + + state.teardown_count += 1 + + teardown_context.distributed_teardown_allowed = ( + state.teardown_count == state.setup_count + ) + + # + # Yield control to the managed code block + # + _LOGGER.info( # pylint: disable=logging-fstring-interpolation + f"acquire_distributed_teardown: yielding control to " + f"managed block - {worker_id=}; {teardown_context=}" + ) + yield teardown_context + + # + # Control returns from the managed code block, unless control left + # managed code with an exception + # + + # Save state to file + state.save_to_file_path(state_file_path) + + @classmethod + def _get_root_state_file_path(cls, root_state_dir: pathlib.Path) -> pathlib.Path: + """Return the path of the file for storing the root state, creating all + parent directories if they don't exist yet. + + :param root_state_dir: Directory where root state should be stored. + :return: The file path of the root state. + """ + root_state_dir.mkdir(parents=True, exist_ok=True) + return root_state_dir / cls._ROOT_STATE_FILE_NAME + + @classmethod + def _get_root_lock_file_path(cls, root_lock_dir: pathlib.Path) -> pathlib.Path: + """Return the path of the lock file, creating all parent directories if + they don't exist yet. + + :param root_lock_dir: Directory where lock file should be stored. + :return: The file path of the lock file. + """ + root_lock_dir.mkdir(parents=True, exist_ok=True) + return root_lock_dir / cls._ROOT_LOCK_FILE_NAME diff --git a/src/xdist/iso_scheduling_utils.py b/src/xdist/iso_scheduling_utils.py new file mode 100644 index 00000000..bf44cc90 --- /dev/null +++ b/src/xdist/iso_scheduling_utils.py @@ -0,0 +1,323 @@ +# This code was contributed to pytest-xdist by Akamai Technologies Inc. +# Copyright 2024 Akamai Technologies, Inc. +# Developed by Vitaly Kruglikov at Akamai Technologies, Inc. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""Utilities for supporting isoscope scheduling. + +NOTE: These utilities are NOT compatible with any other xdist schedulers. + +See also `iso_scheduling_plugin.py` for fixtures specific to isoscope scheduling. +""" + +from __future__ import annotations + +import abc +import pathlib +from typing import TYPE_CHECKING + + +if TYPE_CHECKING: + from collections.abc import Callable + from contextlib import AbstractContextManager + + import pytest + + +class CoordinationTimeoutError(Exception): + """When attempt to acquire the distributed lock times out.""" + + +class IsoSchedulingFixture(abc.ABC): + """Interface of the context manager which is returned by our pytest fixture + `iso_scheduling`. + + An instance of the implementation of this interface is a context manager + which yields an instance of the implementation of the + `DistributedSetupCoordinator` interface. + """ + + # pylint: disable=too-few-public-methods + + @abc.abstractmethod + def coordinate_setup_teardown( + self, setup_request: pytest.FixtureRequest + ) -> AbstractContextManager[DistributedSetupCoordinator]: + """Context manager that yields an instance of + `DistributedSetupCoordinator` for distributed coordination of Setup + and Teardown. + + NOTE: In python3.9 and later, a more appropriate return type would be + `contextlib.AbstractContextManager[DistributedSetupCoordinator]`. + + :param setup_request: Value of the pytest `request` fixture obtained + directly by the calling setup-teardown fixture. + """ + + +class DistributedSetupCoordinator(abc.ABC): + """Interface for use with the `iso_scheduling` fixture for + distributed coordination of Setup and Teardown workflows. For example, + inserting a subnet into `subnet.xml` and reverting it upon Teardown. + + The `iso_scheduling` fixture returns an implementation of this + interface. See the `iso_scheduling` fixture in + `iso_scheduling_plugin.py` for additional information. + + NOTE: Each XDist remote worker is running its own Pytest Session, so we want + only the worker that starts its session first to execute the setup logic and + only the worker that finishes its session last to execute the teardown logic + using a form of distributed coordination. This way, setup is executed exactly + once before any worker executes any of the scope's tests, and teardown is + executed only after the last worker finishes test execution. + + USAGE EXAMPLE: + + ``` + from __future__ import annotations + from typing import TYPE_CHECKING + import pytest + + if TYPE_CHECKING: + from xdist.iso_scheduling_utils import ( + IsoSchedulingFixture, + DistributedSetupContext, + DistributedTeardownContext + ) + + class TestSomething: + + @classmethod + @pytest.fixture(scope='class', autouse=True) + def distributed_setup_and_teardown( + cls, + iso_scheduling: IsoSchedulingFixture: + request: pytest.FixtureRequest): + + # Distributed Setup and Teardown + with iso_scheduling.coordinate_setup_teardown( + setup_request=request) as coordinator: + # Distributed Setup + coordinator.maybe_call_setup(cls.patch_system_under_test) + + try: + # Yield control back to the XDist Worker to allow the + # test cases to run + yield + finally: + # Distributed Teardown + coordinator.maybe_call_teardown(cls.revert_system_under_test) + + @classmethod + def patch_system_under_test( + cls, + setup_context: DistributedSetupContext) -> None: + # Initialize the System Under Test for all the test cases in + # this test class and store state in `setup_context.client_dir`. + + @classmethod + def revert_system_under_test( + cls, + teardown_context: DistributedTeardownContext): + # Fetch state from `teardown_context.client_dir` and revert + # changes made by `patch_system_under_test()`. + + def test_case1(self) + ... + + def test_case2(self) + ... + + def test_case3(self) + ... + ``` + """ + + # Default lock acquisition timeout in seconds + DEFAULT_TIMEOUT_SEC = 90 + + @abc.abstractmethod + def maybe_call_setup( + self, + setup_callback: Callable[[DistributedSetupContext], None], + timeout: float = DEFAULT_TIMEOUT_SEC, + ) -> None: + """Invoke the Setup callback only if distributed setup has not been + performed yet from any other XDist worker for your test scope. + Process-safe. + + Call `maybe_call_setup` from the pytest setup-teardown fixture of your + isoscope-scheduled test (typically test class) if it needs to + initialize a resource which is common to all of its test cases which may + be executing in different XDist worker processes (such as a subnet in + `subnet.xml`). + + `maybe_call_setup` MUST ALWAYS be called in conjunction with + `maybe_call_teardown`. + + :param setup_callback: Callback for performing Setup that is common to + the pytest scope from which `maybe_call_setup` is invoked. + :param timeout: Lock acquisition timeout in seconds + + :return: An instance of `DistributedSetupContext` which MUST be passed + in the corresponding call to `maybe_call_teardown`. + + :raise CoordinationTimeoutError: If attempt to acquire the lock times + out. + """ + + @abc.abstractmethod + def maybe_call_teardown( + self, + teardown_callback: Callable[[DistributedTeardownContext], None], + timeout: float = DEFAULT_TIMEOUT_SEC, + ) -> None: + """Invoke the Teardown callback only in when called in the context of + the final XDist Worker process to have finished the execution of the + tests for your test scope. Process-safe. + + Call `maybe_call_teardown` from the pytest setup-teardown fixture of + your isoscope-scheduled test (typically test class) if it needs to + initialize a resource which is common to all of its test cases which may + be executing in different XDist worker processes (such as a subnet in + `subnet.xml`). + + NOTE: `maybe_call_teardown` MUST ALWAYS be called in conjunction with + `maybe_call_setup`. + + :param teardown_callback: Callback for performing Teardown that is + common to the pytest scope from which `maybe_call_teardown` is + invoked. + :param timeout: Lock acquisition timeout in seconds + + :raise CoordinationTimeoutError: If attempt to acquire the lock times + out. + """ + + +class _DistributedSetupTeardownContextMixin: # pylint: disable=too-few-public-methods + """Mixin for `DistributedSetupContext` and DistributedTeardownContext`.""" + + # Expected instance members in derived class + _root_context_dir: pathlib.Path + _setup_node_name: str + + _CLIENT_SUBDIRECTORY_LINK = "client-workspace" + + @property + def client_dir(self) -> pathlib.Path: + """ + :return: The directory where client should save/retrieve + client-specific state, creating the directory if not already + created. + """ + client_dir_path = self._root_context_dir / self._CLIENT_SUBDIRECTORY_LINK + client_dir_path.mkdir(parents=True, exist_ok=True) + + return client_dir_path + + +class DistributedSetupContext(_DistributedSetupTeardownContextMixin): + """Setup context provided by the `acquire_distributed_setup` context + manager. + """ + + def __init__( + self, + setup_allowed: bool, + root_context_dir: pathlib.Path, + worker_id: str, + setup_request: pytest.FixtureRequest, + ): + """ + :param setup_allowed: Whether distributed setup may be performed by the + current process. + :param root_context_dir: Scope/class-specific root directory for + saving this context manager's state. This directory is common to + all xdist workers for the given test scope/class. + :param worker_id: XDist worker ID which is executing tests in the + current process. + :param setup_request: Value of the pytest `request` fixture obtained + directly by the calling setup-teardown fixture. + """ + self._root_context_dir = root_context_dir + + # XDist worker ID which is executing tests in the current process + self.worker_id = worker_id + + # Pytest setup node name (e.g., name of test class being setup) + self._setup_node_name = setup_request.node.name + + # Managed code MUST obey the value of `distributed_setup_allowed`! + # + # If True, the client is designated for performing the distributed Setup + # actions. + # If False, the client MUST NOT perform the distributed Setup actions, + # in which case someone else has already performed them + self.distributed_setup_allowed = setup_allowed + + def __repr__(self) -> str: + return ( + f"< {self.__class__.__name__}: " + f"node_name={self._setup_node_name}; " + f"setup_allowed={self.distributed_setup_allowed}; " + f"worker_id={self.worker_id}; " + f"client_dir={self.client_dir} >" + ) + + +class DistributedTeardownContext(_DistributedSetupTeardownContextMixin): + """Teardown context provided by the `acquire_distributed_teardown` context + manager. + """ + + def __init__(self, teardown_allowed: bool, setup_context: DistributedSetupContext): + """ + :param teardown_allowed: Whether Distributed Teardown may be performed + by the current process. + :param setup_context: Setup Context from the Setup phase. + """ + # Managed code MUST obey the value of `distributed_teardown_allowed`! + # + # If True, the client is designated for performing the distributed + # Teardown actions. + # If False, the client MUST NOT perform the distributed Teardown + # actions, in which case someone else will perform them. + self.distributed_teardown_allowed = teardown_allowed + + # NOTE: Friend-of-class protected member access + self._root_context_dir = setup_context._root_context_dir # pylint: disable=protected-access + + # XDist worker ID which is executing tests in the current process + self.worker_id = setup_context.worker_id + + # NOTE: Friend-of-class protected member access + self._setup_node_name = setup_context._setup_node_name # pylint: disable=protected-access + + def __repr__(self) -> str: + return ( + f"< {self.__class__.__name__}: " + f"node_name={self._setup_node_name}; " + f"teardown_allowed={self.distributed_teardown_allowed}; " + f"worker_id={self.worker_id}; " + f"client_dir={self.client_dir} >" + ) diff --git a/src/xdist/plugin.py b/src/xdist/plugin.py index f670d9de..b68b7811 100644 --- a/src/xdist/plugin.py +++ b/src/xdist/plugin.py @@ -103,6 +103,7 @@ def pytest_addoption(parser: pytest.Parser) -> None: action="store", choices=[ "each", + "isoscope", "load", "loadscope", "loadfile", @@ -115,6 +116,14 @@ def pytest_addoption(parser: pytest.Parser) -> None: help=( "Set mode for distributing tests to exec environments.\n\n" "each: Send each test to all available environments.\n\n" + "isoscope: Scope Isolation Scheduler." + " Tests are grouped by module for test functions and by class for test methods." + " Tests are executed one group at a time, distributed across available workers. This " + " groupwise isolation guarantees that all tests in one group complete execution before" + " running another group of tests. This can be useful when module-level or class-level" + " fixtures of one group could create undesirable side-effects for tests in other test groups," + " while taking advantage of distributed execution of tests within each group. Grouping by" + " class takes priority over grouping by module.\n\n" "load: Load balance by sending any pending test to any" " available environment.\n\n" "loadscope: Load balance by sending pending groups of tests in" diff --git a/src/xdist/scheduler/__init__.py b/src/xdist/scheduler/__init__.py index b4894732..f11ef5ca 100644 --- a/src/xdist/scheduler/__init__.py +++ b/src/xdist/scheduler/__init__.py @@ -1,4 +1,5 @@ from xdist.scheduler.each import EachScheduling as EachScheduling +from xdist.scheduler.isoscope import IsoScopeScheduling as IsoScopeScheduling from xdist.scheduler.load import LoadScheduling as LoadScheduling from xdist.scheduler.loadfile import LoadFileScheduling as LoadFileScheduling from xdist.scheduler.loadgroup import LoadGroupScheduling as LoadGroupScheduling diff --git a/src/xdist/scheduler/isoscope.py b/src/xdist/scheduler/isoscope.py new file mode 100644 index 00000000..959999b7 --- /dev/null +++ b/src/xdist/scheduler/isoscope.py @@ -0,0 +1,1399 @@ +# This code was contributed to pytest-xdist by Akamai Technologies Inc. +# Copyright 2024 Akamai Technologies, Inc. +# Developed by Vitaly Kruglikov at Akamai Technologies, Inc. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +""" +Implementation of the Distributed Scope Isolation scheduler for pytest-xdist. + +Properties of this scheduler: + 1. Executes one test scope/class at a time. + 2. Distributes tests of the executing scope/class to the configured XDist + Workers. + 3. Guarantees that the Setup of the executing scope/class completes in all + XDist Workers BEFORE any of those Workers start processing the + Teardown of that test scope/class. + 4. Guarantees that the Teardown phase of the executing test scope/class + completes in all XDist Workers before the Setup phase begins for the + next test scope/class. + +Credits: +* Implementation of `_split_scope()` and public method documentation: + - borrowed from the builtin `loadscope` scheduler +""" # pylint: disable=too-many-lines + +from __future__ import annotations + +from collections import OrderedDict +import enum +from math import ceil +import random +from typing import cast +from typing import Tuple +from typing import TYPE_CHECKING + +import pytest + +from xdist.report import report_collection_diff +from xdist.workermanage import parse_spec_config + + +if TYPE_CHECKING: + from collections.abc import Generator + from collections.abc import Iterable + from collections.abc import ValuesView + from typing import NoReturn + from typing import Sequence + + import xdist.remote + from xdist.workermanage import WorkerController + + +class IsoScopeScheduling: # pylint: disable=too-many-instance-attributes + """Distributed Scope Isolation Scheduling: Implement scheduling across + remote workers, distributing and executing one scope at a time, such that + each scope is executed in isolation from tests in other scopes. + + Ensures that all tests in a given scope complete execution before starting + execution of the tests in the subsequent scope. This way, scoped + setup/teardown fixtures can be orchestrated to execute global setup/teardown + once per scope (vs. per worker) using `FileLock` or similar for + coordination. + """ + + class _State(str, enum.Enum): + # Waiting for scheduler to be ready to distribute the next Scope. When + # the Workset Queue is NOT empty AND all workers which are shutting down + # reach zero pending tests AND all other workers have no more than one + # pending tests AND at least one worker is available for the distribution + # of the next scope, then transition to `ACTIVATE_SCOPE` + WAIT_READY_TO_ACTIVATE_SCOPE = "WAIT-READY-TO-ACTIVATE-SCOPE" + + # Activate (i.e., distribute) tests from the next Scope, if any. If a + # scope was distributed, then transition to `WAIT_READY_TO_FENCE`. + # Workers that are available for distribution are those that already + # contain a fence test belonging to this scope as well as empty workers + # which are not shutting down. Workers with matching fence tests have + # priority over empty workers (to satisfy that "at least two + # active-Scope tests per worker" Rule) + ACTIVATE_SCOPE = "ACTIVATE-SCOPE" + + # Waiting for scheduler to be ready to fence the active (i.e., + # distributed) scope. Wait until each non-empty worker has only one + # pending test remaining. Then, if at least one of those non-empty + # and non-shutting-down workers contains a pending test belonging to the + # current active Scope, transition to the `FENCE` state. If none of + # these workers contains a pending test belonging to the current active + # Scope, then reset current active scope and transition to + # `WAIT-READY-TO-ACTIVATE-SCOPE` (this means that all workers containing + # active-Scope tests crashed) + WAIT_READY_TO_FENCE = "WAIT-READY-TO-FENCE" + + # Fence the workers containing the final active-Scope tests in + # order to allow those final pending tests to complete execution. Fence + # tests are dequeued from subsequent scopes, making sure that those + # scopes will be able to satisfy the "at least two active-Scope tests + # per worker" Rule when they are activated. When subsequent scopes run + # out of tests for fencing, then send "shutdown" to the balance of those + # workers instead of a fence test. Finally, transition to + # `WAIT_READY_TO_ACTIVATE_SCOPE`. + FENCE = "FENCE" + + def __init__(self, config: pytest.Config, log: xdist.remote.Producer): + self._config = config + self._log: xdist.remote.Producer = log.distscopeisosched + + # Current scheduling state + self._state: IsoScopeScheduling._State = ( + self._State.WAIT_READY_TO_ACTIVATE_SCOPE + ) + + # Scope ID of tests that are currently executing; `None` prior to the + # initial distribution + self._active_scope_id: str | None = None + + # The initial expected number of remote workers taking part. + # The actual number of workers will vary during the scheduler's + # lifetime as nodes are added by the DSession as they are brought up and + # removed either because of a dead node or normal shutdown. This number + # is primarily used to know when the initial collection is completed. + self._expected_num_workers = len(parse_spec_config(config)) + + # The final list of test IDs collected by all nodes once + # it's validated to be identical between all the nodes. The validation + # is performed once the number of registered node collections reaches + # `_expected_num_workers`. It is initialized to None and then updated + # after validation succeeds. + self._official_test_collection: tuple[str, ...] | None = None + # Remote worker node having `_official_test_collection` as its test + # collection (for reporting failed collection validations) + self._official_test_collection_node: WorkerController | None = None + + # Ordered collection of Scope Worksets. Each Scope Workset is an ordered + # collection of tests belonging to the given scope. Initially empty, + # it will be populated once we establish the final test collection + # (see `_official_test_collection`). + self._workset_queue = _WorksetQueue() + + # Workers available for test distribution (aka, active workers). It's + # the mapping of `WorkerController` nodes to the corresponding + # `_WorkerProxy` instances. Initially empty, it will be populated by + # our `add_node_collection` implementation as it's called by xdist's + # `DSession` and the corresponding test collection passes validation. + self._worker_by_node: OrderedDict[WorkerController, _WorkerProxy] = ( + OrderedDict() + ) + + # Workers pending validation of their Test collections that have not + # been admitted to `_worker_by_node` yet. + # + # A worker is added to `_pending_worker_by_node` with its collection + # value initialized to `None` when xdist Controller invokes + # `add_node()`. + # + # The worker's test collection value is updated when + # `add_node_collection` receives the corresponding test collection. + # + # A worker is moved from `_pending_worker_by_node` to `_worker_by_node` + # when its collection is validated. + # + # A worker is removed from `_pending_worker_by_node` when the xdist + # controller invokes `remove_node()` with the corresponding node. + self._pending_worker_by_node: OrderedDict[WorkerController, _WorkerProxy] = ( + OrderedDict() + ) + + @property + def nodes(self) -> list[WorkerController]: + """A new list of all active `WorkerController` nodes. + + Called by xdist `DSession`. + """ + return list(self._worker_by_node.keys()) + list( + self._pending_worker_by_node.keys() + ) + + @property + def collection_is_completed(self) -> bool: + """Indication whether initial test collection is completed. + + Indicates that all initial participating remote workers have finished + test collection. + + Called by xdist `DSession`: + - when it gets notified that a remote worker completed collection as + a prelude to calling our `schedule()` method. + """ + return self._official_test_collection is not None + + @property + def tests_finished(self) -> bool: + """True if all tests have completed execution. + + Called by xdist `DSession`: + - periodically as a prelude to triggering shutdown + """ + if not self.collection_is_completed: + return False + + if self.has_pending: + return False + + return True + + @property + def has_pending(self) -> bool: + """True if there are pending test items. + + This indicates that collection has finished and nodes are still + processing test items, so this can be thought of as + "the scheduler is active". + + Called by xdist `DSession`. + """ + if not self._workset_queue.empty: + return True + + for worker in self._workers: + if not worker.empty: + return True + + return False + + def add_node(self, node: WorkerController) -> None: + """Add a new node to the scheduler's pending worker collection. + + The node will be activated and assigned tests to be executed only after + its test collection is received by `add_node_collection()` and + validated. + + Called by the ``DSession.worker_workerready`` hook + - when it successfully bootstraps a new remote worker. + """ + self._log(f"Registering remote worker node {node}") + + assert ( + node not in self._pending_worker_by_node + ), f"{node=} already in pending workers" + + self._pending_worker_by_node[node] = _WorkerProxy(node) + + def remove_node(self, node: WorkerController) -> str | None: + """Remove a Remote Worker node from the scheduler. + + This should be called either when the node crashed or at node shutdown + time. + + NOTE: If the worker still has pending tests assigned to it, this + method will return those pending tests back to the Workset Queue for + later execution. + + IMPORTANT: If the remote worker experienced an ungraceful shutdown, it + may create an imbalance between the execution of the setup and teardown + fixture(s). THIS MAY LEAVE THE SYSTEM UNDER TEST IN AN UNEXPECTED STATE, + COMPROMISING EXECUTION OF ALL SUBSEQUENT TESTS IN CURRENT AND FUTURE + SESSIONS. + + Called by the hooks: + + - ``DSession.worker_workerfinished``. + - ``DSession.worker_errordown``. + + :return: the test ID being executed while the node crashed or None if + the node has no more pending items. + + :raise KeyError: if the Remote Worker node has not been registered with + the scheduler. (NOTE: xdist's `DSession` expects this behavior) + """ + self._log(f"Removing remote worker node {node}") + + if node in self._pending_worker_by_node: + # Worker was not admitted to active workers yet, remove it from the + # pending worker collection. + self._pending_worker_by_node.pop(node) + assert ( + node not in self._worker_by_node + ), f"{node=} in both pending and active workers" + return None + + # Worker was admitted to active workers already + + worker = self._worker_by_node.pop(node) + + if worker.empty: + return None + + # The remote worker node crashed; identify the test that crashed + # + # IMPORTANT: The remote worker might have experienced an ungraceful + # shutdown, possibly creating an imbalance between the execution of + # the setup and teardown fixture(s). THIS MAY LEAVE THE + # SYSTEM UNDER TEST IN AN UNEXPECTED STATE, COMPROMISING EXECUTION OF + # ALL SUBSEQUENT TESTS IN CURRENT AND FUTURE SESSIONS. + + first_pending_test = worker.head_pending_test + crashed_test_id = first_pending_test.test_id + + self._log( + f"Remote Worker {worker!r} shut down ungracefully. It " + f"may have crashed while executing the pending test " + f"{first_pending_test}. " + f"NOTE: The ungraceful shutdown may create an imbalance " + f"between the execution of the setup and teardown " + f"fixture(s). THIS MAY LEAVE THE SYSTEM UNDER TEST IN AN " + f"UNEXPECTED STATE, COMPROMISING EXECUTION OF ALL SUBSEQUENT " + f"TESTS IN CURRENT AND FUTURE SESSIONS." + ) + + # Return the pending tests back to the workset queue + for test in worker.release_pending_tests(): + self._workset_queue.add_test(test) + + return crashed_test_id + + def add_node_collection( + self, node: WorkerController, collection: Sequence[str] + ) -> None: + """Register the collected test items from a Remote Worker node. + + If the official test collection has been established already, validate + the given worker's test collection against the official node; if valid, + then activate the worker, making it available for scheduling. + + If the official test collection has not been established yet, and we + now have at least the expected number of pending workers with a test + collection, and all these test collections are identical, then: + 1. Record the reference node and collection for subsequent + validations of future worker collections + 2. Activate all these workers, making them available for scheduling. + 2. Organize tests into a queue of worksets grouped by test scope ID + + Called by the hook: + + - ``DSession.worker_collectionfinish``. + """ + self._log(f"Adding collection for node {node}: {len(collection)=}") + + # Check that add_node() was called on the node before + assert node in self._pending_worker_by_node, ( + f"Received test collection for {node=} which is not in pending " f"workers" + ) + + worker = self._pending_worker_by_node[node] + + collection = worker.collection = tuple(collection) + + if self.collection_is_completed: + # A new node has been added after final collection establishment, + # perhaps an original one died. + + # Check that the new collection matches the official collection + if self._do_two_nodes_have_same_collection( + reference_node=cast( + WorkerController, self._official_test_collection_node + ), + reference_collection=cast( + Tuple[str, ...], self._official_test_collection + ), + node=node, + collection=collection, + ): + # The worker's collection is valid, so activate the new worker + self._pending_worker_by_node.pop(node) + self._worker_by_node[node] = worker + + return + + # + # The final collection has not been established yet + # + + # Check if we now have enough collections to establish a final one + + # Get all pending workers with registered test collection + # ZZZ remove line w: _WorkerProxy + workers_with_collection = [ + w for w in self._pending_worker_by_node.values() if w.collection is not None + ] + + if len(workers_with_collection) < self._expected_num_workers: + # Not enough test collections registered yet + return + + # Check that all nodes collected the same tests + same_collection = True + reference_worker = workers_with_collection[0] + for pending_worker in workers_with_collection[1:]: + if not self._do_two_nodes_have_same_collection( + reference_node=reference_worker.node, + reference_collection=cast(Tuple[str, ...], reference_worker.collection), + node=pending_worker.node, + collection=cast(Tuple[str, ...], pending_worker.collection), + ): + same_collection = False + + if not same_collection: + self._log("**Different tests collected, aborting worker activation**") + return + + # Collections are identical! + + # Record the reference node and collection for subsequent validations of + # future worker collections + self._official_test_collection_node = reference_worker.node + self._official_test_collection = reference_worker.collection + + # Activate these workers + for worker in workers_with_collection: + # Activate the worker + self._pending_worker_by_node.pop(worker.node) + self._worker_by_node[worker.node] = worker + + # Shuffle the tests to break any inherent ordering relationships for + # distribution across workers (e.g., a sub-sequence of tests that are + # particularly slow) + all_tests = [ + _TestProxy(test_id=test_id, test_index=test_index) + for test_index, test_id in enumerate( + cast(Tuple[str, ...], self._official_test_collection) + ) + ] + shuffled_test_collection = random.sample(all_tests, k=len(all_tests)) + + # Organize tests into a queue of worksets grouped by test scope ID + for test in shuffled_test_collection: + self._workset_queue.add_test(test) + + def mark_test_complete( + self, node: WorkerController, item_index: int, duration: float = 0 + ) -> None: + """Mark test item as completed by node and remove from pending tests + in the worker and reschedule. + + Called by the hook: + + - ``DSession.worker_runtest_protocol_complete``. + """ + # Suppress "unused parameter" warning + assert duration is duration # pylint: disable=comparison-with-itself + + worker = self._worker_by_node[node] + + if self._log.enabled: + self._log( + f"Marking test complete: " + f"test_id={cast(Tuple[str, ...], self._official_test_collection)[item_index]}; " + f"{item_index=}; {worker}" + ) + + worker.handle_test_completion(test_index=item_index) + + self._reschedule_workers() + + def mark_test_pending(self, item: str) -> NoReturn: + """Not supported""" + raise NotImplementedError() + + def remove_pending_tests_from_node( + self, + node: WorkerController, + indices: Sequence[int], + ) -> NoReturn: + """Not supported""" + raise NotImplementedError() + + def schedule(self) -> None: + """Initiate distribution of the test collection. + + Initiate scheduling of the items across the nodes. If this gets called + again later it behaves the same as calling ``._reschedule()`` on all + nodes so that newly added nodes will start to be used. + + If ``.collection_is_completed`` is True, this is called by the hook: + + - ``DSession.worker_collectionfinish``. + """ + assert ( + self.collection_is_completed + ), "schedule() called before test collection completed" + + # Test collection has been completed, so reschedule if needed + self._reschedule_workers() + + @staticmethod + def split_scope(test_id: str) -> str: + """Determine the scope (grouping) of a test ID (aka, "nodeid"). + + There are usually 3 cases for a nodeid:: + + example/loadsuite/test/test_beta.py::test_beta0 + example/loadsuite/test/test_delta.py::Delta1::test_delta0 + example/loadsuite/epsilon/__init__.py::epsilon.epsilon + + #. Function in a test module. + #. Method of a class in a test module. + #. Doctest in a function in a package. + + This function will group tests with the scope determined by splitting + the first ``::`` from the right. That is, classes will be grouped in a + single work unit, and functions from a test module will be grouped by + their module. In the above example, scopes will be:: + + example/loadsuite/test/test_beta.py + example/loadsuite/test/test_delta.py::Delta1 + example/loadsuite/epsilon/__init__.py + """ + return test_id.rsplit("::", 1)[0] + + @property + def _workers(self) -> Iterable[_WorkerProxy]: + """An iterable of all active worker proxies in this scheduler, + including those that have initiated, but not yet completed shutdown. + """ + return self._worker_by_node.values() + + def _reschedule_workers(self) -> None: + """Distribute work to workers if needed at this time.""" + assert self._state is not None + + traversed_states: list[IsoScopeScheduling._State] = [] + previous_state = None + while self._state != previous_state: + # NOTE: This loop will terminate because completion of tests and + # worker availability are reported outside the scope of this + # function, and our state transitions are limited by those factors + assert len(traversed_states) <= len(self._State), ( + f"Too many traversed states - {len(traversed_states)}: " + f"{traversed_states}" + ) + traversed_states.append(self._state) + + previous_state = self._state + + if self._state is self._State.WAIT_READY_TO_ACTIVATE_SCOPE: + self._handle_state_wait_ready_to_activate_scope() + elif self._state is self._State.ACTIVATE_SCOPE: + self._handle_state_activate_scope() + elif self._state is self._State.WAIT_READY_TO_FENCE: + self._handle_state_wait_ready_to_fence() + elif self._state is self._State.FENCE: + self._handle_state_fence() + else: + raise RuntimeError(f"Unhandled state: {self._state}") + + def _handle_state_wait_ready_to_activate_scope(self) -> None: + """Handle the `WAIT_READY_TO_ACTIVATE_SCOPE` state. + + Waiting for scheduler to be ready to distribute the next Scope. When + the Workset Queue is NOT empty AND all workers which are shutting down + reach zero pending tests AND all other workers have no more than one + pending tests AND at least one worker is available for the distribution + of the next scope, then transition to `ACTIVATE_SCOPE` + """ + assert ( + self._state is self._State.WAIT_READY_TO_ACTIVATE_SCOPE + ), f"{self._state=} != {self._State.WAIT_READY_TO_ACTIVATE_SCOPE}" + + if self._workset_queue.empty: + # No more scopes are available + return + + # First check if all workers satisfy the pending test thresholds + for worker in self._workers: + if worker.num_pending_tests > 1: + # A worker has too many pending tests + return + if worker.shutting_down and worker.num_pending_tests != 0: + # A worker is shutting down, but is not empty yet + return + + # Check whether at least one worker is available for the next Scope. + # + # In the event none are available, we'll have to wait for crashed + # worker(s) to be restarted. + # + # NOTE: xdist will either replace crashed workers or terminate the + # session. + + next_scope_id = self._workset_queue.head_workset.scope_id + if not self._get_workers_available_for_distribution(scope_id=next_scope_id): + # No workers are available for distribution of the next scope. + # It appears that some workers have crashed. xdist will either + # replace crashed workers or terminate the session. + if self._log.enabled: + self._log( + f"No workers are available for {next_scope_id=}, " + f"they likely crashed; staying in {self._state=}" + ) + return + + # Conditions are satisfied for transition to next state + previous_state = self._state + self._state = self._State.ACTIVATE_SCOPE + self._log(f"Transitioned from {previous_state!s} to " f"{self._state!s}") + + def _handle_state_activate_scope(self) -> None: + """Handle the `ACTIVATE_SCOPE` state. + + Activate (i.e., distribute) tests from the next Scope, if any. If we + distributed a scope, then transition to `WAIT_READY_TO_FENCE`. + Workers that are available for distribution are those that already + contain fence tests belonging to this scope as well as empty workers + which are not shutting down. Workers with matching fence tests have + priority over empty workers (to satisfy the "at least two + active-Scope tests per worker" Rule) + """ + assert ( + self._state is self._State.ACTIVATE_SCOPE + ), f"{self._state=} != {self._State.ACTIVATE_SCOPE}" + + # The previous state is responsible for ensuring that the workset queue + # is not empty + assert not self._workset_queue.empty, f"Empty {self._workset_queue}" + + workset = self._workset_queue.dequeue_workset() + + # Get workers that are available for distribution: those that already + # contain a fence test belonging to this scope as well as empty workers + # which are not shutting down + available_workers = self._get_workers_available_for_distribution( + scope_id=workset.scope_id + ) + + # The previous state is responsible for ensuring that workers are + # available for this Scope + assert ( + available_workers + ), f"No workers available for {workset.scope_id=} in {self._state=}" + + # Distribute the workset to the available workers + self._distribute_workset(workset=workset, workers=available_workers) + + # Update Active Scope ID + self._active_scope_id = workset.scope_id + + # Conditions are satisfied for transition to next state + previous_state = self._state + self._state = self._State.WAIT_READY_TO_FENCE + self._log( + f"Transitioned from {previous_state!s} to " + f"{self._state!s}. " + f"Activated scope={self._active_scope_id}" + ) + + def _handle_state_wait_ready_to_fence(self) -> None: + """Handle the `WAIT_READY_TO_FENCE` state. + + Waiting for scheduler to be ready to fence the active (i.e., + distributed) scope. Wait until each non-empty worker has only one + pending test remaining. Then, if at least one of those non-empty + and non-shutting-down workers contains a pending test belonging to the + current active Scope, transition to the `FENCE` state. If none of + these workers contains a pending test belonging to the current active + Scope, then reset current active scope and transition to + `WAIT-READY-TO-ACTIVATE-SCOPE` (this means that all workers containing + active-Scope tests crashed) + """ + assert ( + self._state is self._State.WAIT_READY_TO_FENCE + ), f"{self._state=} != {self._State.WAIT_READY_TO_FENCE}" + + assert self._active_scope_id is not None, f"{self._active_scope_id=} is None" + + for worker in self._workers: + if worker.num_pending_tests > 1: + # A worker has too many pending tests + return + + workers_to_fence = self._get_workers_ready_for_fencing( + scope_id=self._active_scope_id + ) + + # Conditions are satisfied for transition to next state + previous_state = self._state + + if workers_to_fence: + # There are pending active-Scope tests that need to be fenced + self._state = self._State.FENCE + else: + # No active-Scope tests pending, so nothing to fence. Their + # worker(s) must have crashed? + self._state = self._State.WAIT_READY_TO_ACTIVATE_SCOPE + self._log( + f"Nothing to fence! No active-scope tests pending - " + f"workers crashed? {self._active_scope_id=}" + ) + + self._log(f"Transitioned from {previous_state!s} to " f"{self._state!s}") + + def _handle_state_fence(self) -> None: + """Handle the `FENCE` state. + + Fence the workers containing the final active-Scope tests in + order to allow those final pending tests to complete execution. Fence + tests are dequeued from subsequent scopes, making sure that those + scopes will be able to satisfy the "at least two active-Scope tests + per worker" Rule when they are activated. When subsequent scopes run + out of tests for fencing, then send "shutdown" to the balance of those + workers instead of a fence test. Finally, transition to + `WAIT_READY_TO_ACTIVATE_SCOPE`. + """ + assert ( + self._state is self._State.FENCE + ), f"{self._state=} is not {self._State.FENCE}" + + workers_to_fence = self._get_workers_ready_for_fencing( + scope_id=cast(str, self._active_scope_id) + ) + + # The prior state should have ensured that there is at least one worker + # that needs to be fenced + assert workers_to_fence, ( + f"No workers ready to fence {self._active_scope_id=} " + f"in {self._state=}; " + f"active workers: {[w.verbose_repr() for w in self._workers]}" + ) + + # We will take Fence tests from subsequent worksets. + # NOTE: A given workset may be used to fence multiple preceding active + # Scopes + fence_item_generator = self._generate_fence_items( + source_worksets=self._workset_queue.worksets + ) + + # Start fencing + for worker in workers_to_fence: + fence_item = next(fence_item_generator) + if fence_item is not None: + worker.run_some_tests([fence_item]) + self._log( + f"Fenced {worker} with {fence_item}. " + f"Active scope={self._active_scope_id}" + ) + else: + # No more fence items, so send the "shutdown" message to + # the worker to force it to execute its final pending test and + # shut down. We won't need this worker any more - the remaining + # fence items are already occupying the necessary number of + # workers + worker.shutdown() + + # Transition to next state + previous_state = self._state + self._state = self._State.WAIT_READY_TO_ACTIVATE_SCOPE + self._log(f"Transitioned from {previous_state!s} to " f"{self._state!s}") + + def _distribute_workset( + self, workset: _ScopeWorkset, workers: list[_WorkerProxy] + ) -> None: + """Distribute the tests in the given workset to the given workers. + + Adhere to the "at least two active-Scope tests per worker" Rule. + + Note that each of the non-empty workers, if any, contains exactly one + Fence test that belongs to the scope of the given workset. + + :param workset: The workset to distribute. NOTE that some of its tests + may have already been dequeued and applied as fences for a prior + scope. + :param workers: Workers to receive the distribution of tests from the + given workset. NOTE that some of the workers may be non-empty, in + which case they contain exactly one Fence test that belongs to the + scope of the given workset. + """ + # Workers with matching fence tests have priority over empty workers (to + # satisfy the "at least two active-Scope tests per worker" Rule) + # + # Sort workers such that non-empty ones (those containing Fence items) + # are at the beginning to make sure each receive at least one additional + # test item from the workset + workers = list(sorted(workers, key=lambda w: w.empty)) + + num_workers_with_fences = sum(1 for w in workers if not w.empty) + + # Remaining tests in the workset plus the number borrowed as fences + # must add up to the original total tests in the workset + assert workset.num_tests + num_workers_with_fences == workset.high_water, ( + f"{workset}.num_tests + {num_workers_with_fences=} " + f"!= {workset.high_water=}; {workers=}" + ) + + # Determine the number of workers we will use for this distribution + num_workers_to_use = min( + self._get_max_workers_for_num_tests(workset.high_water), len(workers) + ) + + # At minimum, all workers fenced from the given Scope Workset must be + # included in the distribution + assert num_workers_to_use >= num_workers_with_fences, ( + f"{num_workers_to_use=} < {num_workers_with_fences=} " + f"for {workset} and available {len(workers)=}" + ) + # We should only be called when there is work to be done + assert num_workers_to_use > 0, f"{num_workers_to_use=} <= 0" + # Our workset's footprint should not exceed available workers + assert num_workers_to_use <= len( + workers + ), f"{num_workers_to_use=} > {len(workers)=} for {workset}" + + # Distribute the tests to the selected workers + self._log( + f"Distributing {workset} to {num_workers_to_use=}: " + f"{workers[:num_workers_to_use]}" + ) + + num_tests_remaining = workset.high_water + worker: _WorkerProxy + num_available_workers: int + for worker, num_available_workers in zip( + workers, range(num_workers_to_use, 0, -1) + ): + # Workers ready for distribution must have no more than one pending + # test + assert ( + worker.num_pending_tests <= 1 + ), f"{worker.verbose_repr()} num_pending_tests > 1" + + if not worker.empty: + # The single pending test in the worker must be a Fence test + # borrowed from the given workset + assert ( + worker.head_pending_test.scope_id == workset.scope_id + ), f"Scope IDs of {worker.verbose_repr()} and {workset} differ" + + # Determine the target number of tests for this worker (including + # a matching Fence test, if any) + target_num_tests = ceil(num_tests_remaining / num_available_workers) + num_tests_remaining -= target_num_tests + + # Number of tests we'll be dequeuing from the workset and adding to + # the worker + num_tests_to_add = target_num_tests - worker.num_pending_tests + + # Send tests to the worker + if num_tests_to_add: + tests_to_add = workset.dequeue_tests(num_tests=num_tests_to_add) + worker.run_some_tests(tests_to_add) + self._log( + f"Distributed {len(tests_to_add)} tests to {worker} " + f"from {workset}" + ) + else: + # NOTE: A Workset with a high watermark of just one item becomes + # empty if a Fence item was withdrawn from it + assert workset.high_water == 1, ( + f"Attempted to distribute 0 tests to {worker} " f"from {workset}" + ) + self._log(f"No more tests to distribute from {workset} " f"to {worker}") + + # Workset should be empty now + assert workset.empty, ( + f"{workset} is not empty after distribution to {num_workers_to_use} " + f"workers: {workers[:num_workers_to_use]}." + ) + + @classmethod + def _generate_fence_items( + cls, source_worksets: Iterable[_ScopeWorkset] + ) -> Generator[_TestProxy | None, None, None]: + """Generator that withdraws (i.e., dequeues) Fence test items from the + given ordered Scope Worksets and yields them until it runs out of the + fence items per limits described below, and will thereafter yield + `None`. + + Details: + Withdraws (i.e., dequeues) Fence test items - one test per yield - from + the given ordered Scope Worksets and yields these + test items, while making sure not to exceed each Workset's withdrawal + limit to be in compliance with the "at least two active-Scope tests per + worker" Rule when these Worksets are activated. + + The withdrawals are made from the first Workset until it reaches its + Fence limit, then the next Workset, and so on. + + If all Fence items available in the given Source Worksets become + exhausted, the generator yields `None` indefinitely. + + NOTE: The Worksets may have been used to fence multiple preceding active + Scopes, so they may not have their full capacity of Fence items. + + NOTE: ASSUME that all previously withdrawn items were used for Fencing. + + NOTE: Worksets with high watermark of just one item become empty when + a Fence item is withdrawn. + + NOTE: Worksets with original capacity of more than one Test item will + not be completely emptied out by Fencing in order to adhere with the + "at least two active-Scope tests per worker" Rule when these Worksets + are eventually activated. + + :param source_worksets: A (possibly-empty) ordered Iterable of Scope + Worksets from which to withdraw Fence test items. + + :return: this generator. + """ + for workset in source_worksets: + # Determine the maximum number of items we can withdraw from this + # Workset for Fencing. + # + # ASSUME that all previously withdrawn items were used for Fencing + num_fence_items = cls._get_fence_capacity_of_workset(workset) + for _ in range(num_fence_items): + yield workset.dequeue_tests(num_tests=1)[0] + + # The given Worksets ran out of Fence items, so yield `None` from now on + while True: + yield None + + @classmethod + def _get_fence_capacity_of_workset(cls, workset: _ScopeWorkset) -> int: + """Determine the maximum number of items we can withdraw from this + Workset for Fencing. + + NOTE: The Worksets may have been used to fence multiple preceding active + Scopes, so they may not have their full capacity of Fence items. + + NOTE: ASSUME that all previously withdrawn items were used for Fencing. + + NOTE: Worksets with high watermark of just one item become empty when + a Fence item is withdrawn. + + :param workset: The given Scope Workset + :return: + """ + num_fence_items = cls._get_max_workers_for_num_tests( + num_tests=workset.high_water + ) - (workset.high_water - workset.num_tests) + + assert num_fence_items >= 0, ( + f"Number of fences below zero " f"({num_fence_items}) in {workset}" + ) + + return num_fence_items + + @staticmethod + def _get_max_workers_for_num_tests(num_tests: int) -> int: + """Determine the maximum number of workers to which the given number of + tests can be distributed, adhering to the "at least two active-Scope + tests per worker" Rule. + + f(0) = 0 + f(1) = 1 + f(2) = 1 + f(3) = 1 + f(4) = 2 + f(5) = 2 + f(6) = 3 + f(7) = 3 + f(8) = 4 + + :param num_tests: Number of tests. + :return: The maximum number of workers to which the given number of + tests can be distributed, adhering to the "at least two active-Scope + tests per worker" Rule. + """ + if num_tests == 1: + return 1 + + return num_tests // 2 + + def _get_workers_available_for_distribution( + self, scope_id: str + ) -> list[_WorkerProxy]: + """Return workers available for distribution of the given Scope. + + Available workers are non-shutting-down workers that either + * contain a single pending test which is a fence + test belonging to the given scope + * or are empty workers (no pending tests) + + ASSUMPTION: the caller is responsible for making sure that no worker + contains more than one pending test before calling this method. + + :param scope_id: The scope ID of the test Scope being distributed. + + :return: A (possibly empty) list of workers available for distribution. + """ + return [ + worker + for worker in self._workers + if ( + not worker.shutting_down + and (worker.empty or worker.tail_pending_test.scope_id == scope_id) + ) + ] + + def _get_workers_ready_for_fencing(self, scope_id: str) -> list[_WorkerProxy]: + """Return workers that are ready to be Fenced for the given test Scope. + + A worker that needs to be Fenced satisfies all the following conditions: + * is not shutting down + * contains exactly one pending test + * this test belongs to the given Scope. + + :param scope_id: Scope ID of the test Scope that needs to be Fenced + + :return: A (possibly empty) list of workers to Fence. + """ + return [ + worker + for worker in self._workers + if ( + not worker.shutting_down + and worker.num_pending_tests == 1 + and worker.head_pending_test.scope_id == scope_id + ) + ] + + def _do_two_nodes_have_same_collection( + self, + reference_node: WorkerController, + reference_collection: tuple[str, ...], + node: WorkerController, + collection: tuple[str, ...], + ) -> bool: + """ + If collections differ, this method returns False while logging + the collection differences and posting collection errors to + pytest_collectreport hook. + + :param reference_node: Node of test collection believed to be correct. + :param reference_collection: Test collection believed to be correct. + :param node: Node of the other collection. + :param collection: The other collection to be compared with + `reference_collection` + :return: True if both nodes have collected the same test items. False + otherwise. + """ + msg = report_collection_diff( + reference_collection, collection, reference_node.gateway.id, node.gateway.id + ) + if not msg: + return True + + self._log(msg) + + if self._config is not None: + # NOTE: Not sure why/when `_config` would be `None`. Copied check + # from the `loadscope` scheduler. + + report = pytest.CollectReport( + node.gateway.id, "failed", longrepr=msg, result=[] + ) + self._config.hook.pytest_collectreport(report=report) + + return False + + +class _WorkerProxy: + """Our proxy of a xdist Remote Worker. + + NOTE: tests are added to the pending queue and sent to the remote worker. + NOTE: a test is removed from the pending queue when pytest-xdist controller + reports that the test has completed + """ + + def __init__(self, node: WorkerController): + """:param node: The corresponding xdist worker node.""" + # node: node instance for communication with remote worker, + # provided by pytest-xdist controller + self._node: WorkerController = node + + # An ordered collection of test IDs collected by the remote worker. + # Initially None, until assigned by the Scheduler + self._collection: tuple[str, ...] | None = None + + self._pending_test_by_index: OrderedDict[int, _TestProxy] = OrderedDict() + + def __repr__(self) -> str: + return self.verbose_repr(verbose=False) + + @property + def node(self) -> WorkerController: + """:return: The corresponding xdist worker node.""" + return self._node + + @property + def collection(self) -> tuple[str, ...] | None: + """ + :return: An ordered collection of test IDs collected by the remote + worker; `None` if the collection is not available yet. + """ + return self._collection + + @collection.setter + def collection(self, collection: tuple[str, ...]) -> None: + """ + :param collection: An ordered collection of test IDs collected by the + remote worker. Must not be `None`. Also, MUST NOT be set already. + """ + assert collection is not None, f"None test collection passed to {self}" + + assert ( + self._collection is None + ), f"Test collection passed when one already exists to {self}" + + self._collection = collection + + @property + def pending_tests(self) -> ValuesView[_TestProxy]: + """Pending tests""" + return self._pending_test_by_index.values() + + @property + def head_pending_test(self) -> _TestProxy: + """ + :return: The head pending test + + :raise StopIteration: If there are no pending tests + """ + return next(iter(self.pending_tests)) + + @property + def tail_pending_test(self) -> _TestProxy: + """ + :return: The tail pending test + + :raise StopIteration: If there are no pending tests + """ + return next(reversed(self.pending_tests)) + + @property + def empty(self) -> bool: + """ + `True` if no tests have been enqueued for this worker + `False` is at least one Test remains on the pending queue + """ + return not self._pending_test_by_index + + @property + def num_pending_tests(self) -> int: + """Count of tests in the pending queue""" + return len(self._pending_test_by_index) + + @property + def shutting_down(self) -> bool: + """ + :return: `True` if the worker is already down or shutdown was sent to + the remote worker; `False` otherwise. + """ + return self._node.shutting_down + + def verbose_repr(self, verbose: bool = True) -> str: + """Return a possibly verbose `repr` of the instance. + + :param verbose: `True` to return verbose `repr`; `False` for terse + `repr` content. Defaults to `True`. + + :return: `repr` of the instance. + """ + items = [ + "<", + f"{self.__class__.__name__}:", + f"{self._node}", + f"shutting_down={self.shutting_down}", + f"num_pending={self.num_pending_tests}", + ] + + if verbose: + if self.num_pending_tests == 1: + items.append(f"head_scope_id={self.head_pending_test.scope_id}") + if self.num_pending_tests > 1: + items.append(f"tail_scope_id={self.tail_pending_test.scope_id}") + + items.append(">") + + return " ".join(items) + + def run_some_tests(self, tests: Iterable[_TestProxy]) -> None: + """ + Add given tests to the pending queue and + send their indexes to the remote worker + """ + self._node.send_runtest_some([test.test_index for test in tests]) + self._pending_test_by_index.update((t.test_index, t) for t in tests) + + def handle_test_completion(self, test_index: int) -> None: + """Remove completed test from the worker's pending tests. + + :param test_index: The stable index of the corresponding test. + """ + # Test assumption: tests should be completed in the order they are sent + # to the remote worker + head_pending_test_index = next(iter(self._pending_test_by_index.keys())) + + # Completion should be reported in same order the tests were sent to + # the remote worker + assert ( + head_pending_test_index == test_index + ), f"{head_pending_test_index=} != {test_index}" + + # Remove the test from the worker's pending queue + self._pending_test_by_index.pop(test_index) + + def release_pending_tests(self) -> list[_TestProxy]: + """Reset the worker's pending tests, returning those pending tests. + + :return: A (possibly empty) list of pending tests. + """ + pending_tests = list(self.pending_tests) + self._pending_test_by_index.clear() + return pending_tests + + def shutdown(self) -> None: + """ + Send the "shutdown" message to the remote worker. This + will cause the remote worker to shut down after executing + any remaining pending tests assigned to it. + """ + self._node.shutdown() + + +class _TestProxy: + """ + Represents a single test from the overall test + collection to be executed + """ + + # There can be a large number of tests, so economize memory by declaring + # `__slots__` (see https://wiki.python.org/moin/UsingSlots) + __slots__ = ( + "test_id", + "test_index", + ) + + def __init__(self, test_id: str, test_index: int): + """ + :param test_id: Test ID of this test; + :param test_index: The stable index of the corresponding test + for assigning to remote worker. + + """ + self.test_id: str = test_id + self.test_index: int = test_index + + def __repr__(self) -> str: + return ( + f"<{self.__class__.__name__}: test_index={self.test_index} " + f"scope_id={self.scope_id} test_id={self.test_id}>" + ) + + @property + def scope_id(self) -> str: + """Scope ID to which this test belongs.""" + return IsoScopeScheduling.split_scope(self.test_id) + + +class _ScopeWorkset: + """Ordered collection of Tests for the given scope.""" + + __slots__ = ( + "scope_id", + "_high_water", + "_test_by_index", + ) + + def __init__(self, scope_id: str): + """:param scope_id: Test Scope to which the tests in this workset belong;""" + self.scope_id = scope_id + + # High watermark for number of tests in the workset + self._high_water: int = 0 + + self._test_by_index: OrderedDict[int, _TestProxy] = OrderedDict() + + def __repr__(self) -> str: + return ( + f"<{self.__class__.__name__}: scope_id={self.scope_id} " + f"num_tests={self.num_tests} high_water={self.high_water}>" + ) + + @property + def empty(self) -> bool: + """`True` if workset is empty; `False` otherwise.""" + return not self._test_by_index + + @property + def high_water(self) -> int: + """:return: High Watermark of the number of tests in the workset.""" + return self._high_water + + @property + def num_tests(self) -> int: + """Number of tests in this workset""" + return len(self._test_by_index) + + def enqueue_test(self, test: _TestProxy) -> None: + """Append given test to ordered test collection""" + assert test.scope_id == self.scope_id, f"Wrong {test.scope_id=} for {self}" + + assert ( + test.test_index not in self._test_by_index + ), f"{test.test_index=} was already assigned to {self}" + + self._test_by_index[test.test_index] = test + + # Update high watermark + self._high_water = max(self._high_water, len(self._test_by_index)) + + def dequeue_tests(self, num_tests: int) -> list[_TestProxy]: + """ + Remove and return the given number of tests from the head of the + collection. + + :param num_tests: a positive number of tests to dequeue; must not exceed + available tests. + @raise IndexError: If `num_tests` exceeds available tests. + """ + assert num_tests > 0, f"Non-positive {num_tests=} requested." + + if num_tests > len(self._test_by_index): + raise IndexError(f"{num_tests=} exceeds {len(self._test_by_index)=}") + + key_iter = iter(self._test_by_index.keys()) + test_indexes_to_dequeue = [next(key_iter) for _ in range(num_tests)] + + return [ + self._test_by_index.pop(test_index) + for test_index in test_indexes_to_dequeue + ] + + +class _WorksetQueue: + """Ordered collection of Scope Worksets grouped by scope id.""" + + def __init__(self) -> None: + self._workset_by_scope: OrderedDict[str, _ScopeWorkset] = OrderedDict() + + def __repr__(self) -> str: + return ( + f"<{self.__class__.__name__}: " + f"num_worksets={len(self._workset_by_scope)}>" + ) + + @property + def empty(self) -> bool: + """`True` if work queue is empty; `False` otherwise.""" + return not self._workset_by_scope + + @property + def head_workset(self) -> _ScopeWorkset: + """ + :return: The head workset + + :raise StopIteration: If the Workset Queue is empty + """ + return next(iter(self.worksets)) + + @property + def worksets(self) -> ValuesView[_ScopeWorkset]: + """ + :return: An iterable of this queue's ordered collection of + `_ScopeWorkset` instances. + """ + return self._workset_by_scope.values() + + def add_test(self, test: _TestProxy) -> None: + """Adds given test to its Scope Workset, creating the corresponding + workset as needed. Newly-created Worksets are always added at + the end of the Workset Queue(appended). + """ + scope_id = test.scope_id + + if (workset := self._workset_by_scope.get(scope_id)) is not None: + # Add to an existing Scope Workset + workset.enqueue_test(test) + else: + # Create a new Scope Workset + new_workset = _ScopeWorkset(scope_id=scope_id) + new_workset.enqueue_test(test) + self._workset_by_scope[scope_id] = new_workset + + def dequeue_workset(self) -> _ScopeWorkset: + """Dequeue and return the scope workset at the head of the queue. + + @raise IndexError: If queue is empty. + """ + if self.empty: + raise IndexError("Attempted dequeue from empty Workset Queue.") + + return self._workset_by_scope.pop(next(iter(self._workset_by_scope.keys()))) diff --git a/testing/acceptance_test.py b/testing/acceptance_test.py index 3ef10cc9..e77d408e 100644 --- a/testing/acceptance_test.py +++ b/testing/acceptance_test.py @@ -1,9 +1,11 @@ from __future__ import annotations import os +import pathlib import re import shutil from typing import cast +from uuid import uuid1 import pytest @@ -1198,6 +1200,330 @@ def pytest_collection_modifyitems(): result.stdout.fnmatch_lines(["*RuntimeError: Some runtime error*"]) +@pytest.fixture(scope="session") +def shared_scope_setup_status_path( + tmp_path_factory: pytest.TempPathFactory, testrun_uid: str +) -> pathlib.Path: + return ( + tmp_path_factory.getbasetemp().parent + / "test_distributed_setup_teardown_coordination" + / testrun_uid + / uuid1().hex + / "scope_setup_status.txt" + ) + + +class TestIsoScope: + def test_distributed_setup_teardown_coordination( + self, pytester: pytest.Pytester, shared_scope_setup_status_path: pathlib.Path + ) -> None: + """ + The isoscope scheduler provides a distributed coordination mechanism + for Scope-level Resource Setup/Teardown with the following guarantees: + 1. Resource Setup is performed exactly once per test Scope. + 2. Resource Teardown is performed exactly once per test Scope. + 3. Resource Setup of the executing test Scope completes BEFORE + execution of the Scope's tests. + 4. Resource Teardown phase of the executing test Scope begins after + completion of all tests of the Scope. + 5. Resource Setup of the next test Scope begins after completion of + the previous test Scope's Resource Teardown. + """ + test_file = f""" + from __future__ import annotations + import pathlib + from typing import TYPE_CHECKING + import pytest + if TYPE_CHECKING: + from xdist.iso_scheduling_utils import ( + IsoSchedulingFixture, + DistributedSetupContext, + DistributedTeardownContext + ) + + _SHARED_SCOPE_SETUP_STATUS_PATH = pathlib.Path( + r"{shared_scope_setup_status_path!s}") + + class TestScopeA: + @classmethod + @pytest.fixture(scope='class', autouse=True) + def distributed_setup_and_teardown( + cls, + iso_scheduling: IsoSchedulingFixture, + request: pytest.FixtureRequest): + with iso_scheduling.coordinate_setup_teardown( + setup_request=request) as coordinator: + # Distributed Setup + coordinator.maybe_call_setup(cls.patch_system_under_test) + try: + # Yield control back to the XDist Worker to allow the + # test cases to run + yield + finally: + # Distributed Teardown + coordinator.maybe_call_teardown(cls.revert_system_under_test) + @classmethod + def patch_system_under_test( + cls, + setup_context: DistributedSetupContext) -> None: + # Initialize the System Under Test for all the test cases in + # this test class and store state in `setup_context.client_dir`. + assert _SHARED_SCOPE_SETUP_STATUS_PATH.read_text() == "TEARDOWN_COMPLETE" + _SHARED_SCOPE_SETUP_STATUS_PATH.write_text("SETUP_COMPLETE") + + @classmethod + def revert_system_under_test( + cls, + teardown_context: DistributedTeardownContext): + # Fetch state from `teardown_context.client_dir` and revert + # changes made by `patch_system_under_test()`. + assert _SHARED_SCOPE_SETUP_STATUS_PATH.read_text() == "SETUP_COMPLETE" + _SHARED_SCOPE_SETUP_STATUS_PATH.write_text("TEARDOWN_COMPLETE") + + @pytest.mark.parametrize('i', range(5)) + def test(self, i): + assert _SHARED_SCOPE_SETUP_STATUS_PATH.read_text() == "SETUP_COMPLETE" + + class TestScopeB(TestScopeA): + pass + """ + # Initialize the status file used by underlying test + shared_scope_setup_status_path.parent.mkdir(parents=True, exist_ok=True) + shared_scope_setup_status_path.write_text("TEARDOWN_COMPLETE") + + pytester.makepyfile(test_a=test_file) + result = pytester.runpytest("-n2", "--dist=isoscope", "-v") + + assert ( + sum( + get_workers_and_test_count_by_prefix( + "test_a.py::TestScopeA", result.outlines + ).values() + ) + == 5 + ) + assert ( + sum( + get_workers_and_test_count_by_prefix( + "test_a.py::TestScopeB", result.outlines + ).values() + ) + == 5 + ) + + def test_by_module(self, pytester: pytest.Pytester) -> None: + test_file = """ + import pytest + @pytest.mark.parametrize('i', range(10)) + def test(i): + pass + """ + pytester.makepyfile(test_a=test_file, test_b=test_file) + result = pytester.runpytest("-n2", "--dist=isoscope", "-v") + assert get_workers_and_test_count_by_prefix( + "test_a.py::test", result.outlines + ) == {"gw0": 5, "gw1": 5} + assert get_workers_and_test_count_by_prefix( + "test_b.py::test", result.outlines + ) == {"gw0": 5, "gw1": 5} + + def test_by_class(self, pytester: pytest.Pytester) -> None: + pytester.makepyfile( + test_a=""" + import pytest + class TestA: + @pytest.mark.parametrize('i', range(10)) + def test(self, i): + pass + + class TestB: + @pytest.mark.parametrize('i', range(10)) + def test(self, i): + pass + """ + ) + result = pytester.runpytest("-n2", "--dist=isoscope", "-v") + assert get_workers_and_test_count_by_prefix( + "test_a.py::TestA", result.outlines + ) == {"gw0": 5, "gw1": 5} + assert get_workers_and_test_count_by_prefix( + "test_a.py::TestB", result.outlines + ) == {"gw0": 5, "gw1": 5} + + def test_module_single_start(self, pytester: pytest.Pytester) -> None: + """Ensure test suite is finishing in case all workers start with a single test (#277).""" + test_file1 = """ + import pytest + def test(): + pass + """ + test_file2 = """ + import pytest + def test_1(): + pass + def test_2(): + pass + """ + pytester.makepyfile(test_a=test_file1, test_b=test_file1, test_c=test_file2) + result = pytester.runpytest("-n2", "--dist=isoscope", "-v") + a = get_workers_and_test_count_by_prefix("test_a.py::test", result.outlines) + b = get_workers_and_test_count_by_prefix("test_b.py::test", result.outlines) + c1 = get_workers_and_test_count_by_prefix("test_c.py::test_1", result.outlines) + c2 = get_workers_and_test_count_by_prefix("test_c.py::test_2", result.outlines) + assert a in ({"gw0": 1}, {"gw1": 1}) + assert b in ({"gw0": 1}, {"gw1": 1}) + assert a.items() == b.items() + assert c1 == c2 + + def test_single_scope_all_workers_utilized(self, pytester: pytest.Pytester) -> None: + """ + With single scope, there are no fence tests from another scope, so + this scheduler resorts to shutting down the workers in order to execute + the final tests in each worker. isoscope allocates at least two tests + per worker from the active scope, unless the scope has only one test. + """ + test_file = """ + import pytest + @pytest.mark.parametrize('i', range(5)) + def test(i): + pass + """ + pytester.makepyfile(test_a=test_file) + result = pytester.runpytest("-n2", "--dist=isoscope", "-v") + counts_by_worker = get_workers_and_test_count_by_prefix( + "test_a.py::test", result.outlines + ) + assert counts_by_worker["gw0"] in (2, 3) + assert counts_by_worker["gw1"] in (2, 3) + assert counts_by_worker["gw0"] + counts_by_worker["gw1"] == 5 + + @pytest.mark.parametrize("num_tests", [1, 2, 3]) + def test_single_scope_subset_of_workers_utilized( + self, num_tests: int, pytester: pytest.Pytester + ) -> None: + """ + With single scope, there are no fence tests from another scope, so + this scheduler resorts to shutting down the workers in order to execute + the final tests in each worker. isoscope allocates at least two tests + per worker from the active scope, unless the scope has only one test. + """ + test_file = f""" + import pytest + @pytest.mark.parametrize('i', range({num_tests})) + def test(i): + pass + """ + pytester.makepyfile(test_a=test_file) + result = pytester.runpytest("-n2", "--dist=isoscope", "-v") + counts_by_worker = get_workers_and_test_count_by_prefix( + "test_a.py::test", result.outlines + ) + assert counts_by_worker.setdefault("gw0", 0) in (0, num_tests) + assert counts_by_worker.setdefault("gw1", 0) in (0, num_tests) + assert counts_by_worker["gw0"] + counts_by_worker["gw1"] == num_tests + + def test_multi_scope_with_insufficient_fence( + self, pytester: pytest.Pytester + ) -> None: + """ + When there are not enough fence tests from subsequent scope(s), + this scheduler resorts to shutting down the excess workers in order to + execute the final tests in each worker. isoscope allocates at least two + tests per worker from the active scope, unless the scope has only one + test. + """ + test_file1 = """ + import pytest + # 6 tests should distribute 2 per worker for 3 workers due to the + # min-2 scope tests per worker rule. + @pytest.mark.parametrize('i', range(6)) + def test(i): + pass + """ + test_file2 = """ + import pytest + class TestFenceA: + def test(self): + pass + + class TestFenceB: + # Two tests are only enough for one fence due to min-2 scope + # tests per worker rule + def test1(self): + pass + def test2(self): + pass + """ + pytester.makepyfile(test_a=test_file1, test_fence_scopes=test_file2) + result = pytester.runpytest("-n3", "--dist=isoscope", "-v") + + counts_by_worker_a = get_workers_and_test_count_by_prefix( + "test_a.py::test", result.outlines + ) + # 6 tests should distribute 2 per worker for 3 workers due to the + # min-2 scope tests per worker rule. + assert sum(counts_by_worker_a.values()) == 6 + for worker in ["gw0", "gw1", "gw2"]: + assert counts_by_worker_a[worker] == 2 + + counts_by_worker_fence_a = get_workers_and_test_count_by_prefix( + "test_fence_scopes.py::TestFenceA", result.outlines + ) + counts_by_worker_fence_b = get_workers_and_test_count_by_prefix( + "test_fence_scopes.py::TestFenceB", result.outlines + ) + + assert len(counts_by_worker_fence_a) == 1 + assert next(iter(counts_by_worker_fence_a.values())) == 1 + + assert len(counts_by_worker_fence_b) == 1 + assert next(iter(counts_by_worker_fence_b.values())) == 2 + + @pytest.mark.parametrize("num_tests", [1, 2, 3, 4, 5, 7]) + def test_two_tests_min_per_worker_rule( + self, num_tests: int, pytester: pytest.Pytester + ) -> None: + """ + isoscope allocates at least two tests per worker from the active scope, + unless the scope has only one test. + """ + test_file1 = f""" + import pytest + @pytest.mark.parametrize('i', range({num_tests})) + def test(i): + pass + """ + pytester.makepyfile(test_a=test_file1) + result = pytester.runpytest("-n2", "--dist=isoscope", "-v") + + if num_tests == 1: + expected_worker_a_test_count = 1 + elif num_tests == 2: + expected_worker_a_test_count = 2 + elif num_tests == 3: + expected_worker_a_test_count = 3 + elif num_tests == 4: + expected_worker_a_test_count = 2 + elif num_tests == 5: + expected_worker_a_test_count = 3 + elif num_tests == 7: + expected_worker_a_test_count = 4 + else: + assert False, f"Unexpected {num_tests=}" + + counts_by_worker_a = get_workers_and_test_count_by_prefix( + "test_a.py::test", result.outlines + ) + + counts_by_worker_a.setdefault("gw0", 0) + counts_by_worker_a.setdefault("gw1", 0) + + assert set(counts_by_worker_a.values()) == { + expected_worker_a_test_count, + num_tests - expected_worker_a_test_count, + } + + class TestLoadScope: def test_by_module(self, pytester: pytest.Pytester) -> None: test_file = """ @@ -1513,7 +1839,8 @@ def test_c(self): """ + ((_test_content * 4) % ("A", "B", "C", "D")) @pytest.mark.parametrize( - "scope", ["each", "load", "loadscope", "loadfile", "worksteal", "no"] + "scope", + ["each", "isoscope", "load", "loadscope", "loadfile", "worksteal", "no"], ) def test_single_file(self, pytester: pytest.Pytester, scope: str) -> None: pytester.makepyfile(test_a=self.test_file1) @@ -1521,7 +1848,8 @@ def test_single_file(self, pytester: pytest.Pytester, scope: str) -> None: result.assert_outcomes(passed=(12 if scope != "each" else 12 * 2)) @pytest.mark.parametrize( - "scope", ["each", "load", "loadscope", "loadfile", "worksteal", "no"] + "scope", + ["each", "isoscope", "load", "loadscope", "loadfile", "worksteal", "no"], ) def test_multi_file(self, pytester: pytest.Pytester, scope: str) -> None: pytester.makepyfile(