diff --git a/bmsdna/lakeapi/context/df_duckdb.py b/bmsdna/lakeapi/context/df_duckdb.py index f1a4f8a..0209c14 100644 --- a/bmsdna/lakeapi/context/df_duckdb.py +++ b/bmsdna/lakeapi/context/df_duckdb.py @@ -22,6 +22,7 @@ from bmsdna.lakeapi.core.log import get_logger import multiprocessing from .source_uri import SourceUri +import csv logger = get_logger(__name__) @@ -154,7 +155,19 @@ async def write_nd_json(self, file_name: str): async def write_csv(self, file_name: str, *, separator: str): if not ENABLE_COPY_TO: - return await super().write_csv(file_name, separator=separator) + query = get_sql(self.original_sql, dialect="duckdb") + await run_in_threadpool(self.con.execute, query) + assert self.con.description is not None + col_names = [d[0] for d in self.con.description] + with open(file_name, "w", newline="", encoding="utf-8") as csvfile: + writer = csv.DictWriter( + csvfile, fieldnames=col_names, delimiter=separator + ) + writer.writeheader() + while chunk := self.con.fetchmany(self.chunk_size): + for row in chunk: + writer.writerow(dict(zip(col_names, row))) + return query = get_sql(self.original_sql, dialect="duckdb") uuidstr = _get_temp_table_name() full_query = f"""CREATE TEMP VIEW {uuidstr} AS {query}; diff --git a/bmsdna/lakeapi/utils/meta_cache.py b/bmsdna/lakeapi/utils/meta_cache.py index efd95f0..7a837fc 100644 --- a/bmsdna/lakeapi/utils/meta_cache.py +++ b/bmsdna/lakeapi/utils/meta_cache.py @@ -8,6 +8,8 @@ DeltaTableMeta, duckdb_apply_storage_options, ) +from deltalake2db.duckdb import apply_storage_options_fsspec +from deltalake2db.azure_helper import get_account_name_from_path from typing import Optional @@ -28,12 +30,22 @@ def get_deltalake_meta(use_polars: bool, uri: SourceUri): ab_uri, ab_opts = uri.get_uri_options(flavor="original") if not uri.is_local(): - duckdb_apply_storage_options( - _global_duck_con, - ab_uri, - ab_opts, - use_fsspec=os.getenv("DUCKDB_DELTA_USE_FSSPEC", "0") == "1", - ) + if os.getenv("DUCKDB_DELTA_USE_FSSPEC", "0") == "1" and "://" in ab_uri: + account_name_path = get_account_name_from_path(ab_uri) + fake_protocol = apply_storage_options_fsspec( + _global_duck_con, + ab_uri, + ab_opts or {}, + account_name_path=account_name_path, + ) + ab_uri = fake_protocol + "://" + ab_uri.split("://")[1] + else: + duckdb_apply_storage_options( + _global_duck_con, + ab_uri, + ab_opts, + use_fsspec=os.getenv("DUCKDB_DELTA_USE_FSSPEC", "0") == "1", + ) meta_engine = DuckDBMetaEngine(_global_duck_con) if mt := _cached_meta.get(uri): diff --git a/pyproject.toml b/pyproject.toml index f429a07..09501c2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "bmsdna-lakeapi" -version = "0.27.0" +version = "0.27.1" description = "" authors = [{ name = "DWH Team", email = "you@example.com" }] dependencies = [ diff --git a/tests/test_performance.py b/tests/test_performance.py index de80c55..9fcaea3 100644 --- a/tests/test_performance.py +++ b/tests/test_performance.py @@ -25,7 +25,7 @@ def call_api_1(engine, format): duration = end - start print(f"Engine {engine} took {duration} seconds with format {format}") - assert duration < 1.0 + assert duration < 1.5 def call_api_2(engine, format): start = time.time() @@ -37,7 +37,7 @@ def call_api_2(engine, format): duration = end - start print(f"Engine {engine} took {duration} seconds with format {format}") - assert duration < 1.0 + assert duration < 1.5 def call_api_3(engine, format): start = time.time() @@ -49,7 +49,7 @@ def call_api_3(engine, format): duration = end - start print(f"Engine {engine} took {duration} seconds with format {format}") - assert duration < 1.0 + assert duration < 1.5 def call_api_4(engine, format): start = time.time() @@ -61,7 +61,7 @@ def call_api_4(engine, format): duration = end - start print(f"Engine {engine} took {duration} seconds with format {format}") - assert duration < 1.0 + assert duration < 1.5 tasks = [] for _ in range(100): diff --git a/uv.lock b/uv.lock index 389cbf5..d7c62ba 100644 --- a/uv.lock +++ b/uv.lock @@ -432,7 +432,7 @@ wheels = [ [[package]] name = "bmsdna-lakeapi" -version = "0.27.0" +version = "0.27.1" source = { editable = "." } dependencies = [ { name = "adlfs" },