diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0632fbe..f37d281 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -16,10 +16,10 @@ jobs: pylint: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: mpi4py/setup-mpi@v1 - name: Set up Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: "3.10" - name: Install dependencies @@ -30,7 +30,7 @@ jobs: pip install -r *.egg-info/requires.txt - name: Analysing the code with pylint run: | - pylint --unsafe-load-any-extension=y --disable=fixme $(git ls-files '*.py') || true + pylint --unsafe-load-any-extension=y --disable=fixme $(git ls-files "pytest_parallel/*.py" "test/*.py") || true build: needs: [pylint] @@ -67,12 +67,19 @@ jobs: mpi: intelmpi - os: ubuntu-latest mpi: msmpi + # mpich seems broken on Ubuntu - os: ubuntu-latest py-version: 3.8 mpi: mpich - os: ubuntu-latest py-version: 3.9 mpi: mpich + - os: ubuntu-latest + py-version: 3.10 + mpi: mpich + - os: ubuntu-latest + py-version: 3.11 + mpi: mpich name: ${{ matrix.mpi }} - ${{matrix.py-version}} - ${{matrix.os}} steps: - name: Checkout diff --git a/.slurm_draft/worker.py b/.slurm_draft/worker.py index 3084a00..b69b272 100644 --- a/.slurm_draft/worker.py +++ b/.slurm_draft/worker.py @@ -11,13 +11,13 @@ test_idx = int(sys.argv[3]) comm = MPI.COMM_WORLD -print(f'start at {scheduler_ip}@{server_port} test {test_idx} at rank {comm.Get_rank()}/{comm.Get_size()} exec on {socket.gethostname()} - ',datetime.datetime.now()) +print(f'start at {scheduler_ip}@{server_port} test {test_idx} at rank {comm.rank}/{comm.size} exec on {socket.gethostname()} - ',datetime.datetime.now()) -if comm.Get_rank() == 0: +if comm.rank == 0: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.connect((scheduler_ip, server_port)) #time.sleep(10+5*test_idx) - #msg = f'Hello from test {test_idx} at rank {comm.Get_rank()}/{comm.Get_size()} exec on {socket.gethostname()}' + #msg = f'Hello from test {test_idx} at rank {comm.rank}/{comm.size} exec on {socket.gethostname()}' #socket_utils.send(s, msg) info = { 'test_idx': test_idx, diff --git a/CMakeLists.txt b/CMakeLists.txt index 9321db8..662d11c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,7 +9,7 @@ cmake_policy(SET CMP0074 NEW) # force find_package to take _ROOT va # Project # ---------------------------------------------------------------------- project( - pytest_parallel VERSION 1.2.0 + pytest_parallel VERSION 1.3.0 DESCRIPTION "pytest_parallel extends PyTest to support parallel testing using mpi4py" ) diff --git a/README.md b/README.md index e4296a9..468ff2d 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ pytest_parallel =============== -**pytest_parallel** extends [PyTest](http://pytest.org) to support parallel testing using mpi4py. +**pytest_parallel** extends [PyTest](http://pytest.org) to support parallel testing using [mpi4py](https://mpi4py.readthedocs.io/en/stable/). [![Python 3](https://img.shields.io/static/v1?label=Python&logo=Python&color=3776AB&message=3)](https://www.python.org/) ![CI workflow](https://github.com/onera/pytest_parallel/actions/workflows/test.yml/badge.svg) @@ -10,35 +10,45 @@ pytest_parallel [![Windows OK](https://img.shields.io/static/v1?label=Windows&logo=Windows&color=white&message=%E2%9C%93)](https://en.wikipedia.org/wiki/Windows) ![License-MPL](https://img.shields.io/badge/license-MPL%202.0-blue.svg) -## Introduction ## +## Quick start ## + +**pytest_parallel** automates the execution of MPI parallel tests. Let's say you want to test several algorithms, each with a specific number of processes: ```Python import pytest_parallel @pytest_parallel.mark.parallel(2) -def test_fail_one_rank(comm): - assert comm.Get_size()==2 - if comm.Get_rank()==0: assert True - if comm.Get_rank()==1: assert False +def test_A(comm): + if comm.rank == 1: assert False + +def test_B(): + assert True + +@pytest_parallel.mark.parallel(3) +def test_C(comm): + assert comm.size == 3 + +def test_D(): + assert False ``` -Here a test that should run on two processes is declared. When the test suite is run, the test will execute on two MPI processes. The `comm` fixture is an mpi4py communicator that is private to the test. +Tests decorated with `@pytest_parallel.mark.parallel(N)` are specified to run in parallel on `N` processes. The `comm` fixture is a communicator created specifically for the test, satisfying `comm.size == N`. Sequential tests do not need a communicator and will be run as usual on one process. -The test can be run on two processes, e.g. with: +You can run the tests with MPI, e.g. with: ```Bash -mpirun -np 2 pytest --color=yes test_pytest_parallel.py +mpirun -np 4 pytest --color=yes -vv path/to/tests ``` And the following output will be produced: ![example failing test](doc/images/test_fail.png) -If there is not enough MPI processes to run the test, it will be skipped. -For instance, the following launching command: +If there is not enough MPI processes to run some tests, they will be skipped. +For instance, the following command: ```Bash -mpirun -np 1 pytest --color=yes test_pytest_parallel.py +mpirun -np 1 pytest --color=yes path/to/tests ``` would lead to: @@ -47,11 +57,11 @@ would lead to: ## The `comm` fixture ## -The `comm` fixture that you get when decorating your test with `pytest_parallel.mark.parallel` is a sub-communicator of `MPI.COMM_WORLD` that is unique to each test. +The `comm` fixture you get when decorating your test with `@pytest_parallel.mark.parallel` is a sub-communicator of `MPI.COMM_WORLD` that is unique to each test. ## The `parallel` decorator ## -The `pytest_parallel.mark.parallel(n_procs)` decorator takes one argument, `n_procs`. +The `@pytest_parallel.mark.parallel(n_procs)` decorator takes one argument, `n_procs`. `n_procs` is generally an integer that specifies the size of the communicator that will be given to the test through the `comm` fixture. @@ -60,95 +70,308 @@ The `pytest_parallel.mark.parallel(n_procs)` decorator takes one argument, `n_pr ```Python @pytest_parallel.mark.parallel([2,4]) def test_param(comm): - print(comm.Get_size()) + print(comm.size) ``` will run two times: once with `comm` being a communicator of size 2, once with `comm` being a communicator of size 4. +## Installation ## + +### Through pip ### + +```Bash +pip install "git+https://github.com/onera/pytest_parallel.git" +``` +Note that you can pass the `--user` option to `pip` if you don't have root permissions or want to install `pytest_parallel` in your home directory. + +### Manually ### + +**pytest_parallel** is a pure PyTest plugin. It depends on mpi4py and Numpy. + +To install manually: +```Bash +(mkdir install_dir && cd install_dir && git clone https://github.com/onera/pytest_parallel.git) +export PYTHONPATH=install_dir/pytest_parallel:$PYTHONPATH +export PYTEST_PLUGINS=pytest_parallel.plugin +``` + ## Schedulers ## -**pytest_parallel** comes with three kind of schedulers. To understand how they work, let's take the following example: +The job of **pytest_parallel** is to run `Nt` tests by using `Np` processes that the user asked for. For that, the multiple tests that need to be run have to be scheduled. **pytest_parallel** has 5 different schedulers that the user can select from. Schedulers are divided in two families: +- [process-reuse schedulers](#process-reuse-schedulers), efficient for many test that are very fast (typically unit tests) +- [process-isolate schedulers](#process-isolate-schedulers), more robust and able to report test crashes, more scalable (able to run on multiple compute nodes), but also more heavy-weight + +### Process-reuse schedulers ### + +Process-reuse schedulers are mostly useful when you have numerous tests that are very fast, typically unit tests. + +For these schedulers, **pytest_parallel** is always launched through MPI, e.g.: + +```Bash +mpirun -np 4 pytest path/to/tests +``` + +Here, 4 MPI processes have been spawn by `mpirun`, and **pytest_parallel** will use them to run all the tests. Since `test_A` uses 2 processes, `test_B` and `test_D` use 1 process each (they are sequential) and `test_C` use 3 processes, we need `2+1+3+1 == 7` processes. This means some processes will be used by multiple tests. + +The main advantage is that the Python environment is loaded once and for all, hence if you have 1000 tests that take 1 millisecond each, and the loading of all your Python modules by the interpreter takes 1 second, then running PyTest will take approximately 2 seconds. + +However, the tests are not completely isolated, so if one test crash (e.g. due to an unrecoverable error), the error message may not point you directly to the faulty test. Deadlocks will also be difficult to pinpoint. + +There are 3 kinds of process-reuse schedulers: +- the [sequential scheduler](#sequential-scheduler) +- the [static scheduler](#static-scheduler) +- the [dynamic scheduler](#dynamic-scheduler) (this is the default one) + +To understand how they work, let's again our previous example: ```Python import pytest_parallel @pytest_parallel.mark.parallel(2) def test_A(comm): - if comm.Get_rank()==1: assert False + if comm.rank == 1: assert False def test_B(): assert True @pytest_parallel.mark.parallel(3) def test_C(comm): - assert comm.Get_size() == 3 + assert comm.size == 3 def test_D(): assert False ``` -Let's also fix the number of workers that we will be using to 4. This means that the test is launched with: +and run it on 4 processes. + +#### Sequential scheduler #### +The **sequential** scheduler can be selected with: ```Bash -mpirun -np 4 pytest test_pytest_parallel.py +mpirun -np 4 pytest --scheduler=sequential path/to/tests ``` -### Sequential scheduler ### - -The **sequential** scheduler just takes each test in order, one by one, and executes it on as many processes as it needs. The other processes are sleeping. On our example, this would result on the following sequence diagram: +This scheduler just takes each test in order, one by one, and executes on as many processes it needs. The other processes are sleeping. On our example, this would result in the following sequence diagram: ![sequential scheduler sequence diagram](doc/images/seq.png) -While it is not optimized for performance, the sequential scheduler is very useful when you get unrecoverable errors (e.g. segfault) because then your PyTest report may be incomplete. Running tests sequentially allows at least to find which test is at fault. +While it is not optimized for performance, the sequential scheduler is very useful when you get unrecoverable errors (e.g. segfaults) because then your PyTest report may be incomplete. Running tests sequentially allows at least to find which test is at fault. -The sequential scheduler is the default one. To enable it explicitly, you can pass the `--scheduler=sequential` option to PyTest. +#### Static scheduler #### -### Static scheduler ### +The **static** scheduler can be selected with: +```Bash +mpirun -np 4 pytest --scheduler=static path/to/tests +``` The **static** scheduler tries to distribute tests to minimize the number of idle processes. The process is static, that is, after test collection, it determines which process will execute which test, and in which order. On our example, it will result in the following scheduling: ![static scheduler sequence diagram](doc/images/static.png) The scheduler works by steps. Each step has `n_worker` slots (`n_worker` being the number of processes that PyTest was launched with). Each test will try to find a step with enough slots and will consume `n_proc` slots on the step. If no step is found, a new one is created, until each test has a step. -While this scheduler is more optimized, since it gives an a priori scheduling, it is not optimal depending on the duration of the tests. Let's look again at our example, but let's say `test_B` takes much longer than the others. We will then have the following sequence: +While this scheduler is more optimized, it gives an *a priori* scheduling, hence it is not optimal depending on the duration of the tests. Let's look again at our example, but let's say `test_B` and `test_D` take longer than the others. We will then have the following sequence: ![static scheduler sequence diagram - bad case](doc/images/static_bad.png) -We see that processes 0,1 and 2 wait at the first step for process 3 to finish. +We see that processes 0,1 and 2 wait for process 3 to finish the first step, whereas they could do meaningful work in the meantime. ### Dynamic scheduler ### -The **dynamic** scheduler spawns a new MPI process which acts as the master scheduler and sends work to the original processes. The scheduler tries to schedule tests requiring the most processes first. The scheduler tries to send work to idle process until all the processes are busy executing one test, or when not enough processes are ready to accept a test. It then waits for a signal that workers have finished their test to schedule further work. +The **dynamic** scheduler is the default one. To enable it explicitly, you can pass the `--scheduler=dynamic` option to PyTest: + +```Bash +mpirun -np 4 pytest --scheduler=dynamic path/to/tests +``` + +The scheduler spawns a new MPI process which acts as the master scheduler and sends work to the original processes. The scheduler tries to schedule tests requiring the most processes first. It sends work to idle processes until all the processes are busy executing a test, or until not enough processes are ready to accept a test. It then waits for a signal that workers have finished their test to schedule further work. Example: -![static scheduler sequence diagram - bad case](doc/images/dyn_anim.png) +![dynamic scheduler sequence diagram](doc/images/dyn_anim.png) + +When tests have significantly different durations, the dynamic scheduler is beneficial than the static scheduler. It has however a slightly longer startup (because it needs to spawn the master process). + +### Process-isolate schedulers ### + +Process-isolate schedulers spawn a new process for each new test. Or more exactly, for a test that is specified to use a communicator of size `N`, **pytest_parallel** will launch `N` MPI processes just for this test, and it will do so for each test of the test suite. + +Or course, these schedulers are more robust: even if a test crashes with an irrecoverable error, the other tests are not impacted and **pytest_parallel** will report errors correctly. But remember that these schedulers need to start a new Python interpreter and load the Python modules for each test: if you have a lot of fast tests, the start-up times add up. + +If you use a process-isolate scheduler, contrary to process-reuse schedulers, you don't launch PyTest through `mpirun -np N`. Rather, you launch PyTest directly and specify the `--n-workers` parameter. + +There are 2 kinds of process-isolate schedulers: the **shell** scheduler and the **SLURM** scheduler. + +#### Shell scheduler ### + +The **shell** scheduler can be selected with: +```Bash +pytest --n-workers=4 --scheduler=shell path/to/tests +``` +The scheduling algorithm is the same as the [static scheduler](#static-scheduler). + +#### SLURM scheduler ### +The **SLURM** scheduler can be selected with: +```Bash +pytest --n-workers=4 --scheduler=slurm path/to/tests +``` +SLURM takes care of the scheduling. This scheduler has specific options: +- `--slurm-options`: a list of arguments passed to [sbatch](https://slurm.schedmd.com/sbatch.html), for exemple `--slurm-options="--time=00:30:00 --qos=my_queue"`. Do **not** specify `--ntasks` here, since **pytest_parallel** will use the value given by `--n-workers`. +- `--slurm-srun-options`: a list options for [`srun`](https://slurm.schedmd.com/srun.html). For example: `--slurm-srun-options="--mem-per-cpu=4GBb"` +- `--slurm-export-env`: should the SLURM job use the same environment as the terminal that spawned it? Enabled by default. Use `--no-slurm-export-env` to disable. +- `--slurm-init-cmds`: commands to pass to the SLURM job that should be executed before the tests. Example: `--slurm-init-cmds="source my_env.sh"` + + +## FAQ ## + +### Which MPI implementation is supported? + +**pytest_parallel** has currently been tested and is used daily with OpenMPI and Intel MPI. MPICH is also regularly tested though the GitHub CI with the process-reuse schedulers. Other MPI implementations are supposed to work but have not been tested. An exception is the `shell` process-isolate scheduler that use implementation-specific environment variables to pin the processes to cores. Feel free to give use feedback/patches. + +### Which OS is supported? -#### Design alternative #### +**pytest_parallel** is daily tested and used on Linux machines. However, on the GitHub CI, the `sequential`, `static` and `dynamic` schedulers work with macOS, and the `sequential` and `static` schedulers work with Windows. -Instead of spawning a new MPI **process**, it would have been possible to spawn a new **thread** on process 0. However, it would then require to use `MPI_thread_init` with a value of at least `MPI_THREAD_FUNNELED`, and in practice, `MPI_THREAD_MULTIPLE` to ease the implementation of self-communication on rank 0. Here, no thread level is required (i.e. `MPI_THREAD_SINGLE` is **fine**). +### Which job scheduler is available? +Currently SLURM is the only job scheduler available. Other job schedulers (PBS, LFS...) are not supported currently. If you don't use SLURM, the `shell` scheduler may be enought for your tests as long as you dont want to use more than one compute node. -## Prerequisites ## -**pytest_parallel** is a pure PyTest plugin. It depends on mpi4py and Numpy +### **pytest_parallel** gives me a new communicator for each test, but my project only uses `MPI.COMM_WORLD`, how can I use **pytest_parallel**? + +The [process-isolate schedulers](#process-isolate-schedulers) can be used with tests using different sizes of `MPI.COMM_WORLD`. The `comm` fixture can then be discarded: + +```Python +import pytest_parallel +from mpi4py import MPI + +@pytest_parallel.mark.parallel(3) +def test_using_comm_world(comm): + # `comm` is unused but you currently need to write it down anyway + # because it is a fixture that comes with `@pytest_parallel.mark.parallel` + my_algo_implicitly_using_MPI_COMM_WORLD() +``` + +If you select only one test (e.g. because you are debugging this one), then you can also use a process-reuse scheduler. + +It may be good practice to assert the `comm` fixture has the correct size: + +```Python +@pytest_parallel.mark.parallel(3) +def test_using_comm_world(comm): + assert comm.size == MPI.COMM_WORLD.size # good practice, fails with a process-reuse scheduler if different communicator sizes + my_algo_implicitly_using_MPI_COMM_WORLD() +``` +This way, you will get a meaningful error message if you accidentally run multiple incompatible tests with a process-reuse scheduler. + +```Bash +mpirun -np 4 pytest --scheduler=shell path/to/tests +``` + +For unit tests, process-isolate schedulers are very slow, and **[process-reuse schedulers](#process-reuse-schedulers) will not work**. We really encourage you to generalize your function with an additional `comm` argument that is used for communication, rather than forcing your users to use `MPI.COMM_WORLD`. +It would be possible to develop hybrid process-reuse schedulers where processes are re-used, but only among tests of the same communicator size (and repeat the operation for as many communicator sizes there are on the test suite). If you feel the need, write a feature request and maybe we will implement it. + + +### Can I write an MPI test with no fixed number of processes and let **pytest_parallel** use `MPI.COMM_WORLD`? + +Not currently. **pytest_parallel** is designed to dissociate the parallelism specified for each test and the resources given to execute them. +If the need arizes, we could however: +- implement a mode that would use the number of processes given by the command line instead of the one specified with each test +- add a `@pytest_parallel.mark.parallel_from_context` decorator that would mark the test to be run with the maximum parallelism specified (that is, the number of processes given by the command line) + +### My test suite deadlocks. How do I pinpoint the test at fault? + +There is no magic technique. Try to narrow it down by using the [sequential scheduler](#sequential-scheduler). + +A solution that we need to implement is to handle timeouts for the [process-isolate schedulers](#process-isolate-schedulers). Feel free to submit a feature request. + +### Why is the [shell scheduler](#shell-scheduler) using a static scheduling strategy? + +The [shell scheduler](#shell-scheduler) uses the same scheduling algorithm as the [static scheduler](#static-scheduler) because it is easier to implement. We hope to also implement a dynamic scheduling strategy if we feel the need for it. + +### I want to use the static shell scheduler, but I have the error `MPI_INIT failed` + +On some systems, using `mpi4py` without `mpirun` does not work. For example, using: +```Bash +pytest --n-workers=4 --scheduler=shell path/to/tests +``` + +produces the following error: +``` +Error obtaining unique transport key from PMIX (OMPI_MCA_orte_precondition_transports not present in +the environment). + +It looks like MPI_INIT failed for some reason; your parallel process is +likely to abort. There are many reasons that a parallel process can +fail during MPI_INIT; some of which are due to configuration or environment +problems. This failure appears to be an internal failure; here's some +additional information (which may only be relevant to an Open MPI +developer): +``` + +In this case, try: +```Bash +mpirun -np 1 pytest --n-workers=4 --scheduler=shell path/to/tests +``` + +### Can I use **pytest_parallel** with MPI and OpenMP/pthreads/TBB? + +We do not use **pytest_parallel** with multi-treading, any feedback is welcomed! Regarding the `shell` scheduler, we explicitly pin one MPI process per core, with is probably wrong with multiple threads by MPI process. ## Plugin compatibility ## **pytest_parallel** is known to work with the **pytest-html** plugin. It is also compatible with the xml built-in plugin. -No other plugin has been tested. +No other plugin has been tested, feedback is welcomed. + ## Implementation notes ## +### Use of PyTest hooks ### We use PyTest hooks to schedule tests and gather report information from remote processes. That is, mainly: * either `pytest_collection_modifyitems` or `pytest_runtestloop` to schedule tests * `pytest_runtest_logreport` to gather test reports - * `pytest_pyfunc_call` to prevent the actual test code to be executed when it is actually executed on the other process + * `pytest_pyfunc_call` to prevent the actual test code to be executed when it is actually executed on the other process. PyTest expects its complete "pipeline" to be executed for all messages to be reported correctly, so we have to trick it: - * make it think that every test was run on the master rank. - * de-activate logging on other ranks + * make it think that every test was run on the master rank + * de-activate logging on other ranks. + +### Implementation of process-isolate schedulers ### + +In both cases, we have a master process (the one launched by the user) that will spawn worker processes. The master then waits to receive test reports from the workers. + +#### Information exchange #### + +The master and worker processes exchange information through sockets. Master creates a socket, then spawns workers by giving them the information of the socket (its ip and port), so that they can connect to it to send their report. + +Actually, in order to correctly report crashes, each test `t` using `Nt` processes is run and reported in two steps: +- First, `pytest --_worker` is launched `Nt` times by MPI. They run the test `t` and, and for each of the test stages `s` (`setup`/`call`/`teardown`), rank 0 writes a report in the file called `.pytest_parallel/tmp/{t}_{s}`. +- Then, when the `pytest --_worker` processes are done for the test (either because they finished or because they crashed), the `pytest_parallel.send_report` module is run. It looks for the files that rank 0 of `pytest --_worker` has supposedly written. If the files are there, they are sent to master through the socket. If one of the file is missing, it means that the worker processes have crashed. In this case, a crash report is created and is sent to master through the socket. Note that with MPI, errors are fatal, so if any process fail, then all the process fail, and since rank 0 needs to wait all process for their part of the report, if one fail during step `s`, rank 0 will fail **before writing anything to the `.pytest_parallel/tmp/{t}_{s}` file. + +#### Fake-running a test on master #### + +To trick PyTest into thinking the test is run by the master process, it runs `pytest_runtest_protocol` for each test but with the following hooks: +- `pytest_pyfunc_call` does nothing (i.e. does not actually execute the test) +- `pytest_runtest_logreport` creates a report by using the one that was received from the worker + +#### Shell scheduler specifics #### + +File: `shell_static_scheduler.py` + +The master process gather tests in "steps" according to the [static scheduling](#static-scheduler) algorithm. Then, for each step `i`: +- The master process writes a bash script in `.pytest_parallel/pytest_static_sched_{i}.sh`. +- It launches the script. All the tests of the step are run in parallel (through `&` and `wait`). The script pins exactly one core per MPI process. +- The master process waits to receive the test reports. +- When it has all the reports of the tests of the step, it reports them ([by fake-running the tests](fake-running-a-test-on-master)). +- It moves to the next step. + +#### SLURM scheduler specifics #### + +File: `process_scheduler.py` + +The master process writes a SLURM job `.pytest_parallel/job.sh` that is submitted through `sbatch`. In the job, to each test corresponds a "job step" launched with `srun --exclusive [...] &`. The `--exclusive` and `&` enables SLURM to schedule the job steps as best as it can. The job then waits for all the tests to finish (this is the `wait` command at the end of the script). Note: contrary to `sbatch`, the `--exclusive` option of `srun` enables job **steps** to be run in parallel. It does not mean that we want the ressource to be exclusive to the job (see [here](https://stackoverflow.com/a/45886983/1583122) and [here](https://stackoverflow.com/a/73970255/1583122)) + +Once submitted, the master process waits to receive test reports. Each time it receives a report, it treats it immediately ([by fake-running the test](fake-running-a-test-on-master)). ### Performance detail ### @@ -156,13 +379,24 @@ For the static and dynamic schedulers, on the master process, we must execute th This can be done by hooking `pytest_runtest_setup/call/teardown`. However, we observed it was greatly degrading performance (x5 !), so instead we just copy-pasted the `_pytest/runner/pytest_runtest_protocol` and hard-coded the execution shortcut. +### Design alternative ### + +Regarding the dynamic scheduler, instead of spawning a new MPI **process**, it would have been possible to spawn a new **thread** on process 0. However, it would then require to use `MPI_thread_init` with a value of at least `MPI_THREAD_FUNNELED`, and in practice, `MPI_THREAD_MULTIPLE` to ease the implementation of self-communication on rank 0. Here, no thread level is required (i.e. `MPI_THREAD_SINGLE` is **fine**). + +Another possibility would have been to use sockets for communications between the scheduling process and the worker processes (instead of using MPI inter-communicators). We used `MPI_Comm_spawn` because at the time we had no experience with sockets. Re-implementing the dynamic scheduler using sockets may be useful to make it more robust. + + +## Contributing ## + +Any contributions are welcome: bug report, feature requests, general feedback, pull requests. If you want to contribute a non-trivial pull-request, please begin by opening an issue explaining what you want to do in order to iterate on the best way to do it. -## TODO ## +## Future work ## -* Binding of MPI processes -* Schedule a test not only if enought procs are available, but also if the procs belong to some common NUMA domain (cluster, CPU, node...) +* More configuration options for the binding of MPI processes +* Dynamic scheduler: schedule a test not only if enought procs are available, but also if the procs belong to some common NUMA domain (cluster, CPU, node...) * Reserve more procs than declared in the test. Useful for loading and scaling performance tests. Possible API: ```Python @pytest_parallel.mark.parallel(4, exclusive_numa_domain='cpu') ``` * Dynamic scheduler: more asynchrony (send -> isend) +* Add the process-isolate scheduler to the CI \ No newline at end of file diff --git a/doc/images/test_fail.png b/doc/images/test_fail.png index 745c45f..ecd2490 100644 Binary files a/doc/images/test_fail.png and b/doc/images/test_fail.png differ diff --git a/doc/images/test_skip.png b/doc/images/test_skip.png index 5856f08..ce563d2 100644 Binary files a/doc/images/test_skip.png and b/doc/images/test_skip.png differ diff --git a/pyproject.toml b/pyproject.toml index bc0954c..d727668 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ authors = [ {name = "Berenger Berthoul", email = "berenger.berthoul@onera.fr"}, ] maintainers = [ - {name = "Bruno Maugars", email = "bruno.maugars@onera.fr"}, + {name = "Berenger Berthoul", email = "bruno.maugars@onera.fr"}, ] license = {text = "Mozilla Public License 2.0"} keywords = [ @@ -52,7 +52,7 @@ dependencies = [ "mpi4py", "numpy", ] -version = "1.2.0" +version = "1.3.0" [project.urls] Homepage = "https://github.com/onera/pytest_parallel" diff --git a/pytest_parallel/__init__.py b/pytest_parallel/__init__.py index 69936b8..9eeaa6e 100644 --- a/pytest_parallel/__init__.py +++ b/pytest_parallel/__init__.py @@ -1,3 +1,3 @@ -__version__ = "1.2" +__version__ = "1.3" from . import mark diff --git a/pytest_parallel/gather_report.py b/pytest_parallel/gather_report.py index ad7a4e3..3a7e1dc 100644 --- a/pytest_parallel/gather_report.py +++ b/pytest_parallel/gather_report.py @@ -45,8 +45,8 @@ def gather_report_on_local_rank_0(report): del report.sub_comm # No need to keep it in the report # Furthermore we need to serialize the report # and mpi4py does not know how to serialize report.sub_comm - i_sub_rank = sub_comm.Get_rank() - n_sub_rank = sub_comm.Get_size() + i_sub_rank = sub_comm.rank + n_sub_rank = sub_comm.size if ( report.outcome != "skipped" diff --git a/pytest_parallel/mpi_reporter.py b/pytest_parallel/mpi_reporter.py index 0ead5f4..a3062d9 100644 --- a/pytest_parallel/mpi_reporter.py +++ b/pytest_parallel/mpi_reporter.py @@ -1,17 +1,18 @@ -import numpy as np +import sys + import pytest from mpi4py import MPI from .algo import partition, lower_bound -from .utils import get_n_proc_for_test, add_n_procs, run_item_test, mark_original_index -from .utils_mpi import number_of_working_processes, is_dyn_master_process +from .utils.items import get_n_proc_for_test, add_n_procs, run_item_test, mark_original_index +from .utils.mpi import number_of_working_processes, is_dyn_master_process from .gather_report import gather_report_on_local_rank_0 from .static_scheduler_utils import group_items_by_parallel_steps def mark_skip(item): comm = MPI.COMM_WORLD - n_rank = comm.Get_size() + n_rank = comm.size n_proc_test = get_n_proc_for_test(item) skip_msg = f"Not enough procs to execute: {n_proc_test} required but only {n_rank} available" item.add_marker(pytest.mark.skip(reason=skip_msg), append=False) @@ -28,7 +29,8 @@ def create_sub_comm_of_size(global_comm, n_proc, mpi_comm_creation_function): if mpi_comm_creation_function == 'MPI_Comm_create': return sub_comm_from_ranks(global_comm, range(0,n_proc)) elif mpi_comm_creation_function == 'MPI_Comm_split': - if i_rank < n_proc_test: + i_rank = global_comm.rank + if i_rank < n_proc: color = 1 else: color = MPI.UNDEFINED @@ -37,8 +39,7 @@ def create_sub_comm_of_size(global_comm, n_proc, mpi_comm_creation_function): assert 0, 'Unknown MPI communicator creation function. Available: `MPI_Comm_create`, `MPI_Comm_split`' def create_sub_comms_for_each_size(global_comm, mpi_comm_creation_function): - i_rank = global_comm.Get_rank() - n_rank = global_comm.Get_size() + n_rank = global_comm.size sub_comms = [None] * n_rank for i in range(0,n_rank): n_proc = i+1 @@ -47,8 +48,7 @@ def create_sub_comms_for_each_size(global_comm, mpi_comm_creation_function): def add_sub_comm(items, global_comm, test_comm_creation, mpi_comm_creation_function): - i_rank = global_comm.Get_rank() - n_rank = global_comm.Get_size() + n_rank = global_comm.size # Strategy 'by_rank': create one sub-communicator by size, from sequential (size=1) to n_rank if test_comm_creation == 'by_rank': @@ -71,12 +71,17 @@ def add_sub_comm(items, global_comm, test_comm_creation, mpi_comm_creation_funct assert 0, 'Unknown test MPI communicator creation strategy. Available: `by_rank`, `by_test`' class SequentialScheduler: - def __init__(self, global_comm, test_comm_creation='by_rank', mpi_comm_creation_function='MPI_Comm_create', barrier_at_test_start=True, barrier_at_test_end=True): + def __init__(self, global_comm): self.global_comm = global_comm.Dup() # ensure that all communications within the framework are private to the framework - self.test_comm_creation = test_comm_creation - self.mpi_comm_creation_function = mpi_comm_creation_function - self.barrier_at_test_start = barrier_at_test_start - self.barrier_at_test_end = barrier_at_test_end + + # These parameters are not accessible through the API, but are left here for tweaking and experimenting + self.test_comm_creation = 'by_rank' # possible values : 'by_rank' | 'by_test' + self.mpi_comm_creation_function = 'MPI_Comm_create' # possible values : 'MPI_Comm_create' | 'MPI_Comm_split' + self.barrier_at_test_start = True + self.barrier_at_test_end = True + if sys.platform == "win32": + self.mpi_comm_creation_function = 'MPI_Comm_split' # because 'MPI_Comm_create' uses `Create_group`, + # that is not implemented in mpi4py for Windows @pytest.hookimpl(trylast=True) def pytest_collection_modifyitems(self, config, items): @@ -86,20 +91,10 @@ def pytest_collection_modifyitems(self, config, items): def pytest_runtest_protocol(self, item, nextitem): if self.barrier_at_test_start: self.global_comm.barrier() - #print(f'pytest_runtest_protocol beg {MPI.COMM_WORLD.rank=}') _ = yield - #print(f'pytest_runtest_protocol end {MPI.COMM_WORLD.rank=}') if self.barrier_at_test_end: self.global_comm.barrier() - #@pytest.hookimpl(tryfirst=True) - #def pytest_runtest_protocol(self, item, nextitem): - # if self.barrier_at_test_start: - # self.global_comm.barrier() - # print(f'pytest_runtest_protocol beg {MPI.COMM_WORLD.rank=}') - # if item.sub_comm == MPI.COMM_NULL: - # return True # for this hook, `firstresult=True` so returning a non-None will stop other hooks to run - @pytest.hookimpl(tryfirst=True) def pytest_pyfunc_call(self, pyfuncitem): #print(f'pytest_pyfunc_call {MPI.COMM_WORLD.rank=}') @@ -113,7 +108,7 @@ def pytest_runtestloop(self, session) -> bool: _ = yield # prevent return value being non-zero (ExitCode.NO_TESTS_COLLECTED) # when no test run on non-master - if self.global_comm.Get_rank() != 0 and session.testscollected == 0: + if self.global_comm.rank != 0 and session.testscollected == 0: session.testscollected = 1 return True @@ -136,7 +131,7 @@ def pytest_runtest_logreport(self, report): def prepare_items_to_run(items, comm): - i_rank = comm.Get_rank() + i_rank = comm.rank items_to_run = [] @@ -168,7 +163,7 @@ def prepare_items_to_run(items, comm): def items_to_run_on_this_proc(items_by_steps, items_to_skip, comm): - i_rank = comm.Get_rank() + i_rank = comm.rank items = [] @@ -204,14 +199,13 @@ def pytest_runtestloop(self, session) -> bool: and not session.config.option.continue_on_collection_errors ): raise session.Interrupted( - "%d error%s during collection" - % (session.testsfailed, "s" if session.testsfailed != 1 else "") + f"{session.testsfailed} error{'s' if session.testsfailed != 1 else ''} during collection" ) if session.config.option.collectonly: return True - n_workers = self.global_comm.Get_size() + n_workers = self.global_comm.size add_n_procs(session.items) @@ -221,20 +215,12 @@ def pytest_runtestloop(self, session) -> bool: items_by_steps, items_to_skip, self.global_comm ) - for i, item in enumerate(items): - # nextitem = items[i + 1] if i + 1 < len(items) else None - # For optimization purposes, it would be nice to have the previous commented line - # (`nextitem` is only used internally by PyTest in _setupstate.teardown_exact) - # Here, it does not work: - # it seems that things are messed up on rank 0 - # because the nextitem might not be run (see pytest_runtest_setup/call/teardown hooks just above) - # In practice though, it seems that it is not the main thing that slows things down... - + for item in items: nextitem = None run_item_test(item, nextitem, session) # prevent return value being non-zero (ExitCode.NO_TESTS_COLLECTED) when no test run on non-master - if self.global_comm.Get_rank() != 0 and session.testscollected == 0: + if self.global_comm.rank != 0 and session.testscollected == 0: session.testscollected = 1 return True @@ -256,8 +242,8 @@ def pytest_runtest_logreport(self, report): gather_report_on_local_rank_0(report) # master ranks of each sub_comm must send their report to rank 0 - if sub_comm.Get_rank() == 0: # only master are concerned - if self.global_comm.Get_rank() != 0: # if master is not global master, send + if sub_comm.rank == 0: # only master are concerned + if self.global_comm.rank != 0: # if master is not global master, send self.global_comm.send(report, dest=0) elif report.master_running_proc != 0: # else, recv if test run remotely # In the line below, MPI.ANY_TAG will NOT clash with communications outside the framework because self.global_comm is private @@ -322,7 +308,7 @@ def schedule_test(item, available_procs, inter_comm): # mark the procs as busy for sub_rank in sub_ranks: - available_procs[sub_rank] = False + available_procs[sub_rank] = 0 # TODO isend would be slightly better (less waiting) for sub_rank in sub_ranks: @@ -354,11 +340,11 @@ def wait_test_to_complete(items_to_run, session, available_procs, inter_comm): for sub_rank in sub_ranks: if sub_rank != first_rank_done: rank_original_idx = inter_comm.recv(source=sub_rank, tag=WORK_DONE_TAG) - assert (rank_original_idx == original_idx) # sub_rank is supposed to have worked on the same test + assert rank_original_idx == original_idx # sub_rank is supposed to have worked on the same test # the procs are now available for sub_rank in sub_ranks: - available_procs[sub_rank] = True + available_procs[sub_rank] = 1 # "run" the test (i.e. trigger PyTest pipeline but do not really run the code) nextitem = None # not known at this point @@ -366,7 +352,7 @@ def wait_test_to_complete(items_to_run, session, available_procs, inter_comm): def wait_last_tests_to_complete(items_to_run, session, available_procs, inter_comm): - while np.sum(available_procs) < len(available_procs): + while sum(available_procs) < len(available_procs): wait_test_to_complete(items_to_run, session, available_procs, inter_comm) @@ -418,8 +404,7 @@ def pytest_runtestloop(self, session) -> bool: and not session.config.option.continue_on_collection_errors ): raise session.Interrupted( - "%d error%s during collection" - % (session.testsfailed, "s" if session.testsfailed != 1 else "") + f"{session.testsfailed} error{'s' if session.testsfailed != 1 else ''} during collection" ) if session.config.option.collectonly: @@ -451,10 +436,10 @@ def pytest_runtestloop(self, session) -> bool: # schedule tests to run items_left_to_run = sorted(items_to_run, key=lambda item: item.n_proc) - available_procs = np.ones(n_workers, dtype=np.int8) + available_procs = [1] * n_workers while len(items_left_to_run) > 0: - n_av_procs = np.sum(available_procs) + n_av_procs = sum(available_procs) item_idx = item_with_biggest_admissible_n_proc(items_left_to_run, n_av_procs) @@ -511,7 +496,7 @@ def pytest_runtest_logreport(self, report): sub_comm = report.sub_comm gather_report_on_local_rank_0(report) - if sub_comm.Get_rank() == 0: # if local master proc, send + if sub_comm.rank == 0: # if local master proc, send # The idea of the scheduler is the following: # The server schedules test over clients # A client executes the test then report to the server it is done diff --git a/pytest_parallel/plugin.py b/pytest_parallel/plugin.py index 3544bbc..2dca271 100644 --- a/pytest_parallel/plugin.py +++ b/pytest_parallel/plugin.py @@ -7,10 +7,13 @@ import tempfile from pathlib import Path import argparse -import resource + import pytest from _pytest.terminal import TerminalReporter +class PytestParallelError(ValueError): + pass + # -------------------------------------------------------------------------- def pytest_addoption(parser): parser.addoption( @@ -25,11 +28,10 @@ def pytest_addoption(parser): parser.addoption('--timeout', dest='timeout', type=int, default=7200, help='Timeout') - parser.addoption('--slurm-options', dest='slurm_options', type=str, help='list of SLURM options e.g. "--time=00:30:00 --qos=my_queue --n_tasks=4"') + parser.addoption('--slurm-options', dest='slurm_options', type=str, help='list of SLURM options e.g. "--time=00:30:00 --qos=my_queue"') parser.addoption('--slurm-srun-options', dest='slurm_srun_options', type=str, help='list of SLURM srun options e.g. "--mem-per-cpu=4GB"') - parser.addoption('--slurm-additional-cmds', dest='slurm_additional_cmds', type=str, help='list of commands to pass to SLURM job e.g. "source my_env.sh"') + parser.addoption('--slurm-init-cmds', dest='slurm_init_cmds', type=str, help='list of commands to pass to SLURM job e.g. "source my_env.sh"') parser.addoption('--slurm-file', dest='slurm_file', type=str, help='Path to file containing header of SLURM job') # TODO DEL - parser.addoption('--slurm-sub-command', dest='slurm_sub_command', type=str, help='SLURM submission command (defaults to `sbatch`)') # TODO DEL if sys.version_info >= (3,9): parser.addoption('--slurm-export-env', dest='slurm_export_env', action=argparse.BooleanOptionalAction, default=True) @@ -43,6 +45,7 @@ def pytest_addoption(parser): parser.addoption('--_worker', dest='_worker', action='store_true', help='Internal pytest_parallel option') parser.addoption('--_scheduler_ip_address', dest='_scheduler_ip_address', type=str, help='Internal pytest_parallel option') parser.addoption('--_scheduler_port', dest='_scheduler_port', type=int, help='Internal pytest_parallel option') + parser.addoption('--_session_folder', dest='_session_folder', type=str, help='Internal pytest_parallel option') parser.addoption('--_test_idx' , dest='_test_idx' , type=int, help='Internal pytest_parallel option') # Note: @@ -51,13 +54,21 @@ def pytest_addoption(parser): # because it can mess SLURM `srun` if "--scheduler=slurm" in sys.argv: assert 'mpi4py.MPI' not in sys.modules, 'Internal pytest_parallel error: mpi4py.MPI should not be imported' \ - ' when we are about to register and environment for SLURM' \ - ' (because importing mpi4py.MPI makes the current process look like and MPI process,' \ - ' and SLURM does not like that)' + ' when we are about to register an environment for SLURM' \ + ' (because importing mpi4py.MPI makes the current process look like and MPI process,' \ + ' and SLURM does not like that)' + if os.getenv('I_MPI_MPIRUN') is not None: + err_msg = 'Internal pytest_parallel error: the environment variable I_MPI_MPIRUN is set' \ + f' (it has value "{os.getenv("I_MPI_MPIRUN")}"),\n' \ + ' while pytest was invoked with "--scheduler=slurm".\n' \ + ' This indicates that pytest was run through MPI, and SLURM generally does not like that.\n' \ + ' With "--scheduler=slurm", just run `pytest` directly, not through `mpirun/mpiexec/srun`,\n' \ + ' because it will launch MPI itself (you may want to use --n-workers=).' + raise PytestParallelError(err_msg) r = subprocess.run(['env','--null'], stdout=subprocess.PIPE) # `--null`: end each output line with NUL, required by `sbatch --export-file` - assert r.returncode==0, 'SLURM scheduler: error when writing `env` to `pytest_slurm/env_vars.sh`' + assert r.returncode==0, 'Internal pytest_parallel SLURM schedule error: error when writing `env` to `pytest_slurm/env_vars.sh`' pytest._pytest_parallel_env_vars = r.stdout # -------------------------------------------------------------------------- @@ -70,44 +81,71 @@ def _invoke_params(args): quoted_invoke_params.append(arg) return ' '.join(quoted_invoke_params) +# -------------------------------------------------------------------------- +def _set_timeout(timeout): + if sys.platform != "win32": + import resource + resource.setrlimit(resource.RLIMIT_CPU, (timeout, timeout)) + # if windows, we don't know how to do that + # -------------------------------------------------------------------------- @pytest.hookimpl(trylast=True) def pytest_configure(config): # Set timeout timeout = config.getoption('timeout') - resource.setrlimit(resource.RLIMIT_CPU, (timeout, timeout)) + _set_timeout(timeout) # Get options and check dependent/incompatible options scheduler = config.getoption('scheduler') n_workers = config.getoption('n_workers') slurm_options = config.getoption('slurm_options') slurm_srun_options = config.getoption('slurm_srun_options') - slurm_additional_cmds = config.getoption('slurm_additional_cmds') + slurm_init_cmds = config.getoption('slurm_init_cmds') is_worker = config.getoption('_worker') slurm_file = config.getoption('slurm_file') slurm_export_env = config.getoption('slurm_export_env') - slurm_sub_command = config.getoption('slurm_sub_command') detach = config.getoption('detach') - if scheduler != 'slurm' and scheduler != 'shell': - assert not is_worker, 'Option `--slurm-worker` only available when `--scheduler=slurm` or `--scheduler=shell`' - if (scheduler == 'slurm' or scheduler == 'shell') and not is_worker: - assert n_workers, f'You need to specify `--n-workers` when `--scheduler={scheduler}`' + if not scheduler in ['slurm', 'shell']: + assert not is_worker, f'Internal pytest_parallel error `--_worker` not available with`--scheduler={scheduler}`' + if scheduler in ['slurm', 'shell'] and not is_worker: + if n_workers is None: + raise PytestParallelError(f'You need to specify `--n-workers` when `--scheduler={scheduler}`') if scheduler != 'slurm': - assert not slurm_options, 'Option `--slurm-options` only available when `--scheduler=slurm`' - assert not slurm_srun_options, 'Option `--slurms-run-options` only available when `--scheduler=slurm`' - assert not slurm_additional_cmds, 'Option `--slurm-additional-cmds` only available when `--scheduler=slurm`' - assert not slurm_file, 'Option `--slurm-file` only available when `--scheduler=slurm`' + if slurm_options is not None: + raise PytestParallelError('Option `--slurm-options` only available when `--scheduler=slurm`') + if slurm_srun_options is not None: + raise PytestParallelError('Option `--slurms-run-options` only available when `--scheduler=slurm`') + if slurm_init_cmds is not None: + raise PytestParallelError('Option `--slurm-init-cmds` only available when `--scheduler=slurm`') + if slurm_file is not None: + raise PytestParallelError('Option `--slurm-file` only available when `--scheduler=slurm`') + + if scheduler in ['shell', 'slurm'] and not is_worker: + from mpi4py import MPI + if MPI.COMM_WORLD.size != 1: + err_msg = 'Do not launch `pytest_parallel` on more that one process when `--scheduler=shell` or `--scheduler=slurm`.\n' \ + '`pytest_parallel` will spawn MPI processes itself.\n' \ + f'You may want to use --n-workers={MPI.COMM_WORLD.size}.' + raise PytestParallelError(err_msg) + if scheduler == 'slurm' and not is_worker: - assert slurm_options or slurm_file, 'You need to specify either `--slurm-options` or `--slurm-file` when `--scheduler=slurm`' + if slurm_options is None and slurm_file is None: + raise PytestParallelError('You need to specify either `--slurm-options` or `--slurm-file` when `--scheduler=slurm`') if slurm_options: - assert not slurm_file, 'You need to specify either `--slurm-options` or `--slurm-file`, but not both' + if slurm_file: + raise PytestParallelError('You need to specify either `--slurm-options` or `--slurm-file`, but not both') if slurm_file: - assert not slurm_options, 'You need to specify either `--slurm-options` or `--slurm-file`, but not both' - assert not slurm_additional_cmds, 'You cannot specify `--slurm-additional-cmds` together with `--slurm-file`' + if slurm_options: + raise PytestParallelError('You need to specify either `--slurm-options` or `--slurm-file`, but not both') + if slurm_init_cmds: + raise PytestParallelError('You cannot specify `--slurm-init-cmds` together with `--slurm-file`') + + if '-n=' in slurm_options or '--ntasks=' in slurm_options: + raise PytestParallelError('Do not specify `-n/--ntasks` in `--slurm-options` (it is deduced from the `--n-worker` value).') - from .process_scheduler import ProcessScheduler + from .slurm_scheduler import SlurmScheduler enable_terminal_reporter = True @@ -117,17 +155,16 @@ def pytest_configure(config): ## pull apart `--slurm-options` for special treatement main_invoke_params = main_invoke_params.replace(f'--slurm-options={slurm_options}', '') for file_or_dir in config.option.file_or_dir: - main_invoke_params = main_invoke_params.replace(file_or_dir, '') + main_invoke_params = main_invoke_params.replace(file_or_dir, '') slurm_option_list = slurm_options.split() if slurm_options is not None else [] slurm_conf = { - 'options' : slurm_option_list, - 'srun_options' : slurm_srun_options, - 'additional_cmds': slurm_additional_cmds, - 'file' : slurm_file, - 'export_env' : slurm_export_env, - 'sub_command' : slurm_sub_command, + 'options' : slurm_option_list, + 'srun_options': slurm_srun_options, + 'init_cmds' : slurm_init_cmds, + 'file' : slurm_file, + 'export_env' : slurm_export_env, } - plugin = ProcessScheduler(main_invoke_params, n_workers, slurm_conf, detach) + plugin = SlurmScheduler(main_invoke_params, n_workers, slurm_conf, detach) elif scheduler == 'shell' and not is_worker: from .shell_static_scheduler import ShellStaticScheduler @@ -136,13 +173,13 @@ def pytest_configure(config): # reconstruct complete invoke string main_invoke_params = _invoke_params(config.invocation_params.args) for file_or_dir in config.option.file_or_dir: - main_invoke_params = main_invoke_params.replace(file_or_dir, '') + main_invoke_params = main_invoke_params.replace(file_or_dir, '') plugin = ShellStaticScheduler(main_invoke_params, n_workers, detach) else: from mpi4py import MPI from .mpi_reporter import SequentialScheduler, StaticScheduler, DynamicScheduler from .process_worker import ProcessWorker - from .utils_mpi import spawn_master_process, should_enable_terminal_reporter + from .utils.mpi import spawn_master_process, should_enable_terminal_reporter global_comm = MPI.COMM_WORLD enable_terminal_reporter = should_enable_terminal_reporter(global_comm, scheduler) @@ -154,11 +191,12 @@ def pytest_configure(config): elif scheduler == 'dynamic': inter_comm = spawn_master_process(global_comm) plugin = DynamicScheduler(global_comm, inter_comm) - elif (scheduler == 'slurm' or scheduler == 'shell') and is_worker: + elif scheduler in ['shell', 'slurm'] and is_worker: scheduler_ip_address = config.getoption('_scheduler_ip_address') scheduler_port = config.getoption('_scheduler_port') + session_folder = config.getoption('_session_folder') test_idx = config.getoption('_test_idx') - plugin = ProcessWorker(scheduler_ip_address, scheduler_port, test_idx, detach) + plugin = ProcessWorker(scheduler_ip_address, scheduler_port, session_folder, test_idx, detach) else: assert 0 @@ -172,7 +210,7 @@ def pytest_configure(config): # Pytest relies on having a terminal reporter to decide on how to create error messages, see #12 # Hence, register a terminal reporter that outputs to /dev/null - null_file = open(os.devnull,'w') + null_file = open(os.devnull,'w', encoding='utf-8') terminal_reporter = TerminalReporter(config, null_file) config.pluginmanager.register(terminal_reporter, "terminalreporter") @@ -201,16 +239,16 @@ def __init__(self, comm): def __enter__(self): from mpi4py import MPI if self.comm != MPI.COMM_NULL: # TODO DEL once non-participating rank do not participate in fixtures either - rank = self.comm.Get_rank() + rank = self.comm.rank self.tmp_dir = tempfile.TemporaryDirectory() if rank == 0 else None self.tmp_path = Path(self.tmp_dir.name) if rank == 0 else None return self.comm.bcast(self.tmp_path, root=0) - def __exit__(self, type, value, traceback): + def __exit__(self, ex_type, ex_value, traceback): from mpi4py import MPI if self.comm != MPI.COMM_NULL: # TODO DEL once non-participating rank do not participate in fixtures either self.comm.barrier() - if self.comm.Get_rank() == 0: + if self.comm.rank == 0: self.tmp_dir.cleanup() diff --git a/pytest_parallel/process_worker.py b/pytest_parallel/process_worker.py index a3c7f22..3c76c21 100644 --- a/pytest_parallel/process_worker.py +++ b/pytest_parallel/process_worker.py @@ -1,20 +1,24 @@ -import pytest +from pathlib import Path +import pickle +import pytest from mpi4py import MPI -import socket -import pickle -from . import socket_utils -from .utils import get_n_proc_for_test, run_item_test +from .utils.items import get_n_proc_for_test, run_item_test from .gather_report import gather_report_on_local_rank_0 class ProcessWorker: - def __init__(self, scheduler_ip_address, scheduler_port, test_idx, detach): + def __init__(self, scheduler_ip_address, scheduler_port, session_folder, test_idx, detach): self.scheduler_ip_address = scheduler_ip_address self.scheduler_port = scheduler_port + self.session_folder = session_folder self.test_idx = test_idx self.detach = detach + + def _file_path(self, when): + return Path(f'.pytest_parallel/{self.session_folder}/_partial/{self.test_idx}_{when}') + @pytest.hookimpl(tryfirst=True) def pytest_runtestloop(self, session) -> bool: comm = MPI.COMM_WORLD @@ -26,26 +30,32 @@ def pytest_runtestloop(self, session) -> bool: item.test_info = {'test_idx': self.test_idx, 'fatal_error': None} - if comm.Get_size() != test_comm_size: # fatal error, SLURM and MPI do not interoperate correctly - error_info = f'FATAL ERROR in pytest_parallel with slurm scheduling: test `{item.nodeid}`' \ - f' uses a `comm` of size {test_comm_size} but was launched with size {comm.Get_size()}.\n' \ - f' This generally indicates that `srun` does not interoperate correctly with MPI.' + # check there is no file from a previous run + if comm.rank == 0: + for when in ['fatal_error', 'setup', 'call', 'teardown']: + path = self._file_path(when) + assert not path.exists(), f'INTERNAL FATAL ERROR in pytest_parallel: file "{path}" should not exist at this point' - item.test_info['fatal_error'] = error_info - else: # normal case: the test can be run - nextitem = None - run_item_test(item, nextitem, session) + # check the number of procs matches the one specified by the test + if comm.size != test_comm_size: # fatal error, SLURM and MPI do not interoperate correctly + if comm.rank == 0: + error_info = f'FATAL ERROR in pytest_parallel with slurm scheduling: test `{item.nodeid}`' \ + f' uses a `comm` of size {test_comm_size} but was launched with size {comm.size}.\n' \ + f' This generally indicates that `srun` does not interoperate correctly with MPI.' + file_path = self._file_path('fatal_error') + with open(file_path, 'w', encoding='utf-8') as f: + f.write(error_info) + return True - if not self.detach and comm.Get_rank() == 0: # not detached: proc 0 is expected to send results to scheduling process - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.connect((self.scheduler_ip_address, self.scheduler_port)) - socket_utils.send(s, pickle.dumps(item.test_info)) + # run the test + nextitem = None + run_item_test(item, nextitem, session) if item.test_info['fatal_error'] is not None: assert 0, f'{item.test_info["fatal_error"]}' return True - + @pytest.hookimpl(hookwrapper=True) def pytest_runtest_makereport(self, item): """ @@ -61,9 +71,11 @@ def pytest_runtest_makereport(self, item): @pytest.hookimpl(tryfirst=True) def pytest_runtest_logreport(self, report): assert report.when in ("setup", "call", "teardown") # only known tags + sub_comm = report.sub_comm # keep `sub_comm` because `gather_report_on_local_rank_0` removes it gather_report_on_local_rank_0(report) - report.test_info.update({report.when: {'outcome' : report.outcome, - 'longrepr': report.longrepr, - 'duration': report.duration, }}) - - + report_info = {'outcome' : report.outcome, + 'longrepr': report.longrepr, + 'duration': report.duration, } + if sub_comm.rank == 0: + with open(self._file_path(report.when), 'wb') as f: + f.write(pickle.dumps(report_info)) diff --git a/pytest_parallel/send_report.py b/pytest_parallel/send_report.py new file mode 100644 index 0000000..2a92b1a --- /dev/null +++ b/pytest_parallel/send_report.py @@ -0,0 +1,71 @@ +import argparse +import socket +import pickle +from pathlib import Path +from _pytest._code.code import ( + ExceptionChainRepr, + ReprTraceback, + ReprEntryNative, +) +from .utils.socket import send as socket_send + + +parser = argparse.ArgumentParser(description='Send return the codes of the tests to the master pytest_parallel process') + +parser.add_argument('--_scheduler_ip_address', dest='_scheduler_ip_address', type=str) +parser.add_argument('--_scheduler_port', dest='_scheduler_port', type=int) +parser.add_argument('--_session_folder', dest='_session_folder', type=str) +parser.add_argument('--_test_idx', dest='_test_idx', type=int) +parser.add_argument('--_test_name', dest='_test_name', type=str) + +args = parser.parse_args() + +def _file_path(when): + return Path(f'.pytest_parallel/{args._session_folder}/_partial/{args._test_idx}_{when}') + +test_info = {'test_idx': args._test_idx, 'fatal_error': None} # TODO no fatal_error=None (absense means no error) + +# 'fatal_error' file +file_path = _file_path('fatal_error') +if file_path.exists(): + with open(file_path, 'r', encoding='utf-8') as file: + fatal_error = file.read() + test_info['fatal_error'] = fatal_error + + +# 'setup/call/teardown' files +already_failed = False +for when in ('setup', 'call', 'teardown'): + file_path = _file_path(when) + if file_path.exists(): + try: + with open(file_path, 'rb') as file: + report_info = file.read() + report_info = pickle.loads(report_info) + test_info[when] = report_info + except pickle.PickleError: + test_info['fatal_error'] = f'FATAL ERROR in pytest_parallel : unable to decode {file_path}' + else: # Supposedly not found because the test crashed before writing the file + collect_longrepr = [] + msg = 'Error: the test crashed. ' + red = 31 + bold = 1 + msg = f'\x1b[{red}m' + f'\x1b[{bold}m' + msg+ '\x1b[0m' + msg += f'Log file: {args._test_name}\n' + trace_back = ReprTraceback([ReprEntryNative(msg)], None, None) + collect_longrepr.append( + (trace_back, None, None) + ) + longrepr = ExceptionChainRepr(collect_longrepr) + + outcome = 'passed' if already_failed else 'failed' # No need to report the error twice + test_info[when] = {'outcome' : outcome, + 'longrepr': longrepr, + 'duration': 0, } # unable to report accurately + + already_failed = True + + +with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((args._scheduler_ip_address, args._scheduler_port)) + socket_send(s, pickle.dumps(test_info)) diff --git a/pytest_parallel/shell_static_scheduler.py b/pytest_parallel/shell_static_scheduler.py index 980c476..bc8b4a3 100644 --- a/pytest_parallel/shell_static_scheduler.py +++ b/pytest_parallel/shell_static_scheduler.py @@ -1,77 +1,17 @@ -import pytest import os import stat import subprocess import socket import pickle -from pathlib import Path -from . import socket_utils -from .utils import get_n_proc_for_test, add_n_procs, run_item_test, mark_original_index -from .algo import partition -from .static_scheduler_utils import group_items_by_parallel_steps + +import pytest from mpi4py import MPI -import numpy as np - -def mark_skip(item, ntasks): - n_proc_test = get_n_proc_for_test(item) - skip_msg = f"Not enough procs to execute: {n_proc_test} required but only {ntasks} available" - item.add_marker(pytest.mark.skip(reason=skip_msg), append=False) - item.marker_mpi_skip = True - -def replace_sub_strings(s, subs, replacement): - res = s - for sub in subs: - res = res.replace(sub,replacement) - return res - -def remove_exotic_chars(s): - return replace_sub_strings(str(s), ['[',']','/', ':'], '_') - -def parse_job_id_from_submission_output(s): - # At this point, we are trying to guess -_- - # Here we supposed that the command for submitting the job - # returned string with only one number, - # and that this number is the job id - import re - return int(re.search(r'\d+', str(s)).group()) - - -# https://stackoverflow.com/a/34177358 -def command_exists(cmd_name): - """Check whether `name` is on PATH and marked as executable.""" - from shutil import which - return which(cmd_name) is not None - -def _get_my_ip_address(): - hostname = socket.gethostname() - - assert command_exists('tracepath'), 'pytest_parallel SLURM scheduler: command `tracepath` is not available' - cmd = ['tracepath','-4','-n',hostname] - r = subprocess.run(cmd, stdout=subprocess.PIPE) - assert r.returncode==0, f'pytest_parallel SLURM scheduler: error running command `{" ".join(cmd)}`' - ips = r.stdout.decode("utf-8") - - try: - my_ip = ips.split('\n')[0].split(':')[1].split()[0] - except: - assert 0, f'pytest_parallel SLURM scheduler: error parsing result `{ips}` of command `{" ".join(cmd)}`' - import ipaddress - try: - ipaddress.ip_address(my_ip) - except ValueError: - assert 0, f'pytest_parallel SLURM scheduler: error parsing result `{ips}` of command `{" ".join(cmd)}`' - - return my_ip - -def setup_socket(socket): - # Find IP our address - SCHEDULER_IP_ADDRESS = _get_my_ip_address() - - # setup master's socket - socket.bind((SCHEDULER_IP_ADDRESS, 0)) # 0: let the OS choose an available port - socket.listen() - port = socket.getsockname()[1] - return SCHEDULER_IP_ADDRESS, port + +from .utils.socket import recv as socket_recv +from .utils.socket import setup_socket +from .utils.items import add_n_procs, run_item_test, mark_original_index, mark_skip +from .utils.file import remove_exotic_chars, create_folders +from .static_scheduler_utils import group_items_by_parallel_steps def mpi_command(current_proc, n_proc): mpi_vendor = MPI.get_vendor()[0] @@ -85,57 +25,78 @@ def mpi_command(current_proc, n_proc): else: assert 0, f'Unknown MPI implementation "{mpi_vendor}"' -def submit_items(items_to_run, SCHEDULER_IP_ADDRESS, port, main_invoke_params, ntasks, i_step, n_step): +def submit_items(items_to_run, SCHEDULER_IP_ADDRESS, port, session_folder, main_invoke_params, i_step, n_step): # sort item by comm size to launch bigger first (Note: in case SLURM prioritize first-received items) items = sorted(items_to_run, key=lambda item: item.n_proc, reverse=True) # launch `mpiexec` for each item - worker_flags=f"--_worker --_scheduler_ip_address={SCHEDULER_IP_ADDRESS} --_scheduler_port={port}" + script_prolog = '' + script_prolog += '#!/bin/bash\n\n' + + socket_flags=f"--_scheduler_ip_address={SCHEDULER_IP_ADDRESS} --_scheduler_port={port} --_session_folder={session_folder}" cmds = [] current_proc = 0 for item in items: test_idx = item.original_index - test_out_file_base = f'.pytest_parallel/{remove_exotic_chars(item.nodeid)}' - cmd = mpi_command(current_proc, item.n_proc) - cmd += f' python3 -u -m pytest -s {worker_flags} {main_invoke_params} --_test_idx={test_idx} {item.config.rootpath}/{item.nodeid}' - cmd += f' > {test_out_file_base}' + test_out_file = f'.pytest_parallel/{session_folder}/{remove_exotic_chars(item.nodeid)}' + cmd = '(' + cmd += mpi_command(current_proc, item.n_proc) + cmd += f' python3 -u -m pytest -s --_worker {socket_flags} {main_invoke_params} --_test_idx={test_idx} {item.config.rootpath}/{item.nodeid}' + cmd += f' > {test_out_file} 2>&1' + cmd += f' ; python3 -m pytest_parallel.send_report {socket_flags} --_test_idx={test_idx} --_test_name={test_out_file}' + cmd += ')' cmds.append(cmd) current_proc += item.n_proc - script = " & \\\n".join(cmds) + '\n' - Path('.pytest_parallel').mkdir(exist_ok=True) - script_path = f'.pytest_parallel/pytest_static_sched_{i_step+1}.sh' - with open(script_path,'w') as f: - f.write(script) + # create the script + ## 1. prolog + script = script_prolog + + ## 2. join all the commands + ## '&' makes it work in parallel (the following commands does not wait for the previous ones to finish) + ## '\' (escaped with '\\') makes it possible to use multiple lines + script += ' & \\\n'.join(cmds) + '\n' + + ## 3. wait everyone + script += '\nwait\n' + + script_path = f'.pytest_parallel/{session_folder}/pytest_static_sched_{i_step+1}.sh' + with open(script_path,'w', encoding='utf-8') as f: + f.write(script) current_permissions = stat.S_IMODE(os.lstat(script_path).st_mode) os.chmod(script_path, current_permissions | stat.S_IXUSR) - p = subprocess.Popen([script_path], shell=True, stdout=subprocess.PIPE) + #p = subprocess.Popen([script_path], shell=True, stdout=subprocess.PIPE) + p = subprocess.Popen([script_path], shell=True) print(f'\nLaunching tests (step {i_step+1}/{n_step})...') return p def receive_items(items, session, socket, n_item_to_recv): # > Precondition: Items must keep their original order to pick up the right item at the reception - original_indices = np.array([item.original_index for item in items]) - assert (original_indices==np.arange(len(items))).all() + original_indices = [item.original_index for item in items] + assert original_indices==list(range(len(items))) while n_item_to_recv>0: - conn, addr = socket.accept() + conn, _ = socket.accept() with conn: - msg = socket_utils.recv(conn) + msg = socket_recv(conn) test_info = pickle.loads(msg) # the worker is supposed to have send a dict with the correct structured information - test_idx = test_info['test_idx'] - if test_info['fatal_error'] is not None: - assert 0, f'{test_info["fatal_error"]}' - item = items[test_idx] # works because of precondition - item.sub_comm = None - item.info = test_info - - # "run" the test (i.e. trigger PyTest pipeline but do not really run the code) - nextitem = None # not known at this point - run_item_test(item, nextitem, session) - n_item_to_recv -= 1 + if 'signal_info' in test_info: + print('signal_info= ',test_info['signal_info']) + break + else: + test_idx = test_info['test_idx'] + if test_info['fatal_error'] is not None: + assert 0, f'{test_info["fatal_error"]}' + item = items[test_idx] # works because of precondition + item.sub_comm = None + item.info = test_info + + # "run" the test (i.e. trigger PyTest pipeline but do not really run the code) + nextitem = None # not known at this point + run_item_test(item, nextitem, session) + n_item_to_recv -= 1 class ShellStaticScheduler: def __init__(self, main_invoke_params, ntasks, detach): @@ -161,8 +122,7 @@ def pytest_runtestloop(self, session) -> bool: and not session.config.option.continue_on_collection_errors ): raise session.Interrupted( - "%d error%s during collection" - % (session.testsfailed, "s" if session.testsfailed != 1 else "") + f"{session.testsfailed} error{'s' if session.testsfailed != 1 else ''} during collection" ) if session.config.option.collectonly: @@ -183,17 +143,18 @@ def pytest_runtestloop(self, session) -> bool: run_item_test(item, nextitem, session) # schedule tests to run - SCHEDULER_IP_ADDRESS,port = setup_socket(self.socket) + SCHEDULER_IP_ADDRESS, port = setup_socket(self.socket) + session_folder = create_folders() n_step = len(items_by_steps) for i_step,items in enumerate(items_by_steps): n_item_to_receive = len(items) - sub_process = submit_items(items, SCHEDULER_IP_ADDRESS, port, self.main_invoke_params, self.ntasks, i_step, n_step) + sub_process = submit_items(items, SCHEDULER_IP_ADDRESS, port, session_folder, self.main_invoke_params, i_step, n_step) if not self.detach: # The job steps are supposed to send their reports receive_items(session.items, session, self.socket, n_item_to_receive) returncode = sub_process.wait() # at this point, the sub-process should be done since items have been received # https://docs.pytest.org/en/stable/reference/exit-codes.html - # 0 means all passed, 1 means all executed, but some failed + # 0 means all passed, 1 means all executed, but some failed assert returncode==0 or returncode==1 , f'Pytest internal error during step {i_step} of shell scheduler (error code {returncode})' return True diff --git a/pytest_parallel/process_scheduler.py b/pytest_parallel/slurm_scheduler.py similarity index 53% rename from pytest_parallel/process_scheduler.py rename to pytest_parallel/slurm_scheduler.py index 8245395..73964e5 100644 --- a/pytest_parallel/process_scheduler.py +++ b/pytest_parallel/slurm_scheduler.py @@ -1,87 +1,31 @@ -import pytest import subprocess import socket import pickle -from pathlib import Path -from . import socket_utils -from .utils import get_n_proc_for_test, add_n_procs, run_item_test, mark_original_index -from .algo import partition - - -def mark_skip(item, ntasks): - n_proc_test = get_n_proc_for_test(item) - skip_msg = f"Not enough procs to execute: {n_proc_test} required but only {ntasks} available" - item.add_marker(pytest.mark.skip(reason=skip_msg), append=False) - item.marker_mpi_skip = True - -def replace_sub_strings(s, subs, replacement): - res = s - for sub in subs: - res = res.replace(sub,replacement) - return res - -def remove_exotic_chars(s): - return replace_sub_strings(str(s), ['[',']','/', ':'], '_') - -def parse_job_id_from_submission_output(s): - # At this point, we are trying to guess -_- - # Here we supposed that the command for submitting the job - # returned string with only one number, - # and that this number is the job id - import re - return int(re.search(r'\d+', str(s)).group()) - - -# https://stackoverflow.com/a/34177358 -def command_exists(cmd_name): - """Check whether `name` is on PATH and marked as executable.""" - from shutil import which - return which(cmd_name) is not None - -def _get_my_ip_address(): - hostname = socket.gethostname() - - assert command_exists('tracepath'), 'pytest_parallel SLURM scheduler: command `tracepath` is not available' - cmd = ['tracepath','-4','-n',hostname] - r = subprocess.run(cmd, stdout=subprocess.PIPE) - assert r.returncode==0, f'pytest_parallel SLURM scheduler: error running command `{" ".join(cmd)}`' - ips = r.stdout.decode("utf-8") - - try: - my_ip = ips.split('\n')[0].split(':')[1].split()[0] - except: - assert 0, f'pytest_parallel SLURM scheduler: error parsing result `{ips}` of command `{" ".join(cmd)}`' - import ipaddress - try: - ipaddress.ip_address(my_ip) - except ValueError: - assert 0, f'pytest_parallel SLURM scheduler: error parsing result `{ips}` of command `{" ".join(cmd)}`' - - return my_ip +import pytest -def submit_items(items_to_run, socket, main_invoke_params, ntasks, slurm_conf): - # Find IP our address - SCHEDULER_IP_ADDRESS = _get_my_ip_address() +from .utils.socket import recv as socket_recv +from .utils.socket import setup_socket +from .utils.items import add_n_procs, run_item_test, mark_original_index, mark_skip +from .utils.file import remove_exotic_chars, create_folders +from .algo import partition - # setup master's socket - socket.bind((SCHEDULER_IP_ADDRESS, 0)) # 0: let the OS choose an available port - socket.listen() - port = socket.getsockname()[1] +def submit_items(items_to_run, socket, session_folder, main_invoke_params, ntasks, slurm_conf): + SCHEDULER_IP_ADDRESS, port = setup_socket(socket) # generate SLURM header options if slurm_conf['file'] is not None: - with open(slurm_conf['file']) as f: + with open(slurm_conf['file'], encoding='utf-8') as f: slurm_header = f.read() - # Note: + # Note: # ntasks is supposed to be <= to the number of the ntasks submitted to slurm # but since the header file can be arbitrary, we have no way to check at this point else: slurm_header = '#!/bin/bash\n' slurm_header += '\n' slurm_header += '#SBATCH --job-name=pytest_parallel\n' - slurm_header += '#SBATCH --output=.pytest_parallel/slurm.%j.out\n' - slurm_header += '#SBATCH --error=.pytest_parallel/slurm.%j.err\n' + slurm_header += f'#SBATCH --output=.pytest_parallel/{session_folder}/slurm.out\n' + slurm_header += f'#SBATCH --error=.pytest_parallel/{session_folder}/slurm.err\n' for opt in slurm_conf['options']: slurm_header += f'#SBATCH {opt}\n' slurm_header += f'#SBATCH --ntasks={ntasks}' @@ -92,56 +36,57 @@ def submit_items(items_to_run, socket, main_invoke_params, ntasks, slurm_conf): # launch srun for each item srun_options = slurm_conf['srun_options'] if srun_options is None: - srun_options = '' - worker_flags=f"--_worker --_scheduler_ip_address={SCHEDULER_IP_ADDRESS} --_scheduler_port={port}" + srun_options = '' + socket_flags = f"--_scheduler_ip_address={SCHEDULER_IP_ADDRESS} --_scheduler_port={port} --_session_folder={session_folder}" cmds = '' - if slurm_conf['additional_cmds'] is not None: - cmds += slurm_conf['additional_cmds'] + '\n' + if slurm_conf['init_cmds'] is not None: + cmds += slurm_conf['init_cmds'] + '\n' for item in items: test_idx = item.original_index - test_out_file_base = f'.pytest_parallel/{remove_exotic_chars(item.nodeid)}' - cmd = f'srun {srun_options} --exclusive --ntasks={item.n_proc} -l' - cmd += f' python3 -u -m pytest -s {worker_flags} {main_invoke_params} --_test_idx={test_idx} {item.config.rootpath}/{item.nodeid}' - cmd += f' > {test_out_file_base} 2>&1' - cmd += ' &\n' # launch everything in parallel + test_out_file = f'.pytest_parallel/{session_folder}/{remove_exotic_chars(item.nodeid)}' + cmd = '(' + cmd += f'srun {srun_options}' + cmd += ' --exclusive' + cmd += ' --kill-on-bad-exit=1' # make fatal errors (e.g. segfault) kill the whole srun step. Else, deadlock (at least with Intel MPI) + cmd += f' --ntasks={item.n_proc}' + cmd += ' --label' # Prepend task number to lines of stdout/err + cmd += f' python3 -u -m pytest -s --_worker {socket_flags} {main_invoke_params} --_test_idx={test_idx} {item.config.rootpath}/{item.nodeid}' + cmd += f' > {test_out_file} 2>&1' + cmd += f' ; python3 -m pytest_parallel.send_report {socket_flags} --_test_idx={test_idx} --_test_name={test_out_file}' + cmd += ')' + cmd += ' &\n' # launch everything in parallel cmds += cmd cmds += 'wait\n' job_cmds = f'{slurm_header}\n\n{cmds}' - Path('.pytest_parallel').mkdir(exist_ok=True) - with open('.pytest_parallel/job.sh','w') as f: - f.write(job_cmds) + + with open(f'.pytest_parallel/{session_folder}/job.sh','w', encoding='utf-8') as f: + f.write(job_cmds) # submit SLURM job - with open('.pytest_parallel/env_vars.sh','wb') as f: - f.write(pytest._pytest_parallel_env_vars) + with open(f'.pytest_parallel/{session_folder}/env_vars.sh','wb') as f: + f.write(pytest._pytest_parallel_env_vars) - if slurm_conf['sub_command'] is None: - if slurm_conf['export_env']: - sbatch_cmd = 'sbatch --parsable --export-file=.pytest_parallel/env_vars.sh .pytest_parallel/job.sh' - else: - sbatch_cmd = 'sbatch --parsable .pytest_parallel/job.sh' + if slurm_conf['export_env']: + sbatch_cmd = f'sbatch --parsable --export-file=.pytest_parallel/{session_folder}/env_vars.sh .pytest_parallel/{session_folder}/job.sh' else: - sbatch_cmd = slurm_conf['sub_command'] + ' .pytest_parallel/job.sh' + sbatch_cmd = f'sbatch --parsable .pytest_parallel/{session_folder}/job.sh' p = subprocess.Popen([sbatch_cmd], shell=True, stdout=subprocess.PIPE) print('\nSubmitting tests to SLURM...') returncode = p.wait() assert returncode==0, f'Error when submitting to SLURM with `{sbatch_cmd}`' - if slurm_conf['sub_command'] is None: - slurm_job_id = int(p.stdout.read()) - else: - slurm_job_id = parse_job_id_from_submission_output(p.stdout.read()) + slurm_job_id = int(p.stdout.read()) print(f'SLURM job {slurm_job_id} has been submitted') return slurm_job_id def receive_items(items, session, socket, n_item_to_recv): while n_item_to_recv>0: - conn, addr = socket.accept() + conn, _ = socket.accept() with conn: - msg = socket_utils.recv(conn) + msg = socket_recv(conn) test_info = pickle.loads(msg) # the worker is supposed to have send a dict with the correct structured information test_idx = test_info['test_idx'] if test_info['fatal_error'] is not None: @@ -155,7 +100,7 @@ def receive_items(items, session, socket, n_item_to_recv): run_item_test(items[test_idx], nextitem, session) n_item_to_recv -= 1 -class ProcessScheduler: +class SlurmScheduler: def __init__(self, main_invoke_params, ntasks, slurm_conf, detach): self.main_invoke_params = main_invoke_params self.ntasks = ntasks @@ -181,8 +126,7 @@ def pytest_runtestloop(self, session) -> bool: and not session.config.option.continue_on_collection_errors ): raise session.Interrupted( - "%d error%s during collection" - % (session.testsfailed, "s" if session.testsfailed != 1 else "") + f"{session.testsfailed} error{'s' if session.testsfailed != 1 else ''} during collection" ) if session.config.option.collectonly: @@ -207,17 +151,18 @@ def pytest_runtestloop(self, session) -> bool: # schedule tests to run n_item_to_receive = len(items_to_run) if n_item_to_receive > 0: - self.slurm_job_id = submit_items(items_to_run, self.socket, self.main_invoke_params, self.ntasks, self.slurm_conf) - if not self.detach: # The job steps are supposed to send their reports - receive_items(session.items, session, self.socket, n_item_to_receive) + session_folder = create_folders() + self.slurm_job_id = submit_items(items_to_run, self.socket, session_folder, self.main_invoke_params, self.ntasks, self.slurm_conf) + if not self.detach: # The job steps are supposed to send their reports + receive_items(session.items, session, self.socket, n_item_to_receive) return True @pytest.hookimpl() - def pytest_keyboard_interrupt(excinfo): - if excinfo.slurm_job_id is not None: - print(f'Calling `scancel {excinfo.slurm_job_id}`') - subprocess.run(['scancel',str(excinfo.slurm_job_id)]) + def pytest_keyboard_interrupt(self, excinfo): + if self.slurm_job_id is not None: + print(f'Calling `scancel {self.slurm_job_id}`') + subprocess.run(['scancel',str(self.slurm_job_id)]) @pytest.hookimpl(hookwrapper=True) def pytest_runtest_makereport(self, item): diff --git a/pytest_parallel/socket_utils.py b/pytest_parallel/socket_utils.py deleted file mode 100644 index f434270..0000000 --- a/pytest_parallel/socket_utils.py +++ /dev/null @@ -1,29 +0,0 @@ -def send(sock, msg_bytes): - msg_len = len(msg_bytes) - sent = sock.send(msg_len.to_bytes(8,'big')) # send int64 big endian - if sent == 0: - raise RuntimeError('Socket send broken: could not send message size') - - totalsent = 0 - while totalsent < msg_len: - sent = sock.send(msg_bytes[totalsent:]) - if sent == 0: - raise RuntimeError('Socket send broken: could not send message') - totalsent = totalsent + sent - -def recv(sock): - msg_len_bytes = sock.recv(8) - if msg_len_bytes == b'': - raise RuntimeError('Socket recv broken: message has no size') - msg_len = int.from_bytes(msg_len_bytes, 'big') - - chunks = [] - bytes_recv = 0 - while bytes_recv < msg_len: - chunk = sock.recv(min(msg_len-bytes_recv, 4096)) - if chunk == b'': - raise RuntimeError('Socket recv broken: could not receive message') - chunks.append(chunk) - bytes_recv += len(chunk) - msg_bytes = b''.join(chunks) - return msg_bytes diff --git a/pytest_parallel/static_scheduler_utils.py b/pytest_parallel/static_scheduler_utils.py index 9e33da9..6034d79 100644 --- a/pytest_parallel/static_scheduler_utils.py +++ b/pytest_parallel/static_scheduler_utils.py @@ -20,5 +20,3 @@ def group_items_by_parallel_steps(items, n_workers): remaining_n_procs_by_step += [n_workers - item.n_proc] return items_by_step, items_to_skip - - diff --git a/pytest_parallel/utils/__init__.py b/pytest_parallel/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pytest_parallel/utils/file.py b/pytest_parallel/utils/file.py new file mode 100644 index 0000000..cc87074 --- /dev/null +++ b/pytest_parallel/utils/file.py @@ -0,0 +1,19 @@ +from pathlib import Path +import tempfile + + +def replace_sub_strings(s, subs, replacement): + res = s + for sub in subs: + res = res.replace(sub,replacement) + return res + +def remove_exotic_chars(s): + return replace_sub_strings(str(s), ['[',']','/', ':'], '_') + + +def create_folders(): + Path('.pytest_parallel').mkdir(exist_ok=True) + session_folder_abs = Path(tempfile.mkdtemp(dir='.pytest_parallel')) # create a folder that did not already exist + Path(session_folder_abs/'_partial').mkdir() + return session_folder_abs.name diff --git a/pytest_parallel/utils.py b/pytest_parallel/utils/items.py similarity index 52% rename from pytest_parallel/utils.py rename to pytest_parallel/utils/items.py index 0869e8d..c403bd2 100644 --- a/pytest_parallel/utils.py +++ b/pytest_parallel/utils/items.py @@ -1,13 +1,13 @@ -import sys import pytest from _pytest.nodes import Item + def get_n_proc_for_test(item: Item) -> int : - if not hasattr(item, 'callspec'): return 1 # no callspec, so no `comm` => sequential test case - try: - return item.callspec.getparam('comm') - except ValueError: # no `comm` => sequential test case - return 1 + if not hasattr(item, 'callspec'): return 1 # no callspec, so no `comm` => sequential test case + try: + return item.callspec.getparam('comm') + except ValueError: # no `comm` => sequential test case + return 1 def add_n_procs(items): @@ -26,3 +26,10 @@ def run_item_test(item, nextitem, session): def mark_original_index(items): for i, item in enumerate(items): item.original_index = i + + +def mark_skip(item, ntasks): + n_proc_test = get_n_proc_for_test(item) + skip_msg = f"Not enough procs to execute: {n_proc_test} required but only {ntasks} available" + item.add_marker(pytest.mark.skip(reason=skip_msg), append=False) + item.marker_mpi_skip = True diff --git a/pytest_parallel/utils_mpi.py b/pytest_parallel/utils/mpi.py similarity index 94% rename from pytest_parallel/utils_mpi.py rename to pytest_parallel/utils/mpi.py index 3466235..4aac2bc 100644 --- a/pytest_parallel/utils_mpi.py +++ b/pytest_parallel/utils/mpi.py @@ -13,7 +13,7 @@ def should_enable_terminal_reporter(comm, scheduler): if scheduler == "dynamic": return is_dyn_master_process(comm) else: - return comm.Get_rank() == 0 + return comm.rank == 0 def spawn_master_process(global_comm): @@ -37,4 +37,4 @@ def spawn_master_process(global_comm): def number_of_working_processes(comm): if is_dyn_master_process(comm): return comm.Get_remote_size() - return comm.Get_size() + return comm.size diff --git a/pytest_parallel/utils/socket.py b/pytest_parallel/utils/socket.py new file mode 100644 index 0000000..0adda64 --- /dev/null +++ b/pytest_parallel/utils/socket.py @@ -0,0 +1,71 @@ +import shutil +import socket +import subprocess + + +def send(sock, msg_bytes): + msg_len = len(msg_bytes) + sent = sock.send(msg_len.to_bytes(8,'big')) # send int64 big endian + if sent == 0: + raise RuntimeError('Socket send broken: could not send message size') + + totalsent = 0 + while totalsent < msg_len: + sent = sock.send(msg_bytes[totalsent:]) + if sent == 0: + raise RuntimeError('Socket send broken: could not send message') + totalsent = totalsent + sent + +def recv(sock): + msg_len_bytes = sock.recv(8) + if msg_len_bytes == b'': + raise RuntimeError('Socket recv broken: message has no size') + msg_len = int.from_bytes(msg_len_bytes, 'big') + + chunks = [] + bytes_recv = 0 + while bytes_recv < msg_len: + chunk = sock.recv(min(msg_len-bytes_recv, 4096)) + if chunk == b'': + raise RuntimeError('Socket recv broken: could not receive message') + chunks.append(chunk) + bytes_recv += len(chunk) + msg_bytes = b''.join(chunks) + return msg_bytes + + +# https://stackoverflow.com/a/34177358 +def command_exists(cmd_name): + """Check whether `name` is on PATH and marked as executable.""" + return shutil.which(cmd_name) is not None + +def _get_my_ip_address(): + hostname = socket.gethostname() + + assert command_exists('tracepath'), 'pytest_parallel SLURM scheduler: command `tracepath` is not available' + cmd = ['tracepath','-n',hostname] + r = subprocess.run(cmd, stdout=subprocess.PIPE) + assert r.returncode==0, f'pytest_parallel SLURM scheduler: error running command `{" ".join(cmd)}`' + ips = r.stdout.decode("utf-8") + + try: + my_ip = ips.split('\n')[0].split(':')[1].split()[0] + except IndexError: + assert 0, f'pytest_parallel SLURM scheduler: error parsing result `{ips}` of command `{" ".join(cmd)}`' + import ipaddress + try: + ipaddress.ip_address(my_ip) + except ValueError: + assert 0, f'pytest_parallel SLURM scheduler: error parsing result `{ips}` of command `{" ".join(cmd)}`' + + return my_ip + +def setup_socket(socket): + # Find our IP address + SCHEDULER_IP_ADDRESS = _get_my_ip_address() + + # setup master's socket + socket.bind((SCHEDULER_IP_ADDRESS, 0)) # 0: let the OS choose an available port + socket.listen() + port = socket.getsockname()[1] + return SCHEDULER_IP_ADDRESS, port diff --git a/test/pytest_parallel_refs/terminal_fail_complex_assert_two_procs b/test/pytest_parallel_refs/terminal_fail_complex_assert_two_procs index ed73216..1e36693 100644 --- a/test/pytest_parallel_refs/terminal_fail_complex_assert_two_procs +++ b/test/pytest_parallel_refs/terminal_fail_complex_assert_two_procs @@ -19,7 +19,7 @@ comm = @pytest_parallel.mark.parallel\(2\) def test_fail_with_complex_assert_reporting\(comm\): - if comm.Get_rank\(\) == 0: + if comm.rank == 0: > assert 1 == 0 E assert 1 == 0 @@ -30,9 +30,9 @@ comm = @pytest_parallel.mark.parallel\(2\) def test_fail_with_complex_assert_reporting\(comm\): - if comm.Get_rank\(\) == 0: + if comm.rank == 0: assert 1 == 0 - if comm.Get_rank\(\) == 1: + if comm.rank == 1: > assert \(np.array\(\[0,1,2\]\) == np.array\(\[0,1,3\]\)\).all\(\) E assert (?:np.)?False_? E \+ where (?:np.)?False_? = \(\) diff --git a/test/pytest_parallel_refs/terminal_success_0_fail_1 b/test/pytest_parallel_refs/terminal_success_0_fail_1 index eceba75..a4dbe61 100644 --- a/test/pytest_parallel_refs/terminal_success_0_fail_1 +++ b/test/pytest_parallel_refs/terminal_success_0_fail_1 @@ -19,7 +19,7 @@ comm = @pytest_parallel.mark.parallel\(2\) def test_fail_one_rank\(comm\): - if comm.Get_rank\(\) == 0: + if comm.rank == 0: > assert 0 E assert 0 diff --git a/test/pytest_parallel_tests/test_crash_reporting.py b/test/pytest_parallel_tests/test_crash_reporting.py new file mode 100644 index 0000000..d0be3c1 --- /dev/null +++ b/test/pytest_parallel_tests/test_crash_reporting.py @@ -0,0 +1,49 @@ +# TODO These test file was used to develop crash reporting when scheduler=shell or scheduler=slurm, +# but it is not currently integrated to the pytest_parallel test suite +import signal +import pytest_parallel + +def test_seq_pass(): + assert 1 + +def test_seq_fail(): + assert 0 + +def test_seq_crash(): + signal.raise_signal(11) # SIGSEGV + + +@pytest_parallel.mark.parallel(2) +def test_par_pass(comm): + assert 1 + +@pytest_parallel.mark.parallel(2) +def test_par_fail(comm): + assert 0 + +@pytest_parallel.mark.parallel(2) +def test_par_pass_fail(comm): + if comm.rank==0: + assert 1 + if comm.rank==1: + assert 0 + + + +@pytest_parallel.mark.parallel(2) +def test_par_crash(comm): + signal.raise_signal(11) # SIGSEGV + +@pytest_parallel.mark.parallel(2) +def test_par_pass_crash(comm): + if comm.rank==0: + assert 1 + if comm.rank==1: + signal.raise_signal(11) # SIGSEGV + +@pytest_parallel.mark.parallel(2) +def test_par_crash_fail(comm): + if comm.rank==1: + signal.raise_signal(11) # SIGSEGV + if comm.rank==1: + assert 0 diff --git a/test/pytest_parallel_tests/test_doc_example.py b/test/pytest_parallel_tests/test_doc_example.py index a1955b8..c895027 100644 --- a/test/pytest_parallel_tests/test_doc_example.py +++ b/test/pytest_parallel_tests/test_doc_example.py @@ -1,11 +1,11 @@ -import pytest_parallel import time +import pytest_parallel @pytest_parallel.mark.parallel(2) def test_A(comm): time.sleep(0.1) - if comm.Get_rank() == 1: + if comm.rank == 1: assert False @@ -17,7 +17,7 @@ def test_B(): @pytest_parallel.mark.parallel(3) def test_C(comm): time.sleep(0.2) - assert comm.Get_size() == 3 + assert comm.size == 3 def test_D(): diff --git a/test/pytest_parallel_tests/test_fail_complex_assert_two_procs.py b/test/pytest_parallel_tests/test_fail_complex_assert_two_procs.py index b2bfb22..be7003b 100644 --- a/test/pytest_parallel_tests/test_fail_complex_assert_two_procs.py +++ b/test/pytest_parallel_tests/test_fail_complex_assert_two_procs.py @@ -1,9 +1,9 @@ -import pytest_parallel import numpy as np +import pytest_parallel @pytest_parallel.mark.parallel(2) def test_fail_with_complex_assert_reporting(comm): - if comm.Get_rank() == 0: + if comm.rank == 0: assert 1 == 0 - if comm.Get_rank() == 1: + if comm.rank == 1: assert (np.array([0,1,2]) == np.array([0,1,3])).all() diff --git a/test/pytest_parallel_tests/test_parametrize.py b/test/pytest_parallel_tests/test_parametrize.py index 0355b24..a8487d0 100644 --- a/test/pytest_parallel_tests/test_parametrize.py +++ b/test/pytest_parallel_tests/test_parametrize.py @@ -1,6 +1,5 @@ import pytest import pytest_parallel -from mpi4py import MPI @pytest_parallel.mark.parallel([1, 2]) diff --git a/test/pytest_parallel_tests/test_scheduling.py b/test/pytest_parallel_tests/test_scheduling.py index ab8bdb3..1c08c1b 100644 --- a/test/pytest_parallel_tests/test_scheduling.py +++ b/test/pytest_parallel_tests/test_scheduling.py @@ -1,5 +1,5 @@ -import pytest_parallel import time +import pytest_parallel # time_base = 1.0 time_base = 0.01 diff --git a/test/pytest_parallel_tests/test_success_0_fail_1.py b/test/pytest_parallel_tests/test_success_0_fail_1.py index 5d6b81a..5e3ac15 100644 --- a/test/pytest_parallel_tests/test_success_0_fail_1.py +++ b/test/pytest_parallel_tests/test_success_0_fail_1.py @@ -3,7 +3,7 @@ @pytest_parallel.mark.parallel(2) def test_fail_one_rank(comm): - if comm.Get_rank() == 0: + if comm.rank == 0: assert 0 - if comm.Get_rank() == 1: + if comm.rank == 1: assert 1 diff --git a/test/test_pytest_parallel.py b/test/test_pytest_parallel.py index fd5d2e8..24960b8 100644 --- a/test/test_pytest_parallel.py +++ b/test/test_pytest_parallel.py @@ -3,21 +3,19 @@ by running it on a set of examples, then comparing it to template references """ - - -# pytest_parallel MUST NOT be plugged in its testing framework environement -# it will be plugged by the framework when needed (see `run_pytest_parallel_test`) -# (else we would use pytest_parallel to test pytest_parallel, which is logically wrong) import os -pytest_plugins = os.getenv('PYTEST_PLUGINS') -assert pytest_plugins is None or 'pytest_parallel.plugin' not in pytest_plugins - import sys import re import subprocess from pathlib import Path import pytest +# pytest_parallel MUST NOT be plugged in its testing framework environement +# it will be plugged by the framework when needed (see `run_pytest_parallel_test`) +# (else we would use pytest_parallel to test pytest_parallel, which is logically wrong) +pytest_plugins = os.getenv('PYTEST_PLUGINS') +assert pytest_plugins is None or 'pytest_parallel.plugin' not in pytest_plugins + root_dir = Path(__file__).parent tests_dir = root_dir / "pytest_parallel_tests" @@ -52,7 +50,7 @@ def run_pytest_parallel_test(test_name, n_workers, scheduler, capfd, suffix=""): stderr_file_path.unlink(missing_ok=True) test_env = os.environ.copy() - # To test pytest_parallel, we can need to launch pytest with it + # To test pytest_parallel, we need to launch pytest with pytest_parallel as a plugin: if "PYTEST_DISABLE_PLUGIN_AUTOLOAD" not in test_env: test_env["PYTEST_DISABLE_PLUGIN_AUTOLOAD"] = "1" cmd = f"mpiexec -n {n_workers} pytest -p pytest_parallel.plugin -s -ra -vv --color=no --scheduler={scheduler} {test_file_path}" @@ -68,10 +66,10 @@ def run_pytest_parallel_test(test_name, n_workers, scheduler, capfd, suffix=""): param_scheduler = ["sequential", "static", "dynamic"] -# TODO "slurm" scheduler +# TODO "slurm", "shell" scheduler #param_scheduler = ["slurm"] if sys.platform == "win32": - param_scheduler = ["sequential", "static"] + param_scheduler = ["sequential", "static"] # fmt: off @pytest.mark.parametrize("scheduler", param_scheduler) @@ -81,28 +79,28 @@ def test_00(self, scheduler, capfd): run_pytest_parallel_test('seq' def test_01(self, scheduler, capfd): run_pytest_parallel_test('two_success_tests_one_proc' , 1, scheduler, capfd) # need at least 1 proc def test_02(self, scheduler, capfd): run_pytest_parallel_test('two_success_tests_one_proc' , 2, scheduler, capfd) # 2 tests executing concurrently def test_04(self, scheduler, capfd): run_pytest_parallel_test('two_success_tests_one_proc' , 4, scheduler, capfd) # 2 tests executing concurrently, 2 procs do nothing - + def test_05(self, scheduler, capfd): run_pytest_parallel_test('two_fail_tests_one_proc' , 1, scheduler, capfd) # same but failing def test_06(self, scheduler, capfd): run_pytest_parallel_test('two_fail_tests_one_proc' , 2, scheduler, capfd) def test_07(self, scheduler, capfd): run_pytest_parallel_test('two_fail_tests_one_proc' , 4, scheduler, capfd) - + def test_08(self, scheduler, capfd): run_pytest_parallel_test('two_success_tests_two_procs' , 2, scheduler, capfd) # need at least 2 procs def test_09(self, scheduler, capfd): run_pytest_parallel_test('two_success_tests_two_procs' , 4, scheduler, capfd) # 4 tests (needing 2 procs each) executing concurrently def test_10(self, scheduler, capfd): run_pytest_parallel_test('two_success_tests_two_procs' , 1, scheduler, capfd, suffix='_skip') # the two test will be skipped (not enough procs) - + def test_11(self, scheduler, capfd): run_pytest_parallel_test('two_fail_tests_two_procs' , 2, scheduler, capfd) # same but failing def test_12(self, scheduler, capfd): run_pytest_parallel_test('two_fail_tests_two_procs' , 4, scheduler, capfd) def test_13(self, scheduler, capfd): run_pytest_parallel_test('two_fail_tests_two_procs' , 1, scheduler, capfd, suffix='_skip') - + def test_14(self, scheduler, capfd): run_pytest_parallel_test('success_0_fail_1' , 2, scheduler, capfd) # one test failing (succeed one rank 0, fail on rank 1) - + def test_15(self, scheduler, capfd): run_pytest_parallel_test('two_success_fail_tests_two_procs', 2, scheduler, capfd) # one test succeeds, one test fails def test_16(self, scheduler, capfd): run_pytest_parallel_test('two_success_fail_tests_two_procs', 4, scheduler, capfd) # same, more procs - + def test_17(self, scheduler, capfd): run_pytest_parallel_test('fixture_error' , 1, scheduler, capfd) # check that fixture errors are correctly reported - - def test_18(self, scheduler, capfd): run_pytest_parallel_test('parametrize' , 2, scheduler, capfd) # check the parametrize API - + + def test_18(self, scheduler, capfd): run_pytest_parallel_test('parametrize' , 2, scheduler, capfd) # check the parametrize API + def test_19(self, scheduler, capfd): run_pytest_parallel_test('scheduling' , 4, scheduler, capfd) # check 'real' case def test_20(self, scheduler, capfd): run_pytest_parallel_test('fail_complex_assert_two_procs' , 2, scheduler, capfd) # check 'complex' error message # fmt: on