diff --git a/bmsdna/lakeapi/context/df_duckdb.py b/bmsdna/lakeapi/context/df_duckdb.py index 83bfffb..74af650 100644 --- a/bmsdna/lakeapi/context/df_duckdb.py +++ b/bmsdna/lakeapi/context/df_duckdb.py @@ -30,6 +30,8 @@ logger = get_logger(__name__) ENABLE_COPY_TO = os.environ.get("ENABLE_COPY_TO", "0") == "1" +USE_DELTA_EXT = os.getenv("DUCKDB_DELTA_USE_EXT", "0") == "1" +USE_FSSPEC = os.getenv("DUCKDB_DELTA_USE_FSSPEC", "0") == "1" DUCK_CONFIG = {} DUCK_INIT_SCRIPTS: list[str] = [] @@ -237,6 +239,7 @@ def __init__( ): super().__init__(chunk_size=chunk_size, engine_name="duckdb") self.con = con.cursor() + self.con.execute("set azure_transport_option_type='curl'") for ins in DUCK_INIT_SCRIPTS: self.con.execute(ins) self.res_con = None @@ -402,7 +405,7 @@ def register_datasource( self.con, remote_uri, remote_opts, - use_fsspec=os.getenv("DUCKDB_DELTA_USE_FSSPEC", "0") == "1", + use_fsspec=USE_FSSPEC, ) if file_type == "json": @@ -436,12 +439,13 @@ def register_datasource( ab_uri, uri_opts = uri.get_uri_options(flavor="original") duckdb_create_view_for_delta( self.con, - get_deltalake_meta(False, uri), + get_deltalake_meta(False, uri) if not USE_DELTA_EXT else ab_uri, target_name, storage_options=uri_opts, # type: ignore conditions=filters, - use_fsspec=os.getenv("DUCKDB_DELTA_USE_FSSPEC", "0") == "1", + use_fsspec=USE_FSSPEC, limit=limit, + use_delta_ext=USE_DELTA_EXT, ) return if file_type == "duckdb": diff --git a/bmsdna/lakeapi/context/df_polars.py b/bmsdna/lakeapi/context/df_polars.py index ae060fe..e462f02 100644 --- a/bmsdna/lakeapi/context/df_polars.py +++ b/bmsdna/lakeapi/context/df_polars.py @@ -23,6 +23,9 @@ import json from .source_uri import SourceUri from sqlglot.dialects.postgres import Postgres +import os + +USE_NATIVE_POLARS = os.getenv("USE_NATIVE_POLARS", "0") == "1" class Polars(Postgres): @@ -254,6 +257,12 @@ def register_datasource( storage_options=db_opts if db_opts else None, ) df = pl.DataFrame(schema=schema) + elif USE_NATIVE_POLARS: + db_uri, db_ots = uri.get_uri_options(flavor="object_store") + + df = pl.scan_delta( + ab_uri, storage_options=dict(db_ots) if db_ots else None + ) else: df = polars_scan_delta( db_uri, diff --git a/bmsdna/lakeapi/utils/meta_cache.py b/bmsdna/lakeapi/utils/meta_cache.py index 639d1b4..374c8a4 100644 --- a/bmsdna/lakeapi/utils/meta_cache.py +++ b/bmsdna/lakeapi/utils/meta_cache.py @@ -9,6 +9,8 @@ ) from typing import Optional +from bmsdna.lakeapi.core.config import BasicConfig + _cached_meta: dict[SourceUri, DeltaTableMeta] = {} @@ -26,6 +28,7 @@ def duck_meta_engine(): global _global_duck_meta_engine if _global_duck_con is None: _global_duck_con = duckdb.connect(":memory:") + _global_duck_con.execute("set azure_transport_option_type='curl'") _global_duck_meta_engine = DuckDBMetaEngine( _global_duck_con, use_fsspec=os.getenv("DUCKDB_DELTA_USE_FSSPEC", "0") == "1", @@ -53,3 +56,15 @@ def duck_meta_engine(): mt = _get_deltalake_meta(meta_engine, ab_uri, ab_opts) _cached_meta[uri] = mt return mt + + +def clear_deltalake_meta_cache(): + global _cached_meta + global _global_duck_meta_engine + global _global_duck_con + _cached_meta.clear() + if _global_duck_meta_engine is not None: + _global_duck_meta_engine = None + if _global_duck_con is not None: + _global_duck_con.close() + _global_duck_con = None diff --git a/pyproject.toml b/pyproject.toml index 832e296..f5b02a8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "bmsdna-lakeapi" -version = "0.27.5" +version = "0.27.6" description = "" authors = [{ name = "DWH Team", email = "you@example.com" }] dependencies = [ diff --git a/uv.lock b/uv.lock index bc577e1..4081af4 100644 --- a/uv.lock +++ b/uv.lock @@ -368,7 +368,7 @@ wheels = [ [[package]] name = "bmsdna-lakeapi" -version = "0.27.3" +version = "0.27.6" source = { editable = "." } dependencies = [ { name = "adlfs" }, @@ -439,7 +439,7 @@ requires-dist = [ { name = "argon2-cffi", marker = "extra == 'auth'" }, { name = "arrow-odbc", specifier = ">=5.0.0" }, { name = "arrow-odbc", marker = "extra == 'odbc'" }, - { name = "deltalake2db", specifier = ">=1.2.0" }, + { name = "deltalake2db", specifier = ">=1.2.2" }, { name = "duckdb", specifier = ">=1.1.0,<2" }, { name = "expandvars", specifier = ">=0.12.0" }, { name = "fastapi", specifier = ">=0.110.0" }, @@ -812,15 +812,15 @@ wheels = [ [[package]] name = "deltalake2db" -version = "1.2.0" +version = "1.2.2" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "polars" }, { name = "sqlglot" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/f6/29/a84d78807dcb71890ba8c343d1af3d9d2955ac838c824fba697f808ce42f/deltalake2db-1.2.0.tar.gz", hash = "sha256:12e721d1e4c829525f109b932ae5320c72354c0b1734bb8db9b1622266ca9982", size = 247226, upload-time = "2025-10-01T07:17:55.281Z" } +sdist = { url = "https://files.pythonhosted.org/packages/10/81/af0d41aa9ee1be747b499bc7b2e29619f6c8f2902f46e53e25b2e2629e4f/deltalake2db-1.2.2.tar.gz", hash = "sha256:72bc5ce0720401ee2d3fb79dd84c3730be90fe5eaae821b6629180a8fd42b50c", size = 247592, upload-time = "2025-10-02T12:55:34.578Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/7a/28/efcb5e6b5fddfcb591f936cf5d3153769acd9021d4ef5373338618931f88/deltalake2db-1.2.0-py3-none-any.whl", hash = "sha256:acb88980924bf4c7300943fb5b485a09909acc2bddf1eac3769172505cb38868", size = 19906, upload-time = "2025-10-01T07:17:53.85Z" }, + { url = "https://files.pythonhosted.org/packages/c8/12/9e2e080fe15e047131f5025725664f2532f662af9f7aa9cfc364e5935b2e/deltalake2db-1.2.2-py3-none-any.whl", hash = "sha256:5440f42bce34499c4c1e3fb2e8594d44436a577f95060f73d96fab7cca9e5c4b", size = 20059, upload-time = "2025-10-02T12:55:33.298Z" }, ] [[package]]