Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 150 additions & 25 deletions src/pyuvdata/analytic_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,18 +388,24 @@ def _check_eval_inputs(
raise ValueError("az_array and za_array must have the same shape.")

def _get_empty_data_array(
self, grid_shape: tuple[int, int], beam_type: str = "efield"
self,
grid_shape: tuple[int, int],
beam_type: Literal[
"efield", "power", "feed_iresponse", "feed_projection"
] = "efield",
) -> FloatArray:
"""Get the empty data to fill in the eval methods."""
if beam_type == "efield":
if beam_type in ["efield", "feed_projection"]:
return np.zeros(
(self.Naxes_vec, self.Nfeeds, *grid_shape), dtype=np.complex128
)
else:
elif beam_type == "power":
return np.zeros(
(1, self.Npols, *grid_shape),
dtype=np.complex128 if self.Npols > self.Nfeeds else np.float64,
)
elif beam_type == "feed_iresponse":
return np.zeros((1, self.Nfeeds, *grid_shape), dtype=np.complex128)

def efield_eval(
self, *, az_array: FloatArray, za_array: FloatArray, freq_array: FloatArray
Expand Down Expand Up @@ -445,9 +451,9 @@ def efield_eval(

data_array = self._get_empty_data_array(az_grid.shape)

for fn in np.arange(self.Nfeeds):
data_array[0, fn, :, :] = np.sqrt(power_vals[fn] / 2.0)
data_array[1, fn, :, :] = np.sqrt(power_vals[fn] / 2.0)
for feed_i in np.arange(self.Nfeeds):
data_array[0, feed_i, :, :] = np.sqrt(power_vals[feed_i] / 2.0)
data_array[1, feed_i, :, :] = np.sqrt(power_vals[feed_i] / 2.0)

return data_array

Expand Down Expand Up @@ -510,11 +516,123 @@ def power_eval(

return data_array

def feed_iresponse_eval(
self, *, az_array: FloatArray, za_array: FloatArray, freq_array: FloatArray
) -> FloatArray:
"""
Evaluate the feed I response at the given coordinates.

Parameters
----------
az_array : array_like of floats, optional
Azimuth values to evaluate the beam at in radians. Must be the same shape
as za_array.
za_array : array_like of floats, optional
Zenith values to evaluate the beam at in radians. Must be the same shape
as az_array.
freq_array : array_like of floats, optional
Frequency values to evaluate the beam at in Hertz.

Returns
-------
array_like of complex
An array of beam values. The shape of the evaluated data will be:
(1, Nfeeds, freq_array.size, az_array.size)

"""
self._check_eval_inputs(
az_array=az_array, za_array=za_array, freq_array=freq_array
)

if hasattr(self, "_feed_iresponse_eval"):
za_grid, _ = np.meshgrid(za_array, freq_array)
az_grid, f_grid = np.meshgrid(az_array, freq_array)

return self._feed_iresponse_eval(
az_grid=az_grid, za_grid=za_grid, f_grid=f_grid
).astype(complex)
else:
efield_vals = self.efield_eval(
za_array=za_array, az_array=az_array, freq_array=freq_array
)

unpolarized = True
for feed_i in np.arange(self.Nfeeds):
if not np.allclose(efield_vals[0, feed_i], efield_vals[1, feed_i]):
unpolarized = False

if unpolarized:
# for unpolarized beams, where the response to all vector direction
# is identical, f is the sum over that direction divided by sqrt(2)
# this works better than what is below because it handles negatives
# in e.g. AiryBeams
data_array = np.sum(efield_vals, axis=0) / np.sqrt(2)
else:
# if polarized default f to the magnitude, assume zero time delay
data_array = np.sqrt(np.sum(np.abs(efield_vals) ** 2, axis=0))
data_array = data_array[np.newaxis]

return data_array.astype(complex)

def feed_projection_eval(
self, *, az_array: FloatArray, za_array: FloatArray, freq_array: FloatArray
) -> FloatArray:
"""
Evaluate the feed projection at the given coordinates.

Parameters
----------
az_array : array_like of floats, optional
Azimuth values to evaluate the beam at in radians. Must be the same shape
as za_array.
za_array : array_like of floats, optional
Zenith values to evaluate the beam at in radians. Must be the same shape
as az_array.
freq_array : array_like of floats, optional
Frequency values to evaluate the beam at in Hertz.

Returns
-------
array_like of complex
An array of beam values. The shape of the evaluated data will be:
(Naxes_vec, Nfeeds, freq_array.size, az_array.size)

"""
self._check_eval_inputs(
az_array=az_array, za_array=za_array, freq_array=freq_array
)

za_grid, _ = np.meshgrid(za_array, freq_array)
az_grid, f_grid = np.meshgrid(az_array, freq_array)

if hasattr(self, "_feed_projection_eval"):
return self._feed_projection_eval(
az_grid=az_grid, za_grid=za_grid, f_grid=f_grid
).astype(complex)
else:
efield_vals = self.efield_eval(
az_array=az_array, za_array=za_array, freq_array=freq_array
)

# set f to the magnitude of the I response, assume no time delays
f_vals = self.feed_iresponse_eval(
az_array=az_array, za_array=za_array, freq_array=freq_array
)
data_array = self._get_empty_data_array(az_grid.shape)

for fn in np.arange(self.Nfeeds):
data_array[0, fn] = efield_vals[0, fn] / f_vals[0, fn]
data_array[1, fn] = efield_vals[1, fn] / f_vals[0, fn]

return data_array

@combine_docstrings(UVBeam.new)
def to_uvbeam(
self,
freq_array: FloatArray,
beam_type: Literal["efield", "power"] = "efield",
beam_type: Literal[
"efield", "power", "feed_iresponse", "feed_projection"
] = "efield",
pixel_coordinate_system: (
Literal["az_za", "orthoslant_zenith", "healpix"] | None
) = None,
Expand All @@ -535,7 +653,7 @@ def to_uvbeam(
freq_array : ndarray of float
Array of frequencies in Hz to evaluate the beam at.
beam_type : str
Beam type, either "efield" or "power".
Beam type, one of "efield", "power", "feed_iresponse" or "feed_projection".
pixel_coordinate_system : str
Pixel coordinate system, options are "az_za", "orthoslant_zenith" and
"healpix". Forced to be "healpix" if ``nside`` is given and by
Expand All @@ -558,13 +676,14 @@ def to_uvbeam(
Healpix ordering parameter, defaults to "ring" if nside is provided.

"""
if beam_type not in ["efield", "power"]:
raise ValueError("Beam type must be 'efield' or 'power'")
allowed_beam_types = ["efield", "power", "feed_iresponse", "feed_projection"]
if beam_type not in allowed_beam_types:
raise ValueError(f"Beam type must be one of {allowed_beam_types}")

if beam_type == "efield":
polarization_array = None
else:
if beam_type == "power":
polarization_array = self.polarization_array
else:
polarization_array = None

mount_type = self.mount_type if hasattr(self, "mount_type") else None

Expand All @@ -584,6 +703,7 @@ def to_uvbeam(

uvb = UVBeam.new(
telescope_name="Analytic Beam",
beam_type=beam_type,
data_normalization="physical",
feed_name=self.__repr__(),
feed_version="1.0",
Expand Down Expand Up @@ -623,10 +743,7 @@ def to_uvbeam(
az_array = az_array.flatten()
za_array = za_array.flatten()

if beam_type == "efield":
eval_function = "efield_eval"
else:
eval_function = "power_eval"
eval_function = f"{beam_type}_eval"

data_array = getattr(self, eval_function)(
az_array=az_array, za_array=za_array, freq_array=freq_array
Expand All @@ -643,7 +760,9 @@ def to_uvbeam(
def plot(
self,
*,
beam_type: str,
beam_type: Literal[
"efield", "power", "feed_iresponse", "feed_projection"
] = "efield",
freq: float,
complex_type: str = "real",
colormap: str | None = None,
Expand All @@ -661,6 +780,8 @@ def plot(

Parameters
----------
beam_type : str
Beam type, one of "efield", "power", "feed_iresponse" or "feed_projection".
freq : int
The frequency index to plot.
complex_type : str
Expand Down Expand Up @@ -1232,10 +1353,13 @@ def _efield_eval(

# The first dimension is for [azimuth, zenith angle] in that order
# the second dimension is for feed [e, n] in that order
data_array[0, self.east_ind] = -np.sin(az_grid)
data_array[0, self.north_ind] = np.cos(az_grid)
data_array[1, self.east_ind] = np.cos(za_grid) * np.cos(az_grid)
data_array[1, self.north_ind] = np.cos(za_grid) * np.sin(az_grid)
cos_za = np.cos(za_grid)
sin_az = np.sin(az_grid)
cos_az = np.cos(az_grid)
data_array[0, self.east_ind] = -sin_az
data_array[0, self.north_ind] = cos_az
data_array[1, self.east_ind] = cos_za * cos_az
data_array[1, self.north_ind] = cos_za * sin_az

return data_array

Expand All @@ -1247,12 +1371,13 @@ def _power_eval(

# these are just the sum in quadrature of the efield components.
# some trig work is done to reduce the number of cos/sin evaluations
data_array[0, 0] = 1 - (np.sin(za_grid) * np.cos(az_grid)) ** 2
data_array[0, 1] = 1 - (np.sin(za_grid) * np.sin(az_grid)) ** 2
sin_za = np.sin(za_grid)
data_array[0, 0] = 1 - (sin_za * np.cos(az_grid)) ** 2
data_array[0, 1] = 1 - (sin_za * np.sin(az_grid)) ** 2

if self.Npols > self.Nfeeds:
# cross pols are included
data_array[0, 2] = -(np.sin(za_grid) ** 2) * np.sin(2.0 * az_grid) / 2.0
data_array[0, 2] = -(sin_za**2) * np.sin(2.0 * az_grid) / 2.0
data_array[0, 3] = data_array[0, 2]

return data_array
Expand Down
40 changes: 24 additions & 16 deletions src/pyuvdata/beam_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class BeamInterface:
Beam object to use for computations. If a BeamInterface is passed, a new
view of the same object is created.
beam_type : str
The beam type, either "efield" or "power".
The beam type, one of "efield", "power", "feed_iresponse" or "feed_projection".
include_cross_pols : bool
Option to include the cross polarized beams (e.g. xy and yx or en and ne).
Used if beam is a UVBeam and and the input UVBeam is an Efield beam but
Expand All @@ -48,7 +48,9 @@ class BeamInterface:
"""

beam: AnalyticBeam | UVBeam
beam_type: Literal["efield", "power"] | None = None
beam_type: (
Literal["efield", "power", "feed_iresponse", "feed_projection"] | None
) = None
include_cross_pols: InitVar[bool] = True

def __post_init__(self, include_cross_pols: bool):
Expand Down Expand Up @@ -77,17 +79,26 @@ def __post_init__(self, include_cross_pols: bool):
if isinstance(self.beam, UVBeam):
if self.beam_type is None or self.beam_type == self.beam.beam_type:
self.beam_type = self.beam.beam_type
elif self.beam_type == "power":
elif self.beam.beam_type == "efield":
conv_func = {
"power": "efield_to_power",
"feed_iresponse": "efield_to_feed_iresponse",
"feed_projection": "efield_to_feed_projection",
}
warnings.warn(
"Input beam is an efield UVBeam but beam_type is specified as "
"'power'. Converting efield beam to power."
f"{self.beam_type}. Converting efield beam to {self.beam_type}."
)
self.beam.efield_to_power(calc_cross_pols=include_cross_pols)
kwargs = {}
if self.beam_type == "power":
kwargs["calc_cross_pols"] = include_cross_pols
getattr(self.beam, conv_func[self.beam_type])(**kwargs)
else:
raise ValueError(
"Input beam is a power UVBeam but beam_type is specified as "
"'efield'. It's not possible to convert a power beam to an "
"efield beam, either provide an efield UVBeam or do not "
f"Input beam is a {self.beam.beam_type} UVBeam but beam_type "
f"is specified as {self.beam_type}. It's not possible to "
f"convert a {self.beam.beam_type} beam to {self.beam_type}, "
f"either provide a UVBeam with a compatible type or do not "
"specify `beam_type`."
)
elif self.beam_type is None:
Expand Down Expand Up @@ -317,13 +328,10 @@ def compute_response(
az_array_use = copy.copy(az_array)
za_array_use = copy.copy(za_array)

if self.beam_type == "efield":
interp_data = self.beam.efield_eval(
az_array=az_array_use, za_array=za_array_use, freq_array=freq_array
)
else:
interp_data = self.beam.power_eval(
az_array=az_array_use, za_array=za_array_use, freq_array=freq_array
)
eval_function = f"{self.beam_type}_eval"

interp_data = getattr(self.beam, eval_function)(
az_array=az_array_use, za_array=za_array_use, freq_array=freq_array
)

return interp_data
8 changes: 6 additions & 2 deletions src/pyuvdata/utils/plotting.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def beam_plot(
feed_labels[np.isclose(beam_obj.feed_angle, 0)] = "N/S"
feed_labels[np.isclose(beam_obj.feed_angle, np.pi / 2)] = "E/W"

if beam_type == "efield":
if beam_type != "power":
nfeedpol = beam_obj.Nfeeds
feedpol_label = feed_labels
if issubclass(type(beam_obj), UnpolarizedAnalyticBeam):
Expand Down Expand Up @@ -410,7 +410,7 @@ def beam_plot(
freq_title = freq

naxes_vec = beam_obj.Naxes_vec
if beam_type == "power":
if beam_type in ["power", "feed_iresponse"]:
naxes_vec = 1
reg_grid = True

Expand All @@ -433,6 +433,10 @@ def beam_plot(
beam_type_label = "power"
elif beam_type == "efield":
beam_type_label = "E-field"
elif beam_type == "feed_iresponse":
beam_type_label = "Feed I response"
elif beam_type == "feed_projection":
beam_type_label = "Feed projection"

plot_beam_arrays(
beam_vals,
Expand Down
Loading
Loading