Skip to content

Commit 597456c

Browse files
committed
always use fsspec
1 parent 65109ef commit 597456c

File tree

1 file changed

+26
-54
lines changed
  • src/nested_pandas/nestedframe

1 file changed

+26
-54
lines changed

src/nested_pandas/nestedframe/io.py

Lines changed: 26 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,19 @@
11
# typing.Self and "|" union syntax don't exist in Python 3.9
22
from __future__ import annotations
3-
import os
4-
import io
53

4+
from urllib.parse import urlparse
5+
6+
import fsspec
67
import pandas as pd
7-
from pandas._libs import lib
8+
import pyarrow as pa
9+
import pyarrow.parquet as pq
810
from pandas._typing import (
9-
DtypeBackend,
1011
FilePath,
1112
ReadBuffer,
1213
)
13-
import pyarrow as pa
14-
import pyarrow.dataset as ds
15-
import fsspec
16-
import pyarrow.parquet as pq
17-
18-
import requests
19-
import fsspec
20-
from urllib.parse import urlparse
2114

22-
from .core import NestedFrame
2315
from ..series.dtype import NestedDtype
16+
from .core import NestedFrame
2417

2518

2619
def read_parquet(
@@ -31,7 +24,7 @@ def read_parquet(
3124
"""
3225
Load a parquet object from a file path into a NestedFrame.
3326
34-
As a deviation from `pandas`, this function loads via
27+
As a deviation from `pandas`, this function loads via
3528
`pyarrow.parquet.read_table`, and then converts to a NestedFrame.
3629
3730
Parameters
@@ -75,24 +68,9 @@ def read_parquet(
7568
elif isinstance(reject_nesting, str):
7669
reject_nesting = [reject_nesting]
7770

78-
# TODO: This potentially can't read remote files
79-
# maybe load into a pyarrow.dataset first
80-
"""
81-
if isinstance(data, str):
82-
# Check if the file is a URL
83-
if data.startswith("http://") or data.startswith("https://"):
84-
# Use fsspec to open the file
85-
fs = fsspec.filesystem("http")
86-
with fs.open(data) as f:
87-
table = pq.read_table(f, columns=columns)
88-
else:
89-
# Use pyarrow to read the file directly
90-
table = pq.read_table(data, columns=columns)
91-
"""
92-
9371
# First load through pyarrow
94-
# This will handle local files, http(s) and s3
95-
table = _fs_read_table(data, use_fsspec=True, columns=columns)
72+
# This wrapper will handle local files, http(s) and s3
73+
table = _fs_read_table(data, columns=columns)
9674

9775
# Resolve partial loading of nested structures
9876
# Using pyarrow to avoid naming conflicts from partial loading ("flux" vs "lc.flux")
@@ -106,16 +84,15 @@ def read_parquet(
10684
# get the top-level column name
10785
nested_col = col_in.split(".")[0]
10886
if nested_col not in reject_nesting:
109-
if nested_col not in nested_structures.keys():
87+
if nested_col not in nested_structures:
11088
nested_structures[nested_col] = [i]
11189
else:
11290
nested_structures[nested_col].append(i)
11391

114-
#print(nested_structures)
11592
# Check for full and partial load of the same column and error
11693
# Columns in the reject_nesting will not be checked
11794
for col in columns:
118-
if col in nested_structures.keys():
95+
if col in nested_structures:
11996
raise ValueError(
12097
f"The provided column list contains both a full and partial "
12198
f"load of the column '{col}'. This is not allowed as the partial "
@@ -133,7 +110,7 @@ def read_parquet(
133110
field_names = [table.column_names[i] for i in indices]
134111
structs[col] = pa.StructArray.from_arrays(
135112
[table.column(i).chunk(0) for i in indices], # Child arrays
136-
field_names # Field names
113+
field_names, # Field names
137114
)
138115
indices_to_remove.extend(indices)
139116

@@ -153,7 +130,6 @@ def read_parquet(
153130
df = NestedFrame(table.to_pandas(types_mapper=lambda ty: pd.ArrowDtype(ty), self_destruct=True))
154131
del table
155132

156-
#print(df.dtypes)
157133
# Attempt to cast struct columns to NestedDTypes
158134
df = _cast_struct_cols_to_nested(df, reject_nesting)
159135

@@ -183,16 +159,14 @@ def _cast_struct_cols_to_nested(df, reject_nesting):
183159
return df
184160

185161

186-
def _fs_read_table(uri, use_fsspec=True, headers=None, **kwargs):
162+
def _fs_read_table(uri, headers=None, **kwargs):
187163
"""
188164
A smart wrapper around `pq.read_table` that handles multiple filesystems.
189165
190166
Parameters
191167
----------
192168
uri (str):
193169
path or URI to a Parquet file
194-
use_fsspec (bool):
195-
whether to use fsspec for URI handling (e.g., for S3)
196170
headers (dict):
197171
headers for HTTP requests (optional)
198172
kwargs:
@@ -210,21 +184,19 @@ def _fs_read_table(uri, use_fsspec=True, headers=None, **kwargs):
210184

211185
# --- HTTP/HTTPS via requests ---
212186
elif parsed.scheme in ("http", "https"):
213-
if use_fsspec:
214-
fs = fsspec.filesystem("http")
215-
with fs.open(uri, mode="rb") as f:
216-
return pq.read_table(f, **kwargs)
217-
else:
218-
response = requests.get(uri, headers=headers or {}, stream=True)
219-
response.raise_for_status()
220-
buf = pa.BufferReader(response.content)
221-
return pq.read_table(buf, **kwargs)
222-
223-
# --- S3/GS/etc via fsspec ---
224-
elif use_fsspec:
225-
fs, path = fsspec.core.url_to_fs(uri)
226-
with fs.open(path, mode="rb") as f:
187+
fs = fsspec.filesystem("http")
188+
with fs.open(uri, mode="rb") as f:
227189
return pq.read_table(f, **kwargs)
228190

191+
# --- S3/GS/etc via fsspec ---
192+
# Try to use the url_to_fs function to get the filesystem (S3,GCS, etc)
229193
else:
230-
raise ValueError(f"Unsupported URI scheme: {parsed.scheme}")
194+
try:
195+
fs, path = fsspec.core.url_to_fs(uri)
196+
with fs.open(path, mode="rb") as f:
197+
return pq.read_table(f, **kwargs)
198+
except ValueError:
199+
raise ValueError(
200+
f"Unsupported URI scheme: {parsed.scheme}. Please use a local file, "
201+
"HTTP/HTTPS URL, or a supported filesystem (e.g., S3, GCS) with fsspec."
202+
)

0 commit comments

Comments
 (0)