Skip to content

Conversation

@iguinn
Copy link
Contributor

@iguinn iguinn commented Oct 4, 2025

Added functions to LH5Iterator:
map takes an input function with arguments of type Table and LH5Iterator and applies it to each block of data, returning a list of the results (similar to the python builtin map). In addition, the user can provide:

  • An aggregator function that will in some way combine the results of this function (e.g., can join tables column wise or add together results).
  • Begin and terminate functions to do things before loops begin/end
  • Either a concurrent.futures.Executor or a number of processes to run (using ProcessPoolExecutor). This will divide the iterator into equal(ish) sized chunks of files/groups and submit each chunk to the Executor. If this is used, results will be returned as an iterator over futures objects (see concurrent.futures.Executor.map)

query takes an input function with arguments of type Table and LH5Iterator and returns a Table, pandas dataframe, awkward array, or numpy array with some sort of processing (including down-selection) applied. This function is mapped over the full dataset pointed to by the iterator, and the results are merged into a single Table/dataframe/awkward array. This can be used with multiprocessing.

  • In addition, a pandas query can be provided as a string; this will return a pandas dataframe with the query used to select which entries to include. This feature could be expanded for awkward and other datatypes in the future (see

    Secure Table.eval() #135 for why this is not done yet)

hist takes a list of Hist.axis specifications, a query function/str, and a list of keys, and builds a histogram out of the queried data (filling it iteratively so that we never have to hold the full table in memory at once). This can be used with multiprocessing.

In addition, some other functions were implemented to support these:

  • deepcopy is defined; this deep-copies everything except LH5Store, for which a new Store is constructed (since h5py objects are not all deepcopy-able)
  • getstate and setstate are defined to enable pickling of an LH5Iterator. Pickling is used by multiprocessing to communicate data from the parent process to child processes. This requires skipping the LH5Store when writing, and constructing a new one when reading.
  • _select_groups is a helper function that reduces the files and groups iterated over to some slice. This is used by...
  • _generate_workers splits the iterator into n equal(ish) iterators that are passed to each of the processes when using multi-processing
  • Additional helper functions used by map, query, and hist

Added tests for map, query and hist.

@codecov
Copy link

codecov bot commented Oct 4, 2025

Codecov Report

❌ Patch coverage is 80.47337% with 33 lines in your changes missing coverage. Please review.
✅ Project coverage is 80.86%. Comparing base (7428202) to head (fd94d58).

Files with missing lines Patch % Lines
src/lgdo/lh5/iterator.py 80.35% 33 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #192      +/-   ##
==========================================
- Coverage   80.91%   80.86%   -0.05%     
==========================================
  Files          47       47              
  Lines        3747     3910     +163     
==========================================
+ Hits         3032     3162     +130     
- Misses        715      748      +33     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@gipert
Copy link
Member

gipert commented Oct 5, 2025

Very cool functionality! i think we should merge it as you are not changing existing code. But maybe I would suggest writing a documentation page (or notebook) about the iterator? at this point there is quite some functionality and i think it's worth having a new section in the docs

@iguinn
Copy link
Contributor Author

iguinn commented Oct 5, 2025

Hi Luigi, yes, I would like to merge this soon, but I agree there's a few things to do first. (this is the same changes as another pull request that was getting messy due to cross-dependencies on other PRs, so I resubmitted)

@gipert
Copy link
Member

gipert commented Nov 28, 2025

hi @iguinn, I'm trying this out for the ssc (big) data and it works nice! can we add a little more docs and merge?

Parameters
----------
fun:
function with signature fun(lh5_obj: Table, it: LH5Iterator) -> Any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
function with signature fun(lh5_obj: Table, it: LH5Iterator) -> Any
function with signature ``fun(lh5_obj: Table, it: LH5Iterator) -> Any``

Comment on lines +800 to +805
"""Map function over iterator blocks and return order-preserving list
of outputs. Can be multi-threaded provided there are no attempts to
modify existing objects. Multi-threading splits the iterator into
multiple independent streams with an approximately equal number of
files/groups, concurrently processed under a single program multiple
data model. Results will be returned asynchronously for each process.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Map function over iterator blocks and return order-preserving list
of outputs. Can be multi-threaded provided there are no attempts to
modify existing objects. Multi-threading splits the iterator into
multiple independent streams with an approximately equal number of
files/groups, concurrently processed under a single program multiple
data model. Results will be returned asynchronously for each process.
"""Map function over iterator blocks.
Returns order-preserving list of outputs. Can be multi-threaded
provided there are no attempts to modify existing objects.
Multi-threading splits the iterator into multiple independent
streams with an approximately equal number of files/groups,
concurrently processed under a single program multiple data
model. Results will be returned asynchronously for each process.

multiple independent streams with an approximately equal number of
files/groups, concurrently processed under a single program multiple
data model. Results will be returned asynchronously for each process.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a simple example here?

Examples
--------
>>> it = LH5Interator(...)
>>>
>>> def fun():

etc

number of processes. If ``None``, use number equal to threads available
to ``executor`` (if provided), or else do not parallelize
executor:
`concurrent.futures.Executor <https://docs.python.org/3/library/concurrent.futures.html>`_
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`concurrent.futures.Executor <https://docs.python.org/3/library/concurrent.futures.html>`_
:class:`concurrent.futures.Executor`

does this work?

Comment on lines +871 to +872
Query the data files in the iterator and return the selected data
as a single table in one of several formats.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Query the data files in the iterator and return the selected data
as a single table in one of several formats.
Query the data files in the iterator.
Returns the selected data as a single table in one of several formats.

docstrings should always be: one-line description and then full description

- ``pandas.DataFrame``: pandas dataframe. Treat as mapping from column
name to values
- A string expression. This will call `pd.DataFrame.query <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.query.html>`_ and return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- A string expression. This will call `pd.DataFrame.query <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.query.html>`_ and return
- A string expression. This will call :meth:`pd.DataFrame.query` and return

this should also work, to be tested

number of processes. If ``None``, use number equal to threads available
to ``executor`` (if provided), or else do not parallelize
executor:
`concurrent.futures.Executor <https://docs.python.org/3/library/concurrent.futures.html>`_
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use intersphinx mapping here too

Parameters
----------
ax:
Axis object(s) used to construct the histogram. Can provide a ``Hist``
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Axis object(s) used to construct the histogram. Can provide a ``Hist``
Axis object(s) used to construct the histogram. Can provide a :class:``.types.hist.Hist`

or whatever is the right path

- ``Mapping[str, ArrayLike]``: mapping from axis name to values
- ``pandas.DataFrame``: pandas dataframe. Treat as mapping from column
name to values
- A string expression. This will call `pd.DataFrame.query <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.query.html>`_ and return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intersphinx reference

number of processes. If ``None``, use number equal to threads available
to ``executor`` (if provided), or else do not parallelize
executor:
`concurrent.futures.Executor <https://docs.python.org/3/library/concurrent.futures.html>`_
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intersphinx

object for managing parallelism. If ``None``, create a ``ProcessPoolExecutor`` with
number of processes equal to ``processes``.
hist_kwargs:
additional keyword arguments for constructing Hist. See `hist.Hist`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
additional keyword arguments for constructing Hist. See `hist.Hist`.
additional keyword arguments for constructing Hist. See :class:`.types.hist.Hist`.

elif isinstance(data, Collection):
hist.fill(*data)
else:
msg = "data returned by where is not compatible with hist. Must be a 1d or 2d numpy array, a list of arrays, or a mapping from str to array"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
msg = "data returned by where is not compatible with hist. Must be a 1d or 2d numpy array, a list of arrays, or a mapping from str to array"
msg = (
"data returned by where is not compatible with hist. Must "
"be a 1d or 2d numpy array, a list of arrays, or a mapping from str to array"
)

Copy link
Member

@gipert gipert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks Ian! I have some suggestions on how to improve the docstrings

@gipert
Copy link
Member

gipert commented Dec 2, 2025

could you also update the docs for the buffer_len arg of LH5Iterator? there's no mention to inputting it in units of bytes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants