diff --git a/.github/workflows/testsuite.yaml b/.github/workflows/testsuite.yaml index 39af32f7..a0ccab54 100644 --- a/.github/workflows/testsuite.yaml +++ b/.github/workflows/testsuite.yaml @@ -177,6 +177,54 @@ jobs: env_vars: OS,PYTHON fail_ci_if_error: true + only_progsteps: + env: + ENV_NAME: only_progsteps + PYTHON: "3.12" + name: No Tqdm Tests + defaults: + run: + # Adding -l {0} helps ensure conda can be found properly. + shell: bash -l {0} + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@master + with: + fetch-depth: 1 + - name: Setup Miniconda + uses: conda-incubator/setup-miniconda@v2.0.0 + with: + auto-update-conda: true + miniconda-version: "latest" + python-version: ${{ env.PYTHON }} + environment-file: ci/${{ env.ENV_NAME }}.yaml + activate-environment: ${{ env.ENV_NAME }} + + - name: Conda Info + run: | + conda info -a + conda list + PYVER=`python -c "import sys; print('{:d}.{:d}'.format(sys.version_info.major, sys.version_info.minor))"` + if [[ $PYVER != ${{ env.PYTHON }} ]]; then + exit 1; + fi + + - name: Install + run: | + pip install . + + - name: Run Tests + run: | + python -m pytest --cov=pyuvsim --cov-config=.coveragerc --cov-report xml:./coverage.xml --junitxml=test-reports/xunit.xml + + - uses: codecov/codecov-action@v1.5.2 + if: success() + with: + token: ${{secrets.CODECOV_TOKEN}} #required + file: ./coverage.xml #optional + env_vars: OS,PYTHON + fail_ci_if_error: true + warning_test: env: ENV_NAME: pyuvsim_tests_mpich diff --git a/README.md b/README.md index 8d17c978..6214a8ac 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ Optional: * lunarsky>=0.2.5 (for simulating telescopes on the moon) * python-casacore>=3.5.2 (for writing CASA measurement sets, not available on Windows) * matplotlib>=3.6 (for plotting functions) - +* tqdm ### Developer Installation If you are developing `pyuvsim`, you will need to download and install the diff --git a/ci/only_progsteps.yaml b/ci/only_progsteps.yaml new file mode 100644 index 00000000..8d79be66 --- /dev/null +++ b/ci/only_progsteps.yaml @@ -0,0 +1,26 @@ +name: only_progsteps +channels: + - conda-forge + - defaults +dependencies: + - astropy>=5.2 + - astropy-healpix>=0.6 + - coverage + - line_profiler + - mpi4py>=3.0.0 + - numpy>=1.20 + - openmpi + - pip + - psutil + - python-casacore>=3.3 + - pytest + - pytest-cov + - pytest-xdist + - pyuvdata>=2.2.10 + - pyyaml>=5.1 + - scipy>=1.3 + - setuptools_scm>=7.0.3 + - pip: + - pyradiosky>=0.2 + - lunarsky>=0.2.1 + - git+https://github.com/aelanman/analytic_diffuse diff --git a/pyproject.toml b/pyproject.toml index 78ccbd02..d6a390ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,7 +56,8 @@ plot = ["matplotlib>=3.6"] sim = ["mpi4py>=3.1.3", "psutil"] sim-test = ["pyuvsim[sim,test]", "mpi-pytest>=2025.7.0"] all = ["pyuvsim[casa,healpix,moon,plot,sim]"] -test = ["coverage", "pooch>=1.8", "pre-commit", "pytest", "pytest-cov>=5.0"] +tqdm = [ "tqdm" ] +test = ["coverage", "pooch>=1.8", "pre-commit", "pytest", "pytest-cov>=5.0", "pyuvsim[tqdm]"] doc = ["matplotlib", "pypandoc", "sphinx"] profiler = ["line-profiler"] dev = ["pyuvsim[all,test,sim-test,doc,profiler]"] @@ -78,7 +79,7 @@ plot_csv_antpos = "pyuvsim.cli:plot_csv_antpos" [tool.setuptools_scm] [tool.pytest.ini_options] -addopts = "--ignore=scripts" + addopts = "--ignore=scripts" [tool.ruff.lint] select = [ diff --git a/src/pyuvsim/cli.py b/src/pyuvsim/cli.py index 87f983cc..28777abb 100644 --- a/src/pyuvsim/cli.py +++ b/src/pyuvsim/cli.py @@ -64,6 +64,21 @@ def run_pyuvsim(argv=None): help="Also save pickled LineStats data for line profiling.", action="store_true", ) + parser.add_argument( + "--backend", + type=str, + help="Backend task collection for simulation", + choices=["rma", "send_recv"], + default="rma", + ) + + parser.add_argument( + "--progbar", + type=str, + help="Monitor and reporting module for simulation progress", + choices=["progsteps", "tqdm"], + default="progsteps", + ) args = parser.parse_args(argv) @@ -98,7 +113,11 @@ def run_pyuvsim(argv=None): if args.param is not None: uvsim.run_uvsim( - args.param, quiet=args.quiet, block_nonroot_stdout=block_nonroot_stdout + args.param, + quiet=args.quiet, + block_nonroot_stdout=block_nonroot_stdout, + backend=args.backend, + progbar=args.progbar, ) else: uvd = UVData.from_file(args.uvdata) @@ -113,12 +132,17 @@ def run_pyuvsim(argv=None): catalog=skymodel, quiet=args.quiet, block_nonroot_stdout=block_nonroot_stdout, + backend=args.backend, + progbar=args.progbar, ) - pobj = Path(args.outfile) - utils.write_uvdata( - uvd_out, - param_dict={"outdir": str(pobj.parent), "outfile_name": str(pobj.name)}, - ) + + if mpi.rank != 0: + return + + pobj = Path(args.outfile) + utils.write_uvdata( + uvd_out, param_dict={"outdir": str(pobj.parent), "outfile_name": str(pobj.name)} + ) if args.profile: dt = pytime.time() - t0 diff --git a/src/pyuvsim/mpi.py b/src/pyuvsim/mpi.py index 182e30f9..2b45f7a3 100644 --- a/src/pyuvsim/mpi.py +++ b/src/pyuvsim/mpi.py @@ -3,6 +3,7 @@ """MPI setup.""" import atexit +import enum import struct as _struct import sys from array import array as _array @@ -19,7 +20,7 @@ world_comm = None node_comm = None rank_comm = None - +status = None # Split serialized objects into chunks of 2 GiB INT_MAX = 2**31 - 1 @@ -27,6 +28,16 @@ shared_window_list = [] +class Tags(enum.IntEnum): + """Tags for MPI computations where worker nodes communicate with a main distribution node.""" + + READY = enum.auto() + START = enum.auto() + DONE = enum.auto() + EXIT = enum.auto() + ERROR = enum.auto() + + def set_mpi_excepthook(mpi_comm): """Kill the whole job on an uncaught python exception.""" @@ -38,7 +49,7 @@ def mpi_excepthook(exctype, value, traceback): # pragma: no cover sys.excepthook = mpi_excepthook -def start_mpi(block_nonroot_stdout=True): +def start_mpi(block_nonroot_stdout=True, thread_multiple=False): """ Initialize MPI if not already initialized and do setup. @@ -51,20 +62,27 @@ def start_mpi(block_nonroot_stdout=True): Redirect stdout on nonzero ranks to /dev/null, for cleaner output. """ + global world_comm, node_comm, rank_comm, rank, Npus, status + do_once = False - global world_comm, node_comm, rank_comm, rank, Npus if not MPI.Is_initialized(): # pragma: no cover - MPI.Init_thread( - MPI.THREAD_SERIALIZED - ) # RMA is incompatible with THREAD_MULTIPLE. do_once = True + + if not thread_multiple: + MPI.Init_thread( + MPI.THREAD_SERIALIZED + ) # RMA is incompatible with THREAD_MULTIPLE. + else: + MPI.Init_thread(MPI.THREAD_MULTIPLE) atexit.register(MPI.Finalize) + world_comm = MPI.COMM_WORLD node_comm = world_comm.Split_type(MPI.COMM_TYPE_SHARED) rank_comm = world_comm.Split(color=node_comm.rank) Npus = world_comm.Get_size() rank = world_comm.Get_rank() + status = MPI.Status() set_mpi_excepthook(world_comm) world_comm.Barrier() @@ -78,6 +96,7 @@ def start_mpi(block_nonroot_stdout=True): with open("/dev/null", "w") as devnull: sys.stdout = devnull atexit.register(sys.stdout.close) + atexit.register(free_shared) @@ -487,3 +506,8 @@ def get_comm(): def get_node_comm(): """Get node_comm, the Communicator for all PUs on current node.""" return node_comm + + +def get_status(): + """status, the status of an mpi message.""" + return status diff --git a/src/pyuvsim/utils.py b/src/pyuvsim/utils.py index b5d6f363..09b42d1d 100644 --- a/src/pyuvsim/utils.py +++ b/src/pyuvsim/utils.py @@ -2,6 +2,7 @@ # Licensed under the 3-clause BSD License """Define various utility functions.""" +import itertools import os import sys import time as pytime @@ -42,6 +43,14 @@ def __init__(self, maxval=None): self.curval = 0 self.remain = None + def __enter__(self): + """Enter a context manager.""" + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Exit a context manager.""" + self.finish() + def update(self, count): """ Update the progress bar. @@ -370,3 +379,49 @@ class in simulation, accounting for its attributes as well as [sys.getsizeof(v) * Ncomponents * Nfreqs for k, v in Ncomp_Nfreq_attrs.items()] ) return mem_est + + +def _grouper_it(iterable, chunksize=1): + """Chunk an iterator and return an iterator. + + Parameters + ---------- + iterable : Iterable + The iterable object to chunk + chunksize : int + size of chunks desired + + Returns + ------- + iterable chunked into sizes + """ + it = iter(iterable) + while True: + chunk = list(itertools.islice(it, chunksize)) + if not chunk: + return + yield chunk + + +def _chunked_iterator_product(iter1, iter2, chunksize1, chunksize2): + """Iterate over the product of two chunked iterators. + + Parameters + ---------- + iter1 : Iterable + One iterator to chunk through. + iter2 : Iterable + The second iterator to chunk through. + chunksize1 : int + Chunk size for iter1 + chunksize2 : int + Chunk size for iter2 + + Returns + ------- + An iterator over the chunked product of all combinations of iter1 and iter2 + + """ + for i1 in _grouper_it(iter1, chunksize1): + for i2 in _grouper_it(iter2, chunksize2): + yield i1, i2 diff --git a/src/pyuvsim/uvsim.py b/src/pyuvsim/uvsim.py index 020fce93..facbca14 100644 --- a/src/pyuvsim/uvsim.py +++ b/src/pyuvsim/uvsim.py @@ -10,6 +10,7 @@ import contextlib import warnings +from contextlib import nullcontext import astropy.units as units import numpy as np @@ -625,6 +626,220 @@ def _check_ntasks_valid(Ntasks_tot): ) +def _run_uvsim_rma( + uv_container, + local_task_iter, + Ntasks_tot, + Nsky_parts, + Nbls, + Nblts, + Nfreqs, + Nsrcs, + pbar, + quiet, +): + mpi.start_mpi() + rank = mpi.get_rank() + comm = mpi.get_comm() + if rank == 0: + vis_data = mpi.MPI.Win.Create( + uv_container._data_array.value, comm=mpi.world_comm + ) + else: + vis_data = mpi.MPI.Win.Create(None, comm=mpi.world_comm) + + engine = UVEngine() + count = mpi.Counter() + size_complex = np.ones(1, dtype=complex).nbytes + data_array_shape = (Nblts, Nfreqs, 4) + uvdata_indices = [] + + with pbar as pbar: + try: + for task in local_task_iter: + engine.set_task(task) + vis = engine.make_visibility() + + blti, freq_ind = task.uvdata_index + + uvdata_indices.append(task.uvdata_index) + + flat_ind = np.ravel_multi_index((blti, freq_ind, 0), data_array_shape) + offset = flat_ind * size_complex + + vis_data.Lock(0) + vis_data.Accumulate(vis, 0, target=offset, op=mpi.MPI.SUM) + vis_data.Unlock(0) + + cval = count.next() + if rank == 0 and not quiet: + if not isinstance(pbar, simutils.progsteps): + # need a delta if this is a tqdm progress bar + cval -= pbar.n + pbar.update(cval) + finally: + request = comm.Ibarrier() + + while not request.Test(): + if rank == 0 and not quiet: + cval = count.current_value() + if not isinstance(pbar, simutils.progsteps): + # need a delta if this is a tqdm progress bar + cval -= pbar.n + pbar.update(cval) + + count.free() + vis_data.Free() + + if rank == 0 and not quiet: + print("\nCalculations Complete.", flush=True) + + if rank == 0: + return uv_container, uvdata_indices + return None, uvdata_indices + + +def _run_uvsim_send_recv( + uv_container, + Ntasks_tot, + Nsky_parts, + Nbls, + Nblts, + Nfreqs, + Nsrcs, + beam_list, + beam_dict, + catalog, + pbar, + quiet, +): + mpi.start_mpi() + rank = mpi.get_rank() + comm = mpi.get_comm() + Npus = mpi.get_Npus() + status = mpi.get_status() + + uvdata_indices = [] + + if rank == 0: + completed_workers = 0 + n_workers = Npus - 1 + completed_tasks = 0 + task_inds = range(int(Nblts * Nfreqs)) + src_inds = range(int(Nsrcs)) + error_occurred = None + + chunksize1 = int(np.ceil(0.1 * Nblts * Nfreqs / n_workers)) + chunksize2 = int(np.ceil(Nsrcs / Nsky_parts)) + all_iter = simutils._chunked_iterator_product( + task_inds, src_inds, chunksize1, chunksize2 + ) + with pbar as pbar: + while completed_workers < n_workers: + msg = comm.recv( + source=mpi.MPI.ANY_SOURCE, tag=mpi.MPI.ANY_TAG, status=status + ) + source = status.Get_source() + tag = status.Get_tag() + + # an error occurred on a worker. + # tell all PUs to stop working + if error_occurred is not None: + comm.send(None, dest=source, tag=mpi.Tags.EXIT) + + if tag == mpi.Tags.READY: + try: + comm.send(next(all_iter), dest=source, tag=mpi.Tags.START) + except StopIteration: + comm.send(None, dest=source, tag=mpi.Tags.EXIT) + + elif tag == mpi.Tags.DONE: + uv_inds, vis = msg[0], msg[1] + uv_container.data_array[uv_inds] += vis + if isinstance(pbar, simutils.progsteps): + completed_tasks += 1 + pbar.update(completed_tasks) + else: + pbar.update(1) + + elif tag == mpi.Tags.EXIT: + completed_workers += 1 + elif tag == mpi.Tags.ERROR: + error_occurred = msg + else: + raise ValueError(f"{msg} {tag}") + + if error_occurred is not None: + raise ValueError( + f"Error occurred on worker node {error_occurred[0]}: {error_occurred[1]}" + ) + + else: + engine = UVEngine() + while True: + comm.send(None, dest=0, tag=mpi.Tags.READY) + msg = comm.recv(source=0, tag=mpi.MPI.ANY_TAG, status=status) + tag = status.Get_tag() + if tag == mpi.Tags.START: + msg_task_inds, msg_src_inds = msg + + try: + # get message with task_inds and src_inds + local_task_iter = uvdata_to_task_iter( + msg_task_inds, + uv_container, + catalog.subselect(msg_src_inds), + beam_list, + beam_dict, + Nsky_parts=Nsky_parts, + ) + for task in local_task_iter: + engine.set_task(task) + + uvdata_indices.append(task.uvdata_index) + + comm.send( + [task.uvdata_index, engine.make_visibility()], + dest=0, + tag=mpi.Tags.DONE, + ) + except Exception as err: + comm.send([rank, err], dest=0, tag=mpi.Tags.ERROR) + + elif tag == mpi.Tags.EXIT: + break + + comm.send(None, dest=0, tag=mpi.Tags.EXIT) + + if rank == 0: + return uv_container, uvdata_indices + return None, uvdata_indices + + +def _get_pbar(progbar, Ntasks_tot, rank): + if progbar not in ["tqdm", "progsteps"]: + raise ValueError( + "The progbar keyword must be one of " + "'progsteps' or 'tqdm' " + f"but received {progbar}." + ) + if progbar == "tqdm": + try: + import tqdm + except ImportError as err: + raise ImportError( + "The tqdm module must be installed to use a tqdm progress bar." + ) from err + if rank == 0: + if progbar == "tqdm": + pbar = tqdm.tqdm(total=Ntasks_tot, unit="UVTask", leave=True) + else: + pbar = simutils.progsteps(maxval=Ntasks_tot) + else: + pbar = nullcontext() + return pbar + + def _set_nsky_parts(Nsrcs, cat_nfreqs, Nsky_parts): """Set the Nsky_parts.""" # Estimating required memory to decide how to split source array. @@ -810,6 +1025,8 @@ def run_uvdata_uvsim( catalog: SkyModel | SkyModelData, beam_interp_check: bool | None = None, quiet: bool = False, + backend: str = "rma", + progbar: str = "progsteps", block_nonroot_stdout: bool = True, Nsky_parts: int | None = None, ) -> UVData: @@ -838,6 +1055,12 @@ def run_uvdata_uvsim( zenith angle. quiet : bool Do not print anything. + backend : str + Specifies the backend for simulation. + Must be one of ['rma', 'send_recv']. (Default: 'rma') + progbar : str + Specifies the progress bar class used to output simulation progress. + Must be one of ['progsteps', 'tqdm']. Default 'progsteps' block_nonroot_stdout : bool Redirect stdout on nonzero ranks to /dev/null, for cleaner output. Nsky_parts : int, optional @@ -860,7 +1083,28 @@ def run_uvdata_uvsim( "line_profiler installed." ) - mpi.start_mpi(block_nonroot_stdout=block_nonroot_stdout) + if backend not in ["rma", "send_recv"]: + raise ValueError( + "The backend keyword must be one of " + "'rma', 'send_recv' " + f"but received value {backend}." + ) + + if progbar not in ["tqdm", "progsteps"]: + raise ValueError( + "The progbar keyword must be one of " + "'progsteps' or 'tqdm' " + f"but received {progbar}." + ) + + if backend == "rma": + thread_multiple = False + else: + thread_multiple = True + + mpi.start_mpi( + block_nonroot_stdout=block_nonroot_stdout, thread_multiple=thread_multiple + ) rank = mpi.get_rank() comm = mpi.get_comm() Npus = mpi.get_Npus() @@ -871,6 +1115,14 @@ def run_uvdata_uvsim( input_uv, beam_list, catalog, input_order = _run_uvdata_input_check( input_uv=input_uv, beam_list=beam_list, catalog=catalog ) + else: + input_uv = UVData() + beam_list = BeamList([]) + beam_dict = None + catalog = SkyModelData() + + if Npus < 2: + raise ValueError("At least 2 PUs are required to run pyuvsim.") input_uv = comm.bcast(input_uv, root=0) beam_dict = comm.bcast(beam_dict, root=0) @@ -886,13 +1138,8 @@ def run_uvdata_uvsim( print("Nfreqs:", input_uv.Nfreqs, flush=True) print("Nsrcs:", catalog.Ncomponents, flush=True) uv_container = simsetup._complete_uvdata(input_uv, inplace=False) - if "world" in input_uv.extra_keywords: - uv_container.extra_keywords["world"] = input_uv.extra_keywords["world"] - vis_data = mpi.MPI.Win.Create( - uv_container._data_array.value, comm=mpi.world_comm - ) else: - vis_data = mpi.MPI.Win.Create(None, comm=mpi.world_comm) + uv_container = None Nbls = input_uv.Nbls Nblts = input_uv.Nblts @@ -900,107 +1147,109 @@ def run_uvdata_uvsim( Npols = input_uv.Npols Nsrcs = catalog.Ncomponents - task_inds, src_inds, Ntasks_local, Nsrcs_local = _make_task_inds( - Nblts, Nfreqs, Nsrcs, rank, Npus - ) + if beam_interp_check is None: + # check that all the beams cover the full sky + beam_interp_check = not beam_list.check_all_azza_beams_full_sky() - count = mpi.Counter() + Nsky_parts = _set_nsky_parts(Nsrcs, catalog.Nfreqs, Nsky_parts) + + if backend == "rma": + # rank 0 gets a blank iter to just skip the computation step + local_task_iter = iter(()) + Ntasks_local = 0 + if rank != 0: + task_inds, src_inds, Ntasks_local, Nsrcs_local = _make_task_inds( + Nblts, Nfreqs, Nsrcs, rank - 1, Npus - 1 + ) - # wrap this in a try/finally (no exception handling) to ensure resources are freed - try: - if beam_interp_check is None: - # check that all the beams cover the full sky - beam_interp_check = not beam_list.check_all_azza_beams_full_sky() + local_task_iter = uvdata_to_task_iter( + task_inds, + input_uv, + catalog.subselect(src_inds), + beam_list, + beam_dict, + Nsky_parts=Nsky_parts, + ) + elif backend == "send_recv": + # hack this a little bit, we want to have Nbls * Nfreqs * Ntimes + # total tasks we are distributing over all PUs but then we + # sum up over all PUs here in the next few lines + Ntasks_local = Nblts * Nfreqs / Npus - Nsky_parts = _set_nsky_parts(Nsrcs, catalog.Nfreqs, Nsky_parts) + Ntasks_tot = Ntasks_local * Nsky_parts + # Sum all the tasks across each node + Nsky_parts = comm.allreduce(Nsky_parts, op=mpi.MPI.MAX) + Ntasks_tot = int(np.ceil(comm.allreduce(Ntasks_tot, op=mpi.MPI.SUM))) - local_task_iter = uvdata_to_task_iter( - task_inds, - input_uv, - catalog.subselect(src_inds), + if rank == 0 and not quiet: + if Nsky_parts > 1: + print( + "The source list has been split into Nsky_parts" + f" <= {Nsky_parts} chunks on some or all nodes" + " due to memory limitations.", + flush=True, + ) + print("Tasks: ", Ntasks_tot, flush=True) + + pbar = _get_pbar(progbar, Ntasks_tot, rank) + if backend == "rma": + uv_container, uvdata_indices = _run_uvsim_rma( + uv_container, + local_task_iter, + Ntasks_tot, + Nsky_parts, + Nbls, + Nblts, + Nfreqs, + Nsrcs, + pbar, + quiet, + ) + elif backend == "send_recv": + # the compute nodes need to have the input_uv for this mode + # they make the task list on the fly + if rank != 0: + uv_container = input_uv + uv_container, uvdata_indices = _run_uvsim_send_recv( + uv_container, + Ntasks_tot, + Nsky_parts, + Nbls, + Nblts, + Nfreqs, + Nsrcs, beam_list, beam_dict, - Nsky_parts=Nsky_parts, + catalog, + pbar, + quiet, ) - Ntasks_tot = Ntasks_local * Nsky_parts - # Sum all the tasks across each node - Nsky_parts = comm.reduce(Nsky_parts, op=mpi.MPI.MAX, root=0) - Ntasks_tot = comm.reduce(Ntasks_tot, op=mpi.MPI.SUM, root=0) - if rank == 0 and not quiet: - if Nsky_parts > 1: - print( - "The source list has been split into Nsky_parts" - f" <= {Nsky_parts} chunks on some or all nodes" - " due to memory limitations.", - flush=True, - ) - print("Tasks: ", Ntasks_tot, flush=True) - pbar = simutils.progsteps(maxval=Ntasks_tot) - - engine = UVEngine() - size_complex = np.ones(1, dtype=complex).nbytes - data_array_shape = (Nblts, Nfreqs, Npols) - uvdata_indices = [] - - for task in local_task_iter: - engine.set_task(task) - vis = engine.make_visibility() - - blti, freq_ind = task.uvdata_index - - uvdata_indices.append(task.uvdata_index) - - flat_ind = np.ravel_multi_index((blti, freq_ind, 0), data_array_shape) - offset = flat_ind * size_complex - - vis_data.Lock(0) - vis_data.Accumulate(vis, 0, target=offset, op=mpi.MPI.SUM) - vis_data.Unlock(0) - - cval = count.next() - if rank == 0 and not quiet: - pbar.update(cval) - - request = comm.Ibarrier() - - while not request.Test(): - if rank == 0 and not quiet: - cval = count.current_value() - pbar.update(cval) - if rank == 0 and not quiet: - pbar.finish() + # If profiling is active, save meta data: + from .profiling import prof # noqa + + if hasattr(prof, "meta_file") and len(uvdata_indices) > 0: # pragma: nocover + # Saving axis sizes on current rank (local) and for the whole job (global). + # These lines are affected by issue 179 of line_profiler, so the nocover + # above will need to stay until this issue is resolved (see profiling.py). + task_inds = np.array(uvdata_indices) + + bl_inds = task_inds[:, 0] % Nbls + time_inds = (task_inds[:, 0] - bl_inds) // Nbls + Ntimes_loc = np.unique(time_inds).size + Nbls_loc = np.unique(bl_inds).size + Nfreqs_loc = np.unique(task_inds[:, 1]).size + axes_dict = { + "Ntimes_loc": Ntimes_loc, + "Nbls_loc": Nbls_loc, + "Nfreqs_loc": Nfreqs_loc, + "Nsrcs_loc": Nsky_parts, + "prof_rank": prof.rank, + } - if rank == 0 and not quiet: - print("Calculations Complete.", flush=True) - - # If profiling is active, save meta data: - from .profiling import prof # noqa - - if hasattr(prof, "meta_file"): # pragma: nocover - # Saving axis sizes on current rank (local) and for the whole job (global). - # These lines are affected by issue 179 of line_profiler, so the nocover - # above will need to stay until this issue is resolved (see profiling.py). - task_inds = np.array(uvdata_indices) - bl_inds = task_inds[:, 0] % Nbls - time_inds = (task_inds[:, 0] - bl_inds) // Nbls - Ntimes_loc = np.unique(time_inds).size - Nbls_loc = np.unique(bl_inds).size - Nfreqs_loc = np.unique(task_inds[:, 1]).size - axes_dict = { - "Ntimes_loc": Ntimes_loc, - "Nbls_loc": Nbls_loc, - "Nfreqs_loc": Nfreqs_loc, - "Nsrcs_loc": Nsky_parts, - "prof_rank": prof.rank, - } - - with open(prof.meta_file, "w") as afile: - for k, v in axes_dict.items(): - afile.write(f"{k} \t {int(v):d}\n") - finally: - count.free() - vis_data.Free() + with open(prof.meta_file, "w") as afile: + for k, v in axes_dict.items(): + afile.write(f"{k} \t {int(v):d}\n") if rank == 0: # This needs to be done after the simulation completes because it @@ -1012,12 +1261,16 @@ def run_uvdata_uvsim( return uv_container + return + def run_uvsim( params, return_uv=False, beam_interp_check=None, quiet=False, + backend="rma", + progbar="progsteps", block_nonroot_stdout=True, ): """ @@ -1039,6 +1292,12 @@ def run_uvsim( zenith angle. quiet : bool If True, do not print anything to stdout. (Default False) + backend : str + Specifies the backend for simulation. + Must be one of ['rma', 'send_recv']. (Default: 'rma') + progbar : str + Specifies the progress bar class used to output simulation progress. + Must be one of ['progsteps', 'tqdm']. Default 'progsteps' block_nonroot_stdout : bool Redirect stdout on nonzero ranks to /dev/null, for cleaner output. @@ -1057,7 +1316,28 @@ def run_uvsim( "line_profiler installed." ) - mpi.start_mpi(block_nonroot_stdout=block_nonroot_stdout) + if backend not in ["rma", "send_recv"]: + raise ValueError( + "The backend keyword must be one of " + "'rma', 'send_recv' " + f"but received value {backend}." + ) + + if progbar not in ["tqdm", "progsteps"]: + raise ValueError( + "The progbar keyword must be one of " + "'progsteps' or 'tqdm' " + f"but received {progbar}." + ) + + if backend == "rma": + thread_multiple = False + else: + thread_multiple = True + + mpi.start_mpi( + thread_multiple=thread_multiple, block_nonroot_stdout=block_nonroot_stdout + ) rank = mpi.get_rank() input_uv = UVData() @@ -1087,11 +1367,13 @@ def run_uvsim( beam_dict=beam_dict, catalog=skydata, quiet=quiet, + backend=backend, + progbar=progbar, beam_interp_check=beam_interp_check, ) if rank == 0 and not quiet: - print(f"Run uvdata uvsim took {(Time.now() - start).to('minute'):.3f}") + print(f"\nRun uvdata uvsim took {(Time.now() - start).to('minute'):.3f}") if rank == 0: if isinstance(params, str): diff --git a/tests/test_profiler.py b/tests/test_profiler.py index 38ca5249..eae489c5 100644 --- a/tests/test_profiler.py +++ b/tests/test_profiler.py @@ -25,19 +25,33 @@ def profdata_dir_setup(tmpdir): @pytest.mark.filterwarnings("ignore:The mount_type parameter must be set for UVBeam") -def test_profiler(tmpdir): +@pytest.mark.parametrize("backend", ["rma", "send_recv"]) +@pytest.mark.parametrize("progbar", ["progsteps", "tqdm"]) +@pytest.mark.parallel(2) +def test_profiler(tmpdir, backend, progbar): + if progbar == "tqdm": + pytest.importorskip("tqdm") line_profiler = pytest.importorskip("line_profiler") + outpath = profdata_dir_setup(tmpdir) testprof_fname = str(outpath.join("time_profile.out")) pyuvsim.profiling.set_profiler(outfile_prefix=testprof_fname, dump_raw=True) + with check_warnings(UserWarning, match="Profiler already set"): pyuvsim.profiling.set_profiler( outfile_prefix=testprof_fname[:-4], dump_raw=True ) + param_filename = os.path.join( SIM_DATA_PATH, "test_config", "param_1time_1src_testcat.yaml" ) - pyuvsim.uvsim.run_uvsim(param_filename, return_uv=True) + pyuvsim.uvsim.run_uvsim( + param_filename, return_uv=True, backend=backend, progbar=progbar + ) + + if pyuvsim.mpi.rank != 0: + return + time_profiler = pyuvsim.profiling.get_profiler() time_profiler.disable_by_count() assert isinstance(time_profiler, line_profiler.LineProfiler) diff --git a/tests/test_run.py b/tests/test_run.py index 2c167d0f..f2e16037 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -26,6 +26,8 @@ c_ms = speed_of_light.to("m/s").value pytest.importorskip("mpi4py") # noqa +# everything in this file requires 2 PUs +pytestmark = pytest.mark.parallel(2) @pytest.mark.filterwarnings("ignore:Fixing auto-correlations to be be real-only") @@ -36,8 +38,8 @@ not pytest.pyuvsim_can_parallel, reason="mpi-pytest is not installed. Cannot run parallel tests.", ) -@pytest.mark.parallel(2) -def test_run_paramfile_uvsim(goto_tempdir, paramfile): +@pytest.mark.parametrize("backend", ["rma", "send_recv"]) +def test_run_paramfile_uvsim(goto_tempdir, paramfile, backend): # Test vot and txt catalogs for parameter simulation # Compare to reference files. ref_file = os.path.join(SIM_DATA_PATH, "testfile_singlesource.uvh5") @@ -45,7 +47,7 @@ def test_run_paramfile_uvsim(goto_tempdir, paramfile): param_filename = os.path.join(SIM_DATA_PATH, "test_config", paramfile) # This test obsparam file has "single_source.txt" as its catalog. - pyuvsim.uvsim.run_uvsim(param_filename) + pyuvsim.uvsim.run_uvsim(param_filename, backend=backend) # Loading the file and comparing is only done on rank 0. if pyuvsim.mpi.rank != 0: @@ -76,6 +78,9 @@ def test_run_paramfile_uvsim(goto_tempdir, paramfile): uv_new.extra_keywords = uv_ref.extra_keywords uv_ref.telescope.mount_type = uv_new.telescope.mount_type + # remove filename attribute to ensure equality + uv_new.filename = None + uv_ref.filename = None assert uv_new == uv_ref @@ -94,12 +99,19 @@ def test_run_paramfile_uvsim(goto_tempdir, paramfile): ("monopole-nonflat", 3e-4), ], ) -def test_analytic_diffuse(model, tol, tmpdir): +@pytest.mark.parametrize("backend", ["rma", "send_recv"]) +@pytest.mark.parametrize("progbar", ["progsteps", "tqdm"]) +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +def test_analytic_diffuse(model, tol, tmpdir, backend, progbar): # Generate the given model and simulate for a few baselines. # Import from analytic_diffuse (consider moving to rasg_affiliates?) - pytest.importorskip("analytic_diffuse") + analytic_diffuse = pytest.importorskip("analytic_diffuse") pytest.importorskip("astropy_healpix") - import analytic_diffuse + if progbar == "tqdm": + pytest.importorskip("tqdm") modname = model use_w = False @@ -152,18 +164,26 @@ def test_analytic_diffuse(model, tol, tmpdir): with open(obspar_path, "w") as ofile: yaml.dump(obspar, ofile, default_flow_style=False) - uv_out = pyuvsim.run_uvsim(obspar_path, return_uv=True) - # Convert from Jy to K sr - dat = uv_out.data_array[:, 0, 0] * jy_to_ksr(uv_out.freq_array[0]).value - # Evaluate the solution and compare to visibilities. - soln = analytic_diffuse.get_solution(modname) - uvw_lam = uv_out.uvw_array * uv_out.freq_array[0] / c_ms - ana = soln(uvw_lam, **params) - np.testing.assert_allclose(ana / 2, dat, atol=tol, rtol=0) + uv_out = pyuvsim.run_uvsim( + obspar_path, return_uv=True, backend=backend, progbar=progbar + ) + if pyuvsim.mpi.rank == 0: + # Convert from Jy to K sr + dat = uv_out.data_array[:, 0, 0] * jy_to_ksr(uv_out.freq_array[0]).value + # Evaluate the solution and compare to visibilities. + soln = analytic_diffuse.get_solution(modname) + uvw_lam = uv_out.uvw_array * uv_out.freq_array[0] / c_ms + ana = soln(uvw_lam, **params) + np.testing.assert_allclose(ana / 2, dat, atol=tol, rtol=0) @pytest.mark.filterwarnings("ignore:antenna_diameters are not set") -def test_diffuse_units(tmpdir): +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +@pytest.mark.parametrize("backend", ["rma", "send_recv"]) +def test_diffuse_units(tmpdir, backend): pytest.importorskip("analytic_diffuse") pytest.importorskip("astropy_healpix") @@ -226,14 +246,14 @@ def test_diffuse_units(tmpdir): with open(obspar_path, "w") as ofile: yaml.dump(obspar, ofile, default_flow_style=False) - uv_out_k = pyuvsim.run_uvsim(obspar_path, return_uv=True) + uv_out_k = pyuvsim.run_uvsim(obspar_path, backend=backend, return_uv=True) obspar["sources"] = {"catalog": catpath_jy} obspar["filing"]["outfile_name"] = "diffuse_units_jy_sim.uvh5" with open(obspar_path, "w") as ofile: yaml.dump(obspar, ofile, default_flow_style=False) - uv_out_jy = pyuvsim.run_uvsim(obspar_path, return_uv=True) + uv_out_jy = pyuvsim.run_uvsim(obspar_path, backend=backend, return_uv=True) # make the histories the same for comparison uv_out_jy.history = uv_out_k.history @@ -244,7 +264,12 @@ def test_diffuse_units(tmpdir): @pytest.mark.parametrize("rename_beamfits", [True, False]) -def test_run_paramdict_uvsim(rename_beamfits, tmp_path): +@pytest.mark.parametrize("backend", ["rma", "send_recv"]) +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +def test_run_paramdict_uvsim(rename_beamfits, backend, tmp_path): # Running a simulation from parameter dictionary. param_file = os.path.join( SIM_DATA_PATH, "test_config", "param_1time_1src_testcat.yaml" @@ -285,12 +310,17 @@ def test_run_paramdict_uvsim(rename_beamfits, tmp_path): else: params = pyuvsim.simsetup._config_str_to_dict(param_file) - pyuvsim.run_uvsim(params, return_uv=True) + pyuvsim.run_uvsim(params, backend=backend, return_uv=True) @pytest.mark.parametrize("spectral_type", ["flat", "subband", "spectral_index"]) @pytest.mark.parametrize("nfeeds", [1, 2]) -def test_run_gleam_uvsim(spectral_type, nfeeds): +@pytest.mark.parametrize("backend", ["send_recv"]) +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +def test_run_gleam_uvsim(spectral_type, nfeeds, backend): params = pyuvsim.simsetup._config_str_to_dict( os.path.join(SIM_DATA_PATH, "test_config", "param_1time_testgleam.yaml") ) @@ -316,7 +346,11 @@ def test_run_gleam_uvsim(spectral_type, nfeeds): for beam_inter in beam_list: assert beam_inter.Nfeeds == nfeeds - uv_out = pyuvsim.run_uvsim(params, return_uv=True) + uv_out = pyuvsim.run_uvsim(params, backend=backend, return_uv=True) + + if pyuvsim.mpi.rank != 0: + return + assert uv_out.telescope.name == "Triangle" file_name = f"gleam_triangle_{spectral_type}.uvh5" @@ -341,6 +375,8 @@ def test_run_gleam_uvsim(spectral_type, nfeeds): uv_out.history = uv_in.history uv_in.extra_keywords = uv_out.extra_keywords uv_in.telescope.mount_type = uv_out.telescope.mount_type + uv_in.filename = uv_out.filename + assert uv_in.telescope._location == uv_out.telescope._location assert uv_in == uv_out @@ -354,9 +390,19 @@ def test_run_gleam_uvsim(spectral_type, nfeeds): ["spectral_index", False, True], ), ) -def test_zenith_spectral_sim(spectral_type, tmpdir, use_cli, use_uvdata): +@pytest.mark.parametrize("backend", ["send_recv"]) +@pytest.mark.parametrize("progbar", ["progsteps", "tqdm"]) +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +def test_zenith_spectral_sim( + spectral_type, tmpdir, use_cli, use_uvdata, backend, progbar +): # Make a power law source at zenith in three ways. # Confirm that simulated visibilities match expectation. + if progbar == "tqdm": + pytest.importorskip("tqdm") params = pyuvsim.simsetup._config_str_to_dict( os.path.join(SIM_DATA_PATH, "test_config", "param_1time_1src_testcat.yaml") @@ -464,6 +510,10 @@ def test_zenith_spectral_sim(spectral_type, tmpdir, use_cli, use_uvdata): cat_in, "--outfile", uvd_out, + "--backend", + backend, + "--progbar", + progbar, ] ) else: @@ -478,6 +528,10 @@ def test_zenith_spectral_sim(spectral_type, tmpdir, use_cli, use_uvdata): "--quiet", "--keep_nonroot_stdout", "--raw_profile", + "--backend", + backend, + "--progbar", + progbar, ] ) assert os.path.exists(profile_file) @@ -486,10 +540,19 @@ def test_zenith_spectral_sim(spectral_type, tmpdir, use_cli, use_uvdata): else: if use_uvdata: uv_out = pyuvsim.run_uvdata_uvsim( - input_uv=uvd, beam_list=beam_list, beam_dict=beam_dict, catalog=catalog + input_uv=uvd, + beam_list=beam_list, + beam_dict=beam_dict, + catalog=catalog, + backend=backend, + progbar=progbar, ) else: - uv_out = pyuvsim.run_uvsim(param_file, return_uv=True) + uv_out = pyuvsim.run_uvsim( + param_file, return_uv=True, backend=backend, progbar=progbar + ) + if pyuvsim.mpi.rank != 0: + return if use_cli and use_uvdata: # this is different because we are writing out the analytic beam to a @@ -501,6 +564,10 @@ def test_zenith_spectral_sim(spectral_type, tmpdir, use_cli, use_uvdata): np.testing.assert_allclose(uv_out.data_array[ii, :, 0], exp_spectrum) +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) def test_pol_error(): # Check that running with a uvdata object without the proper polarizations will fail. hera_uv = UVData() @@ -521,6 +588,10 @@ def test_pol_error(): @pytest.mark.filterwarnings("ignore:Setting the location attribute post initialization") @pytest.mark.parametrize("selenoid", ["SPHERE", "GSFC", "GRAIL23", "CE-1-LAM-GEO"]) +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) def test_sim_on_moon(goto_tempdir, selenoid): pytest.importorskip("lunarsky") from spiceypy.utils.exceptions import SpiceUNKNOWNFRAME @@ -591,6 +662,9 @@ def test_sim_on_moon(goto_tempdir, selenoid): except SpiceUNKNOWNFRAME as err: pytest.skip("SpiceUNKNOWNFRAME error: " + str(err)) + if pyuvsim.mpi.rank != 0: + return + assert history_utils._check_history_version(uv_out.history, pyradiosky.__version__) assert history_utils._check_history_version(uv_out.history, pyuvdata.__version__) assert history_utils._check_history_version(uv_out.history, pyuvsim.__version__) @@ -603,6 +677,7 @@ def test_sim_on_moon(goto_tempdir, selenoid): # Lunar Frame Roundtripping param_dict["filing"]["outdir"] = str(tmpdir) + uv_filename = pyuvsim.utils.write_uvdata( uv_out, param_dict, return_filename=True, quiet=True ) @@ -612,6 +687,10 @@ def test_sim_on_moon(goto_tempdir, selenoid): @pytest.mark.parametrize("selenoid", ["SPHERE", "GSFC", "GRAIL23", "CE-1-LAM-GEO"]) +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) def test_lunar_gauss(goto_tempdir, selenoid): pytest.importorskip("lunarsky") from lunarsky import MoonLocation @@ -660,8 +739,12 @@ def test_lunar_gauss(goto_tempdir, selenoid): except SpiceUNKNOWNFRAME as err: pytest.skip("SpiceUNKNOWNFRAME error: " + str(err)) + if pyuvsim.mpi.rank != 0: + return + assert uv_out.telescope.location.ellipsoid == selenoid + assert uv_out._telescope_location.ellipsoid == selenoid # Skymodel and update positions # Init sky model diff --git a/tests/test_uvsim.py b/tests/test_uvsim.py index 43c7120c..91992863 100644 --- a/tests/test_uvsim.py +++ b/tests/test_uvsim.py @@ -463,6 +463,11 @@ def test_single_offzenith_source(beam, hera_loc): @pytest.mark.filterwarnings("ignore:No julian date given for mock catalog.") @pytest.mark.filterwarnings("ignore:No out format specified for uvdata file.") @pytest.mark.filterwarnings("ignore:The mount_type parameter must be set for UVBeam") +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +@pytest.mark.parallel(2) def test_select_antennas(): pytest.importorskip("mpi4py") @@ -767,9 +772,15 @@ def test_local_task_gen(): @pytest.mark.filterwarnings("ignore:The parameter `blt_order` could not be identified") +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +@pytest.mark.parallel(2) def test_nsky_parts_large(capsys): """Check that we get the same visibilities no matter what Nsky_parts is set to.""" pytest.importorskip("mpi4py") + pyuvsim.mpi.start_mpi() hera_uv = UVData.from_file(EW_uvfits_10time10chan) hera_uv.select(times=np.unique(hera_uv.time_array)[0:3], freq_chans=range(3)) time = Time(hera_uv.time_array[0], scale="utc", format="jd") @@ -796,9 +807,10 @@ def test_nsky_parts_large(capsys): Nsky_parts=5, ) - captured = capsys.readouterr() - assert "The source list has been split into Nsky_parts" in captured.out - assert out_uv_single_nsky == out_uv_multi_nsky + if pyuvsim.mpi.get_rank() == 0: + captured = capsys.readouterr() + assert "The source list has been split into Nsky_parts" in captured.out + assert out_uv_single_nsky == out_uv_multi_nsky def test_set_nsky_parts_errors(): @@ -1110,6 +1122,11 @@ def test_overflow_check(): pyuvsim.uvsim._check_ntasks_valid(should_fail) +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +@pytest.mark.parallel(2) def test_fullfreq_check(uvdata_two_redundant_bls_triangle_sources): # Check that run_uvdata_uvsim will error if 'spectral_type' is 'full' # and the frequencies on the catalog do not match the simulation's. @@ -1158,7 +1175,13 @@ def test_run_mpierr(hera_loc, cst_beam): beam_list.share() +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +@pytest.mark.filterwarnings("ignore:Cannot check consistency of a string-mode BeamList") @pytest.mark.parametrize("order", [("bda",), ("baseline", "time"), ("ant2", "time")]) +@pytest.mark.parallel(2) def test_ordering(uvdata_two_redundant_bls_triangle_sources, order): pytest.importorskip("mpi4py") uvdata_linear, beam_list, beam_dict, sky_model = ( @@ -1177,6 +1200,11 @@ def test_ordering(uvdata_two_redundant_bls_triangle_sources, order): beam_dict=beam_dict, catalog=sky_model, ) + rank = pyuvsim.mpi.get_rank() + # rank 0 is the only one with the full uvdata object + if rank != 0: + return + assert out_uv.blt_order == order assert out_uv.blt_order == uvdata_linear.blt_order @@ -1194,9 +1222,19 @@ def test_ordering(uvdata_two_redundant_bls_triangle_sources, order): ) +@pytest.mark.filterwarnings("ignore:Cannot check consistency of a string-mode BeamList") +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +@pytest.mark.parallel(2) @pytest.mark.parametrize("order", [("bda",), ("baseline", "time"), ("ant2", "time")]) def test_order_warning(uvdata_two_redundant_bls_triangle_sources, order): pytest.importorskip("mpi4py") + # need to get the mpi initialized + # now that simulations require at least 2 PUs + pyuvsim.mpi.start_mpi() + rank = pyuvsim.mpi.get_rank() uvdata_linear, beam_list, beam_dict, sky_model = ( uvdata_two_redundant_bls_triangle_sources ) @@ -1208,22 +1246,40 @@ def test_order_warning(uvdata_two_redundant_bls_triangle_sources, order): uvdata_linear.reorder_blts(order=order[0], minor_order=minor_order) # delete the order like we forgot to set it uvdata_linear.blt_order = None - with check_warnings( - UserWarning, match="The parameter `blt_order` could not be identified." - ): + if rank == 0: + with check_warnings( + UserWarning, match="The parameter `blt_order` could not be identified." + ): + out_uv = pyuvsim.uvsim.run_uvdata_uvsim( + input_uv=uvdata_linear.copy(), + beam_list=beam_list, + beam_dict=beam_dict, + catalog=sky_model, + ) + + assert out_uv.blt_order == ("time", "baseline") + else: out_uv = pyuvsim.uvsim.run_uvdata_uvsim( input_uv=uvdata_linear.copy(), beam_list=beam_list, beam_dict=beam_dict, catalog=sky_model, ) - assert out_uv.blt_order == ("time", "baseline") +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +@pytest.mark.parallel(2) @pytest.mark.filterwarnings("ignore:key beam_path in extra_keywords is longer than 8") @pytest.mark.parametrize("cut_beam", [10, 85, 90]) -def test_nblts_not_square(uvdata_two_redundant_bls_triangle_sources, cut_beam): +@pytest.mark.parametrize("backend", ["rma", "send_recv"]) +def test_nblts_not_square(uvdata_two_redundant_bls_triangle_sources, cut_beam, backend): pytest.importorskip("mpi4py") + pyuvsim.mpi.start_mpi(block_nonroot_stdout=False) + rank = pyuvsim.mpi.get_rank() + uvdata_linear, beam_list, beam_dict, sky_model = ( uvdata_two_redundant_bls_triangle_sources ) @@ -1247,27 +1303,63 @@ def test_nblts_not_square(uvdata_two_redundant_bls_triangle_sources, cut_beam): indices = indices[::2] blt_inds = np.delete(np.arange(uvdata_linear.Nblts), indices) uvdata_linear.select(blt_inds=blt_inds) - assert uvdata_linear.Nblts != uvdata_linear.Nbls * uvdata_linear.Ntimes if cut_beam < 85: # There's a source out at ~80 degrees - with pytest.raises( - ValueError, - match="at least one interpolation location is outside of the UVBeam", - ): - out_uv = pyuvsim.uvsim.run_uvdata_uvsim( - input_uv=uvdata_linear.copy(), - beam_list=beam_list, - beam_dict=beam_dict, - catalog=sky_model, - ) + + # for send_recv the error is sent to node 0 and not raised on worker PUs + if backend == "send_recv": + if rank == 0: + with pytest.raises( + ValueError, + match=r".*at least one interpolation location is outside of the UVBeam", + ): + out_uv = pyuvsim.uvsim.run_uvdata_uvsim( + input_uv=uvdata_linear.copy(), + beam_list=beam_list, + beam_dict=beam_dict, + catalog=sky_model, + backend=backend, + ) + else: + pyuvsim.uvsim.run_uvdata_uvsim( + input_uv=uvdata_linear.copy(), + beam_list=beam_list, + beam_dict=beam_dict, + catalog=sky_model, + backend=backend, + ) + + # work only occurs on non rank 0 nodes. check for the error there. + else: + if rank != 0: + with pytest.raises( + ValueError, + match="at least one interpolation location is outside of the UVBeam", + ): + out_uv = pyuvsim.uvsim.run_uvdata_uvsim( + input_uv=uvdata_linear.copy(), + beam_list=beam_list, + beam_dict=beam_dict, + catalog=sky_model, + backend=backend, + ) + else: + pyuvsim.uvsim.run_uvdata_uvsim( + input_uv=uvdata_linear.copy(), + beam_list=beam_list, + beam_dict=beam_dict, + catalog=sky_model, + backend=backend, + ) else: out_uv = pyuvsim.uvsim.run_uvdata_uvsim( input_uv=uvdata_linear.copy(), beam_list=beam_list, beam_dict=beam_dict, catalog=sky_model, + backend=backend, ) np.testing.assert_allclose(out_uv.get_data((0, 1)), out_uv.get_data((1, 2))) @@ -1280,6 +1372,11 @@ def test_nblts_not_square(uvdata_two_redundant_bls_triangle_sources, cut_beam): @pytest.mark.filterwarnings("ignore:The uvw_array does not match the expected") +@pytest.mark.skipif( + not pytest.pyuvsim_can_parallel, + reason="mpi-pytest is not installed. Cannot run parallel tests.", +) +@pytest.mark.parallel(2) def test_uvdata_uvsim_uvw_setting(uvdata_two_redundant_bls_triangle_sources): pytest.importorskip("mpi4py") uvd, beam_list, beam_dict, sky_model = uvdata_two_redundant_bls_triangle_sources @@ -1294,6 +1391,9 @@ def test_uvdata_uvsim_uvw_setting(uvdata_two_redundant_bls_triangle_sources): input_uv=uvd2, beam_list=beam_list, beam_dict=beam_dict, catalog=sky_model ) + if pyuvsim.mpi.rank != 0: + return + assert out_uv1 == out_uv2 @@ -1363,3 +1463,46 @@ def test_run_uvdata_uvsim_errors(kwargs, msg): pytest.importorskip("mpi4py") with pytest.raises(TypeError, match=msg): pyuvsim.run_uvdata_uvsim(**kwargs) + + +def test_tqdm_import_error(): + pytest.importorskip("mpi4py") + try: + import tqdm # noqa + + return + except ImportError: + with pytest.raises(ImportError, match="The tqdm module must be"): + pyuvsim.uvsim._get_pbar("tqdm", None, None) + + +def test_progbar_error_get_pbar(): + pytest.importorskip("mpi4py") + with pytest.raises(ValueError, match="The progbar keyword must be one of "): + pyuvsim.uvsim._get_pbar("foo", None, None) + + +def test_progbar_error_uvdata_uvsim(): + pytest.importorskip("mpi4py") + with pytest.raises(ValueError, match="The progbar keyword must be one of "): + pyuvsim.run_uvdata_uvsim( + None, None, None, None, False, backend="rma", progbar="Foo" + ) + + +def test_progbar_error_run_uvsim(): + pytest.importorskip("mpi4py") + with pytest.raises(ValueError, match="The progbar keyword must be one of "): + pyuvsim.run_uvsim(None, False, None, backend="rma", progbar="Foo") + + +def test_backend_error_uvdata_uvsim(): + pytest.importorskip("mpi4py") + with pytest.raises(ValueError, match="The backend keyword must be one of "): + pyuvsim.run_uvdata_uvsim(None, None, None, None, False, backend="Test") + + +def test_backend_error_run_uvsim(): + pytest.importorskip("mpi4py") + with pytest.raises(ValueError, match="The backend keyword must be one of "): + pyuvsim.run_uvsim(None, False, False, backend="Test")