From 802690d33a9e2bfd569e931ea9fc9f58d919c7de Mon Sep 17 00:00:00 2001 From: Matthew Kolopanis Date: Thu, 21 Mar 2024 13:59:27 -0700 Subject: [PATCH 1/3] Adds second backend for send/receive w/ load balancing. progbar plugin move inner progsteps to own function add in tqdm progressbar, check diffuse analytic, check for npus > 1 add tqdm to 'all' dependency, try to test importerror in min deps added a two helper functions for chunking iterators added the send_recv backend which combines with tqdm to distribute tasks propagate the backend keyword up to the run script allow all backends to use the profiling option make analytic diffuse test also run against all backends import from pyuvsim.uvsim in the import tqdm error test add tqdm to full test suite, make tests that run with only progsteps add test to run profiling for all backends. only make profiling output if uvdata_indices is not empty remove debug prints, remove thread_multiple as a parameter to run fix number of arguments to run_uvsim test define a proper enum for the tags added tqdm to option list in readme fixed typo in run_param_pyuvsim argument parser force ntasks tot to be an int, ceil to be conservative add a context manger to progsteps disentangle the progressbar from the backend; test all combinations revamp rma to not use rank 0, remove messaging equiv to s/r update tests to run in paralell. do gymnastics for warnings have newer tests account for parallel and backend differences propagate errors on worker nodes to rank0 in send_recv. always clean up the window in rma process. [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci restore mpich test yaml lost to rebase overlords update progsteps only ci yaml match versioning on progsteps_only yaml to other ci yamls break some check from run_uvdata_uvsim into a separate function --- .github/workflows/testsuite.yaml | 48 ++++ README.md | 2 +- ci/only_progsteps.yaml | 26 ++ pyproject.toml | 5 +- src/pyuvsim/cli.py | 23 +- src/pyuvsim/mpi.py | 35 ++- src/pyuvsim/utils.py | 55 ++++ src/pyuvsim/uvsim.py | 468 ++++++++++++++++++++++++------- tests/test_profiler.py | 23 +- tests/test_run.py | 61 +++- tests/test_uvsim.py | 147 ++++++++-- 11 files changed, 753 insertions(+), 140 deletions(-) create mode 100644 ci/only_progsteps.yaml diff --git a/.github/workflows/testsuite.yaml b/.github/workflows/testsuite.yaml index 39af32f7..bcd64c4a 100644 --- a/.github/workflows/testsuite.yaml +++ b/.github/workflows/testsuite.yaml @@ -177,6 +177,54 @@ jobs: env_vars: OS,PYTHON fail_ci_if_error: true + only_progsteps: + env: + ENV_NAME: only_progsteps + PYTHON: 3.7 + name: No Tqdm Tests + defaults: + run: + # Adding -l {0} helps ensure conda can be found properly. + shell: bash -l {0} + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@master + with: + fetch-depth: 1 + - name: Setup Miniconda + uses: conda-incubator/setup-miniconda@v2.0.0 + with: + auto-update-conda: true + miniconda-version: "latest" + python-version: ${{ env.PYTHON }} + environment-file: ci/${{ env.ENV_NAME }}.yaml + activate-environment: ${{ env.ENV_NAME }} + + - name: Conda Info + run: | + conda info -a + conda list + PYVER=`python -c "import sys; print('{:d}.{:d}'.format(sys.version_info.major, sys.version_info.minor))"` + if [[ $PYVER != ${{ env.PYTHON }} ]]; then + exit 1; + fi + + - name: Install + run: | + pip install . + + - name: Run Tests + run: | + python -m pytest --cov=pyuvsim --cov-config=.coveragerc --cov-report xml:./coverage.xml --junitxml=test-reports/xunit.xml + + - uses: codecov/codecov-action@v1.5.2 + if: success() + with: + token: ${{secrets.CODECOV_TOKEN}} #required + file: ./coverage.xml #optional + env_vars: OS,PYTHON + fail_ci_if_error: true + warning_test: env: ENV_NAME: pyuvsim_tests_mpich diff --git a/README.md b/README.md index 8d17c978..6214a8ac 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ Optional: * lunarsky>=0.2.5 (for simulating telescopes on the moon) * python-casacore>=3.5.2 (for writing CASA measurement sets, not available on Windows) * matplotlib>=3.6 (for plotting functions) - +* tqdm ### Developer Installation If you are developing `pyuvsim`, you will need to download and install the diff --git a/ci/only_progsteps.yaml b/ci/only_progsteps.yaml new file mode 100644 index 00000000..8d79be66 --- /dev/null +++ b/ci/only_progsteps.yaml @@ -0,0 +1,26 @@ +name: only_progsteps +channels: + - conda-forge + - defaults +dependencies: + - astropy>=5.2 + - astropy-healpix>=0.6 + - coverage + - line_profiler + - mpi4py>=3.0.0 + - numpy>=1.20 + - openmpi + - pip + - psutil + - python-casacore>=3.3 + - pytest + - pytest-cov + - pytest-xdist + - pyuvdata>=2.2.10 + - pyyaml>=5.1 + - scipy>=1.3 + - setuptools_scm>=7.0.3 + - pip: + - pyradiosky>=0.2 + - lunarsky>=0.2.1 + - git+https://github.com/aelanman/analytic_diffuse diff --git a/pyproject.toml b/pyproject.toml index 78ccbd02..d6a390ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,7 +56,8 @@ plot = ["matplotlib>=3.6"] sim = ["mpi4py>=3.1.3", "psutil"] sim-test = ["pyuvsim[sim,test]", "mpi-pytest>=2025.7.0"] all = ["pyuvsim[casa,healpix,moon,plot,sim]"] -test = ["coverage", "pooch>=1.8", "pre-commit", "pytest", "pytest-cov>=5.0"] +tqdm = [ "tqdm" ] +test = ["coverage", "pooch>=1.8", "pre-commit", "pytest", "pytest-cov>=5.0", "pyuvsim[tqdm]"] doc = ["matplotlib", "pypandoc", "sphinx"] profiler = ["line-profiler"] dev = ["pyuvsim[all,test,sim-test,doc,profiler]"] @@ -78,7 +79,7 @@ plot_csv_antpos = "pyuvsim.cli:plot_csv_antpos" [tool.setuptools_scm] [tool.pytest.ini_options] -addopts = "--ignore=scripts" + addopts = "--ignore=scripts" [tool.ruff.lint] select = [ diff --git a/src/pyuvsim/cli.py b/src/pyuvsim/cli.py index 87f983cc..4f3f5432 100644 --- a/src/pyuvsim/cli.py +++ b/src/pyuvsim/cli.py @@ -64,6 +64,21 @@ def run_pyuvsim(argv=None): help="Also save pickled LineStats data for line profiling.", action="store_true", ) + parser.add_argument( + "--backend", + type=str, + help="Backend task collection for simulation", + choices=["rma", "send_recv"], + default="rma", + ) + + parser.add_argument( + "--progbar", + type=str, + help="Monitor and reporting module for simulation progress", + choices=["progsteps", "tqdm"], + default="progsteps", + ) args = parser.parse_args(argv) @@ -98,7 +113,11 @@ def run_pyuvsim(argv=None): if args.param is not None: uvsim.run_uvsim( - args.param, quiet=args.quiet, block_nonroot_stdout=block_nonroot_stdout + args.param, + quiet=args.quiet, + block_nonroot_stdout=block_nonroot_stdout, + backend=args.backend, + progbar=args.progbar, ) else: uvd = UVData.from_file(args.uvdata) @@ -113,6 +132,8 @@ def run_pyuvsim(argv=None): catalog=skymodel, quiet=args.quiet, block_nonroot_stdout=block_nonroot_stdout, + backend=args.backend, + progbar=args.progbar, ) pobj = Path(args.outfile) utils.write_uvdata( diff --git a/src/pyuvsim/mpi.py b/src/pyuvsim/mpi.py index 182e30f9..8de32918 100644 --- a/src/pyuvsim/mpi.py +++ b/src/pyuvsim/mpi.py @@ -3,6 +3,7 @@ """MPI setup.""" import atexit +import enum import struct as _struct import sys from array import array as _array @@ -19,7 +20,7 @@ world_comm = None node_comm = None rank_comm = None - +status = None # Split serialized objects into chunks of 2 GiB INT_MAX = 2**31 - 1 @@ -27,6 +28,16 @@ shared_window_list = [] +class Tags(enum.IntEnum): + """Tags for MPI computations where worker nodes communicate with a main distribution node.""" + + READY = enum.auto() + START = enum.auto() + DONE = enum.auto() + EXIT = enum.auto() + ERROR = enum.auto() + + def set_mpi_excepthook(mpi_comm): """Kill the whole job on an uncaught python exception.""" @@ -38,7 +49,7 @@ def mpi_excepthook(exctype, value, traceback): # pragma: no cover sys.excepthook = mpi_excepthook -def start_mpi(block_nonroot_stdout=True): +def start_mpi(block_nonroot_stdout=True, thread_multiple=False): """ Initialize MPI if not already initialized and do setup. @@ -51,20 +62,27 @@ def start_mpi(block_nonroot_stdout=True): Redirect stdout on nonzero ranks to /dev/null, for cleaner output. """ + global world_comm, node_comm, rank_comm, rank, Npus, status + do_once = False - global world_comm, node_comm, rank_comm, rank, Npus if not MPI.Is_initialized(): # pragma: no cover - MPI.Init_thread( - MPI.THREAD_SERIALIZED - ) # RMA is incompatible with THREAD_MULTIPLE. do_once = True + + if not thread_multiple: + MPI.Init_thread( + MPI.THREAD_SERIALIZED + ) # RMA is incompatible with THREAD_MULTIPLE. + else: + MPI.Init_thread(MPI.THREAD_MULTIPLE) atexit.register(MPI.Finalize) + world_comm = MPI.COMM_WORLD node_comm = world_comm.Split_type(MPI.COMM_TYPE_SHARED) rank_comm = world_comm.Split(color=node_comm.rank) Npus = world_comm.Get_size() rank = world_comm.Get_rank() + status = MPI.Status() set_mpi_excepthook(world_comm) world_comm.Barrier() @@ -487,3 +505,8 @@ def get_comm(): def get_node_comm(): """Get node_comm, the Communicator for all PUs on current node.""" return node_comm + + +def get_status(): + """status, the status of an mpi message.""" + return status diff --git a/src/pyuvsim/utils.py b/src/pyuvsim/utils.py index b5d6f363..09b42d1d 100644 --- a/src/pyuvsim/utils.py +++ b/src/pyuvsim/utils.py @@ -2,6 +2,7 @@ # Licensed under the 3-clause BSD License """Define various utility functions.""" +import itertools import os import sys import time as pytime @@ -42,6 +43,14 @@ def __init__(self, maxval=None): self.curval = 0 self.remain = None + def __enter__(self): + """Enter a context manager.""" + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Exit a context manager.""" + self.finish() + def update(self, count): """ Update the progress bar. @@ -370,3 +379,49 @@ class in simulation, accounting for its attributes as well as [sys.getsizeof(v) * Ncomponents * Nfreqs for k, v in Ncomp_Nfreq_attrs.items()] ) return mem_est + + +def _grouper_it(iterable, chunksize=1): + """Chunk an iterator and return an iterator. + + Parameters + ---------- + iterable : Iterable + The iterable object to chunk + chunksize : int + size of chunks desired + + Returns + ------- + iterable chunked into sizes + """ + it = iter(iterable) + while True: + chunk = list(itertools.islice(it, chunksize)) + if not chunk: + return + yield chunk + + +def _chunked_iterator_product(iter1, iter2, chunksize1, chunksize2): + """Iterate over the product of two chunked iterators. + + Parameters + ---------- + iter1 : Iterable + One iterator to chunk through. + iter2 : Iterable + The second iterator to chunk through. + chunksize1 : int + Chunk size for iter1 + chunksize2 : int + Chunk size for iter2 + + Returns + ------- + An iterator over the chunked product of all combinations of iter1 and iter2 + + """ + for i1 in _grouper_it(iter1, chunksize1): + for i2 in _grouper_it(iter2, chunksize2): + yield i1, i2 diff --git a/src/pyuvsim/uvsim.py b/src/pyuvsim/uvsim.py index 020fce93..05612acd 100644 --- a/src/pyuvsim/uvsim.py +++ b/src/pyuvsim/uvsim.py @@ -10,6 +10,7 @@ import contextlib import warnings +from contextlib import nullcontext import astropy.units as units import numpy as np @@ -625,6 +626,222 @@ def _check_ntasks_valid(Ntasks_tot): ) +def _run_uvsim_rma( + uv_container, + local_task_iter, + Ntasks_tot, + Nsky_parts, + Nbls, + Nblts, + Nfreqs, + Nsrcs, + pbar, + quiet, +): + mpi.start_mpi() + rank = mpi.get_rank() + comm = mpi.get_comm() + if rank == 0: + if "world" in uv_container.extra_keywords: + uv_container.extra_keywords["world"] = uv_container.extra_keywords["world"] + vis_data = mpi.MPI.Win.Create( + uv_container._data_array.value, comm=mpi.world_comm + ) + else: + vis_data = mpi.MPI.Win.Create(None, comm=mpi.world_comm) + + engine = UVEngine() + count = mpi.Counter() + size_complex = np.ones(1, dtype=complex).nbytes + data_array_shape = (Nblts, Nfreqs, 4) + uvdata_indices = [] + + with pbar as pbar: + try: + for task in local_task_iter: + engine.set_task(task) + vis = engine.make_visibility() + + blti, freq_ind = task.uvdata_index + + uvdata_indices.append(task.uvdata_index) + + flat_ind = np.ravel_multi_index((blti, freq_ind, 0), data_array_shape) + offset = flat_ind * size_complex + + vis_data.Lock(0) + vis_data.Accumulate(vis, 0, target=offset, op=mpi.MPI.SUM) + vis_data.Unlock(0) + + cval = count.next() + if rank == 0 and not quiet: + if not isinstance(pbar, simutils.progsteps): + # need a delta if this is a tqdm progress bar + cval -= pbar.n + pbar.update(cval) + finally: + request = comm.Ibarrier() + + while not request.Test(): + if rank == 0 and not quiet: + cval = count.current_value() + if not isinstance(pbar, simutils.progsteps): + # need a delta if this is a tqdm progress bar + cval -= pbar.n + pbar.update(cval) + + count.free() + vis_data.Free() + + if rank == 0 and not quiet: + print("\nCalculations Complete.", flush=True) + + if rank == 0: + return uv_container, uvdata_indices + return None, uvdata_indices + + +def _run_uvsim_send_recv( + uv_container, + Ntasks_tot, + Nsky_parts, + Nbls, + Nblts, + Nfreqs, + Nsrcs, + beam_list, + beam_dict, + catalog, + pbar, + quiet, +): + mpi.start_mpi() + rank = mpi.get_rank() + comm = mpi.get_comm() + Npus = mpi.get_Npus() + status = mpi.get_status() + + uvdata_indices = [] + + if rank == 0: + completed_workers = 0 + n_workers = Npus - 1 + completed_tasks = 0 + task_inds = range(int(Nblts * Nfreqs)) + src_inds = range(int(Nsrcs)) + error_occurred = None + + chunksize1 = int(np.ceil(0.1 * Nblts * Nfreqs / n_workers)) + chunksize2 = int(np.ceil(Nsrcs / Nsky_parts)) + all_iter = simutils._chunked_iterator_product( + task_inds, src_inds, chunksize1, chunksize2 + ) + with pbar as pbar: + while completed_workers < n_workers: + msg = comm.recv( + source=mpi.MPI.ANY_SOURCE, tag=mpi.MPI.ANY_TAG, status=status + ) + source = status.Get_source() + tag = status.Get_tag() + + # an error occurred on a worker. + # tell all PUs to stop working + if error_occurred is not None: + comm.send(None, dest=source, tag=mpi.Tags.EXIT) + + if tag == mpi.Tags.READY: + try: + comm.send(next(all_iter), dest=source, tag=mpi.Tags.START) + except StopIteration: + comm.send(None, dest=source, tag=mpi.Tags.EXIT) + + elif tag == mpi.Tags.DONE: + uv_inds, vis = msg[0], msg[1] + uv_container.data_array[uv_inds] += vis + if isinstance(pbar, simutils.progsteps): + completed_tasks += 1 + pbar.update(completed_tasks) + else: + pbar.update(1) + + elif tag == mpi.Tags.EXIT: + completed_workers += 1 + elif tag == mpi.Tags.ERROR: + error_occurred = msg + else: + raise ValueError(f"{msg} {tag}") + + if error_occurred is not None: + raise ValueError( + f"Error occurred on worker node {error_occurred[0]}: {error_occurred[1]}" + ) + + else: + engine = UVEngine() + while True: + comm.send(None, dest=0, tag=mpi.Tags.READY) + msg = comm.recv(source=0, tag=mpi.MPI.ANY_TAG, status=status) + tag = status.Get_tag() + if tag == mpi.Tags.START: + msg_task_inds, msg_src_inds = msg + + try: + # get message with task_inds and src_inds + local_task_iter = uvdata_to_task_iter( + msg_task_inds, + uv_container, + catalog.subselect(msg_src_inds), + beam_list, + beam_dict, + Nsky_parts=Nsky_parts, + ) + for task in local_task_iter: + engine.set_task(task) + + uvdata_indices.append(task.uvdata_index) + + comm.send( + [task.uvdata_index, engine.make_visibility()], + dest=0, + tag=mpi.Tags.DONE, + ) + except Exception as err: + comm.send([rank, err], dest=0, tag=mpi.Tags.ERROR) + + elif tag == mpi.Tags.EXIT: + break + + comm.send(None, dest=0, tag=mpi.Tags.EXIT) + + if rank == 0: + return uv_container, uvdata_indices + return None, uvdata_indices + + +def _get_pbar(progbar, Ntasks_tot, rank): + if progbar not in ["tqdm", "progsteps"]: + raise ValueError( + "The progbar keyword must be one of " + "'progsteps' or 'tqdm' " + f"but received {progbar}." + ) + if progbar == "tqdm": + try: + import tqdm + except ImportError as err: + raise ImportError( + "The tqdm module must be installed to use a tqdm progress bar." + ) from err + if rank == 0: + if progbar == "tqdm": + pbar = tqdm.tqdm(total=Ntasks_tot, unit="UVTask", leave=True) + else: + pbar = simutils.progsteps(maxval=Ntasks_tot) + else: + pbar = nullcontext() + return pbar + + def _set_nsky_parts(Nsrcs, cat_nfreqs, Nsky_parts): """Set the Nsky_parts.""" # Estimating required memory to decide how to split source array. @@ -810,6 +1027,8 @@ def run_uvdata_uvsim( catalog: SkyModel | SkyModelData, beam_interp_check: bool | None = None, quiet: bool = False, + backend: str = "rma", + progbar: str = "progsteps", block_nonroot_stdout: bool = True, Nsky_parts: int | None = None, ) -> UVData: @@ -838,6 +1057,12 @@ def run_uvdata_uvsim( zenith angle. quiet : bool Do not print anything. + backend : str + Specifies the backend for simulation. + Must be one of ['rma', 'send_recv']. (Default: 'rma') + progbar : str + Specifies the progress bar class used to output simulation progress. + Must be one of ['progsteps', 'tqdm']. Default 'progsteps' block_nonroot_stdout : bool Redirect stdout on nonzero ranks to /dev/null, for cleaner output. Nsky_parts : int, optional @@ -860,11 +1085,34 @@ def run_uvdata_uvsim( "line_profiler installed." ) - mpi.start_mpi(block_nonroot_stdout=block_nonroot_stdout) + if backend not in ["rma", "send_recv"]: + raise ValueError( + "The backend keyword must be one of " + "'rma', 'send_recv' " + f"but received value {backend}." + ) + + if progbar not in ["tqdm", "progsteps"]: + raise ValueError( + "The progbar keyword must be one of " + "'progsteps' or 'tqdm' " + f"but received {progbar}." + ) + + if backend == "rma": + thread_multiple = False + else: + thread_multiple = True + + mpi.start_mpi( + block_nonroot_stdout=block_nonroot_stdout, thread_multiple=thread_multiple + ) rank = mpi.get_rank() comm = mpi.get_comm() Npus = mpi.get_Npus() + if Npus < 2: + raise ValueError("At least 2 PUs are required to run pyuvsim.") # ensure that we have full or flat spectral type and that we have points not # healpix if rank == 0: @@ -886,13 +1134,8 @@ def run_uvdata_uvsim( print("Nfreqs:", input_uv.Nfreqs, flush=True) print("Nsrcs:", catalog.Ncomponents, flush=True) uv_container = simsetup._complete_uvdata(input_uv, inplace=False) - if "world" in input_uv.extra_keywords: - uv_container.extra_keywords["world"] = input_uv.extra_keywords["world"] - vis_data = mpi.MPI.Win.Create( - uv_container._data_array.value, comm=mpi.world_comm - ) else: - vis_data = mpi.MPI.Win.Create(None, comm=mpi.world_comm) + uv_container = None Nbls = input_uv.Nbls Nblts = input_uv.Nblts @@ -900,19 +1143,20 @@ def run_uvdata_uvsim( Npols = input_uv.Npols Nsrcs = catalog.Ncomponents - task_inds, src_inds, Ntasks_local, Nsrcs_local = _make_task_inds( - Nblts, Nfreqs, Nsrcs, rank, Npus - ) - - count = mpi.Counter() + if beam_interp_check is None: + # check that all the beams cover the full sky + beam_interp_check = not beam_list.check_all_azza_beams_full_sky() - # wrap this in a try/finally (no exception handling) to ensure resources are freed - try: - if beam_interp_check is None: - # check that all the beams cover the full sky - beam_interp_check = not beam_list.check_all_azza_beams_full_sky() + Nsky_parts = _set_nsky_parts(Nsrcs, catalog.Nfreqs, Nsky_parts) - Nsky_parts = _set_nsky_parts(Nsrcs, catalog.Nfreqs, Nsky_parts) + if backend == "rma": + # rank 0 gets a blank iter to just skip the computation step + local_task_iter = iter(()) + Ntasks_local = 0 + if rank != 0: + task_inds, src_inds, Ntasks_local, Nsrcs_local = _make_task_inds( + Nblts, Nfreqs, Nsrcs, rank - 1, Npus - 1 + ) local_task_iter = uvdata_to_task_iter( task_inds, @@ -922,85 +1166,86 @@ def run_uvdata_uvsim( beam_dict, Nsky_parts=Nsky_parts, ) + elif backend == "send_recv": + # hack this a little bit, we want to have Nbls * Nfreqs * Ntimes + # total tasks we are distributing over all PUs but then we + # sum up over all PUs here in the next few lines + Ntasks_local = Nblts * Nfreqs / Npus - Ntasks_tot = Ntasks_local * Nsky_parts - # Sum all the tasks across each node - Nsky_parts = comm.reduce(Nsky_parts, op=mpi.MPI.MAX, root=0) - Ntasks_tot = comm.reduce(Ntasks_tot, op=mpi.MPI.SUM, root=0) - if rank == 0 and not quiet: - if Nsky_parts > 1: - print( - "The source list has been split into Nsky_parts" - f" <= {Nsky_parts} chunks on some or all nodes" - " due to memory limitations.", - flush=True, - ) - print("Tasks: ", Ntasks_tot, flush=True) - pbar = simutils.progsteps(maxval=Ntasks_tot) - - engine = UVEngine() - size_complex = np.ones(1, dtype=complex).nbytes - data_array_shape = (Nblts, Nfreqs, Npols) - uvdata_indices = [] - - for task in local_task_iter: - engine.set_task(task) - vis = engine.make_visibility() - - blti, freq_ind = task.uvdata_index - - uvdata_indices.append(task.uvdata_index) - - flat_ind = np.ravel_multi_index((blti, freq_ind, 0), data_array_shape) - offset = flat_ind * size_complex + Ntasks_tot = Ntasks_local * Nsky_parts + # Sum all the tasks across each node + Nsky_parts = comm.allreduce(Nsky_parts, op=mpi.MPI.MAX) + Ntasks_tot = int(np.ceil(comm.allreduce(Ntasks_tot, op=mpi.MPI.SUM))) - vis_data.Lock(0) - vis_data.Accumulate(vis, 0, target=offset, op=mpi.MPI.SUM) - vis_data.Unlock(0) - - cval = count.next() - if rank == 0 and not quiet: - pbar.update(cval) - - request = comm.Ibarrier() + if rank == 0 and not quiet: + if Nsky_parts > 1: + print( + "The source list has been split into Nsky_parts" + f" <= {Nsky_parts} chunks on some or all nodes" + " due to memory limitations.", + flush=True, + ) + print("Tasks: ", Ntasks_tot, flush=True) + + pbar = _get_pbar(progbar, Ntasks_tot, rank) + if backend == "rma": + uv_container, uvdata_indices = _run_uvsim_rma( + uv_container, + local_task_iter, + Ntasks_tot, + Nsky_parts, + Nbls, + Nblts, + Nfreqs, + Nsrcs, + pbar, + quiet, + ) + elif backend == "send_recv": + # the compute nodes need to have the input_uv for this mode + # they make the task list on the fly + if rank != 0: + uv_container = input_uv + uv_container, uvdata_indices = _run_uvsim_send_recv( + uv_container, + Ntasks_tot, + Nsky_parts, + Nbls, + Nblts, + Nfreqs, + Nsrcs, + beam_list, + beam_dict, + catalog, + pbar, + quiet, + ) - while not request.Test(): - if rank == 0 and not quiet: - cval = count.current_value() - pbar.update(cval) - if rank == 0 and not quiet: - pbar.finish() + # If profiling is active, save meta data: + from .profiling import prof # noqa + + if hasattr(prof, "meta_file") and len(uvdata_indices) > 0: # pragma: nocover + # Saving axis sizes on current rank (local) and for the whole job (global). + # These lines are affected by issue 179 of line_profiler, so the nocover + # above will need to stay until this issue is resolved (see profiling.py). + task_inds = np.array(uvdata_indices) + + bl_inds = task_inds[:, 0] % Nbls + time_inds = (task_inds[:, 0] - bl_inds) // Nbls + Ntimes_loc = np.unique(time_inds).size + Nbls_loc = np.unique(bl_inds).size + Nfreqs_loc = np.unique(task_inds[:, 1]).size + axes_dict = { + "Ntimes_loc": Ntimes_loc, + "Nbls_loc": Nbls_loc, + "Nfreqs_loc": Nfreqs_loc, + "Nsrcs_loc": Nsky_parts, + "prof_rank": prof.rank, + } - if rank == 0 and not quiet: - print("Calculations Complete.", flush=True) - - # If profiling is active, save meta data: - from .profiling import prof # noqa - - if hasattr(prof, "meta_file"): # pragma: nocover - # Saving axis sizes on current rank (local) and for the whole job (global). - # These lines are affected by issue 179 of line_profiler, so the nocover - # above will need to stay until this issue is resolved (see profiling.py). - task_inds = np.array(uvdata_indices) - bl_inds = task_inds[:, 0] % Nbls - time_inds = (task_inds[:, 0] - bl_inds) // Nbls - Ntimes_loc = np.unique(time_inds).size - Nbls_loc = np.unique(bl_inds).size - Nfreqs_loc = np.unique(task_inds[:, 1]).size - axes_dict = { - "Ntimes_loc": Ntimes_loc, - "Nbls_loc": Nbls_loc, - "Nfreqs_loc": Nfreqs_loc, - "Nsrcs_loc": Nsky_parts, - "prof_rank": prof.rank, - } - - with open(prof.meta_file, "w") as afile: - for k, v in axes_dict.items(): - afile.write(f"{k} \t {int(v):d}\n") - finally: - count.free() - vis_data.Free() + with open(prof.meta_file, "w") as afile: + for k, v in axes_dict.items(): + afile.write(f"{k} \t {int(v):d}\n") if rank == 0: # This needs to be done after the simulation completes because it @@ -1012,12 +1257,16 @@ def run_uvdata_uvsim( return uv_container + return + def run_uvsim( params, return_uv=False, beam_interp_check=None, quiet=False, + backend="rma", + progbar="progsteps", block_nonroot_stdout=True, ): """ @@ -1039,6 +1288,12 @@ def run_uvsim( zenith angle. quiet : bool If True, do not print anything to stdout. (Default False) + backend : str + Specifies the backend for simulation. + Must be one of ['rma', 'send_recv']. (Default: 'rma') + progbar : str + Specifies the progress bar class used to output simulation progress. + Must be one of ['progsteps', 'tqdm']. Default 'progsteps' block_nonroot_stdout : bool Redirect stdout on nonzero ranks to /dev/null, for cleaner output. @@ -1057,7 +1312,28 @@ def run_uvsim( "line_profiler installed." ) - mpi.start_mpi(block_nonroot_stdout=block_nonroot_stdout) + if backend not in ["rma", "send_recv"]: + raise ValueError( + "The backend keyword must be one of " + "'rma', 'send_recv' " + f"but received value {backend}." + ) + + if progbar not in ["tqdm", "progsteps"]: + raise ValueError( + "The progbar keyword must be one of " + "'progsteps' or 'tqdm' " + f"but received {progbar}." + ) + + if backend == "rma": + thread_multiple = False + else: + thread_multiple = True + + mpi.start_mpi( + thread_multiple=thread_multiple, block_nonroot_stdout=block_nonroot_stdout + ) rank = mpi.get_rank() input_uv = UVData() @@ -1087,11 +1363,13 @@ def run_uvsim( beam_dict=beam_dict, catalog=skydata, quiet=quiet, + backend=backend, + progbar=progbar, beam_interp_check=beam_interp_check, ) if rank == 0 and not quiet: - print(f"Run uvdata uvsim took {(Time.now() - start).to('minute'):.3f}") + print(f"\nRun uvdata uvsim took {(Time.now() - start).to('minute'):.3f}") if rank == 0: if isinstance(params, str): diff --git a/tests/test_profiler.py b/tests/test_profiler.py index 38ca5249..5d0da5ad 100644 --- a/tests/test_profiler.py +++ b/tests/test_profiler.py @@ -25,7 +25,12 @@ def profdata_dir_setup(tmpdir): @pytest.mark.filterwarnings("ignore:The mount_type parameter must be set for UVBeam") -def test_profiler(tmpdir): +@pytest.mark.parametrize("backend", ["rma", "send_recv"]) +@pytest.mark.parametrize("progbar", ["progsteps", "tqdm"]) +@pytest.mark.parallel(2) +def test_profiler(tmpdir, backend, progbar): + if progbar == "tqdm": + pytest.importorskip("tqdm") line_profiler = pytest.importorskip("line_profiler") outpath = profdata_dir_setup(tmpdir) testprof_fname = str(outpath.join("time_profile.out")) @@ -37,7 +42,9 @@ def test_profiler(tmpdir): param_filename = os.path.join( SIM_DATA_PATH, "test_config", "param_1time_1src_testcat.yaml" ) - pyuvsim.uvsim.run_uvsim(param_filename, return_uv=True) + pyuvsim.uvsim.run_uvsim( + param_filename, return_uv=True, backend=backend, progbar=progbar + ) time_profiler = pyuvsim.profiling.get_profiler() time_profiler.disable_by_count() assert isinstance(time_profiler, line_profiler.LineProfiler) @@ -51,6 +58,18 @@ def test_profiler(tmpdir): assert unique(func_names).tolist() == sorted( pyuvsim.profiling.default_profile_funcs ) + if pyuvsim.mpi.rank == 0: + time_profiler = pyuvsim.profiling.get_profiler() + time_profiler.disable_by_count() + assert isinstance(time_profiler, line_profiler.LineProfiler) + assert hasattr(time_profiler, "rank") + assert hasattr(time_profiler, "meta_file") + lstats = time_profiler.get_stats() + assert len(lstats.timings) != 0 + func_names = [k[2] for k in lstats.timings.keys()] + assert unique(func_names).tolist() == sorted( + pyuvsim.profiling.default_profile_funcs + ) def test_profiler_mock_import(tmpdir): diff --git a/tests/test_run.py b/tests/test_run.py index 2c167d0f..37d5a751 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -26,6 +26,8 @@ c_ms = speed_of_light.to("m/s").value pytest.importorskip("mpi4py") # noqa +# everything in this file requires 2 PUs +pytestmark = pytest.mark.parallel(2) @pytest.mark.filterwarnings("ignore:Fixing auto-correlations to be be real-only") @@ -76,6 +78,9 @@ def test_run_paramfile_uvsim(goto_tempdir, paramfile): uv_new.extra_keywords = uv_ref.extra_keywords uv_ref.telescope.mount_type = uv_new.telescope.mount_type + # remove filename attribute to ensure equality + uv_new.filename = None + uv_ref.filename = None assert uv_new == uv_ref @@ -94,12 +99,15 @@ def test_run_paramfile_uvsim(goto_tempdir, paramfile): ("monopole-nonflat", 3e-4), ], ) -def test_analytic_diffuse(model, tol, tmpdir): +@pytest.mark.parametrize("backend", ["rma", "send_recv"]) +@pytest.mark.parametrize("progbar", ["progsteps", "tqdm"]) +def test_analytic_diffuse(model, tol, tmpdir, backend, progbar): # Generate the given model and simulate for a few baselines. # Import from analytic_diffuse (consider moving to rasg_affiliates?) - pytest.importorskip("analytic_diffuse") + analytic_diffuse = pytest.importorskip("analytic_diffuse") pytest.importorskip("astropy_healpix") - import analytic_diffuse + if progbar == "tqdm": + pytest.importorskip("tqdm") modname = model use_w = False @@ -152,14 +160,17 @@ def test_analytic_diffuse(model, tol, tmpdir): with open(obspar_path, "w") as ofile: yaml.dump(obspar, ofile, default_flow_style=False) - uv_out = pyuvsim.run_uvsim(obspar_path, return_uv=True) - # Convert from Jy to K sr - dat = uv_out.data_array[:, 0, 0] * jy_to_ksr(uv_out.freq_array[0]).value - # Evaluate the solution and compare to visibilities. - soln = analytic_diffuse.get_solution(modname) - uvw_lam = uv_out.uvw_array * uv_out.freq_array[0] / c_ms - ana = soln(uvw_lam, **params) - np.testing.assert_allclose(ana / 2, dat, atol=tol, rtol=0) + uv_out = pyuvsim.run_uvsim( + obspar_path, return_uv=True, backend=backend, progbar=progbar + ) + if pyuvsim.mpi.rank == 0: + # Convert from Jy to K sr + dat = uv_out.data_array[:, 0, 0] * jy_to_ksr(uv_out.freq_array[0]).value + # Evaluate the solution and compare to visibilities. + soln = analytic_diffuse.get_solution(modname) + uvw_lam = uv_out.uvw_array * uv_out.freq_array[0] / c_ms + ana = soln(uvw_lam, **params) + np.testing.assert_allclose(ana / 2, dat, atol=tol, rtol=0) @pytest.mark.filterwarnings("ignore:antenna_diameters are not set") @@ -354,9 +365,15 @@ def test_run_gleam_uvsim(spectral_type, nfeeds): ["spectral_index", False, True], ), ) -def test_zenith_spectral_sim(spectral_type, tmpdir, use_cli, use_uvdata): +@pytest.mark.parametrize("backend", ["rma", "send_recv"]) +@pytest.mark.parametrize("progbar", ["progsteps", "tqdm"]) +def test_zenith_spectral_sim( + spectral_type, tmpdir, use_cli, use_uvdata, backend, progbar +): # Make a power law source at zenith in three ways. # Confirm that simulated visibilities match expectation. + if progbar == "tqdm": + pytest.importorskip("tqdm") params = pyuvsim.simsetup._config_str_to_dict( os.path.join(SIM_DATA_PATH, "test_config", "param_1time_1src_testcat.yaml") @@ -464,6 +481,10 @@ def test_zenith_spectral_sim(spectral_type, tmpdir, use_cli, use_uvdata): cat_in, "--outfile", uvd_out, + "--backend", + backend, + "--progbar", + progbar, ] ) else: @@ -478,6 +499,10 @@ def test_zenith_spectral_sim(spectral_type, tmpdir, use_cli, use_uvdata): "--quiet", "--keep_nonroot_stdout", "--raw_profile", + "--backend", + backend, + "--progbar", + progbar, ] ) assert os.path.exists(profile_file) @@ -486,10 +511,17 @@ def test_zenith_spectral_sim(spectral_type, tmpdir, use_cli, use_uvdata): else: if use_uvdata: uv_out = pyuvsim.run_uvdata_uvsim( - input_uv=uvd, beam_list=beam_list, beam_dict=beam_dict, catalog=catalog + input_uv=uvd, + beam_list=beam_list, + beam_dict=beam_dict, + catalog=catalog, + backend=backend, + progbar=progbar, ) else: - uv_out = pyuvsim.run_uvsim(param_file, return_uv=True) + uv_out = pyuvsim.run_uvsim( + param_file, return_uv=True, backend=backend, progbar=progbar + ) if use_cli and use_uvdata: # this is different because we are writing out the analytic beam to a @@ -603,6 +635,7 @@ def test_sim_on_moon(goto_tempdir, selenoid): # Lunar Frame Roundtripping param_dict["filing"]["outdir"] = str(tmpdir) + uv_filename = pyuvsim.utils.write_uvdata( uv_out, param_dict, return_filename=True, quiet=True ) diff --git a/tests/test_uvsim.py b/tests/test_uvsim.py index 43c7120c..6eac6e19 100644 --- a/tests/test_uvsim.py +++ b/tests/test_uvsim.py @@ -767,9 +767,11 @@ def test_local_task_gen(): @pytest.mark.filterwarnings("ignore:The parameter `blt_order` could not be identified") +@pytest.mark.parallel(2) def test_nsky_parts_large(capsys): """Check that we get the same visibilities no matter what Nsky_parts is set to.""" pytest.importorskip("mpi4py") + pyuvsim.mpi.start_mpi() hera_uv = UVData.from_file(EW_uvfits_10time10chan) hera_uv.select(times=np.unique(hera_uv.time_array)[0:3], freq_chans=range(3)) time = Time(hera_uv.time_array[0], scale="utc", format="jd") @@ -796,9 +798,10 @@ def test_nsky_parts_large(capsys): Nsky_parts=5, ) - captured = capsys.readouterr() - assert "The source list has been split into Nsky_parts" in captured.out - assert out_uv_single_nsky == out_uv_multi_nsky + if pyuvsim.mpi.get_rank() == 0: + captured = capsys.readouterr() + assert "The source list has been split into Nsky_parts" in captured.out + assert out_uv_single_nsky == out_uv_multi_nsky def test_set_nsky_parts_errors(): @@ -1158,6 +1161,8 @@ def test_run_mpierr(hera_loc, cst_beam): beam_list.share() +@pytest.mark.parallel(2) +@pytest.mark.filterwarnings("ignore:Cannot check consistency of a string-mode BeamList") @pytest.mark.parametrize("order", [("bda",), ("baseline", "time"), ("ant2", "time")]) def test_ordering(uvdata_two_redundant_bls_triangle_sources, order): pytest.importorskip("mpi4py") @@ -1177,6 +1182,11 @@ def test_ordering(uvdata_two_redundant_bls_triangle_sources, order): beam_dict=beam_dict, catalog=sky_model, ) + rank = pyuvsim.mpi.get_rank() + # rank 0 is the only one with the full uvdata object + if rank != 0: + return + assert out_uv.blt_order == order assert out_uv.blt_order == uvdata_linear.blt_order @@ -1194,9 +1204,15 @@ def test_ordering(uvdata_two_redundant_bls_triangle_sources, order): ) +@pytest.mark.filterwarnings("ignore:Cannot check consistency of a string-mode BeamList") +@pytest.mark.parallel(2) @pytest.mark.parametrize("order", [("bda",), ("baseline", "time"), ("ant2", "time")]) def test_order_warning(uvdata_two_redundant_bls_triangle_sources, order): pytest.importorskip("mpi4py") + # need to get the mpi initialized + # now that simulations require at least 2 PUs + pyuvsim.mpi.start_mpi() + rank = pyuvsim.mpi.get_rank() uvdata_linear, beam_list, beam_dict, sky_model = ( uvdata_two_redundant_bls_triangle_sources ) @@ -1208,22 +1224,36 @@ def test_order_warning(uvdata_two_redundant_bls_triangle_sources, order): uvdata_linear.reorder_blts(order=order[0], minor_order=minor_order) # delete the order like we forgot to set it uvdata_linear.blt_order = None - with check_warnings( - UserWarning, match="The parameter `blt_order` could not be identified." - ): + if rank == 0: + with check_warnings( + UserWarning, match="The parameter `blt_order` could not be identified." + ): + out_uv = pyuvsim.uvsim.run_uvdata_uvsim( + input_uv=uvdata_linear.copy(), + beam_list=beam_list, + beam_dict=beam_dict, + catalog=sky_model, + ) + + assert out_uv.blt_order == ("time", "baseline") + else: out_uv = pyuvsim.uvsim.run_uvdata_uvsim( input_uv=uvdata_linear.copy(), beam_list=beam_list, beam_dict=beam_dict, catalog=sky_model, ) - assert out_uv.blt_order == ("time", "baseline") +@pytest.mark.parallel(2, timeout=10) @pytest.mark.filterwarnings("ignore:key beam_path in extra_keywords is longer than 8") @pytest.mark.parametrize("cut_beam", [10, 85, 90]) -def test_nblts_not_square(uvdata_two_redundant_bls_triangle_sources, cut_beam): +@pytest.mark.parametrize("backend", ["rma", "send_recv"]) +def test_nblts_not_square(uvdata_two_redundant_bls_triangle_sources, cut_beam, backend): pytest.importorskip("mpi4py") + pyuvsim.mpi.start_mpi(block_nonroot_stdout=False) + rank = pyuvsim.mpi.get_rank() + uvdata_linear, beam_list, beam_dict, sky_model = ( uvdata_two_redundant_bls_triangle_sources ) @@ -1247,27 +1277,63 @@ def test_nblts_not_square(uvdata_two_redundant_bls_triangle_sources, cut_beam): indices = indices[::2] blt_inds = np.delete(np.arange(uvdata_linear.Nblts), indices) uvdata_linear.select(blt_inds=blt_inds) - assert uvdata_linear.Nblts != uvdata_linear.Nbls * uvdata_linear.Ntimes if cut_beam < 85: # There's a source out at ~80 degrees - with pytest.raises( - ValueError, - match="at least one interpolation location is outside of the UVBeam", - ): - out_uv = pyuvsim.uvsim.run_uvdata_uvsim( - input_uv=uvdata_linear.copy(), - beam_list=beam_list, - beam_dict=beam_dict, - catalog=sky_model, - ) + + # for send_recv the error is sent to node 0 and not raised on worker PUs + if backend == "send_recv": + if rank == 0: + with pytest.raises( + ValueError, + match=r".*at least one interpolation location is outside of the UVBeam", + ): + out_uv = pyuvsim.uvsim.run_uvdata_uvsim( + input_uv=uvdata_linear.copy(), + beam_list=beam_list, + beam_dict=beam_dict, + catalog=sky_model, + backend=backend, + ) + else: + pyuvsim.uvsim.run_uvdata_uvsim( + input_uv=uvdata_linear.copy(), + beam_list=beam_list, + beam_dict=beam_dict, + catalog=sky_model, + backend=backend, + ) + + # work only occurs on non rank 0 nodes. check for the error there. + else: + if rank != 0: + with pytest.raises( + ValueError, + match="at least one interpolation location is outside of the UVBeam", + ): + out_uv = pyuvsim.uvsim.run_uvdata_uvsim( + input_uv=uvdata_linear.copy(), + beam_list=beam_list, + beam_dict=beam_dict, + catalog=sky_model, + backend=backend, + ) + else: + pyuvsim.uvsim.run_uvdata_uvsim( + input_uv=uvdata_linear.copy(), + beam_list=beam_list, + beam_dict=beam_dict, + catalog=sky_model, + backend=backend, + ) else: out_uv = pyuvsim.uvsim.run_uvdata_uvsim( input_uv=uvdata_linear.copy(), beam_list=beam_list, beam_dict=beam_dict, catalog=sky_model, + backend=backend, ) np.testing.assert_allclose(out_uv.get_data((0, 1)), out_uv.get_data((1, 2))) @@ -1363,3 +1429,46 @@ def test_run_uvdata_uvsim_errors(kwargs, msg): pytest.importorskip("mpi4py") with pytest.raises(TypeError, match=msg): pyuvsim.run_uvdata_uvsim(**kwargs) + + +def test_tqdm_import_error(): + pytest.importorskip("mpi4py") + try: + import tqdm # noqa + + return + except ImportError: + with pytest.raises(ImportError, match="The tqdm module must be"): + pyuvsim.uvsim._get_pbar("tqdm", None, None) + + +def test_progbar_error_get_pbar(): + pytest.importorskip("mpi4py") + with pytest.raises(ValueError, match="The progbar keyword must be one of "): + pyuvsim.uvsim._get_pbar("foo", None, None) + + +def test_progbar_error_uvdata_uvsim(): + pytest.importorskip("mpi4py") + with pytest.raises(ValueError, match="The progbar keyword must be one of "): + pyuvsim.run_uvdata_uvsim( + None, None, None, None, False, backend="rma", progbar="Foo" + ) + + +def test_progbar_error_run_uvsim(): + pytest.importorskip("mpi4py") + with pytest.raises(ValueError, match="The progbar keyword must be one of "): + pyuvsim.run_uvsim(None, False, None, backend="rma", progbar="Foo") + + +def test_backend_error_uvdata_uvsim(): + pytest.importorskip("mpi4py") + with pytest.raises(ValueError, match="The backend keyword must be one of "): + pyuvsim.run_uvdata_uvsim(None, None, None, None, False, backend="Test") + + +def test_backend_error_run_uvsim(): + pytest.importorskip("mpi4py") + with pytest.raises(ValueError, match="The backend keyword must be one of "): + pyuvsim.run_uvsim(None, False, False, backend="Test") From 6945faa16b4fe4172980224cc3a660386dee2d86 Mon Sep 17 00:00:00 2001 From: Matthew Kolopanis Date: Thu, 21 Mar 2024 15:26:35 -0700 Subject: [PATCH 2/3] plumb mpi-pytest changes through rest of tests, update only progstreps workflow file small clean ups in mpi --- .github/workflows/testsuite.yaml | 2 +- src/pyuvsim/mpi.py | 22 ++++++----- src/pyuvsim/uvsim.py | 2 - tests/test_profiler.py | 2 + tests/test_run.py | 66 +++++++++++++++++++++++++++----- tests/test_uvsim.py | 20 +++++++++- 6 files changed, 89 insertions(+), 25 deletions(-) diff --git a/.github/workflows/testsuite.yaml b/.github/workflows/testsuite.yaml index bcd64c4a..a0ccab54 100644 --- a/.github/workflows/testsuite.yaml +++ b/.github/workflows/testsuite.yaml @@ -180,7 +180,7 @@ jobs: only_progsteps: env: ENV_NAME: only_progsteps - PYTHON: 3.7 + PYTHON: "3.12" name: No Tqdm Tests defaults: run: diff --git a/src/pyuvsim/mpi.py b/src/pyuvsim/mpi.py index 8de32918..de62d731 100644 --- a/src/pyuvsim/mpi.py +++ b/src/pyuvsim/mpi.py @@ -87,16 +87,18 @@ def start_mpi(block_nonroot_stdout=True, thread_multiple=False): world_comm.Barrier() - if do_once and (rank != 0) and block_nonroot_stdout: # pragma: no cover - # For non-root ranks, do not print to stdout. - if sys.platform.startswith("win"): - with open("NUL", "w") as devnull: - sys.stdout = devnull - else: - with open("/dev/null", "w") as devnull: - sys.stdout = devnull - atexit.register(sys.stdout.close) - atexit.register(free_shared) + if do_once: # pragma: no cover + atexit.register(free_shared) + + if (rank != 0) and block_nonroot_stdout: + # For non-root ranks, do not print to stdout. + if sys.platform.startswith("win"): + with open("NUL", "w") as devnull: + sys.stdout = devnull + else: + with open("/dev/null", "w") as devnull: + sys.stdout = devnull + atexit.register(sys.stdout.close) def shared_mem_bcast(arr, root=0): diff --git a/src/pyuvsim/uvsim.py b/src/pyuvsim/uvsim.py index 05612acd..a6e38e36 100644 --- a/src/pyuvsim/uvsim.py +++ b/src/pyuvsim/uvsim.py @@ -642,8 +642,6 @@ def _run_uvsim_rma( rank = mpi.get_rank() comm = mpi.get_comm() if rank == 0: - if "world" in uv_container.extra_keywords: - uv_container.extra_keywords["world"] = uv_container.extra_keywords["world"] vis_data = mpi.MPI.Win.Create( uv_container._data_array.value, comm=mpi.world_comm ) diff --git a/tests/test_profiler.py b/tests/test_profiler.py index 5d0da5ad..0e587c6d 100644 --- a/tests/test_profiler.py +++ b/tests/test_profiler.py @@ -67,6 +67,8 @@ def test_profiler(tmpdir, backend, progbar): lstats = time_profiler.get_stats() assert len(lstats.timings) != 0 func_names = [k[2] for k in lstats.timings.keys()] + # just get the function names (not objects) + func_names = [name.split(".")[-1] for name in func_names] assert unique(func_names).tolist() == sorted( pyuvsim.profiling.default_profile_funcs ) diff --git a/tests/test_run.py b/tests/test_run.py index 37d5a751..0c1b8882 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -38,8 +38,8 @@ not pytest.pyuvsim_can_parallel, reason="mpi-pytest is not installed. Cannot run parallel tests.", ) -@pytest.mark.parallel(2) -def test_run_paramfile_uvsim(goto_tempdir, paramfile): +@pytest.mark.parametrize("backend", ["rma", "send_recv"]) +def test_run_paramfile_uvsim(goto_tempdir, paramfile, backend): # Test vot and txt catalogs for parameter simulation # Compare to reference files. ref_file = os.path.join(SIM_DATA_PATH, "testfile_singlesource.uvh5") @@ -47,7 +47,7 @@ def test_run_paramfile_uvsim(goto_tempdir, paramfile): param_filename = os.path.join(SIM_DATA_PATH, "test_config", paramfile) # This test obsparam file has "single_source.txt" as its catalog. - pyuvsim.uvsim.run_uvsim(param_filename) + pyuvsim.uvsim.run_uvsim(param_filename, backend=backend) # Loading the file and comparing is only done on rank 0. if pyuvsim.mpi.rank != 0: @@ -101,6 +101,10 @@ def test_run_paramfile_uvsim(goto_tempdir, paramfile): ) @pytest.mark.parametrize("backend", ["rma", "send_recv"]) @pytest.mark.parametrize("progbar", ["progsteps", "tqdm"]) +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) def test_analytic_diffuse(model, tol, tmpdir, backend, progbar): # Generate the given model and simulate for a few baselines. # Import from analytic_diffuse (consider moving to rasg_affiliates?) @@ -174,7 +178,12 @@ def test_analytic_diffuse(model, tol, tmpdir, backend, progbar): @pytest.mark.filterwarnings("ignore:antenna_diameters are not set") -def test_diffuse_units(tmpdir): +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +@pytest.mark.parametrize("backend", ["rma", "send_recv"]) +def test_diffuse_units(tmpdir, backend): pytest.importorskip("analytic_diffuse") pytest.importorskip("astropy_healpix") @@ -237,14 +246,14 @@ def test_diffuse_units(tmpdir): with open(obspar_path, "w") as ofile: yaml.dump(obspar, ofile, default_flow_style=False) - uv_out_k = pyuvsim.run_uvsim(obspar_path, return_uv=True) + uv_out_k = pyuvsim.run_uvsim(obspar_path, backend=backend, return_uv=True) obspar["sources"] = {"catalog": catpath_jy} obspar["filing"]["outfile_name"] = "diffuse_units_jy_sim.uvh5" with open(obspar_path, "w") as ofile: yaml.dump(obspar, ofile, default_flow_style=False) - uv_out_jy = pyuvsim.run_uvsim(obspar_path, return_uv=True) + uv_out_jy = pyuvsim.run_uvsim(obspar_path, backend=backend, return_uv=True) # make the histories the same for comparison uv_out_jy.history = uv_out_k.history @@ -255,7 +264,12 @@ def test_diffuse_units(tmpdir): @pytest.mark.parametrize("rename_beamfits", [True, False]) -def test_run_paramdict_uvsim(rename_beamfits, tmp_path): +@pytest.mark.parametrize("backend", ["rma", "send_recv"]) +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +def test_run_paramdict_uvsim(rename_beamfits, backend, tmp_path): # Running a simulation from parameter dictionary. param_file = os.path.join( SIM_DATA_PATH, "test_config", "param_1time_1src_testcat.yaml" @@ -296,12 +310,17 @@ def test_run_paramdict_uvsim(rename_beamfits, tmp_path): else: params = pyuvsim.simsetup._config_str_to_dict(param_file) - pyuvsim.run_uvsim(params, return_uv=True) + pyuvsim.run_uvsim(params, backend=backend, return_uv=True) @pytest.mark.parametrize("spectral_type", ["flat", "subband", "spectral_index"]) @pytest.mark.parametrize("nfeeds", [1, 2]) -def test_run_gleam_uvsim(spectral_type, nfeeds): +@pytest.mark.parametrize("backend", ["rma", "send_recv"]) +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +def test_run_gleam_uvsim(spectral_type, nfeeds, backend): params = pyuvsim.simsetup._config_str_to_dict( os.path.join(SIM_DATA_PATH, "test_config", "param_1time_testgleam.yaml") ) @@ -327,7 +346,11 @@ def test_run_gleam_uvsim(spectral_type, nfeeds): for beam_inter in beam_list: assert beam_inter.Nfeeds == nfeeds - uv_out = pyuvsim.run_uvsim(params, return_uv=True) + uv_out = pyuvsim.run_uvsim(params, backend=backend, return_uv=True) + + if pyuvsim.mpi.rank != 0: + return + assert uv_out.telescope.name == "Triangle" file_name = f"gleam_triangle_{spectral_type}.uvh5" @@ -367,6 +390,10 @@ def test_run_gleam_uvsim(spectral_type, nfeeds): ) @pytest.mark.parametrize("backend", ["rma", "send_recv"]) @pytest.mark.parametrize("progbar", ["progsteps", "tqdm"]) +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) def test_zenith_spectral_sim( spectral_type, tmpdir, use_cli, use_uvdata, backend, progbar ): @@ -533,6 +560,10 @@ def test_zenith_spectral_sim( np.testing.assert_allclose(uv_out.data_array[ii, :, 0], exp_spectrum) +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) def test_pol_error(): # Check that running with a uvdata object without the proper polarizations will fail. hera_uv = UVData() @@ -553,6 +584,10 @@ def test_pol_error(): @pytest.mark.filterwarnings("ignore:Setting the location attribute post initialization") @pytest.mark.parametrize("selenoid", ["SPHERE", "GSFC", "GRAIL23", "CE-1-LAM-GEO"]) +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) def test_sim_on_moon(goto_tempdir, selenoid): pytest.importorskip("lunarsky") from spiceypy.utils.exceptions import SpiceUNKNOWNFRAME @@ -623,6 +658,9 @@ def test_sim_on_moon(goto_tempdir, selenoid): except SpiceUNKNOWNFRAME as err: pytest.skip("SpiceUNKNOWNFRAME error: " + str(err)) + if pyuvsim.mpi.rank != 0: + return + assert history_utils._check_history_version(uv_out.history, pyradiosky.__version__) assert history_utils._check_history_version(uv_out.history, pyuvdata.__version__) assert history_utils._check_history_version(uv_out.history, pyuvsim.__version__) @@ -645,6 +683,10 @@ def test_sim_on_moon(goto_tempdir, selenoid): @pytest.mark.parametrize("selenoid", ["SPHERE", "GSFC", "GRAIL23", "CE-1-LAM-GEO"]) +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) def test_lunar_gauss(goto_tempdir, selenoid): pytest.importorskip("lunarsky") from lunarsky import MoonLocation @@ -693,8 +735,12 @@ def test_lunar_gauss(goto_tempdir, selenoid): except SpiceUNKNOWNFRAME as err: pytest.skip("SpiceUNKNOWNFRAME error: " + str(err)) + if pyuvsim.mpi.rank != 0: + return + assert uv_out.telescope.location.ellipsoid == selenoid + assert uv_out._telescope_location.ellipsoid == selenoid # Skymodel and update positions # Init sky model diff --git a/tests/test_uvsim.py b/tests/test_uvsim.py index 6eac6e19..f1b6d09e 100644 --- a/tests/test_uvsim.py +++ b/tests/test_uvsim.py @@ -767,6 +767,10 @@ def test_local_task_gen(): @pytest.mark.filterwarnings("ignore:The parameter `blt_order` could not be identified") +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) @pytest.mark.parallel(2) def test_nsky_parts_large(capsys): """Check that we get the same visibilities no matter what Nsky_parts is set to.""" @@ -1161,9 +1165,13 @@ def test_run_mpierr(hera_loc, cst_beam): beam_list.share() -@pytest.mark.parallel(2) +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) @pytest.mark.filterwarnings("ignore:Cannot check consistency of a string-mode BeamList") @pytest.mark.parametrize("order", [("bda",), ("baseline", "time"), ("ant2", "time")]) +@pytest.mark.parallel(2) def test_ordering(uvdata_two_redundant_bls_triangle_sources, order): pytest.importorskip("mpi4py") uvdata_linear, beam_list, beam_dict, sky_model = ( @@ -1205,6 +1213,10 @@ def test_ordering(uvdata_two_redundant_bls_triangle_sources, order): @pytest.mark.filterwarnings("ignore:Cannot check consistency of a string-mode BeamList") +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) @pytest.mark.parallel(2) @pytest.mark.parametrize("order", [("bda",), ("baseline", "time"), ("ant2", "time")]) def test_order_warning(uvdata_two_redundant_bls_triangle_sources, order): @@ -1245,7 +1257,11 @@ def test_order_warning(uvdata_two_redundant_bls_triangle_sources, order): ) -@pytest.mark.parallel(2, timeout=10) +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +@pytest.mark.parallel(2) @pytest.mark.filterwarnings("ignore:key beam_path in extra_keywords is longer than 8") @pytest.mark.parametrize("cut_beam", [10, 85, 90]) @pytest.mark.parametrize("backend", ["rma", "send_recv"]) From 6944d9d0789567723002d63acf61186c9e47a947 Mon Sep 17 00:00:00 2001 From: Matthew Kolopanis Date: Tue, 26 Aug 2025 13:17:56 -0700 Subject: [PATCH 3/3] some tweak to get tests flowing --- src/pyuvsim/cli.py | 13 ++++++++----- src/pyuvsim/mpi.py | 23 +++++++++++------------ src/pyuvsim/uvsim.py | 26 ++++++++++++++++---------- tests/test_profiler.py | 21 +++++++-------------- tests/test_run.py | 8 ++++++-- tests/test_uvsim.py | 18 ++++++++++++++++++ 6 files changed, 66 insertions(+), 43 deletions(-) diff --git a/src/pyuvsim/cli.py b/src/pyuvsim/cli.py index 4f3f5432..28777abb 100644 --- a/src/pyuvsim/cli.py +++ b/src/pyuvsim/cli.py @@ -135,11 +135,14 @@ def run_pyuvsim(argv=None): backend=args.backend, progbar=args.progbar, ) - pobj = Path(args.outfile) - utils.write_uvdata( - uvd_out, - param_dict={"outdir": str(pobj.parent), "outfile_name": str(pobj.name)}, - ) + + if mpi.rank != 0: + return + + pobj = Path(args.outfile) + utils.write_uvdata( + uvd_out, param_dict={"outdir": str(pobj.parent), "outfile_name": str(pobj.name)} + ) if args.profile: dt = pytime.time() - t0 diff --git a/src/pyuvsim/mpi.py b/src/pyuvsim/mpi.py index de62d731..2b45f7a3 100644 --- a/src/pyuvsim/mpi.py +++ b/src/pyuvsim/mpi.py @@ -87,18 +87,17 @@ def start_mpi(block_nonroot_stdout=True, thread_multiple=False): world_comm.Barrier() - if do_once: # pragma: no cover - atexit.register(free_shared) - - if (rank != 0) and block_nonroot_stdout: - # For non-root ranks, do not print to stdout. - if sys.platform.startswith("win"): - with open("NUL", "w") as devnull: - sys.stdout = devnull - else: - with open("/dev/null", "w") as devnull: - sys.stdout = devnull - atexit.register(sys.stdout.close) + if do_once and (rank != 0) and block_nonroot_stdout: # pragma: no cover + # For non-root ranks, do not print to stdout. + if sys.platform.startswith("win"): + with open("NUL", "w") as devnull: + sys.stdout = devnull + else: + with open("/dev/null", "w") as devnull: + sys.stdout = devnull + atexit.register(sys.stdout.close) + + atexit.register(free_shared) def shared_mem_bcast(arr, root=0): diff --git a/src/pyuvsim/uvsim.py b/src/pyuvsim/uvsim.py index a6e38e36..facbca14 100644 --- a/src/pyuvsim/uvsim.py +++ b/src/pyuvsim/uvsim.py @@ -1109,14 +1109,20 @@ def run_uvdata_uvsim( comm = mpi.get_comm() Npus = mpi.get_Npus() - if Npus < 2: - raise ValueError("At least 2 PUs are required to run pyuvsim.") # ensure that we have full or flat spectral type and that we have points not # healpix if rank == 0: input_uv, beam_list, catalog, input_order = _run_uvdata_input_check( input_uv=input_uv, beam_list=beam_list, catalog=catalog ) + else: + input_uv = UVData() + beam_list = BeamList([]) + beam_dict = None + catalog = SkyModelData() + + if Npus < 2: + raise ValueError("At least 2 PUs are required to run pyuvsim.") input_uv = comm.bcast(input_uv, root=0) beam_dict = comm.bcast(beam_dict, root=0) @@ -1156,14 +1162,14 @@ def run_uvdata_uvsim( Nblts, Nfreqs, Nsrcs, rank - 1, Npus - 1 ) - local_task_iter = uvdata_to_task_iter( - task_inds, - input_uv, - catalog.subselect(src_inds), - beam_list, - beam_dict, - Nsky_parts=Nsky_parts, - ) + local_task_iter = uvdata_to_task_iter( + task_inds, + input_uv, + catalog.subselect(src_inds), + beam_list, + beam_dict, + Nsky_parts=Nsky_parts, + ) elif backend == "send_recv": # hack this a little bit, we want to have Nbls * Nfreqs * Ntimes # total tasks we are distributing over all PUs but then we diff --git a/tests/test_profiler.py b/tests/test_profiler.py index 0e587c6d..eae489c5 100644 --- a/tests/test_profiler.py +++ b/tests/test_profiler.py @@ -32,19 +32,26 @@ def test_profiler(tmpdir, backend, progbar): if progbar == "tqdm": pytest.importorskip("tqdm") line_profiler = pytest.importorskip("line_profiler") + outpath = profdata_dir_setup(tmpdir) testprof_fname = str(outpath.join("time_profile.out")) pyuvsim.profiling.set_profiler(outfile_prefix=testprof_fname, dump_raw=True) + with check_warnings(UserWarning, match="Profiler already set"): pyuvsim.profiling.set_profiler( outfile_prefix=testprof_fname[:-4], dump_raw=True ) + param_filename = os.path.join( SIM_DATA_PATH, "test_config", "param_1time_1src_testcat.yaml" ) pyuvsim.uvsim.run_uvsim( param_filename, return_uv=True, backend=backend, progbar=progbar ) + + if pyuvsim.mpi.rank != 0: + return + time_profiler = pyuvsim.profiling.get_profiler() time_profiler.disable_by_count() assert isinstance(time_profiler, line_profiler.LineProfiler) @@ -58,20 +65,6 @@ def test_profiler(tmpdir, backend, progbar): assert unique(func_names).tolist() == sorted( pyuvsim.profiling.default_profile_funcs ) - if pyuvsim.mpi.rank == 0: - time_profiler = pyuvsim.profiling.get_profiler() - time_profiler.disable_by_count() - assert isinstance(time_profiler, line_profiler.LineProfiler) - assert hasattr(time_profiler, "rank") - assert hasattr(time_profiler, "meta_file") - lstats = time_profiler.get_stats() - assert len(lstats.timings) != 0 - func_names = [k[2] for k in lstats.timings.keys()] - # just get the function names (not objects) - func_names = [name.split(".")[-1] for name in func_names] - assert unique(func_names).tolist() == sorted( - pyuvsim.profiling.default_profile_funcs - ) def test_profiler_mock_import(tmpdir): diff --git a/tests/test_run.py b/tests/test_run.py index 0c1b8882..f2e16037 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -315,7 +315,7 @@ def test_run_paramdict_uvsim(rename_beamfits, backend, tmp_path): @pytest.mark.parametrize("spectral_type", ["flat", "subband", "spectral_index"]) @pytest.mark.parametrize("nfeeds", [1, 2]) -@pytest.mark.parametrize("backend", ["rma", "send_recv"]) +@pytest.mark.parametrize("backend", ["send_recv"]) @pytest.mark.skipif( not pytest.pyuvsim_can_parallel, reason="mpi-pytest is not installed. Cannot run parallel tests.", @@ -375,6 +375,8 @@ def test_run_gleam_uvsim(spectral_type, nfeeds, backend): uv_out.history = uv_in.history uv_in.extra_keywords = uv_out.extra_keywords uv_in.telescope.mount_type = uv_out.telescope.mount_type + uv_in.filename = uv_out.filename + assert uv_in.telescope._location == uv_out.telescope._location assert uv_in == uv_out @@ -388,7 +390,7 @@ def test_run_gleam_uvsim(spectral_type, nfeeds, backend): ["spectral_index", False, True], ), ) -@pytest.mark.parametrize("backend", ["rma", "send_recv"]) +@pytest.mark.parametrize("backend", ["send_recv"]) @pytest.mark.parametrize("progbar", ["progsteps", "tqdm"]) @pytest.mark.skipif( not pytest.pyuvsim_can_parallel, @@ -549,6 +551,8 @@ def test_zenith_spectral_sim( uv_out = pyuvsim.run_uvsim( param_file, return_uv=True, backend=backend, progbar=progbar ) + if pyuvsim.mpi.rank != 0: + return if use_cli and use_uvdata: # this is different because we are writing out the analytic beam to a diff --git a/tests/test_uvsim.py b/tests/test_uvsim.py index f1b6d09e..91992863 100644 --- a/tests/test_uvsim.py +++ b/tests/test_uvsim.py @@ -463,6 +463,11 @@ def test_single_offzenith_source(beam, hera_loc): @pytest.mark.filterwarnings("ignore:No julian date given for mock catalog.") @pytest.mark.filterwarnings("ignore:No out format specified for uvdata file.") @pytest.mark.filterwarnings("ignore:The mount_type parameter must be set for UVBeam") +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +@pytest.mark.parallel(2) def test_select_antennas(): pytest.importorskip("mpi4py") @@ -1117,6 +1122,11 @@ def test_overflow_check(): pyuvsim.uvsim._check_ntasks_valid(should_fail) +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +@pytest.mark.parallel(2) def test_fullfreq_check(uvdata_two_redundant_bls_triangle_sources): # Check that run_uvdata_uvsim will error if 'spectral_type' is 'full' # and the frequencies on the catalog do not match the simulation's. @@ -1362,6 +1372,11 @@ def test_nblts_not_square(uvdata_two_redundant_bls_triangle_sources, cut_beam, b @pytest.mark.filterwarnings("ignore:The uvw_array does not match the expected") +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +@pytest.mark.parallel(2) def test_uvdata_uvsim_uvw_setting(uvdata_two_redundant_bls_triangle_sources): pytest.importorskip("mpi4py") uvd, beam_list, beam_dict, sky_model = uvdata_two_redundant_bls_triangle_sources @@ -1376,6 +1391,9 @@ def test_uvdata_uvsim_uvw_setting(uvdata_two_redundant_bls_triangle_sources): input_uv=uvd2, beam_list=beam_list, beam_dict=beam_dict, catalog=sky_model ) + if pyuvsim.mpi.rank != 0: + return + assert out_uv1 == out_uv2