Skip to content

SEA: Allow large metadata responses #653

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 85 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
5bf5d4c
Separate Session related functionality from Connection class (#571)
varun-edachali-dbx May 28, 2025
400a8bd
Introduce Backend Interface (DatabricksClient) (#573)
varun-edachali-dbx May 30, 2025
3c78ed7
Implement ResultSet Abstraction (backend interfaces for fetch phase) …
varun-edachali-dbx Jun 3, 2025
9625229
Introduce Sea HTTP Client and test script (#583)
varun-edachali-dbx Jun 4, 2025
0887bc1
Introduce `SeaDatabricksClient` (Session Implementation) (#582)
varun-edachali-dbx Jun 9, 2025
6d63df0
Normalise Execution Response (clean backend interfaces) (#587)
varun-edachali-dbx Jun 11, 2025
ba8d9fd
Introduce models for `SeaDatabricksClient` (#595)
varun-edachali-dbx Jun 12, 2025
bb3f15a
Introduce preliminary SEA Result Set (#588)
varun-edachali-dbx Jun 12, 2025
19f1fae
Merge branch 'main' into sea-migration
varun-edachali-dbx Jun 17, 2025
6c5ba6d
remove invalid ExecuteResponse import
varun-edachali-dbx Jun 17, 2025
5e5147b
Separate Session related functionality from Connection class (#571)
varun-edachali-dbx May 28, 2025
57370b3
Introduce Backend Interface (DatabricksClient) (#573)
varun-edachali-dbx May 30, 2025
75752bf
Implement ResultSet Abstraction (backend interfaces for fetch phase) …
varun-edachali-dbx Jun 3, 2025
450b80d
remove un-necessary initialisation assertions
varun-edachali-dbx Jun 18, 2025
a926f02
remove un-necessary line break s
varun-edachali-dbx Jun 18, 2025
55ad001
more un-necessary line breaks
varun-edachali-dbx Jun 18, 2025
fa15730
constrain diff of test_closing_connection_closes_commands
varun-edachali-dbx Jun 18, 2025
019c7fb
reduce diff of test_closing_connection_closes_commands
varun-edachali-dbx Jun 18, 2025
726abe7
use pytest-like assertions for test_closing_connection_closes_commands
varun-edachali-dbx Jun 18, 2025
bf6d41c
ensure command_id is not None
varun-edachali-dbx Jun 18, 2025
5afa733
line breaks after multi-line pyfocs
varun-edachali-dbx Jun 18, 2025
e3dfd36
ensure non null operationHandle for commandId creation
varun-edachali-dbx Jun 18, 2025
63360b3
use command_id methods instead of explicit guid_to_hex_id conversion
varun-edachali-dbx Jun 18, 2025
13ffb8d
remove un-necessary artifacts in test_session, add back assertion
varun-edachali-dbx Jun 18, 2025
a74d279
Implement SeaDatabricksClient (Complete Execution Spec) (#590)
varun-edachali-dbx Jun 18, 2025
d759050
add from __future__ import annotations to remove string literals arou…
varun-edachali-dbx Jun 19, 2025
1e21434
move docstring of DatabricksClient within class
varun-edachali-dbx Jun 24, 2025
cd4015b
move ThriftResultSet import to top of file
varun-edachali-dbx Jun 24, 2025
ed8b610
make backend/utils __init__ file empty
varun-edachali-dbx Jun 24, 2025
94d951e
use from __future__ import annotations to remove string literals arou…
varun-edachali-dbx Jun 24, 2025
c20058e
use lazy logging
varun-edachali-dbx Jun 24, 2025
fe3acb1
replace getters with property tag
varun-edachali-dbx Jun 24, 2025
9fb6a76
Merge branch 'main' into backend-refactors
varun-edachali-dbx Jun 24, 2025
61dfc4d
set active_command_id to None, not active_op_handle
varun-edachali-dbx Jun 24, 2025
64fb9b2
align test_session with pytest instead of unittest
varun-edachali-dbx Jun 24, 2025
cbf63f9
Merge branch 'main' into sea-migration
varun-edachali-dbx Jun 26, 2025
59b4825
remove duplicate test, correct active_command_id attribute
varun-edachali-dbx Jun 26, 2025
e380654
SeaDatabricksClient: Add Metadata Commands (#593)
varun-edachali-dbx Jun 26, 2025
677a7b0
SEA volume operations fix: assign `manifest.is_volume_operation` to `…
varun-edachali-dbx Jun 26, 2025
45585d4
Introduce manual SEA test scripts for Exec Phase (#589)
varun-edachali-dbx Jun 27, 2025
70c7dc8
Complete Fetch Phase (for `INLINE` disposition and `JSON_ARRAY` forma…
varun-edachali-dbx Jul 2, 2025
abf9aab
Merge branch 'main' into sea-migration
varun-edachali-dbx Jul 3, 2025
9b4b606
Merge branch 'main' into backend-refactors
varun-edachali-dbx Jul 3, 2025
4f11ff0
Introduce `row_limit` param (#607)
varun-edachali-dbx Jul 7, 2025
45f5c26
Merge branch 'main' into backend-refactors
varun-edachali-dbx Jul 10, 2025
2c9368a
formatting (black)
varun-edachali-dbx Jul 10, 2025
9b1b1f5
remove repetition from Session.__init__
varun-edachali-dbx Jul 10, 2025
77e23d3
Merge branch 'backend-refactors' into sea-migration
varun-edachali-dbx Jul 11, 2025
3bd3aef
fix merge artifacts
varun-edachali-dbx Jul 11, 2025
6d4701f
correct patch paths
varun-edachali-dbx Jul 11, 2025
dc1cb6d
fix type issues
varun-edachali-dbx Jul 14, 2025
5d04cd0
Merge branch 'main' into sea-migration
varun-edachali-dbx Jul 15, 2025
922c448
explicitly close result queue
varun-edachali-dbx Jul 15, 2025
1a0575a
Complete Fetch Phase (`EXTERNAL_LINKS` disposition and `ARROW` format…
varun-edachali-dbx Jul 16, 2025
c07beb1
SEA Session Configuration Fix: Explicitly convert values to `str` (#…
varun-edachali-dbx Jul 16, 2025
640cc82
SEA: add support for `Hybrid` disposition (#631)
varun-edachali-dbx Jul 17, 2025
8fbca9d
SEA: Reduce network calls for synchronous commands (#633)
varun-edachali-dbx Jul 19, 2025
806e5f5
SEA: Decouple Link Fetching (#632)
varun-edachali-dbx Jul 21, 2025
b57c3f3
Chunk download latency (#634)
saishreeeee Jul 21, 2025
ef5836b
acquire lock before notif + formatting (black)
varun-edachali-dbx Jul 21, 2025
4fd2a3f
Merge branch 'main' into sea-migration
varun-edachali-dbx Jul 23, 2025
26f8947
fix imports
varun-edachali-dbx Jul 23, 2025
2d44596
add get_chunk_link s
varun-edachali-dbx Jul 23, 2025
99e7435
simplify description extraction
varun-edachali-dbx Jul 23, 2025
54ec080
pass session_id_hex to ThriftResultSet
varun-edachali-dbx Jul 23, 2025
f9f9f31
revert to main's extract description
varun-edachali-dbx Jul 23, 2025
51cef2b
validate row count for sync query tests as well
varun-edachali-dbx Jul 23, 2025
387102d
guid_hex -> hex_guid
varun-edachali-dbx Jul 23, 2025
d53d1ea
reduce diff
varun-edachali-dbx Jul 23, 2025
c7810aa
reduce diff
varun-edachali-dbx Jul 23, 2025
b3072bd
reduce diff
varun-edachali-dbx Jul 23, 2025
8be5264
set .value in compression
varun-edachali-dbx Jul 23, 2025
80692e3
reduce diff
varun-edachali-dbx Jul 23, 2025
83e45ae
is_direct_results -> has_more_rows
varun-edachali-dbx Jul 25, 2025
89de17a
preliminary large metadata results
varun-edachali-dbx Jul 27, 2025
d5ccf13
account for empty table in arrow table filter
varun-edachali-dbx Jul 28, 2025
e6b256c
align flows
varun-edachali-dbx Jul 28, 2025
20c9fbd
align flow of json with arrow
varun-edachali-dbx Jul 28, 2025
a90bfeb
case sensitive support for arrow table
varun-edachali-dbx Jul 28, 2025
11f2c54
remove un-necessary comment
varun-edachali-dbx Jul 28, 2025
ea8ae9f
Merge branch 'main' into robust-metadata-sea
varun-edachali-dbx Jul 29, 2025
12a5ff8
fix merge artifacts
varun-edachali-dbx Jul 29, 2025
f126f4b
remove redundant method
varun-edachali-dbx Jul 29, 2025
2b42ea0
remove incorrect docstring
varun-edachali-dbx Jul 29, 2025
3ca9678
Merge branch 'main' into robust-metadata-sea
varun-edachali-dbx Jul 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/databricks/sql/backend/sea/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def __init__(
)

self.use_hybrid_disposition = kwargs.get("use_hybrid_disposition", True)
self.use_cloud_fetch = kwargs.get("use_cloud_fetch", True)

# Extract warehouse ID from http_path
self.warehouse_id = self._extract_warehouse_id(http_path)
Expand Down Expand Up @@ -694,7 +695,7 @@ def get_catalogs(
max_bytes=max_bytes,
lz4_compression=False,
cursor=cursor,
use_cloud_fetch=False,
use_cloud_fetch=self.use_cloud_fetch,
parameters=[],
async_op=False,
enforce_embedded_schema_correctness=False,
Expand Down Expand Up @@ -727,7 +728,7 @@ def get_schemas(
max_bytes=max_bytes,
lz4_compression=False,
cursor=cursor,
use_cloud_fetch=False,
use_cloud_fetch=self.use_cloud_fetch,
parameters=[],
async_op=False,
enforce_embedded_schema_correctness=False,
Expand Down Expand Up @@ -768,7 +769,7 @@ def get_tables(
max_bytes=max_bytes,
lz4_compression=False,
cursor=cursor,
use_cloud_fetch=False,
use_cloud_fetch=self.use_cloud_fetch,
parameters=[],
async_op=False,
enforce_embedded_schema_correctness=False,
Expand Down Expand Up @@ -815,7 +816,7 @@ def get_columns(
max_bytes=max_bytes,
lz4_compression=False,
cursor=cursor,
use_cloud_fetch=False,
use_cloud_fetch=self.use_cloud_fetch,
parameters=[],
async_op=False,
enforce_embedded_schema_correctness=False,
Expand Down
238 changes: 187 additions & 51 deletions src/databricks/sql/backend/sea/utils/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

from __future__ import annotations

import io
import logging
from copy import deepcopy
from typing import (
List,
Optional,
Any,
Callable,
cast,
TYPE_CHECKING,
)
Expand All @@ -20,6 +21,16 @@
from databricks.sql.backend.sea.result_set import SeaResultSet

from databricks.sql.backend.types import ExecuteResponse
from databricks.sql.backend.sea.models.base import ResultData
from databricks.sql.backend.sea.backend import SeaDatabricksClient
from databricks.sql.utils import CloudFetchQueue, ArrowQueue

try:
import pyarrow
import pyarrow.compute as pc
except ImportError:
pyarrow = None
pc = None

logger = logging.getLogger(__name__)

Expand All @@ -30,32 +41,18 @@ class ResultSetFilter:
"""

@staticmethod
def _filter_sea_result_set(
result_set: SeaResultSet, filter_func: Callable[[List[Any]], bool]
) -> SeaResultSet:
def _create_execute_response(result_set: SeaResultSet) -> ExecuteResponse:
"""
Filter a SEA result set using the provided filter function.
Create an ExecuteResponse with parameters from the original result set.

Args:
result_set: The SEA result set to filter
filter_func: Function that takes a row and returns True if the row should be included
result_set: Original result set to copy parameters from

Returns:
A filtered SEA result set
ExecuteResponse: New execute response object
"""

# Get all remaining rows
all_rows = result_set.results.remaining_rows()

# Filter rows
filtered_rows = [row for row in all_rows if filter_func(row)]

# Reuse the command_id from the original result set
command_id = result_set.command_id

# Create an ExecuteResponse for the filtered data
execute_response = ExecuteResponse(
command_id=command_id,
return ExecuteResponse(
command_id=result_set.command_id,
status=result_set.status,
description=result_set.description,
has_been_closed_server_side=result_set.has_been_closed_server_side,
Expand All @@ -64,32 +61,147 @@ def _filter_sea_result_set(
is_staging_operation=False,
)

# Create a new ResultData object with filtered data
from databricks.sql.backend.sea.models.base import ResultData
@staticmethod
def _create_filtered_manifest(result_set: SeaResultSet, new_row_count: int):
"""
Create a copy of the manifest with updated row count.

Args:
result_set: Original result set to copy manifest from
new_row_count: New total row count for filtered data

result_data = ResultData(data=filtered_rows, external_links=None)
Returns:
Updated manifest copy
"""
filtered_manifest = deepcopy(result_set.manifest)
filtered_manifest.total_row_count = new_row_count
return filtered_manifest

from databricks.sql.backend.sea.backend import SeaDatabricksClient
@staticmethod
def _create_filtered_result_set(
result_set: SeaResultSet,
result_data: ResultData,
row_count: int,
) -> "SeaResultSet":
"""
Create a new filtered SeaResultSet with the provided data.

Args:
result_set: Original result set to copy parameters from
result_data: New result data for the filtered set
row_count: Number of rows in the filtered data

Returns:
New filtered SeaResultSet
"""
from databricks.sql.backend.sea.result_set import SeaResultSet

# Create a new SeaResultSet with the filtered data
manifest = result_set.manifest
manifest.total_row_count = len(filtered_rows)
execute_response = ResultSetFilter._create_execute_response(result_set)
filtered_manifest = ResultSetFilter._create_filtered_manifest(
result_set, row_count
)

filtered_result_set = SeaResultSet(
return SeaResultSet(
connection=result_set.connection,
execute_response=execute_response,
sea_client=cast(SeaDatabricksClient, result_set.backend),
result_data=result_data,
manifest=manifest,
manifest=filtered_manifest,
buffer_size_bytes=result_set.buffer_size_bytes,
arraysize=result_set.arraysize,
)

return filtered_result_set
@staticmethod
def _filter_arrow_table(
table: Any, # pyarrow.Table
column_name: str,
allowed_values: List[str],
case_sensitive: bool = True,
) -> Any: # returns pyarrow.Table
"""
Filter a PyArrow table by column values.

Args:
table: The PyArrow table to filter
column_name: The name of the column to filter on
allowed_values: List of allowed values for the column
case_sensitive: Whether to perform case-sensitive comparison

Returns:
A filtered PyArrow table
"""
if not pyarrow:
raise ImportError("PyArrow is required for Arrow table filtering")

if table.num_rows == 0:
return table

# Handle case-insensitive filtering by normalizing both column and allowed values
if not case_sensitive:
# Convert allowed values to uppercase
allowed_values = [v.upper() for v in allowed_values]
# Get column values as uppercase
column = pc.utf8_upper(table[column_name])
else:
# Use column as-is
column = table[column_name]

# Convert allowed_values to PyArrow Array
allowed_array = pyarrow.array(allowed_values)

# Construct a boolean mask: True where column is in allowed_list
mask = pc.is_in(column, value_set=allowed_array)
return table.filter(mask)

@staticmethod
def _filter_arrow_result_set(
result_set: SeaResultSet,
column_index: int,
allowed_values: List[str],
case_sensitive: bool = True,
) -> SeaResultSet:
"""
Filter a SEA result set that contains Arrow tables.

Args:
result_set: The SEA result set to filter (containing Arrow data)
column_index: The index of the column to filter on
allowed_values: List of allowed values for the column
case_sensitive: Whether to perform case-sensitive comparison

Returns:
A filtered SEA result set
"""
# Validate column index and get column name
if column_index >= len(result_set.description):
raise ValueError(f"Column index {column_index} is out of bounds")
column_name = result_set.description[column_index][0]

# Get all remaining rows as Arrow table and filter it
arrow_table = result_set.results.remaining_rows()
filtered_table = ResultSetFilter._filter_arrow_table(
arrow_table, column_name, allowed_values, case_sensitive
)

# Convert the filtered table to Arrow stream format for ResultData
sink = io.BytesIO()
with pyarrow.ipc.new_stream(sink, filtered_table.schema) as writer:
writer.write_table(filtered_table)
arrow_stream_bytes = sink.getvalue()

# Create ResultData with attachment containing the filtered data
result_data = ResultData(
data=None, # No JSON data
external_links=None, # No external links
attachment=arrow_stream_bytes, # Arrow data as attachment
)

return ResultSetFilter._create_filtered_result_set(
result_set, result_data, filtered_table.num_rows
)

@staticmethod
def filter_by_column_values(
def _filter_json_result_set(
result_set: SeaResultSet,
column_index: int,
allowed_values: List[str],
Expand All @@ -107,22 +219,35 @@ def filter_by_column_values(
Returns:
A filtered result set
"""
# Validate column index (optional - not in arrow version but good practice)
if column_index >= len(result_set.description):
raise ValueError(f"Column index {column_index} is out of bounds")

# Convert to uppercase for case-insensitive comparison if needed
# Extract rows
all_rows = result_set.results.remaining_rows()

# Convert allowed values if case-insensitive
if not case_sensitive:
allowed_values = [v.upper() for v in allowed_values]
# Helper lambda to get column value based on case sensitivity
get_column_value = (
lambda row: row[column_index].upper()
if not case_sensitive
else row[column_index]
)

# Filter rows based on allowed values
filtered_rows = [
row
for row in all_rows
if len(row) > column_index and get_column_value(row) in allowed_values
]

# Create filtered result set
result_data = ResultData(data=filtered_rows, external_links=None)

return ResultSetFilter._filter_sea_result_set(
result_set,
lambda row: (
len(row) > column_index
and (
row[column_index].upper()
if not case_sensitive
else row[column_index]
)
in allowed_values
),
return ResultSetFilter._create_filtered_result_set(
result_set, result_data, len(filtered_rows)
)

@staticmethod
Expand All @@ -143,14 +268,25 @@ def filter_tables_by_type(
Returns:
A filtered result set containing only tables of the specified types
"""

# Default table types if none specified
DEFAULT_TABLE_TYPES = ["TABLE", "VIEW", "SYSTEM TABLE"]
valid_types = (
table_types if table_types and len(table_types) > 0 else DEFAULT_TABLE_TYPES
)
valid_types = table_types if table_types else DEFAULT_TABLE_TYPES

# Check if we have an Arrow table (cloud fetch) or JSON data
# Table type is the 6th column (index 5)
return ResultSetFilter.filter_by_column_values(
result_set, 5, valid_types, case_sensitive=True
)
if isinstance(result_set.results, (CloudFetchQueue, ArrowQueue)):
# For Arrow tables, we need to handle filtering differently
return ResultSetFilter._filter_arrow_result_set(
result_set,
column_index=5,
allowed_values=valid_types,
case_sensitive=True,
)
else:
# For JSON data, use the existing filter method
return ResultSetFilter._filter_json_result_set(
result_set,
column_index=5,
allowed_values=valid_types,
case_sensitive=True,
)
Loading
Loading