Skip to content

Commit 8917d07

Browse files
authored
Restore the ability to read remote directories (#393)
* Restore the support of reading remote dirs * Fix bugs pointed by the review * Do not support directories for HTTP * Fix a bug when reading directory twice * Address review comments
1 parent 960c0ec commit 8917d07

File tree

1 file changed

+104
-42
lines changed
  • src/nested_pandas/nestedframe

1 file changed

+104
-42
lines changed

src/nested_pandas/nestedframe/io.py

Lines changed: 104 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@
2020
FSSPEC_FILESYSTEMS = ("http", "https")
2121
FSSPEC_BLOCK_SIZE = 32 * 1024
2222

23+
# Filesystems for which calling .is_dir() may be very slow and/or .iterdir()
24+
# may yield non-parquet paths. See details in _read_parquet_into_table()
25+
# docstring.
26+
NO_ITERDIR_FILESYSTEMS = (
27+
"http",
28+
"https",
29+
)
30+
2331

2432
def read_parquet(
2533
data: str | UPath | bytes,
@@ -42,13 +50,16 @@ def read_parquet(
4250
it can be a single file name, directory name, or a remote path
4351
(e.g., HTTP/HTTPS or S3). If a file-like object is passed, it
4452
must support the ``read`` method. You can also pass a
45-
``filesystem`` keyword argument with a ``pyarrow.fs`` object, which will
46-
be passed along to the underlying file-reading method.
47-
A file URL can also be a path to a directory that contains multiple
53+
``filesystem`` keyword argument with a ``pyarrow.fs`` object, which
54+
will be passed along to the underlying file-reading method.
55+
A file URL can also be a path to a directory that contains multiple
4856
partitioned parquet files. Both pyarrow and fastparquet support
4957
paths to directories as well as file URLs. A directory path could be:
50-
``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``.
51-
If the path is to a single Parquet file, it will be loaded using
58+
``file://localhost/path/to/tables/`` or ``s3://bucket/partition_dir/``
59+
(note trailing slash for web locations, since it may be expensive
60+
to test a path for being a directory). Directory reading is not
61+
supported for HTTP(S). If the path is to a single Parquet file, it will
62+
be loaded using
5263
``fsspec.parquet.open_parquet_file``, which has optimized handling
5364
for remote Parquet files.
5465
columns : list, default=None
@@ -112,41 +123,7 @@ def read_parquet(
112123
elif isinstance(reject_nesting, str):
113124
reject_nesting = [reject_nesting]
114125

115-
# For single Parquet file paths, we want to use
116-
# `fsspec.parquet.open_parquet_file`. But for any other usage
117-
# (which includes file-like objects, directories and lists
118-
# thereof), we want to defer to `pq.read_table`.
119-
120-
# At the end of this block, `table` will contain the data.
121-
122-
# NOTE: the test for _is_local_dir is sufficient, because we're
123-
# preserving a path to pq.read_table, which can read local
124-
# directories, but not remote directories. Remote directories
125-
# cannot be read by either of these methods.
126-
if isinstance(data, str | Path | UPath) and not _is_local_dir(path_to_data := UPath(data)):
127-
storage_options = _get_storage_options(path_to_data)
128-
filesystem = kwargs.get("filesystem")
129-
if not filesystem:
130-
_, filesystem = _transform_read_parquet_data_arg(path_to_data)
131-
with fsspec.parquet.open_parquet_file(
132-
str(path_to_data),
133-
columns=columns,
134-
storage_options=storage_options,
135-
fs=filesystem,
136-
engine="pyarrow",
137-
) as parquet_file:
138-
table = pq.read_table(parquet_file, columns=columns, **kwargs)
139-
else:
140-
# All other cases, including file-like objects, directories, and
141-
# even lists of the foregoing.
142-
143-
# If `filesystem` is specified - use it, passing it as part of **kwargs
144-
if kwargs.get("filesystem") is not None:
145-
table = pq.read_table(data, columns=columns, **kwargs)
146-
else:
147-
# Otherwise convert with a special function
148-
data, filesystem = _transform_read_parquet_data_arg(data)
149-
table = pq.read_table(data, filesystem=filesystem, columns=columns, **kwargs)
126+
table = _read_parquet_into_table(data, columns, **kwargs)
150127

151128
# Resolve partial loading of nested structures
152129
# Using pyarrow to avoid naming conflicts from partial loading ("flux" vs "lc.flux")
@@ -206,14 +183,99 @@ def read_parquet(
206183
return from_pyarrow(table, reject_nesting=reject_nesting, autocast_list=autocast_list)
207184

208185

209-
def _is_local_dir(path_to_data: UPath):
186+
def _read_parquet_into_table(
187+
data: str | UPath | bytes,
188+
columns: list[str] | None,
189+
**kwargs,
190+
) -> pa.Table:
191+
"""Reads parquet file(s) from path and returns a pyarrow table.
192+
193+
For single Parquet file paths, we want to use
194+
`fsspec.parquet.open_parquet_file`. But for any other usage
195+
(which includes file-like objects, local directories and lists
196+
thereof), we want to defer to `pq.read_table`.
197+
198+
NOTE: the test for _is_local_dir is sufficient, because we're
199+
preserving a path to pq.read_table, which can read local
200+
directories, but not remote directories. Remote directories
201+
are supported separately via _read_parquet_directory.
202+
We don't support HTTP "directories", because 1) calling .is_dir()
203+
may be very expensive, because it downloads content first,
204+
2) because .iter_dir() is likely to return a lot of "junk"
205+
besides of the actual parquet files.
206+
"""
207+
if isinstance(data, str | Path | UPath) and not _is_local_dir(path_to_data := UPath(data)):
208+
storage_options = _get_storage_options(path_to_data)
209+
filesystem = kwargs.get("filesystem")
210+
if not filesystem:
211+
_, filesystem = _transform_read_parquet_data_arg(path_to_data)
212+
# Will not detect HTTP(S) directories.
213+
if _is_remote_dir(data, path_to_data):
214+
return _read_remote_parquet_directory(
215+
path_to_data, filesystem, storage_options, columns, **kwargs
216+
)
217+
with fsspec.parquet.open_parquet_file(
218+
path_to_data.path,
219+
columns=columns,
220+
storage_options=storage_options,
221+
fs=filesystem,
222+
engine="pyarrow",
223+
) as parquet_file:
224+
return pq.read_table(parquet_file, columns=columns, **kwargs)
225+
226+
# All other cases, including file-like objects, directories, and
227+
# even lists of the foregoing.
228+
229+
# If `filesystem` is specified - use it, passing it as part of **kwargs
230+
if kwargs.get("filesystem") is not None:
231+
return pq.read_table(data, columns=columns, **kwargs)
232+
233+
# Otherwise convert with a special function
234+
data, filesystem = _transform_read_parquet_data_arg(data)
235+
return pq.read_table(data, filesystem=filesystem, columns=columns, **kwargs)
236+
237+
238+
def _is_local_dir(upath: UPath) -> bool:
210239
"""Returns True if the given path refers to a local directory.
211240
212241
It's necessary to have this function, rather than simply checking
213242
``UPath(p).is_dir()``, because ``UPath.is_dir`` can be quite
214243
expensive in the case of a remote file path that isn't a directory.
215244
"""
216-
return path_to_data.protocol in ("", "file") and path_to_data.is_dir()
245+
return upath.protocol in ("", "file") and upath.is_dir()
246+
247+
248+
def _is_remote_dir(orig_data: str | Path | UPath, upath: UPath) -> bool:
249+
# Iterating over HTTP(S) directories is very difficult, let's just not do that.
250+
# See details in _read_parquet_into_table docstring.
251+
if upath.protocol in NO_ITERDIR_FILESYSTEMS:
252+
return False
253+
if str(orig_data).endswith("/"):
254+
return True
255+
return upath.is_dir()
256+
257+
258+
def _read_remote_parquet_directory(
259+
dir_upath: UPath, filesystem, storage_options, columns: list[str] | None, **kwargs
260+
) -> pa.Table:
261+
"""Read files one-by-one with fsspec.open_parquet_file and concat the result"""
262+
tables = []
263+
for upath in dir_upath.iterdir():
264+
# Go recursively for filesystems which support file/directory identification with fsspec file
265+
# handlers. This would work for e.g. S3, but not for HTTP(S).
266+
if upath.is_dir():
267+
table = _read_remote_parquet_directory(upath, filesystem, storage_options, columns, **kwargs)
268+
else:
269+
with fsspec.parquet.open_parquet_file(
270+
upath.path,
271+
columns=columns,
272+
storage_options=storage_options,
273+
fs=filesystem,
274+
engine="pyarrow",
275+
) as parquet_file:
276+
table = pq.read_table(parquet_file, columns=columns, **kwargs)
277+
tables.append(table)
278+
return pa.concat_tables(tables)
217279

218280

219281
def _get_storage_options(path_to_data: UPath):

0 commit comments

Comments
 (0)