Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions src/hats_import/catalog/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ class ResumePlan(PipelineResumePlan):
SPLITTING_STAGE = "splitting"
REDUCING_STAGE = "reducing"

HISTOGRAM_BINARY_FILE = "mapping_histogram.npz"
HISTOGRAMS_DIR = "histograms"
LEGACY_HISTOGRAM_BINARY_FILE = "mapping_histogram.npz"
LEGACY_HISTOGRAMS_DIR = "histograms"
ROW_COUNT_HISTOGRAM_BINARY_FILE = "row_count_mapping_histogram.npz"
ROW_COUNT_HISTOGRAMS_DIR = "row_count_histograms"

ALIGNMENT_FILE = "alignment.pickle"

# pylint: disable=too-many-arguments
Expand Down Expand Up @@ -114,7 +117,7 @@ def gather_plan(self, run_stages: list[str] | None = None):
if self.should_run_mapping:
self.map_files = self.get_remaining_map_keys()
file_io.make_directory(
file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR),
file_io.append_paths_to_pointer(self.tmp_path, self.ROW_COUNT_HISTOGRAMS_DIR),
exist_ok=True,
)
if self.should_run_splitting:
Expand Down Expand Up @@ -144,7 +147,7 @@ def get_remaining_map_keys(self):
Returns:
list of mapping keys *not* found in files like /resume/path/mapping_key.npz
"""
prefix = file_io.get_upath(self.tmp_path) / self.HISTOGRAMS_DIR
prefix = file_io.get_upath(self.tmp_path) / self.ROW_COUNT_HISTOGRAMS_DIR
map_file_pattern = re.compile(r"map_(\d+).npz")
done_indexes = [int(map_file_pattern.match(path.name).group(1)) for path in prefix.glob("*.npz")]
remaining_indexes = list(set(range(0, len(self.input_paths))) - (set(done_indexes)))
Expand All @@ -157,24 +160,34 @@ def read_histogram(self, healpix_order):
- Otherwise, combine histograms from partials
- Otherwise, return an empty histogram
"""
file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE)
if not file_io.does_file_or_directory_exist(file_name):
# Read the histogram from partial histograms and combine.
file_name = file_io.append_paths_to_pointer(self.tmp_path, self.ROW_COUNT_HISTOGRAM_BINARY_FILE)
legacy_file_name = file_io.append_paths_to_pointer(self.tmp_path, self.LEGACY_HISTOGRAM_BINARY_FILE)

# Fall back to the legacy histogram file name, if needed.
if not file_io.does_file_or_directory_exist(file_name) and file_io.does_file_or_directory_exist(
legacy_file_name
):
file_name = legacy_file_name

# Otherwise, read the histogram from partial histograms and combine.
elif not file_io.does_file_or_directory_exist(file_name):
remaining_map_files = self.get_remaining_map_keys()
if len(remaining_map_files) > 0:
raise RuntimeError(f"{len(remaining_map_files)} map stages did not complete successfully.")
histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "*.npz")
histogram_files = file_io.find_files_matching_path(
self.tmp_path, self.ROW_COUNT_HISTOGRAMS_DIR, "*.npz"
)
aggregate_histogram = HistogramAggregator(healpix_order)
for partial_file_name in histogram_files:
partial = SparseHistogram.from_file(partial_file_name)
aggregate_histogram.add(partial)

file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE)
file_name = file_io.append_paths_to_pointer(self.tmp_path, self.ROW_COUNT_HISTOGRAM_BINARY_FILE)
with open(file_name, "wb+") as file_handle:
file_handle.write(aggregate_histogram.full_histogram)
if self.delete_resume_log_files:
file_io.remove_directory(
file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR),
file_io.append_paths_to_pointer(self.tmp_path, self.ROW_COUNT_HISTOGRAMS_DIR),
ignore_errors=True,
)

Expand All @@ -200,10 +213,10 @@ def partial_histogram_file(cls, tmp_path, mapping_key: str):
mapping_key (str): unique string for each mapping task (e.g. "map_57")
"""
file_io.make_directory(
file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR),
file_io.append_paths_to_pointer(tmp_path, cls.ROW_COUNT_HISTOGRAMS_DIR),
exist_ok=True,
)
return file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR, f"{mapping_key}.npz")
return file_io.append_paths_to_pointer(tmp_path, cls.ROW_COUNT_HISTOGRAMS_DIR, f"{mapping_key}.npz")

def get_remaining_split_keys(self):
"""Gather remaining keys, dropping successful split tasks from done file names.
Expand Down Expand Up @@ -266,7 +279,7 @@ def get_alignment_file(
highest_healpix_order (int): the highest healpix order (e.g. 5-10)
lowest_healpix_order (int): the lowest healpix order (e.g. 1-5). specifying a lowest order
constrains the partitioning to prevent spatially large pixels.
threshold (int): the maximum number of objects allowed in a single pixel
pixel_threshold (int): the maximum number of objects allowed in a single pixel
drop_empty_siblings (bool): if 3 of 4 pixels are empty, keep only the non-empty pixel
expected_total_rows (int): number of expected rows found in the dataset.

Expand Down
4 changes: 2 additions & 2 deletions tests/hats_import/catalog/test_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ def test_read_bad_fileformat(blank_data_file, capsys, tmp_path):

def read_partial_histogram(tmp_path, mapping_key):
"""Helper to read in the former result of a map operation."""
histogram_file = tmp_path / "histograms" / f"{mapping_key}.npz"
histogram_file = tmp_path / "row_count_histograms" / f"{mapping_key}.npz"
hist = SparseHistogram.from_file(histogram_file)
return hist.to_array()


def test_read_single_fits(tmp_path, formats_fits):
"""Success case - fits file that exists being read as fits"""
(tmp_path / "histograms").mkdir(parents=True)
(tmp_path / "row_count_histograms").mkdir(parents=True)
mr.map_to_pixels(
input_file=formats_fits,
pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("fits")),
Expand Down
6 changes: 3 additions & 3 deletions tests/hats_import/catalog/test_run_round_trip.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,20 +551,20 @@ def assert_stage_level_files_exist(base_intermediate_dir):
# `small_sky_object_catalog` at order 0.
expected_contents = [
"alignment.pickle",
"histograms", # directory containing sub-histograms
"input_paths.txt", # original input paths for subsequent comparison
"mapping_done", # stage-level done file
"mapping_histogram.npz", # concatenated histogram file
"order_0", # all intermediate parquet files
"reader.pickle", # pickled InputReader
"reducing", # directory containing task-level done files
"reducing_done", # stage-level done file
"row_count_histograms", # directory containing sub-histograms
"row_count_mapping_histogram.npz", # concatenated histogram file
"splitting", # directory containing task-level done files
"splitting_done", # stage-level done file
]
assert_directory_contains(base_intermediate_dir, expected_contents)

checking_dir = base_intermediate_dir / "histograms"
checking_dir = base_intermediate_dir / "row_count_histograms"
assert_directory_contains(
checking_dir, ["map_0.npz", "map_1.npz", "map_2.npz", "map_3.npz", "map_4.npz", "map_5.npz"]
)
Expand Down