Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added src/rail/core/.stage.py.swp
Binary file not shown.
29 changes: 20 additions & 9 deletions src/rail/core/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,8 @@ def input_iterator(self, tag: str, **kwargs: Any) -> Iterable:
groupname = kwargs.get("groupname", self.config.hdf5_groupname)
except Exception:
groupname = None
if groupname == "None":
groupname = None

chunk_size = kwargs.get("chunk_size", self.config.chunk_size)

Expand All @@ -518,9 +520,12 @@ def input_iterator(self, tag: str, **kwargs: Any) -> Iterable:
on_disk = True
if handle.data is not None:
in_memory = True

print("groupname", groupname is None, groupname)
if on_disk:
self._input_length = handle.size(groupname=groupname)
if groupname is None:
self._input_length = handle.size()
else:
self._input_length = handle.size(groupname=groupname)

total_chunks_needed = ceil(self._input_length / chunk_size)
# If the number of process is larger than we need, we reduce chunk_size
Expand All @@ -533,13 +538,19 @@ def input_iterator(self, tag: str, **kwargs: Any) -> Iterable:
chunk_size,
"to use all of the processes",
)

kwcopy = dict(
groupname=groupname,
chunk_size=chunk_size,
rank=self.rank,
parallel_size=self.size,
)
if groupname is None:
kwcopy = dict(
chunk_size=chunk_size,
rank=self.rank,
parallel_size=self.size,
)
else:
kwcopy = dict(
groupname=groupname,
chunk_size=chunk_size,
rank=self.rank,
parallel_size=self.size,
)
kwcopy.update(**kwargs)
return handle.iterator(**kwcopy)

Expand Down
Loading