Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/toast/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ install(FILES
observation_dist.py
observation_data.py
observation_view.py
observation_cache.py
pointing_utils.py
vis.py
rng.py
Expand Down
74 changes: 71 additions & 3 deletions src/toast/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
from .utils import Logger


obs_loader_name = "loader"


class Data(MutableMapping):
"""Class which represents distributed data

Expand Down Expand Up @@ -100,14 +103,16 @@ def all_local_detectors(self, selection=None, flagmask=0):
all_dets[d] = None
return list(all_dets.keys())

def detector_units(self, det_data):
def detector_units(self, det_data, ob_key="detector_data_units"):
"""Get the detector data units for a given field.

This verifies that the specified detector data field has the same
units in all observations where it occurs, and returns that unit.

Args:
det_data (str): The detector data field.
ob_key (str): If detector data is not currently loaded, this
optional dictionary specifies the units for each field.

Returns:
(Unit): The unit used across all observations.
Expand All @@ -117,8 +122,12 @@ def detector_units(self, det_data):
local_units = None
for ob in self.obs:
if det_data not in ob.detdata:
continue
ob_units = ob.detdata[det_data].units
if ob_key in ob:
ob_units = ob[ob_key][det_data]
else:
continue
else:
ob_units = ob.detdata[det_data].units
if local_units is None:
local_units = ob_units
else:
Expand Down Expand Up @@ -442,6 +451,65 @@ def select(
continue
return new_data

def using_loaders(self):
"""Pass through Data and determine if Loaders are being used

This is useful when an observation loader exists, but an operator wants to
perform other operations on an observation by calling load_exec() without
triggering the loading. The returned dictionary can later be passed to
`restore_loaders()`.

Returns:
(bool): True if any observation is using a loader.

"""
result = False
for ob in self.obs:
if hasattr(ob, obs_loader_name):
result = True
break
return result

def save_loaders(self):
"""Pass through Data and save the loader instances.

This is useful when an observation loader exists, but an operator wants to
perform other operations on an observation by calling load_exec() without
triggering the loading. The returned dictionary can later be passed to
`restore_loaders()`.

Returns:
(dict): The dictionary of loaders, indexed on the observation UID.

"""
loaders = dict()
for ob in self.obs:
if hasattr(ob, obs_loader_name):
loaders[ob.uid] = ob.loader
del ob.loader
else:
loaders[ob.uid] = None
return loaders

def restore_loaders(self, saved):
"""Pass through Data and restore loader instances.

This is useful when an observation loader exists, but an operator wants to
perform other operations on an observation by calling load_exec() without
triggering the loading. The input dictionary should be generated with
`save_loaders()`.

Args:
saved (dict): The dictionary of saved loader instances.

Returns:
None

"""
for ob in self.obs:
if saved[ob.uid] is not None:
setattr(ob, obs_loader_name, saved[ob.uid])

# Accelerator use

def accel_create(self, names):
Expand Down
98 changes: 98 additions & 0 deletions src/toast/observation_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Copyright (c) 2025-2025 by the parties listed in the AUTHORS file.
# All rights reserved. Use of this source code is governed by
# a BSD-style license that can be found in the LICENSE file.


import os
import hashlib
import glob

from .utils import Logger


def observation_hash(obs):
"""Create a unique hash of an observation.

The goal of this function is to ensure that if two observations are equal then
they have the same unique hash, and if they differ then they have different
hashes.

Args:
obs (Observation): The observation to hash

Returns:
(str): The unique hash string.

"""
# FIXME: Currently this function uses only information available on all
# processes. Instead, we should gather statistics per-detector across
# processes and include that in the hashing.

dhash = hashlib.md5()

bytes = obs.name.encode("utf-8")
dhash.update(bytes)

bytes = str(obs.uid).encode("utf-8")
dhash.update(bytes)

bytes = obs.telescope.name.encode("utf-8")
dhash.update(bytes)

bytes = str(obs.session).encode("utf-8")
dhash.update(bytes)

bytes = str(obs.all_detectors).encode("utf-8")
dhash.update(bytes)

hex = dhash.hexdigest()

return hex


def observation_cache(root_dir, obs, duplicates=None):
"""Get the cache directory for an observation.

This builds the path to an observation cache directory, and also checks if the
same observation exists with a different hash in the same root directory. If
`duplicates` is "warn", then a warning will be printed if any other observation
cache directories exist for the same observation. If `duplicates` is "fail",
then the existance of duplicates with raise an exception.

Args:
root_dir (str): The top level cache directory.
obs (Observation): The observation.
duplicates (str): The action to take for duplicates.

Returns:
(str): The observation cache directory.

"""
log = Logger.get()
hsh = observation_hash(obs)

obs_dir = f"{obs.name}_{hsh}"
cache_dir = os.path.join(root_dir, obs_dir)

if duplicates is not None:
# We care about duplicate observations. Check for those now.
check = None
if obs.comm.group_rank == 0:
check_str = os.path.join(root_dir, f"{obs.name}_*")
check = glob.glob(check_str)
if obs.comm.comm_group is not None:
check = obs.comm.comm_group.bcast(check, root=0)
if len(check) != 0:
# There are existing observation directories...
check_dirs = ", ".join([f"'{os.path.basename(x)}'" for x in check])
msg = f"{obs_dir}: found existing cache dirs {check_dirs}"
if duplicates == "warn":
log.warning_rank(msg, comm=obs.comm.comm_group)
elif duplicates == "fail":
log.error_rank(msg, comm=obs.comm.comm_group)
raise RuntimeError(msg)
else:
log.debug_rank(msg, comm=obs.comm.comm_group)

return cache_dir

2 changes: 2 additions & 0 deletions src/toast/ops/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ install(FILES
pipeline.py
delete.py
copy.py
create.py
reset.py
fill_gaps.py
arithmetic.py
Expand Down Expand Up @@ -64,6 +65,7 @@ install(FILES
pixels_wcs.py
filterbin.py
obsmat.py
accum_obs.py
save_hdf5.py
load_hdf5.py
noise_estimation.py
Expand Down
4 changes: 3 additions & 1 deletion src/toast/ops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

# Import Operators into our public API

from .accum_obs import AccumulateObservation
from .arithmetic import Combine
from .azimuth_intervals import AzimuthIntervals, AzimuthRanges
from .cadence_map import CadenceMap
from .calibrate import CalibrateDetectors
from .common_mode_noise import CommonModeNoise
from .conviqt import SimConviqt, SimTEBConviqt, SimWeightedConviqt
from .copy import Copy
from .create import Create
from .crosslinking import CrossLinking
from .delete import Delete
from .demodulation import Demodulate, StokesWeightsDemod
Expand Down Expand Up @@ -44,7 +46,7 @@
from .noise_weight import NoiseWeight
from .obsmat import ObsMat, coadd_observation_matrix
from .operator import Operator
from .pipeline import Pipeline
from .pipeline import Pipeline, PipelineLoader
from .pixels_healpix import PixelsHealpix
from .pixels_wcs import PixelsWCS
from .pointing import BuildPixelDistribution
Expand Down
Loading
Loading