Skip to content
2 changes: 1 addition & 1 deletion src/hats_import/catalog/file_readers/input_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def read(self, input_file, read_columns=None):
read_columns(List[str]): subset of columns to read.
if None, all columns are read
Yields:
DataFrame containing chunk of file info.
Pandas DataFrame or columnar batch (e.g., pyarrow.Table) containing chunk of file info.
"""

def regular_file_exists(self, input_file, **_kwargs):
Expand Down
90 changes: 68 additions & 22 deletions src/hats_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Import a set of non-hats files using dask for parallelization"""

import pickle
import warnings

import cloudpickle
import hats.pixel_math.healpix_shim as hp
Expand Down Expand Up @@ -37,6 +38,59 @@ def _has_named_index(dataframe):
return True


def _warn_if_not_double_precision_columns(data, columns):
"""Warn when coordinate-like columns are not double-precision.

Handles cases where we have designated a schema_file=... parameter in our
file reader, such that we could have a pandas DataFrame with Arrow dtypes.
"""

def _check_column_is_double_precision(column):
"""Check if the given column is double-precision float, accounting for
both pandas and pyarrow types.

Returns:
bool: True if the column is double-precision float, False otherwise.
"""
if isinstance(data, pd.DataFrame):
dtype = data[column].dtype
if isinstance(dtype, pd.ArrowDtype) and pa.types.is_float64(dtype.pyarrow_dtype):
# Accept Arrow double even within a pandas DataFrame (eg, from schema_file=...)
return True
return dtype == np.float64
if isinstance(data, pa.Table):
field_type = data.schema.field(column).type
return pa.types.is_float64(field_type)
raise TypeError(f"Unsupported data type: {type(data)}")

for column in columns:
if not _check_column_is_double_precision(column):
warnings.warn(
f"Column '{column}' is not double-precision float. "
"This may lead to inaccurate pixel mapping. "
"Consider converting this column to float64 for better precision.",
UserWarning,
)


def _map_chunk_to_pixels(data, highest_order, ra_column, dec_column, use_healpix_29):
"""Compute healpix mapping for a chunk of catalog data.

Returns:
np.ndarray: healpix pixel numbers corresponding to each row in the input data."""
if use_healpix_29:
if isinstance(data, pd.DataFrame) and data.index.name == SPATIAL_INDEX_COLUMN:
return spatial_index_to_healpix(data.index, target_order=highest_order)
return spatial_index_to_healpix(data[SPATIAL_INDEX_COLUMN], target_order=highest_order)
if isinstance(data, pd.DataFrame):
return hp.radec2pix(
highest_order,
data[ra_column].to_numpy(copy=False, dtype=float),
data[dec_column].to_numpy(copy=False, dtype=float),
)
return hp.radec2pix(highest_order, data[ra_column].to_numpy(), data[dec_column].to_numpy())


def _iterate_input_file(
input_file: UPath,
pickled_reader_file: str,
Expand All @@ -46,34 +100,25 @@ def _iterate_input_file(
use_healpix_29=False,
read_columns=None,
):
"""Helper function to handle input file reading and healpix pixel calculation"""
"""Helper function to handle input file reading and healpix pixel calculation.

Yields:
Tuple[int, pd.DataFrame | pa.Table, np.ndarray]:
Chunk index, chunk data, and mapped pixel numbers for the current batch.
"""
with open(pickled_reader_file, "rb") as pickle_file:
file_reader = cloudpickle.load(pickle_file)
if not file_reader:
raise NotImplementedError("No file reader implemented")

first_iteration = True
for chunk_number, data in enumerate(file_reader.read(input_file, read_columns=read_columns)):
if use_healpix_29:
if isinstance(data, pd.DataFrame) and data.index.name == SPATIAL_INDEX_COLUMN:
mapped_pixels = spatial_index_to_healpix(data.index, target_order=highest_order)
else:
mapped_pixels = spatial_index_to_healpix(
data[SPATIAL_INDEX_COLUMN], target_order=highest_order
)
else:
# Set up the pixel data
if isinstance(data, pd.DataFrame):
mapped_pixels = hp.radec2pix(
highest_order,
data[ra_column].to_numpy(copy=False, dtype=float),
data[dec_column].to_numpy(copy=False, dtype=float),
)
else:
mapped_pixels = hp.radec2pix(
highest_order,
data[ra_column].to_numpy(),
data[dec_column].to_numpy(),
)
if first_iteration and not use_healpix_29:
# Only check for single-precision warning on the first chunk to
# avoid redundant warnings in large files.
_warn_if_not_double_precision_columns(data, [ra_column, dec_column])
first_iteration = False
mapped_pixels = _map_chunk_to_pixels(data, highest_order, ra_column, dec_column, use_healpix_29)
yield chunk_number, data, mapped_pixels


Expand Down Expand Up @@ -193,6 +238,7 @@ def split_pixels(
try:
with open(alignment_file, "rb") as pickle_file:
alignment = pickle.load(pickle_file)

for chunk_number, data, mapped_pixels in _iterate_input_file(
input_file, pickled_reader_file, highest_order, ra_column, dec_column, use_healpix_29
):
Expand Down
100 changes: 100 additions & 0 deletions tests/hats_import/catalog/test_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,82 @@ def test_map_with_schema(tmp_path, mixed_schema_csv_dir, mixed_schema_csv_parque
assert (result == expected).all()


def test_map_raises_single_precision_warning(tmp_path, small_sky_single_file):
"""Test that a warning is raised when single-precision floats are used for ra/dec columns"""
# Test that the warning is raised when mapping with single-precision ra/dec
with pytest.warns(UserWarning, match="is not double-precision"):
mr.map_to_pixels(
input_file=small_sky_single_file,
pickled_reader_file=pickle_file_reader(
tmp_path, get_file_reader("csv", type_map={"ra": np.float32, "dec": np.float32})
),
highest_order=0,
ra_column="ra",
dec_column="dec",
resume_path=tmp_path,
mapping_key="map_0",
)


def test_map_unsupported_data_type(tmp_path, small_sky_single_file):
"""Test that a TypeError is raised when file reader returns an unsupported data type"""

class UnsupportedDataTypeReader: # pylint: disable=too-few-public-methods
"""Mock file reader that returns an unsupported data type (dict instead of DataFrame/Table)"""

def read(self, _input_file, read_columns=None): # pylint: disable=unused-argument
# Yield data as dict instead of DataFrame or PyArrow Table
yield 0, {"ra": [1.0, 2.0, 3.0], "dec": [4.0, 5.0, 6.0]}

(tmp_path / "row_count_histograms").mkdir(parents=True)
with pytest.raises(TypeError, match="Unsupported data type"):
mr.map_to_pixels(
input_file=small_sky_single_file,
pickled_reader_file=pickle_file_reader(tmp_path, UnsupportedDataTypeReader()),
highest_order=0,
ra_column="ra",
dec_column="dec",
resume_path=tmp_path,
mapping_key="map_0",
)


def test_map_with_healpix_29_arrow_table(tmp_path, formats_dir):
"""Test mapping with pre-existing healpix_29 column as PyArrow Table"""

class ArrowTableReader: # pylint: disable=too-few-public-methods
"""Mock file reader that returns a PyArrow Table with _healpix_29 column"""

def read(self, input_file, read_columns=None): # pylint: disable=unused-argument
# Read the CSV file with spatial_index column
df = pd.read_csv(input_file)
# Convert to PyArrow Table
table = pa.Table.from_pandas(df)
yield table

(tmp_path / "row_count_histograms").mkdir(parents=True)
input_file = formats_dir / "spatial_index.csv"

# Test that mapping works with PyArrow Table and use_healpix_29=True
mr.map_to_pixels(
input_file=input_file,
pickled_reader_file=pickle_file_reader(tmp_path, ArrowTableReader()),
highest_order=0,
ra_column="NOPE",
dec_column="NOPE",
use_healpix_29=True,
resume_path=tmp_path,
mapping_key="map_0",
)

result = read_partial_histogram(tmp_path, "map_0")

assert len(result) == 12
expected = hist.empty_histogram(0)
expected[11] = 131
npt.assert_array_equal(result, expected)


def test_map_small_sky_order0(tmp_path, small_sky_single_file):
"""Test loading the small sky catalog and partitioning each object into the same large bucket"""
(tmp_path / "histograms").mkdir(parents=True)
Expand Down Expand Up @@ -330,6 +406,30 @@ def test_split_pixels_headers(formats_headers_csv, assert_parquet_file_ids, tmp_
assert not os.path.exists(file_name)


def test_split_pixels_single_precision_warning(small_sky_single_file, tmp_path):
"""Test that a warning is raised when single-precision floats are used for ra/dec columns"""
plan = ResumePlan(tmp_path=tmp_path, progress_bar=False, input_paths=["foo1"])
raw_histogram = np.full(12, 0)
raw_histogram[11] = 131
alignment_file = plan.get_alignment_file(raw_histogram, -1, 0, 0, 1_000, False, 131)

# Test that the warning is raised when mapping with single-precision ra/dec
with pytest.warns(UserWarning, match="is not double-precision"):
mr.split_pixels(
input_file=small_sky_single_file,
pickled_reader_file=pickle_file_reader(
tmp_path, get_file_reader("csv", type_map={"ra": np.float32, "dec": np.float32})
),
highest_order=0,
ra_column="ra",
dec_column="dec",
splitting_key="0",
cache_shard_path=tmp_path,
resume_path=tmp_path,
alignment_file=alignment_file,
)


def test_reduce_idempotent(parquet_shards_dir, assert_parquet_file_ids, tmp_path):
"""Reduce the shards, then reduce them again - make sure no error is thrown the second time."""
shutil.copytree(parquet_shards_dir, tmp_path / "shards")
Expand Down