Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ jobs:
pylint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: mpi4py/setup-mpi@v1
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: "3.10"
- name: Install dependencies
Expand All @@ -30,7 +30,7 @@ jobs:
pip install -r *.egg-info/requires.txt
- name: Analysing the code with pylint
run: |
pylint --unsafe-load-any-extension=y --disable=fixme $(git ls-files '*.py') || true
pylint --unsafe-load-any-extension=y --disable=fixme $(git ls-files "pytest_parallel/*.py" "test/*.py") || true

build:
needs: [pylint]
Expand Down Expand Up @@ -67,12 +67,19 @@ jobs:
mpi: intelmpi
- os: ubuntu-latest
mpi: msmpi
# mpich seems broken on Ubuntu
- os: ubuntu-latest
py-version: 3.8
mpi: mpich
- os: ubuntu-latest
py-version: 3.9
mpi: mpich
- os: ubuntu-latest
py-version: 3.10
mpi: mpich
- os: ubuntu-latest
py-version: 3.11
mpi: mpich
name: ${{ matrix.mpi }} - ${{matrix.py-version}} - ${{matrix.os}}
steps:
- name: Checkout
Expand Down
6 changes: 3 additions & 3 deletions .slurm_draft/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
test_idx = int(sys.argv[3])

comm = MPI.COMM_WORLD
print(f'start at {scheduler_ip}@{server_port} test {test_idx} at rank {comm.Get_rank()}/{comm.Get_size()} exec on {socket.gethostname()} - ',datetime.datetime.now())
print(f'start at {scheduler_ip}@{server_port} test {test_idx} at rank {comm.rank}/{comm.size} exec on {socket.gethostname()} - ',datetime.datetime.now())

if comm.Get_rank() == 0:
if comm.rank == 0:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((scheduler_ip, server_port))
#time.sleep(10+5*test_idx)
#msg = f'Hello from test {test_idx} at rank {comm.Get_rank()}/{comm.Get_size()} exec on {socket.gethostname()}'
#msg = f'Hello from test {test_idx} at rank {comm.rank}/{comm.size} exec on {socket.gethostname()}'
#socket_utils.send(s, msg)
info = {
'test_idx': test_idx,
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ cmake_policy(SET CMP0074 NEW) # force find_package to take <PackageName>_ROOT va
# Project
# ----------------------------------------------------------------------
project(
pytest_parallel VERSION 1.2.0
pytest_parallel VERSION 1.3.0
DESCRIPTION "pytest_parallel extends PyTest to support parallel testing using mpi4py"
)

Expand Down
316 changes: 275 additions & 41 deletions README.md

Large diffs are not rendered by default.

Binary file modified doc/images/test_fail.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/images/test_skip.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ authors = [
{name = "Berenger Berthoul", email = "[email protected]"},
]
maintainers = [
{name = "Bruno Maugars", email = "[email protected]"},
{name = "Berenger Berthoul", email = "[email protected]"},
]
license = {text = "Mozilla Public License 2.0"}
keywords = [
Expand Down Expand Up @@ -52,7 +52,7 @@ dependencies = [
"mpi4py",
"numpy",
]
version = "1.2.0"
version = "1.3.0"

[project.urls]
Homepage = "https://github.com/onera/pytest_parallel"
Expand Down
2 changes: 1 addition & 1 deletion pytest_parallel/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = "1.2"
__version__ = "1.3"

from . import mark
4 changes: 2 additions & 2 deletions pytest_parallel/gather_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ def gather_report_on_local_rank_0(report):
del report.sub_comm # No need to keep it in the report
# Furthermore we need to serialize the report
# and mpi4py does not know how to serialize report.sub_comm
i_sub_rank = sub_comm.Get_rank()
n_sub_rank = sub_comm.Get_size()
i_sub_rank = sub_comm.rank
n_sub_rank = sub_comm.size

if (
report.outcome != "skipped"
Expand Down
87 changes: 36 additions & 51 deletions pytest_parallel/mpi_reporter.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import numpy as np
import sys

import pytest
from mpi4py import MPI

from .algo import partition, lower_bound
from .utils import get_n_proc_for_test, add_n_procs, run_item_test, mark_original_index
from .utils_mpi import number_of_working_processes, is_dyn_master_process
from .utils.items import get_n_proc_for_test, add_n_procs, run_item_test, mark_original_index
from .utils.mpi import number_of_working_processes, is_dyn_master_process
from .gather_report import gather_report_on_local_rank_0
from .static_scheduler_utils import group_items_by_parallel_steps


def mark_skip(item):
comm = MPI.COMM_WORLD
n_rank = comm.Get_size()
n_rank = comm.size
n_proc_test = get_n_proc_for_test(item)
skip_msg = f"Not enough procs to execute: {n_proc_test} required but only {n_rank} available"
item.add_marker(pytest.mark.skip(reason=skip_msg), append=False)
Expand All @@ -28,7 +29,8 @@ def create_sub_comm_of_size(global_comm, n_proc, mpi_comm_creation_function):
if mpi_comm_creation_function == 'MPI_Comm_create':
return sub_comm_from_ranks(global_comm, range(0,n_proc))
elif mpi_comm_creation_function == 'MPI_Comm_split':
if i_rank < n_proc_test:
i_rank = global_comm.rank
if i_rank < n_proc:
color = 1
else:
color = MPI.UNDEFINED
Expand All @@ -37,8 +39,7 @@ def create_sub_comm_of_size(global_comm, n_proc, mpi_comm_creation_function):
assert 0, 'Unknown MPI communicator creation function. Available: `MPI_Comm_create`, `MPI_Comm_split`'

def create_sub_comms_for_each_size(global_comm, mpi_comm_creation_function):
i_rank = global_comm.Get_rank()
n_rank = global_comm.Get_size()
n_rank = global_comm.size
sub_comms = [None] * n_rank
for i in range(0,n_rank):
n_proc = i+1
Expand All @@ -47,8 +48,7 @@ def create_sub_comms_for_each_size(global_comm, mpi_comm_creation_function):


def add_sub_comm(items, global_comm, test_comm_creation, mpi_comm_creation_function):
i_rank = global_comm.Get_rank()
n_rank = global_comm.Get_size()
n_rank = global_comm.size

# Strategy 'by_rank': create one sub-communicator by size, from sequential (size=1) to n_rank
if test_comm_creation == 'by_rank':
Expand All @@ -71,12 +71,17 @@ def add_sub_comm(items, global_comm, test_comm_creation, mpi_comm_creation_funct
assert 0, 'Unknown test MPI communicator creation strategy. Available: `by_rank`, `by_test`'

class SequentialScheduler:
def __init__(self, global_comm, test_comm_creation='by_rank', mpi_comm_creation_function='MPI_Comm_create', barrier_at_test_start=True, barrier_at_test_end=True):
def __init__(self, global_comm):
self.global_comm = global_comm.Dup() # ensure that all communications within the framework are private to the framework
self.test_comm_creation = test_comm_creation
self.mpi_comm_creation_function = mpi_comm_creation_function
self.barrier_at_test_start = barrier_at_test_start
self.barrier_at_test_end = barrier_at_test_end

# These parameters are not accessible through the API, but are left here for tweaking and experimenting
self.test_comm_creation = 'by_rank' # possible values : 'by_rank' | 'by_test'
self.mpi_comm_creation_function = 'MPI_Comm_create' # possible values : 'MPI_Comm_create' | 'MPI_Comm_split'
self.barrier_at_test_start = True
self.barrier_at_test_end = True
if sys.platform == "win32":
self.mpi_comm_creation_function = 'MPI_Comm_split' # because 'MPI_Comm_create' uses `Create_group`,
# that is not implemented in mpi4py for Windows

@pytest.hookimpl(trylast=True)
def pytest_collection_modifyitems(self, config, items):
Expand All @@ -86,20 +91,10 @@ def pytest_collection_modifyitems(self, config, items):
def pytest_runtest_protocol(self, item, nextitem):
if self.barrier_at_test_start:
self.global_comm.barrier()
#print(f'pytest_runtest_protocol beg {MPI.COMM_WORLD.rank=}')
_ = yield
#print(f'pytest_runtest_protocol end {MPI.COMM_WORLD.rank=}')
if self.barrier_at_test_end:
self.global_comm.barrier()

#@pytest.hookimpl(tryfirst=True)
#def pytest_runtest_protocol(self, item, nextitem):
# if self.barrier_at_test_start:
# self.global_comm.barrier()
# print(f'pytest_runtest_protocol beg {MPI.COMM_WORLD.rank=}')
# if item.sub_comm == MPI.COMM_NULL:
# return True # for this hook, `firstresult=True` so returning a non-None will stop other hooks to run

@pytest.hookimpl(tryfirst=True)
def pytest_pyfunc_call(self, pyfuncitem):
#print(f'pytest_pyfunc_call {MPI.COMM_WORLD.rank=}')
Expand All @@ -113,7 +108,7 @@ def pytest_runtestloop(self, session) -> bool:
_ = yield
# prevent return value being non-zero (ExitCode.NO_TESTS_COLLECTED)
# when no test run on non-master
if self.global_comm.Get_rank() != 0 and session.testscollected == 0:
if self.global_comm.rank != 0 and session.testscollected == 0:
session.testscollected = 1
return True

Expand All @@ -136,7 +131,7 @@ def pytest_runtest_logreport(self, report):


def prepare_items_to_run(items, comm):
i_rank = comm.Get_rank()
i_rank = comm.rank

items_to_run = []

Expand Down Expand Up @@ -168,7 +163,7 @@ def prepare_items_to_run(items, comm):


def items_to_run_on_this_proc(items_by_steps, items_to_skip, comm):
i_rank = comm.Get_rank()
i_rank = comm.rank

items = []

Expand Down Expand Up @@ -204,14 +199,13 @@ def pytest_runtestloop(self, session) -> bool:
and not session.config.option.continue_on_collection_errors
):
raise session.Interrupted(
"%d error%s during collection"
% (session.testsfailed, "s" if session.testsfailed != 1 else "")
f"{session.testsfailed} error{'s' if session.testsfailed != 1 else ''} during collection"
)

if session.config.option.collectonly:
return True

n_workers = self.global_comm.Get_size()
n_workers = self.global_comm.size

add_n_procs(session.items)

Expand All @@ -221,20 +215,12 @@ def pytest_runtestloop(self, session) -> bool:
items_by_steps, items_to_skip, self.global_comm
)

for i, item in enumerate(items):
# nextitem = items[i + 1] if i + 1 < len(items) else None
# For optimization purposes, it would be nice to have the previous commented line
# (`nextitem` is only used internally by PyTest in _setupstate.teardown_exact)
# Here, it does not work:
# it seems that things are messed up on rank 0
# because the nextitem might not be run (see pytest_runtest_setup/call/teardown hooks just above)
# In practice though, it seems that it is not the main thing that slows things down...

for item in items:
nextitem = None
run_item_test(item, nextitem, session)

# prevent return value being non-zero (ExitCode.NO_TESTS_COLLECTED) when no test run on non-master
if self.global_comm.Get_rank() != 0 and session.testscollected == 0:
if self.global_comm.rank != 0 and session.testscollected == 0:
session.testscollected = 1
return True

Expand All @@ -256,8 +242,8 @@ def pytest_runtest_logreport(self, report):
gather_report_on_local_rank_0(report)

# master ranks of each sub_comm must send their report to rank 0
if sub_comm.Get_rank() == 0: # only master are concerned
if self.global_comm.Get_rank() != 0: # if master is not global master, send
if sub_comm.rank == 0: # only master are concerned
if self.global_comm.rank != 0: # if master is not global master, send
self.global_comm.send(report, dest=0)
elif report.master_running_proc != 0: # else, recv if test run remotely
# In the line below, MPI.ANY_TAG will NOT clash with communications outside the framework because self.global_comm is private
Expand Down Expand Up @@ -322,7 +308,7 @@ def schedule_test(item, available_procs, inter_comm):

# mark the procs as busy
for sub_rank in sub_ranks:
available_procs[sub_rank] = False
available_procs[sub_rank] = 0

# TODO isend would be slightly better (less waiting)
for sub_rank in sub_ranks:
Expand Down Expand Up @@ -354,19 +340,19 @@ def wait_test_to_complete(items_to_run, session, available_procs, inter_comm):
for sub_rank in sub_ranks:
if sub_rank != first_rank_done:
rank_original_idx = inter_comm.recv(source=sub_rank, tag=WORK_DONE_TAG)
assert (rank_original_idx == original_idx) # sub_rank is supposed to have worked on the same test
assert rank_original_idx == original_idx # sub_rank is supposed to have worked on the same test

# the procs are now available
for sub_rank in sub_ranks:
available_procs[sub_rank] = True
available_procs[sub_rank] = 1

# "run" the test (i.e. trigger PyTest pipeline but do not really run the code)
nextitem = None # not known at this point
run_item_test(item, nextitem, session)


def wait_last_tests_to_complete(items_to_run, session, available_procs, inter_comm):
while np.sum(available_procs) < len(available_procs):
while sum(available_procs) < len(available_procs):
wait_test_to_complete(items_to_run, session, available_procs, inter_comm)


Expand Down Expand Up @@ -418,8 +404,7 @@ def pytest_runtestloop(self, session) -> bool:
and not session.config.option.continue_on_collection_errors
):
raise session.Interrupted(
"%d error%s during collection"
% (session.testsfailed, "s" if session.testsfailed != 1 else "")
f"{session.testsfailed} error{'s' if session.testsfailed != 1 else ''} during collection"
)

if session.config.option.collectonly:
Expand Down Expand Up @@ -451,10 +436,10 @@ def pytest_runtestloop(self, session) -> bool:

# schedule tests to run
items_left_to_run = sorted(items_to_run, key=lambda item: item.n_proc)
available_procs = np.ones(n_workers, dtype=np.int8)
available_procs = [1] * n_workers

while len(items_left_to_run) > 0:
n_av_procs = np.sum(available_procs)
n_av_procs = sum(available_procs)

item_idx = item_with_biggest_admissible_n_proc(items_left_to_run, n_av_procs)

Expand Down Expand Up @@ -511,7 +496,7 @@ def pytest_runtest_logreport(self, report):
sub_comm = report.sub_comm
gather_report_on_local_rank_0(report)

if sub_comm.Get_rank() == 0: # if local master proc, send
if sub_comm.rank == 0: # if local master proc, send
# The idea of the scheduler is the following:
# The server schedules test over clients
# A client executes the test then report to the server it is done
Expand Down
Loading
Loading