diff --git a/src/pyuvdata/analytic_beam.py b/src/pyuvdata/analytic_beam.py index d1b788bd6..770990a00 100644 --- a/src/pyuvdata/analytic_beam.py +++ b/src/pyuvdata/analytic_beam.py @@ -388,18 +388,24 @@ def _check_eval_inputs( raise ValueError("az_array and za_array must have the same shape.") def _get_empty_data_array( - self, grid_shape: tuple[int, int], beam_type: str = "efield" + self, + grid_shape: tuple[int, int], + beam_type: Literal[ + "efield", "power", "feed_iresponse", "feed_projection" + ] = "efield", ) -> FloatArray: """Get the empty data to fill in the eval methods.""" - if beam_type == "efield": + if beam_type in ["efield", "feed_projection"]: return np.zeros( (self.Naxes_vec, self.Nfeeds, *grid_shape), dtype=np.complex128 ) - else: + elif beam_type == "power": return np.zeros( (1, self.Npols, *grid_shape), dtype=np.complex128 if self.Npols > self.Nfeeds else np.float64, ) + elif beam_type == "feed_iresponse": + return np.zeros((1, self.Nfeeds, *grid_shape), dtype=np.complex128) def efield_eval( self, *, az_array: FloatArray, za_array: FloatArray, freq_array: FloatArray @@ -445,9 +451,9 @@ def efield_eval( data_array = self._get_empty_data_array(az_grid.shape) - for fn in np.arange(self.Nfeeds): - data_array[0, fn, :, :] = np.sqrt(power_vals[fn] / 2.0) - data_array[1, fn, :, :] = np.sqrt(power_vals[fn] / 2.0) + for feed_i in np.arange(self.Nfeeds): + data_array[0, feed_i, :, :] = np.sqrt(power_vals[feed_i] / 2.0) + data_array[1, feed_i, :, :] = np.sqrt(power_vals[feed_i] / 2.0) return data_array @@ -510,11 +516,123 @@ def power_eval( return data_array + def feed_iresponse_eval( + self, *, az_array: FloatArray, za_array: FloatArray, freq_array: FloatArray + ) -> FloatArray: + """ + Evaluate the feed I response at the given coordinates. + + Parameters + ---------- + az_array : array_like of floats, optional + Azimuth values to evaluate the beam at in radians. Must be the same shape + as za_array. + za_array : array_like of floats, optional + Zenith values to evaluate the beam at in radians. Must be the same shape + as az_array. + freq_array : array_like of floats, optional + Frequency values to evaluate the beam at in Hertz. + + Returns + ------- + array_like of complex + An array of beam values. The shape of the evaluated data will be: + (1, Nfeeds, freq_array.size, az_array.size) + + """ + self._check_eval_inputs( + az_array=az_array, za_array=za_array, freq_array=freq_array + ) + + if hasattr(self, "_feed_iresponse_eval"): + za_grid, _ = np.meshgrid(za_array, freq_array) + az_grid, f_grid = np.meshgrid(az_array, freq_array) + + return self._feed_iresponse_eval( + az_grid=az_grid, za_grid=za_grid, f_grid=f_grid + ).astype(complex) + else: + efield_vals = self.efield_eval( + za_array=za_array, az_array=az_array, freq_array=freq_array + ) + + unpolarized = True + for feed_i in np.arange(self.Nfeeds): + if not np.allclose(efield_vals[0, feed_i], efield_vals[1, feed_i]): + unpolarized = False + + if unpolarized: + # for unpolarized beams, where the response to all vector direction + # is identical, f is the sum over that direction divided by sqrt(2) + # this works better than what is below because it handles negatives + # in e.g. AiryBeams + data_array = np.sum(efield_vals, axis=0) / np.sqrt(2) + else: + # if polarized default f to the magnitude, assume zero time delay + data_array = np.sqrt(np.sum(np.abs(efield_vals) ** 2, axis=0)) + data_array = data_array[np.newaxis] + + return data_array.astype(complex) + + def feed_projection_eval( + self, *, az_array: FloatArray, za_array: FloatArray, freq_array: FloatArray + ) -> FloatArray: + """ + Evaluate the feed projection at the given coordinates. + + Parameters + ---------- + az_array : array_like of floats, optional + Azimuth values to evaluate the beam at in radians. Must be the same shape + as za_array. + za_array : array_like of floats, optional + Zenith values to evaluate the beam at in radians. Must be the same shape + as az_array. + freq_array : array_like of floats, optional + Frequency values to evaluate the beam at in Hertz. + + Returns + ------- + array_like of complex + An array of beam values. The shape of the evaluated data will be: + (Naxes_vec, Nfeeds, freq_array.size, az_array.size) + + """ + self._check_eval_inputs( + az_array=az_array, za_array=za_array, freq_array=freq_array + ) + + za_grid, _ = np.meshgrid(za_array, freq_array) + az_grid, f_grid = np.meshgrid(az_array, freq_array) + + if hasattr(self, "_feed_projection_eval"): + return self._feed_projection_eval( + az_grid=az_grid, za_grid=za_grid, f_grid=f_grid + ).astype(complex) + else: + efield_vals = self.efield_eval( + az_array=az_array, za_array=za_array, freq_array=freq_array + ) + + # set f to the magnitude of the I response, assume no time delays + f_vals = self.feed_iresponse_eval( + az_array=az_array, za_array=za_array, freq_array=freq_array + ) + data_array = self._get_empty_data_array(az_grid.shape) + + for fn in np.arange(self.Nfeeds): + data_array[0, fn] = efield_vals[0, fn] / f_vals[0, fn] + data_array[1, fn] = efield_vals[1, fn] / f_vals[0, fn] + + return data_array + @combine_docstrings(UVBeam.new) def to_uvbeam( self, freq_array: FloatArray, - beam_type: Literal["efield", "power"] = "efield", + beam_type: Literal[ + "efield", "power", "feed_iresponse", "feed_projection" + ] = "efield", pixel_coordinate_system: ( Literal["az_za", "orthoslant_zenith", "healpix"] | None ) = None, @@ -535,7 +653,7 @@ def to_uvbeam( freq_array : ndarray of float Array of frequencies in Hz to evaluate the beam at. beam_type : str - Beam type, either "efield" or "power". + Beam type, one of "efield", "power", "feed_iresponse" or "feed_projection". pixel_coordinate_system : str Pixel coordinate system, options are "az_za", "orthoslant_zenith" and "healpix". Forced to be "healpix" if ``nside`` is given and by @@ -558,13 +676,14 @@ def to_uvbeam( Healpix ordering parameter, defaults to "ring" if nside is provided. """ - if beam_type not in ["efield", "power"]: - raise ValueError("Beam type must be 'efield' or 'power'") + allowed_beam_types = ["efield", "power", "feed_iresponse", "feed_projection"] + if beam_type not in allowed_beam_types: + raise ValueError(f"Beam type must be one of {allowed_beam_types}") - if beam_type == "efield": - polarization_array = None - else: + if beam_type == "power": polarization_array = self.polarization_array + else: + polarization_array = None mount_type = self.mount_type if hasattr(self, "mount_type") else None @@ -584,6 +703,7 @@ def to_uvbeam( uvb = UVBeam.new( telescope_name="Analytic Beam", + beam_type=beam_type, data_normalization="physical", feed_name=self.__repr__(), feed_version="1.0", @@ -623,10 +743,7 @@ def to_uvbeam( az_array = az_array.flatten() za_array = za_array.flatten() - if beam_type == "efield": - eval_function = "efield_eval" - else: - eval_function = "power_eval" + eval_function = f"{beam_type}_eval" data_array = getattr(self, eval_function)( az_array=az_array, za_array=za_array, freq_array=freq_array @@ -643,7 +760,9 @@ def to_uvbeam( def plot( self, *, - beam_type: str, + beam_type: Literal[ + "efield", "power", "feed_iresponse", "feed_projection" + ] = "efield", freq: float, complex_type: str = "real", colormap: str | None = None, @@ -661,6 +780,8 @@ def plot( Parameters ---------- + beam_type : str + Beam type, one of "efield", "power", "feed_iresponse" or "feed_projection". freq : int The frequency index to plot. complex_type : str @@ -1232,10 +1353,13 @@ def _efield_eval( # The first dimension is for [azimuth, zenith angle] in that order # the second dimension is for feed [e, n] in that order - data_array[0, self.east_ind] = -np.sin(az_grid) - data_array[0, self.north_ind] = np.cos(az_grid) - data_array[1, self.east_ind] = np.cos(za_grid) * np.cos(az_grid) - data_array[1, self.north_ind] = np.cos(za_grid) * np.sin(az_grid) + cos_za = np.cos(za_grid) + sin_az = np.sin(az_grid) + cos_az = np.cos(az_grid) + data_array[0, self.east_ind] = -sin_az + data_array[0, self.north_ind] = cos_az + data_array[1, self.east_ind] = cos_za * cos_az + data_array[1, self.north_ind] = cos_za * sin_az return data_array @@ -1247,12 +1371,13 @@ def _power_eval( # these are just the sum in quadrature of the efield components. # some trig work is done to reduce the number of cos/sin evaluations - data_array[0, 0] = 1 - (np.sin(za_grid) * np.cos(az_grid)) ** 2 - data_array[0, 1] = 1 - (np.sin(za_grid) * np.sin(az_grid)) ** 2 + sin_za = np.sin(za_grid) + data_array[0, 0] = 1 - (sin_za * np.cos(az_grid)) ** 2 + data_array[0, 1] = 1 - (sin_za * np.sin(az_grid)) ** 2 if self.Npols > self.Nfeeds: # cross pols are included - data_array[0, 2] = -(np.sin(za_grid) ** 2) * np.sin(2.0 * az_grid) / 2.0 + data_array[0, 2] = -(sin_za**2) * np.sin(2.0 * az_grid) / 2.0 data_array[0, 3] = data_array[0, 2] return data_array diff --git a/src/pyuvdata/beam_interface.py b/src/pyuvdata/beam_interface.py index e7775772c..5e15c2ead 100644 --- a/src/pyuvdata/beam_interface.py +++ b/src/pyuvdata/beam_interface.py @@ -38,7 +38,7 @@ class BeamInterface: Beam object to use for computations. If a BeamInterface is passed, a new view of the same object is created. beam_type : str - The beam type, either "efield" or "power". + The beam type, one of "efield", "power", "feed_iresponse" or "feed_projection". include_cross_pols : bool Option to include the cross polarized beams (e.g. xy and yx or en and ne). Used if beam is a UVBeam and and the input UVBeam is an Efield beam but @@ -48,7 +48,9 @@ class BeamInterface: """ beam: AnalyticBeam | UVBeam - beam_type: Literal["efield", "power"] | None = None + beam_type: ( + Literal["efield", "power", "feed_iresponse", "feed_projection"] | None + ) = None include_cross_pols: InitVar[bool] = True def __post_init__(self, include_cross_pols: bool): @@ -77,17 +79,26 @@ def __post_init__(self, include_cross_pols: bool): if isinstance(self.beam, UVBeam): if self.beam_type is None or self.beam_type == self.beam.beam_type: self.beam_type = self.beam.beam_type - elif self.beam_type == "power": + elif self.beam.beam_type == "efield": + conv_func = { + "power": "efield_to_power", + "feed_iresponse": "efield_to_feed_iresponse", + "feed_projection": "efield_to_feed_projection", + } warnings.warn( "Input beam is an efield UVBeam but beam_type is specified as " - "'power'. Converting efield beam to power." + f"{self.beam_type}. Converting efield beam to {self.beam_type}." ) - self.beam.efield_to_power(calc_cross_pols=include_cross_pols) + kwargs = {} + if self.beam_type == "power": + kwargs["calc_cross_pols"] = include_cross_pols + getattr(self.beam, conv_func[self.beam_type])(**kwargs) else: raise ValueError( - "Input beam is a power UVBeam but beam_type is specified as " - "'efield'. It's not possible to convert a power beam to an " - "efield beam, either provide an efield UVBeam or do not " + f"Input beam is a {self.beam.beam_type} UVBeam but beam_type " + f"is specified as {self.beam_type}. It's not possible to " + f"convert a {self.beam.beam_type} beam to {self.beam_type}, " + f"either provide a UVBeam with a compatible type or do not " "specify `beam_type`." ) elif self.beam_type is None: @@ -317,13 +328,10 @@ def compute_response( az_array_use = copy.copy(az_array) za_array_use = copy.copy(za_array) - if self.beam_type == "efield": - interp_data = self.beam.efield_eval( - az_array=az_array_use, za_array=za_array_use, freq_array=freq_array - ) - else: - interp_data = self.beam.power_eval( - az_array=az_array_use, za_array=za_array_use, freq_array=freq_array - ) + eval_function = f"{self.beam_type}_eval" + + interp_data = getattr(self.beam, eval_function)( + az_array=az_array_use, za_array=za_array_use, freq_array=freq_array + ) return interp_data diff --git a/src/pyuvdata/utils/plotting.py b/src/pyuvdata/utils/plotting.py index 53c246896..079e08c7d 100644 --- a/src/pyuvdata/utils/plotting.py +++ b/src/pyuvdata/utils/plotting.py @@ -347,7 +347,7 @@ def beam_plot( feed_labels[np.isclose(beam_obj.feed_angle, 0)] = "N/S" feed_labels[np.isclose(beam_obj.feed_angle, np.pi / 2)] = "E/W" - if beam_type == "efield": + if beam_type != "power": nfeedpol = beam_obj.Nfeeds feedpol_label = feed_labels if issubclass(type(beam_obj), UnpolarizedAnalyticBeam): @@ -410,7 +410,7 @@ def beam_plot( freq_title = freq naxes_vec = beam_obj.Naxes_vec - if beam_type == "power": + if beam_type in ["power", "feed_iresponse"]: naxes_vec = 1 reg_grid = True @@ -433,6 +433,10 @@ def beam_plot( beam_type_label = "power" elif beam_type == "efield": beam_type_label = "E-field" + elif beam_type == "feed_iresponse": + beam_type_label = "Feed I response" + elif beam_type == "feed_projection": + beam_type_label = "Feed projection" plot_beam_arrays( beam_vals, diff --git a/src/pyuvdata/uvbeam/initializers.py b/src/pyuvdata/uvbeam/initializers.py index 3a36fcbe4..e29231756 100644 --- a/src/pyuvdata/uvbeam/initializers.py +++ b/src/pyuvdata/uvbeam/initializers.py @@ -24,6 +24,7 @@ def new_uvbeam( feed_version: str = "0.0", model_name: str = "default", model_version: str = "0.0", + beam_type: Literal["efield", "power", "feed_iresponse", "feed_projection"] = None, feed_array: StrArray | None = None, feed_angle: FloatArray | None = None, mount_type: str | None = "fixed", @@ -78,6 +79,9 @@ def new_uvbeam( Name of the beam model. model_version: str Version of the beam model. + beam_type : str + Beam type, one of "efield", "power", "feed_iresponse" or "feed_projection". + Defaults to "power" if polarization_array is provided and "efield" otherwise. feed_array : ndarray of str Array of feed orientations. Options are: n/e or x/y or r/l. feed_angle : ndarray of float @@ -177,11 +181,13 @@ def new_uvbeam( uvb = UVBeam() - if polarization_array is None: - uvb.beam_type = "efield" - uvb._set_efield() - else: - uvb.beam_type = "power" + if beam_type is None: + if polarization_array is None: + beam_type = "efield" + else: + beam_type = "power" + + if beam_type == "power": polarization_array = np.asarray(polarization_array) if polarization_array.dtype.kind != "i": polarization_array = np.asarray(utils.polstr2num(polarization_array)) @@ -189,6 +195,9 @@ def new_uvbeam( uvb.Npols = uvb.polarization_array.size uvb._set_power() + else: + set_function = f"_set_{beam_type}" + getattr(uvb, set_function)() if feed_array is not None: uvb.feed_array = np.asarray(feed_array) @@ -259,7 +268,7 @@ def new_uvbeam( "Either nside or both axis1_array and axis2_array must be provided." ) - if uvb.beam_type == "power": + if uvb.beam_type in ["power", "feed_iresponse"]: uvb.Naxes_vec = 1 uvb._set_cs_params() @@ -296,7 +305,7 @@ def new_uvbeam( f"expected shape {bv_shape}." ) uvb.basis_vector_array = basis_vector_array - elif uvb.beam_type == "efield": + elif uvb.beam_type in ["efield", "feed_projection"]: if uvb.pixel_coordinate_system == "healpix": basis_vector_array = np.zeros( (uvb.Naxes_vec, uvb.Ncomponents_vec, uvb.Npixels), dtype=float @@ -384,15 +393,15 @@ def new_uvbeam( uvb._set_simple() # Set data parameters - if uvb.beam_type == "efield": - data_type = complex - polax = uvb.Nfeeds - else: + if uvb.beam_type == "power": data_type = float for pol in uvb.polarization_array: if pol in [-3, -4, -7, -8]: data_type = complex polax = uvb.Npols + else: + data_type = complex + polax = uvb.Nfeeds if uvb.pixel_coordinate_system == "healpix": pixax = (uvb.Npixels,) diff --git a/src/pyuvdata/uvbeam/uvbeam.py b/src/pyuvdata/uvbeam/uvbeam.py index b9488ecd0..759593119 100644 --- a/src/pyuvdata/uvbeam/uvbeam.py +++ b/src/pyuvdata/uvbeam/uvbeam.py @@ -230,18 +230,17 @@ def __init__(self): description=desc, form="str", expected_type=str, - acceptable_vals=["efield", "power"], + acceptable_vals=["efield", "power", "feed_iresponse", "feed_projection"], ) desc = ( "Beam basis vector components, essentially the mapping between the " "directions that the electrical field values are recorded in to the " "directions aligned with the pixel coordinate system (or azimuth/zenith " - "angle for HEALPix beams)." - 'Not required if beam_type is "power". The shape depends on the ' - 'pixel_coordinate_system, if it is "healpix", the shape is: ' - "(Naxes_vec, Ncomponents_vec, Npixels), otherwise it is " - "(Naxes_vec, Ncomponents_vec, Naxes2, Naxes1)" + "angle for HEALPix beams). Not required if Naxes_vec is 1 (typically " + "true of power beams). The shape depends on the pixel_coordinate_system," + "if it is 'healpix', the shape is: (Naxes_vec, Ncomponents_vec, Npixels), " + "otherwise it is (Naxes_vec, Ncomponents_vec, Naxes2, Naxes1)" ) self._basis_vector_array = uvp.UVParameter( "basis_vector_array", @@ -818,12 +817,18 @@ def _set_efield(self): # call set_cs_params to fix data_array form self._set_cs_params() - def _set_power(self): + def _set_power(self, keep_basis_vector=False): """Set beam_type to 'power' and adjust required parameters.""" self.beam_type = "power" - self._Naxes_vec.acceptable_vals = [1, 2, 3] - self._basis_vector_array.required = False - self._Ncomponents_vec.required = False + if keep_basis_vector: + self._Naxes_vec.acceptable_vals = [2, 3] + self._basis_vector_array.required = True + self._Ncomponents_vec.required = True + else: + # this is the typical case + self._Naxes_vec.acceptable_vals = [1] + self._basis_vector_array.required = False + self._Ncomponents_vec.required = False self._Npols.required = True self._polarization_array.required = True @@ -836,6 +841,28 @@ def _set_power(self): # call set_cs_params to fix data_array form self._set_cs_params() + def _set_feed_iresponse(self): + self.beam_type = "feed_iresponse" + self._Naxes_vec.acceptable_vals = [1] + self._basis_vector_array.required = False + self._Ncomponents_vec.required = False + self._Npols.required = False + self._polarization_array.required = False + self._data_array.expected_type = uvp._get_generic_type(complex) + # call set_cs_params to fix data_array form + self._set_cs_params() + + def _set_feed_projection(self): + self.beam_type = "feed_projection" + self._Naxes_vec.acceptable_vals = [2, 3] + self._Ncomponents_vec.required = True + self._basis_vector_array.required = True + self._Npols.required = False + self._polarization_array.required = False + self._data_array.expected_type = uvp._get_generic_type(complex) + # call set_cs_params to fix data_array form + self._set_cs_params() + def _set_simple(self): """Set antenna_type to 'simple' and adjust required parameters.""" self.antenna_type = "simple" @@ -971,7 +998,11 @@ def check( if self.beam_type == "efield": self._set_efield() elif self.beam_type == "power": - self._set_power() + if self.Naxes_vec > 1: + keep_basis_vector = True + else: + keep_basis_vector = False + self._set_power(keep_basis_vector=keep_basis_vector) if self.antenna_type == "simple": self._set_simple() @@ -1059,6 +1090,12 @@ def efield_to_power( check_extra : bool Option to check optional parameters as well as required ones. + Returns + ------- + UVBeam or None + If inplace is False, returns the power beam object. If inplace is + True, returns None. + """ # Do a quick compatibility check w/ the old feed types. self._fix_feeds() @@ -1096,7 +1133,7 @@ def efield_to_power( beam_object.Naxes_vec = 1 # adjust requirements, fix data_array form - beam_object._set_power() + beam_object._set_power(keep_basis_vector=keep_basis_vector) power_data = np.zeros( beam_object._data_array.expected_shape(beam_object), dtype=np.complex128 ) @@ -1280,6 +1317,12 @@ def efield_to_pstokes( check_extra : bool Option to check optional parameters as well as required ones. + Returns + ------- + UVBeam or None + If inplace is False, returns the pstokes beam object. If inplace is + True, returns None. + """ # Do a quick compatibility check w/ the old feed types. self._fix_feeds() @@ -1348,7 +1391,7 @@ def efield_to_pstokes( ] ) beam_object.Naxes_vec = 1 - beam_object._set_power() + beam_object._set_power(keep_basis_vector=False) history_update_string = ( " Converted from efield to pseudo-stokes power using pyuvdata." @@ -1365,6 +1408,256 @@ def efield_to_pstokes( if not inplace: return beam_object + def efield_to_feed_iresponse( + self, + *, + inplace=True, + run_check=True, + check_extra=True, + run_check_acceptability=True, + ): + """ + Convert E-field beam feed I response. + + The feed I response is the the complex response of each instrumental + feed to unpolarized emission (so it only has an feed index, no vector + component axis). The phase is related to time delays which can vary + spatially but do not differ between incident polarizations (so are + unpolarized). + + Parameters + ---------- + inplace : bool + Option to apply conversion directly on self or to return a new + UVBeam object. + run_check : bool + Option to check for the existence and proper shapes of the required + parameters after converting to power. + run_check_acceptability : bool + Option to check acceptable range of the values of required parameters + after converting to power. + check_extra : bool + Option to check optional parameters as well as required ones. + + Returns + ------- + UVBeam or None + If inplace is False, returns the feed I response beam object. If + inplace is True, returns None. + + """ + if inplace: + beam_object = self + else: + beam_object = self.copy() + + if beam_object.beam_type != "efield": + raise ValueError("beam_type must be efield.") + + if beam_object.pixel_coordinate_system != "az_za": + raise ValueError("pixel_coordinate_system must be az_za.") + + beam_object._set_feed_iresponse() + # feed I response (also called F) is composed of two parts: the magnitude + # part and the complex phase part which is from time delays. + # The magnitude is the sqrt of the sum of the squares of the response to + # the two vector orientations (azimuthally aligned & zenith angle aligned) + f_mag = np.sqrt(np.sum(np.abs(beam_object.data_array) ** 2, axis=0)) + + # to calculate the complex delay part we first need to account for phase + # jumps that are just due to the coordinate system. Because vector fields + # in a sphere have poles in any coordinate system, this flip has to be + # accounted for in any coordinate system. + + # we can use the set of points at zenith (with different azimuth values) + # to identify where these phase flips happen. + zenith_ind = np.nonzero( + beam_object.axis2_array == beam_object.axis2_array.min() + ) + + flip_array = np.copy(beam_object.data_array) + for va_i in range(beam_object.Naxes_vec): + for f_i in range(beam_object.Nfeeds): + az_angles = np.angle(np.squeeze(flip_array[va_i, f_i, :, zenith_ind])) + az_angles = az_angles.reshape((beam_object.Nfreqs, beam_object.Naxes1)) + az_angles[az_angles < 0] += 2 * np.pi + az_angle_diff = abs(az_angles - np.unwrap(az_angles, period=np.pi)) + az_flip = np.logical_and(az_angle_diff > 1, az_angle_diff < 4) + # az_flip is shape (Nfreqs, Naxis1), need to reorder to get + # those axes adjacent + flip_array = np.transpose(flip_array, axes=(0, 1, 2, 4, 3)) + flip_array[va_i, f_i, az_flip] *= -1 + # now switch back + flip_array = np.transpose(flip_array, axes=(0, 1, 2, 4, 3)) + # check if zenith response is mostly negative now, if so make it + # positive + # use mean instead of max because can occasionally get max + # values just above zero + # this is frequency dependent because the earlier flips are + # frequency dependent so keep the freq axis + mean_zen_val = np.mean( + np.squeeze(flip_array[va_i, f_i, :, zenith_ind].real), axis=-1 + ) + flip_array[va_i, f_i, mean_zen_val < 0] *= -1 + + # to compute the time delay, first sum the flipped array across the vector + # direction axis -- this gives us what is common to each instrumental + # pol across incident polarizations. Then normalize by the abs to remove + # the magnitude which we have accounted for in f_mag. + f_delay = np.sum(flip_array, axis=0) + f_delay = f_delay / np.abs(f_delay) + + beam_object.data_array = f_mag * f_delay + beam_object.data_array = beam_object.data_array[np.newaxis] + + beam_object.Naxes_vec = 1 + history_update_string = ( + " Converted from efield to feed I response using pyuvdata." + ) + beam_object.history = beam_object.history + history_update_string + beam_object.basis_vector_array = None + beam_object.Ncomponents_vec = None + + if run_check: + beam_object.check( + check_extra=check_extra, run_check_acceptability=run_check_acceptability + ) + if not inplace: + return beam_object + + def efield_to_feed_projection( + self, + *, + inplace=True, + return_feed_iresponse=False, + run_check=True, + check_extra=True, + run_check_acceptability=True, + ): + """ + Convert E-field beam feed projection. + + The feed projection is the projection from celestial polarization + vector components (orthogonal on the sky) to instrumental feed vector + components (often non-orthogonal on the sky). This is similar to a + rotation matrix in that it just converts between coordinate systems + (the magnitude of the response is removed), but is not unitary because + the projection of the feeds are not orthogonal in all directions on the sky. + + Parameters + ---------- + inplace : bool + Option to apply conversion directly on self or to return a new + UVBeam object. + return_feed_iresponse : bool + Option to return the feed I response, which is necessarily calculated. + run_check : bool + Option to check for the existence and proper shapes of the required + parameters after converting to power. + run_check_acceptability : bool + Option to check acceptable range of the values of required parameters + after converting to power. + check_extra : bool + Option to check optional parameters as well as required ones. + + Returns + ------- + UVBeam or None + If inplace is False, returns the feed I response beam object. If + inplace is True, returns None. + + """ + if inplace: + beam_object = self + else: + beam_object = self.copy() + + if beam_object.beam_type != "efield": + raise ValueError("beam_type must be efield.") + + if beam_object.pixel_coordinate_system != "az_za": + raise ValueError("pixel_coordinate_system must be az_za.") + + # first get the feed I response, then divide it out to get the feed + # projection + feed_resp = beam_object.efield_to_feed_iresponse( + inplace=False, + run_check=run_check, + check_extra=check_extra, + run_check_acceptability=run_check_acceptability, + ) + + beam_object._set_feed_projection() + + for va_i in range(beam_object.Naxes_vec): + for f_i in range(beam_object.Nfeeds): + beam_object.data_array[va_i, f_i] /= feed_resp.data_array[0, f_i] + + history_update_string = ( + " Converted from efield to feed projection using pyuvdata." + ) + beam_object.history = beam_object.history + history_update_string + + if run_check: + beam_object.check( + check_extra=check_extra, run_check_acceptability=run_check_acceptability + ) + + if return_feed_iresponse: + if inplace: + return feed_resp + else: + return beam_object, feed_resp + elif not inplace: + return beam_object + + def decompose_feed_iresponse_projection( + self, run_check=True, check_extra=True, run_check_acceptability=True + ): + """ + Decompose an E-field beam into I response and feed projection objects. + + The feed I response is the the complex response of each instrumental + feed to unpolarized emission (so it only has an feed index, no vector + component axis). The phase is related to time delays which can vary + spatially but do not differ between incident polarizations (so are + unpolarized). + + The feed projection is the projection from celestial polarization + vector components (orthogonal on the sky) to instrumental feed vector + components (often non-orthogonal on the sky). This is similar to a + rotation matrix in that it just converts between coordinate systems + (the magnitude of the response is removed), but is not unitary because + the projection of the feeds are not orthogonal in all directions on the sky. + + Parameters + ---------- + run_check : bool + Option to check for the existence and proper shapes of the required + parameters after converting to power. + run_check_acceptability : bool + Option to check acceptable range of the values of required parameters + after converting to power. + check_extra : bool + Option to check optional parameters as well as required ones. + + Returns + ------- + UVBeam + feed_iresponse object. + UVBeam + feed_projection object. + + """ + fproj, firesp = self.efield_to_feed_projection( + inplace=False, + return_feed_iresponse=True, + run_check=run_check, + check_extra=check_extra, + run_check_acceptability=run_check_acceptability, + ) + return firesp, fproj + def _interp_freq(self, freq_array, *, kind="linear", tol=1.0): """ Interpolate function along frequency axis. diff --git a/tests/conftest.py b/tests/conftest.py index 062345215..96f96e38b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -172,6 +172,14 @@ def az_za_coords(): return az_array, za_array +@pytest.fixture() +def az_za_coords_fine(): + az_array = np.deg2rad(np.linspace(0, 359, 360)) + za_array = np.deg2rad(np.linspace(0, 90, 91)) + + return az_array, za_array + + @pytest.fixture() def az_za_deg_grid(az_za_coords): az_array, za_array = az_za_coords diff --git a/tests/test_analytic_beam.py b/tests/test_analytic_beam.py index dc9549413..88ebbcf5d 100644 --- a/tests/test_analytic_beam.py +++ b/tests/test_analytic_beam.py @@ -2,6 +2,7 @@ # Licensed under the 2-clause BSD License import re +from dataclasses import dataclass import numpy as np import pytest @@ -12,6 +13,7 @@ from pyuvdata import AiryBeam, GaussianBeam, ShortDipoleBeam, UniformBeam, UVBeam from pyuvdata.analytic_beam import AnalyticBeam, UnpolarizedAnalyticBeam from pyuvdata.testing import check_warnings +from pyuvdata.utils.types import FloatArray def test_airy_beam_values(az_za_deg_grid): @@ -190,30 +192,131 @@ def test_short_dipole_beam(az_za_deg_grid): efield_vals = beam.efield_eval(az_array=az_vals, za_array=za_vals, freq_array=freqs) - expected_data = np.zeros((2, 2, n_freqs, nsrcs), dtype=float) + expected_efield = np.zeros((2, 2, n_freqs, nsrcs), dtype=float) - expected_data[0, 0] = -np.sin(az_vals) - expected_data[0, 1] = np.cos(az_vals) - expected_data[1, 0] = np.cos(za_vals) * np.cos(az_vals) - expected_data[1, 1] = np.cos(za_vals) * np.sin(az_vals) + expected_efield[0, 0] = -np.sin(az_vals) + expected_efield[0, 1] = np.cos(az_vals) + expected_efield[1, 0] = np.cos(za_vals) * np.cos(az_vals) + expected_efield[1, 1] = np.cos(za_vals) * np.sin(az_vals) - np.testing.assert_allclose(efield_vals, expected_data, atol=1e-15, rtol=0) + np.testing.assert_allclose(efield_vals, expected_efield, atol=1e-15, rtol=0) power_vals = beam.power_eval(az_array=az_vals, za_array=za_vals, freq_array=freqs) - expected_data = np.zeros((1, 4, n_freqs, nsrcs), dtype=float) + expected_power = np.zeros((1, 4, n_freqs, nsrcs), dtype=float) - expected_data[0, 0] = 1 - np.sin(za_vals) ** 2 * np.cos(az_vals) ** 2 - expected_data[0, 1] = 1 - np.sin(za_vals) ** 2 * np.sin(az_vals) ** 2 - expected_data[0, 2] = -(np.sin(za_vals) ** 2) * np.sin(2.0 * az_vals) / 2.0 - expected_data[0, 3] = -(np.sin(za_vals) ** 2) * np.sin(2.0 * az_vals) / 2.0 + expected_power[0, 0] = 1 - np.sin(za_vals) ** 2 * np.cos(az_vals) ** 2 + expected_power[0, 1] = 1 - np.sin(za_vals) ** 2 * np.sin(az_vals) ** 2 + expected_power[0, 2] = -(np.sin(za_vals) ** 2) * np.sin(2.0 * az_vals) / 2.0 + expected_power[0, 3] = -(np.sin(za_vals) ** 2) * np.sin(2.0 * az_vals) / 2.0 - np.testing.assert_allclose(power_vals, expected_data, atol=1e-15, rtol=0) + np.testing.assert_allclose(power_vals, expected_power, atol=1e-15, rtol=0) assert ( beam.__repr__() == "ShortDipoleBeam(feed_array=array(['x', 'y'], dtype=' FloatArray: + """Evaluate the efield at the given coordinates.""" + data_array = self._get_empty_data_array(az_grid.shape) + + for feed_i in np.arange(self.Nfeeds): + # For power beams the first axis is shallow + data_array[0, feed_i, :, :] = np.cos(self.width * za_grid) / np.sqrt(2) + data_array[1, feed_i, :, :] = np.cos(self.width * za_grid) / np.sqrt(2) + + return data_array + + def _feed_iresponse_eval( + self, *, az_grid: FloatArray, za_grid: FloatArray, f_grid: FloatArray + ) -> FloatArray: + data_array = self._get_empty_data_array( + az_grid.shape, beam_type="feed_iresponse" + ) + + for feed_i in range(self.Nfeeds): + data_array[0, feed_i] = np.cos(self.width * za_grid) + + return data_array + + def _feed_projection_eval( + self, *, az_grid: FloatArray, za_grid: FloatArray, f_grid: FloatArray + ) -> FloatArray: + data_array = self._get_empty_data_array( + az_grid.shape, beam_type="feed_projection" + ) + + data_array.fill(1 / np.sqrt(2)) + + return data_array + + az_vals, za_vals, freqs = az_za_deg_grid + + this_cosbeam = CosEfield(width=3.0) + this_iresponse = this_cosbeam.feed_iresponse_eval( + az_array=az_vals, za_array=za_vals, freq_array=freqs + ) + + from pyuvdata.data.test_analytic_beam import CosEfieldTest + + test_cosbeam = CosEfieldTest(width=3.0) + test_iresponse = test_cosbeam.feed_iresponse_eval( + az_array=az_vals, za_array=za_vals, freq_array=freqs + ) + + np.testing.assert_allclose(this_iresponse, test_iresponse, atol=1e-14, rtol=0) + + this_feed_proj = this_cosbeam.feed_projection_eval( + az_array=az_vals, za_array=za_vals, freq_array=freqs + ) + test_feed_proj = test_cosbeam.feed_projection_eval( + az_array=az_vals, za_array=za_vals, freq_array=freqs + ) + + np.testing.assert_allclose(this_feed_proj, test_feed_proj, atol=1e-14, rtol=0) + def test_shortdipole_feed_error(): with pytest.raises( @@ -460,7 +563,13 @@ def test_beam_equality(beam1, beam2, equal): def test_to_uvbeam_errors(): beam = GaussianBeam(sigma=0.02) - with pytest.raises(ValueError, match="Beam type must be 'efield' or 'power'"): + with pytest.raises( + ValueError, + match=re.escape( + "Beam type must be one of ['efield', 'power', 'feed_iresponse', " + "'feed_projection']" + ), + ): beam.to_uvbeam( freq_array=np.linspace(100, 200, 5), beam_type="foo", @@ -681,6 +790,8 @@ def test_set_x_orientation_deprecation(): None, ), (ShortDipoleBeam(), "efield", "real", None, 90.0, None, None), + (ShortDipoleBeam(), "feed_iresponse", "phase", None, 90.0, None, None), + (ShortDipoleBeam(), "feed_projection", "real", None, 90.0, None, None), (ShortDipoleBeam(), "power", "real", None, 90.0, None, None), (UniformBeam(), "power", "real", None, 90.0, None, None), ], diff --git a/tests/test_beam_interface.py b/tests/test_beam_interface.py index 52393f22a..60b3ff4b9 100644 --- a/tests/test_beam_interface.py +++ b/tests/test_beam_interface.py @@ -62,17 +62,32 @@ def gaussian_uv(gaussian, az_za_coords) -> UVBeam: [ShortDipoleBeam, {}], ], ) -@pytest.mark.parametrize("init_beam_type", ["efield", "power"]) -@pytest.mark.parametrize("final_beam_type", ["efield", "power"]) -@pytest.mark.parametrize("coord_sys", ["az_za", "healpix"]) +@pytest.mark.parametrize( + ["init_beam_type", "final_beam_type", "coord_sys"], + [ + ("efield", "efield", "az_za"), + ("efield", "efield", "healpix"), + ("efield", "power", "az_za"), + ("efield", "power", "healpix"), + ("power", "power", "az_za"), + ("power", "power", "healpix"), + ("power", "efield", "az_za"), + ("power", "feed_iresponse", "az_za"), + ("feed_projection", "power", "az_za"), + ("efield", "feed_iresponse", "az_za"), + ("feed_iresponse", "feed_iresponse", "az_za"), + ("efield", "feed_projection", "az_za"), + ("feed_projection", "feed_projection", "az_za"), + ], +) def test_beam_interface( beam_obj, kwargs, init_beam_type, final_beam_type, - az_za_coords, - xy_grid_coarse, coord_sys, + az_za_coords_fine, + xy_grid_coarse, ): analytic = beam_obj(**kwargs) @@ -121,7 +136,7 @@ def test_beam_interface( az_array = az_array[::10] za_array = za_array[::10] else: - az_array, za_array = az_za_coords + az_array, za_array = az_za_coords_fine to_uvbeam_kwargs = {"axis1_array": az_array, "axis2_array": za_array} include_cross_pols = kwargs.get("include_cross_pols", True) @@ -133,21 +148,21 @@ def test_beam_interface( bi_analytic = BeamInterface(analytic, final_beam_type) if final_beam_type != init_beam_type: - if final_beam_type == "efield": + if init_beam_type != "efield": with pytest.raises( ValueError, - match="Input beam is a power UVBeam but beam_type is specified as " - "'efield'. It's not possible to convert a power beam to an " - "efield beam, either provide an efield UVBeam or do not " - "specify `beam_type`.", + match=f"Input beam is a {init_beam_type} UVBeam but beam_type " + f"is specified as {final_beam_type}. It's not possible to convert a " + f"{init_beam_type} beam to {final_beam_type}, either provide a " + "UVBeam with a compatible type or do not specify `beam_type`", ): BeamInterface(uvb, final_beam_type) return warn_type = UserWarning msg = ( - "Input beam is an efield UVBeam but beam_type is specified as " - "'power'. Converting efield beam to power." + f"Input beam is an {init_beam_type} UVBeam but beam_type is specified " + f"as {final_beam_type}. Converting efield beam to {final_beam_type}." ) else: warn_type = None diff --git a/tests/uvbeam/test_mwa_beam.py b/tests/uvbeam/test_mwa_beam.py index 1e3d46c80..ec393e959 100644 --- a/tests/uvbeam/test_mwa_beam.py +++ b/tests/uvbeam/test_mwa_beam.py @@ -4,7 +4,7 @@ import numpy as np import pytest -from pyuvdata import UVBeam, utils +from pyuvdata import ShortDipoleBeam, UVBeam, utils from pyuvdata.datasets import fetch_data from pyuvdata.testing import check_warnings from pyuvdata.uvbeam.mwa_beam import P1sin, P1sin_array @@ -48,10 +48,10 @@ def test_read_write_mwa(mwa_beam_1ppd, tmp_path): def test_mwa_orientation(mwa_beam_1ppd): power_beam = mwa_beam_1ppd.efield_to_power(inplace=False) - za_val = np.nonzero(np.isclose(power_beam.axis2_array, 15.0 * np.pi / 180)) + small_za_ind = np.nonzero(np.isclose(power_beam.axis2_array, 2.0 * np.pi / 180)) - east_az = np.nonzero(np.isclose(power_beam.axis1_array, 0)) - north_az = np.nonzero(np.isclose(power_beam.axis1_array, np.pi / 2)) + east_az_ind = np.nonzero(np.isclose(power_beam.axis1_array, 0)) + north_az_ind = np.nonzero(np.isclose(power_beam.axis1_array, np.pi / 2)) east_ind = np.nonzero( power_beam.polarization_array @@ -68,14 +68,14 @@ def test_mwa_orientation(mwa_beam_1ppd): # check that the e/w dipole is more sensitive n/s assert ( - power_beam.data_array[0, east_ind, 0, za_val, east_az] - < power_beam.data_array[0, east_ind, 0, za_val, north_az] + power_beam.data_array[0, east_ind, 0, small_za_ind, east_az_ind] + < power_beam.data_array[0, east_ind, 0, small_za_ind, north_az_ind] ) # check that the n/s dipole is more sensitive e/w assert ( - power_beam.data_array[0, north_ind, 0, za_val, north_az] - < power_beam.data_array[0, north_ind, 0, za_val, east_az] + power_beam.data_array[0, north_ind, 0, small_za_ind, north_az_ind] + < power_beam.data_array[0, north_ind, 0, small_za_ind, east_az_ind] ) # check that for a single dipole (all others turned off) there is higher @@ -89,27 +89,34 @@ def test_mwa_orientation(mwa_beam_1ppd): fetch_data("mwa_full_EE"), pixels_per_deg=1, delays=delays ) - za_val = np.nonzero(np.isclose(efield_beam.axis2_array, 80.0 * np.pi / 180)) + large_za_val = np.nonzero(np.isclose(efield_beam.axis2_array, 80.0 * np.pi / 180)) - max_az_response = np.max(np.abs(efield_beam.data_array[0, east_ind, 0, za_val, :])) - max_za_response = np.max(np.abs(efield_beam.data_array[1, east_ind, 0, za_val, :])) + max_az_response = np.max( + np.abs(efield_beam.data_array[0, east_ind, 0, large_za_val, :]) + ) + max_za_response = np.max( + np.abs(efield_beam.data_array[1, east_ind, 0, large_za_val, :]) + ) assert max_az_response > max_za_response - max_az_response = np.max(np.abs(efield_beam.data_array[0, north_ind, 0, za_val, :])) - max_za_response = np.max(np.abs(efield_beam.data_array[1, north_ind, 0, za_val, :])) + max_az_response = np.max( + np.abs(efield_beam.data_array[0, north_ind, 0, large_za_val, :]) + ) + max_za_response = np.max( + np.abs(efield_beam.data_array[1, north_ind, 0, large_za_val, :]) + ) assert max_az_response > max_za_response # check the sign of the responses are as expected close to zenith efield_beam = mwa_beam_1ppd - za_val = np.nonzero(np.isclose(power_beam.axis2_array, 2.0 * np.pi / 180)) # first check zenith angle aligned response - assert efield_beam.data_array[1, east_ind, 0, za_val, east_az] > 0 - assert efield_beam.data_array[1, north_ind, 0, za_val, north_az] > 0 + assert efield_beam.data_array[1, east_ind, 0, small_za_ind, east_az_ind] > 0 + assert efield_beam.data_array[1, north_ind, 0, small_za_ind, north_az_ind] > 0 # then check azimuthal aligned response - assert efield_beam.data_array[0, north_ind, 0, za_val, east_az] > 0 - assert efield_beam.data_array[0, east_ind, 0, za_val, north_az] < 0 + assert efield_beam.data_array[0, north_ind, 0, small_za_ind, east_az_ind] > 0 + assert efield_beam.data_array[0, east_ind, 0, small_za_ind, north_az_ind] < 0 @pytest.mark.parametrize( @@ -189,6 +196,74 @@ def test_mwa_pointing(delay_set, az_val, za_range): assert np.rad2deg(za_array[max_nn_loc]) < za_range[1] +@pytest.mark.parametrize( + ["approach", "inplace"], + [ + ("decompose", None), + ("separate", True), + ("separate", False), + ("joint", True), + ("joint", False), + ], +) +def test_mwa_fhd_decompose(mwa_beam_1ppd, approach, inplace): + mwa_beam = mwa_beam_1ppd + # select to above the horizon + mwa_beam.select(axis2_inds=np.nonzero(mwa_beam.axis2_array <= np.pi / 2)) + + small_za_ind = np.nonzero(np.isclose(mwa_beam.axis2_array, 2.0 * np.pi / 180)) + + if approach == "decompose": + firesp, fproj = mwa_beam.decompose_feed_iresponse_projection() + elif approach == "joint": + rets = mwa_beam.efield_to_feed_projection( + return_feed_iresponse=True, inplace=inplace + ) + if inplace: + firesp = rets + fproj = mwa_beam + else: + firesp = rets[1] + fproj = rets[0] + else: + mwa_beam2 = mwa_beam.copy() + ret0 = mwa_beam.efield_to_feed_iresponse(inplace=inplace) + ret1 = mwa_beam2.efield_to_feed_projection( + return_feed_iresponse=False, inplace=inplace + ) + if inplace: + firesp = mwa_beam + fproj = mwa_beam2 + else: + firesp = ret0 + fproj = ret1 + + # feed I response should have a real component that is positive near zenith + assert np.all(firesp.data_array[0, :, :, small_za_ind].real > 0) + + az_array, za_array = np.meshgrid(mwa_beam.axis1_array, mwa_beam.axis2_array) + + # MWA fproj is pretty similar to dipole_fproj up to mutual coupling effects + dipole_beam = ShortDipoleBeam() + + dipole_fproj = dipole_beam.feed_projection_eval( + az_array=az_array.flatten(), + za_array=za_array.flatten(), + freq_array=np.asarray(np.asarray([mwa_beam.freq_array[-1]])), + ) + dipole_fproj = dipole_fproj.reshape(2, 2, mwa_beam.Naxes2, mwa_beam.Naxes1) + + # they are very similar near zenith + fproj_diff = fproj.data_array[:, :, 0] - dipole_fproj + + np.testing.assert_allclose( + fproj_diff[:, :, small_za_ind].real, 0, rtol=0, atol=2e-4 + ) + np.testing.assert_allclose( + fproj_diff[:, :, small_za_ind].imag, 0, rtol=0, atol=2e-4 + ) + + def test_freq_range(mwa_beam_1ppd): beam1 = mwa_beam_1ppd beam2 = UVBeam() diff --git a/tests/uvbeam/test_uvbeam.py b/tests/uvbeam/test_uvbeam.py index b2228f321..bd3672522 100644 --- a/tests/uvbeam/test_uvbeam.py +++ b/tests/uvbeam/test_uvbeam.py @@ -2554,10 +2554,17 @@ def test_add_errors(power_beam_for_adding, efield_beam_for_adding): if param == "beam_type": beam2_copy = efield_beam.select(freq_chans=1, inplace=False) elif param == "Naxes_vec": - beam2_copy = beam2.copy() + beam1_copy = efield_beam.select(freq_chans=0, inplace=False) + beam2_copy = efield_beam.select(freq_chans=1, inplace=False) beam2_copy.Naxes_vec = value beam2_copy.data_array = np.concatenate( - (beam2_copy.data_array, beam2_copy.data_array, beam2_copy.data_array) + (beam2_copy.data_array, (beam2_copy.data_array[0])[np.newaxis]) + ) + beam2_copy.basis_vector_array = np.concatenate( + ( + beam2_copy.basis_vector_array, + (beam2_copy.basis_vector_array[0])[np.newaxis], + ) ) else: beam2_copy = beam2.copy() @@ -3592,3 +3599,14 @@ def test_plotting_errors(mwa_beam_1ppd): "to plot.", ): beam.plot(freq_ind=5.5) + + +@pytest.mark.parametrize( + "method", ["decompose_feed_iresponse_projection", "efield_to_feed_iresponse"] +) +def test_decompose_errors(cst_power_1freq, cst_efield_2freq_cut_healpix, method): + with pytest.raises(ValueError, match="beam_type must be efield."): + getattr(cst_power_1freq, method)() + + with pytest.raises(ValueError, match="pixel_coordinate_system must be az_za."): + getattr(cst_efield_2freq_cut_healpix, method)()