Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bmsdna/lakeapi/context/df_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def get_pyarrow_dataset(
ab_uri, ab_opts = uri.get_uri_options(flavor="fsspec")
pd = pandas.read_json(
ab_uri,
storage_options=ab_opts,
storage_options=dict(ab_opts) if ab_opts else None,
orient="records",
lines=file_type == "ndjson",
)
Expand Down
2 changes: 1 addition & 1 deletion bmsdna/lakeapi/context/df_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ def register_datasource(
self.con,
ab_uri,
target_name,
storage_options=uri_opts,
storage_options=dict(uri_opts) if uri_opts else None,
conditions=filters,
use_fsspec=os.getenv("DUCKDB_DELTA_USE_FSSPEC", "0") == "1",
limit=limit,
Expand Down
14 changes: 10 additions & 4 deletions bmsdna/lakeapi/context/df_polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,14 @@ def register_datasource(
from deltalake2db import polars_scan_delta, get_polars_schema

if meta_only or limit == 0:
schema = get_polars_schema(db_uri, storage_options=db_opts)
schema = get_polars_schema(
db_uri, storage_options=dict(db_opts) if db_opts else None
)
df = pl.DataFrame(schema=schema)
else:
df = polars_scan_delta(
db_uri,
storage_options=db_opts,
storage_options=dict(db_opts) if db_opts else None,
conditions=filters,
limit=limit,
)
Expand All @@ -261,9 +263,13 @@ def register_datasource(
f"Delta table version {ab_uri} not supported"
) from de
case "parquet":
df = pl.scan_parquet(ab_uri, storage_options=uri_opts)
df = pl.scan_parquet(
ab_uri, storage_options=dict(uri_opts) if uri_opts else None
)
case "arrow":
df = pl.scan_ipc(ab_uri, storage_options=uri_opts)
df = pl.scan_ipc(
ab_uri, storage_options=dict(uri_opts) if uri_opts else None
)
case "csv" if uri_opts is None:
df = pl.scan_csv(ab_uri)
case "csv" if uri_opts is not None:
Expand Down
4 changes: 2 additions & 2 deletions bmsdna/lakeapi/context/source_uri.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from typing import Callable, Literal, TYPE_CHECKING, Union
from typing import Any, Callable, Literal, TYPE_CHECKING, Mapping, Union
import fsspec
import adlfs
import os
Expand Down Expand Up @@ -91,7 +91,7 @@ def get_fs_spec(self) -> tuple[fsspec.AbstractFileSystem, str]:

def get_uri_options(
self, *, flavor: Literal["fsspec", "object_store", "original"]
) -> tuple[str, dict | None]:
) -> tuple[str, Mapping[str, Any] | None]:
return _convert_options(
self.real_uri,
self.accounts.get(self.account) if self.account else None,
Expand Down
28 changes: 7 additions & 21 deletions bmsdna/lakeapi/utils/meta_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,27 @@
_cached_meta: dict[SourceUri, DeltaTableMeta] = {}

_global_duck_con: Optional[duckdb.DuckDBPyConnection] = None
_global_duck_meta_engine: Optional[DuckDBMetaEngine] = None


def get_deltalake_meta(use_polars: bool, uri: SourceUri):
global _global_duck_con
global _global_duck_meta_engine
if use_polars:
ab_uri, ab_opts = uri.get_uri_options(flavor="object_store")

meta_engine = PolarsMetaEngine(ab_opts)
meta_engine = PolarsMetaEngine()
else:
if _global_duck_con is None:
_global_duck_con = duckdb.connect(":memory:")
_global_duck_meta_engine = DuckDBMetaEngine(_global_duck_con)
ab_uri, ab_opts = uri.get_uri_options(flavor="original")

if not uri.is_local():
if os.getenv("DUCKDB_DELTA_USE_FSSPEC", "0") == "1" and "://" in ab_uri:
account_name_path = get_account_name_from_path(ab_uri)
fake_protocol = apply_storage_options_fsspec(
_global_duck_con,
ab_uri,
ab_opts or {},
account_name_path=account_name_path,
)
ab_uri = fake_protocol + "://" + ab_uri.split("://")[1]
else:
duckdb_apply_storage_options(
_global_duck_con,
ab_uri,
ab_opts,
use_fsspec=os.getenv("DUCKDB_DELTA_USE_FSSPEC", "0") == "1",
)
meta_engine = DuckDBMetaEngine(_global_duck_con)
assert _global_duck_meta_engine is not None
meta_engine = _global_duck_meta_engine

if mt := _cached_meta.get(uri):
mt.update_incremental(meta_engine)
return mt
mt = _get_deltalake_meta(meta_engine, ab_uri)
mt = _get_deltalake_meta(meta_engine, ab_uri, ab_opts)
_cached_meta[uri] = mt
return mt
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "bmsdna-lakeapi"
version = "0.27.1"
version = "0.27.2"
description = ""
authors = [{ name = "DWH Team", email = "[email protected]" }]
dependencies = [
Expand All @@ -15,9 +15,9 @@ dependencies = [
"expandvars >=0.12.0",
"fsspec >=2024.2.0,<2025",
"adlfs >=2024.2.0,<2025",
"deltalake2db >=1.1.4",
"deltalake2db >=1.2.0",
]
requires-python = "~=3.10"
requires-python = "~=3.11"

[project.scripts]
validate_lakeapi_schema = "bmsdna.lakeapi.tools.validateschema:validate_schema_cli"
Expand Down
Loading
Loading