Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions bmsdna/lakeapi/context/df_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
logger = get_logger(__name__)

ENABLE_COPY_TO = os.environ.get("ENABLE_COPY_TO", "0") == "1"
USE_DELTA_EXT = os.getenv("DUCKDB_DELTA_USE_EXT", "0") == "1"
USE_FSSPEC = os.getenv("DUCKDB_DELTA_USE_FSSPEC", "0") == "1"

DUCK_CONFIG = {}
DUCK_INIT_SCRIPTS: list[str] = []
Expand Down Expand Up @@ -237,6 +239,7 @@ def __init__(
):
super().__init__(chunk_size=chunk_size, engine_name="duckdb")
self.con = con.cursor()
self.con.execute("set azure_transport_option_type='curl'")
for ins in DUCK_INIT_SCRIPTS:
self.con.execute(ins)
self.res_con = None
Expand Down Expand Up @@ -402,7 +405,7 @@ def register_datasource(
self.con,
remote_uri,
remote_opts,
use_fsspec=os.getenv("DUCKDB_DELTA_USE_FSSPEC", "0") == "1",
use_fsspec=USE_FSSPEC,
)

if file_type == "json":
Expand Down Expand Up @@ -436,12 +439,13 @@ def register_datasource(
ab_uri, uri_opts = uri.get_uri_options(flavor="original")
duckdb_create_view_for_delta(
self.con,
get_deltalake_meta(False, uri),
get_deltalake_meta(False, uri) if not USE_DELTA_EXT else ab_uri,
target_name,
storage_options=uri_opts, # type: ignore
conditions=filters,
use_fsspec=os.getenv("DUCKDB_DELTA_USE_FSSPEC", "0") == "1",
use_fsspec=USE_FSSPEC,
limit=limit,
use_delta_ext=USE_DELTA_EXT,
)
return
if file_type == "duckdb":
Expand Down
9 changes: 9 additions & 0 deletions bmsdna/lakeapi/context/df_polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import json
from .source_uri import SourceUri
from sqlglot.dialects.postgres import Postgres
import os

USE_NATIVE_POLARS = os.getenv("USE_NATIVE_POLARS", "0") == "1"


class Polars(Postgres):
Expand Down Expand Up @@ -254,6 +257,12 @@ def register_datasource(
storage_options=db_opts if db_opts else None,
)
df = pl.DataFrame(schema=schema)
elif USE_NATIVE_POLARS:
db_uri, db_ots = uri.get_uri_options(flavor="object_store")

df = pl.scan_delta(
ab_uri, storage_options=dict(db_ots) if db_ots else None
)
else:
df = polars_scan_delta(
db_uri,
Expand Down
15 changes: 15 additions & 0 deletions bmsdna/lakeapi/utils/meta_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
)
from typing import Optional

from bmsdna.lakeapi.core.config import BasicConfig


_cached_meta: dict[SourceUri, DeltaTableMeta] = {}

Expand All @@ -26,6 +28,7 @@ def duck_meta_engine():
global _global_duck_meta_engine
if _global_duck_con is None:
_global_duck_con = duckdb.connect(":memory:")
_global_duck_con.execute("set azure_transport_option_type='curl'")
_global_duck_meta_engine = DuckDBMetaEngine(
_global_duck_con,
use_fsspec=os.getenv("DUCKDB_DELTA_USE_FSSPEC", "0") == "1",
Expand Down Expand Up @@ -53,3 +56,15 @@ def duck_meta_engine():
mt = _get_deltalake_meta(meta_engine, ab_uri, ab_opts)
_cached_meta[uri] = mt
return mt


def clear_deltalake_meta_cache():
global _cached_meta
global _global_duck_meta_engine
global _global_duck_con
_cached_meta.clear()
if _global_duck_meta_engine is not None:
_global_duck_meta_engine = None
if _global_duck_con is not None:
_global_duck_con.close()
_global_duck_con = None
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "bmsdna-lakeapi"
version = "0.27.5"
version = "0.27.6"
description = ""
authors = [{ name = "DWH Team", email = "[email protected]" }]
dependencies = [
Expand Down
10 changes: 5 additions & 5 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading