Skip to content

feat(bigquery): add normalize_case option for URN handling in lineage… #14082

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,12 @@ def _create_lineage_map(
query,
schema_resolver=self.schema_resolver,
default_db=e.project_id,
normalize_case=(
"lower"
if self.config.convert_urns_to_lowercase
and not self.config.lineage_sql_parser_use_raw_names
else None
),
)
logger.debug(
f"Input tables: {raw_lineage.in_tables}, Output tables: {raw_lineage.out_tables}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,7 @@ class BigQueryQueriesExtractor(Closeable):
"""
Extracts query audit log and generates usage/lineage/operation workunits.

Some notable differences in this wrt older usage extraction method are:
1. For every lineage/operation workunit, corresponding query id is also present
2. Operation aspect for a particular query is emitted at max once(last occurence) for a day
3. "DROP" operation accounts for usage here
4. userEmail is not populated in datasetUsageStatistics aspect, only user urn

parent_config: The main BigQueryV2Config, used for options like convert_urns_to_lowercase and lineage_sql_parser_use_raw_names.
"""

def __init__(
Expand All @@ -139,9 +134,9 @@ def __init__(
graph: Optional[DataHubGraph] = None,
schema_resolver: Optional[SchemaResolver] = None,
discovered_tables: Optional[Collection[str]] = None,
parent_config=None, # <-- new parameter
):
self.connection = connection

self.config = config
self.filters = filters
self.identifiers = identifiers
Expand All @@ -157,8 +152,8 @@ def __init__(
if discovered_tables
else None
)

self.structured_report = structured_report
self.parent_config = parent_config

self.aggregator = SqlParsingAggregator(
platform=self.identifiers.platform,
Expand All @@ -182,8 +177,14 @@ def __init__(
is_temp_table=self.is_temp_table,
is_allowed_table=self.is_allowed_table,
format_queries=False,
normalize_case=(
"lower"
if self.parent_config
and self.parent_config.convert_urns_to_lowercase
and not self.parent_config.lineage_sql_parser_use_raw_names
else None
),
)

self.report.sql_aggregator = self.aggregator.report
self.report.num_discovered_tables = (
len(self.discovered_tables) if self.discovered_tables else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,12 @@ def get_tables_from_query(
self.schema_resolver,
default_db=default_project,
default_schema=default_dataset,
normalize_case=(
"lower"
if self.config.convert_urns_to_lowercase
and not self.config.lineage_sql_parser_use_raw_names
else None
),
)
except Exception:
logger.debug(
Expand Down
61 changes: 35 additions & 26 deletions metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ def schema_count(self) -> int:
)

def get_urn_for_table(
self, table: _TableName, lower: bool = False, mixed: bool = False
self,
table: _TableName,
lower: bool = False,
mixed: bool = False,
normalize_case: str = None,
) -> str:
# TODO: Validate that this is the correct 2/3 layer hierarchy for the platform.

Expand All @@ -98,7 +102,15 @@ def get_urn_for_table(

platform_instance = self.platform_instance

if lower:
if normalize_case == "upper":
table_name = table_name.upper()
if platform_instance:
platform_instance = platform_instance.upper()
elif normalize_case == "lower":
table_name = table_name.lower()
if platform_instance:
platform_instance = platform_instance.lower()
elif lower:
table_name = table_name.lower()
if not mixed:
platform_instance = (
Expand Down Expand Up @@ -127,35 +139,32 @@ def resolve_urn(self, urn: str) -> Tuple[str, Optional[SchemaInfo]]:

return urn, None

def resolve_table(self, table: _TableName) -> Tuple[str, Optional[SchemaInfo]]:
urn = self.get_urn_for_table(table)
def resolve_table(
self, table: _TableName, normalize_case: str = None
) -> Tuple[str, Optional[SchemaInfo]]:
urn = self.get_urn_for_table(table, normalize_case=normalize_case)

schema_info = self._resolve_schema_info(urn)
if schema_info:
return urn, schema_info

urn_lower = self.get_urn_for_table(table, lower=True)
if urn_lower != urn:
schema_info = self._resolve_schema_info(urn_lower)
if schema_info:
return urn_lower, schema_info

# Our treatment of platform instances when lowercasing urns
# is inconsistent. In some places (e.g. Snowflake), we lowercase
# the table names but not the platform instance. In other places
# (e.g. Databricks), we lowercase everything because it happens
# via the automatic lowercasing helper.
# See https://github.com/datahub-project/datahub/pull/8928.
# While we have this sort of inconsistency, we should also
# check the mixed case urn, as a last resort.
urn_mixed = self.get_urn_for_table(table, lower=True, mixed=True)
if urn_mixed not in {urn, urn_lower}:
schema_info = self._resolve_schema_info(urn_mixed)
if schema_info:
return urn_mixed, schema_info

if self._prefers_urn_lower():
return urn_lower, None
if not normalize_case:
urn_lower = self.get_urn_for_table(table, lower=True)
if urn_lower != urn:
schema_info = self._resolve_schema_info(urn_lower)
if schema_info:
return urn_lower, schema_info

urn_mixed = self.get_urn_for_table(table, lower=True, mixed=True)
if urn_mixed not in {urn, urn_lower}:
schema_info = self._resolve_schema_info(urn_mixed)
if schema_info:
return urn_mixed, schema_info

if self._prefers_urn_lower():
return urn_lower, None
else:
return urn, None
else:
return urn, None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ def __init__(
is_allowed_table: Optional[Callable[[str], bool]] = None,
format_queries: bool = True,
query_log: QueryLogSetting = _DEFAULT_QUERY_LOG_SETTING,
normalize_case: str = None,
) -> None:
self.platform = DataPlatformUrn(platform)
self.platform_instance = platform_instance
Expand Down Expand Up @@ -416,6 +417,7 @@ def __init__(

self.format_queries = format_queries
self.query_log = query_log
self.normalize_case = normalize_case

# The exit stack helps ensure that we close all the resources we open.
self._exit_stack = contextlib.ExitStack()
Expand Down Expand Up @@ -1174,6 +1176,7 @@ def _run_sql_parser(
schema_resolver=schema_resolver,
default_db=default_db,
default_schema=default_schema,
normalize_case=self.normalize_case,
)
self.report.num_sql_parsed += 1

Expand Down
30 changes: 20 additions & 10 deletions metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,7 @@ def _sqlglot_lineage_inner(
default_db: Optional[str] = None,
default_schema: Optional[str] = None,
override_dialect: Optional[DialectOrStr] = None,
normalize_case: str = None,
) -> SqlParsingResult:
if override_dialect:
dialect = get_dialect(override_dialect)
Expand Down Expand Up @@ -1326,19 +1327,15 @@ def _sqlglot_lineage_inner(
table_name_schema_mapping: Dict[_TableName, SchemaInfo] = {}

for table in tables | modified:
# For select statements, qualification will be a no-op. For other statements, this
# is where the qualification actually happens.
qualified_table = table.qualified(
dialect=dialect, default_db=default_db, default_schema=default_schema
)

urn, schema_info = schema_resolver.resolve_table(qualified_table)

urn, schema_info = schema_resolver.resolve_table(
qualified_table, normalize_case=normalize_case
)
table_name_urn_mapping[qualified_table] = urn
if schema_info:
table_name_schema_mapping[qualified_table] = schema_info

# Also include the original, non-qualified table name in the urn mapping.
table_name_urn_mapping[table] = urn

total_tables_discovered = len(tables | modified)
Expand Down Expand Up @@ -1449,6 +1446,7 @@ def _sqlglot_lineage_nocache(
default_db: Optional[str] = None,
default_schema: Optional[str] = None,
override_dialect: Optional[DialectOrStr] = None,
normalize_case: str = None,
) -> SqlParsingResult:
"""Parse a SQL statement and generate lineage information.

Expand Down Expand Up @@ -1508,6 +1506,7 @@ def _sqlglot_lineage_nocache(
default_db=default_db,
default_schema=default_schema,
override_dialect=override_dialect,
normalize_case=normalize_case,
)
except Exception as e:
return SqlParsingResult.make_from_error(e)
Expand Down Expand Up @@ -1546,14 +1545,25 @@ def sqlglot_lineage(
default_db: Optional[str] = None,
default_schema: Optional[str] = None,
override_dialect: Optional[DialectOrStr] = None,
normalize_case: str = None,
) -> SqlParsingResult:
if schema_resolver.includes_temp_tables():
return _sqlglot_lineage_nocache(
sql, schema_resolver, default_db, default_schema, override_dialect
sql,
schema_resolver,
default_db,
default_schema,
override_dialect,
normalize_case=normalize_case,
)
else:
return _sqlglot_lineage_cached(
sql, schema_resolver, default_db, default_schema, override_dialect
return _sqlglot_lineage_nocache(
sql,
schema_resolver,
default_db,
default_schema,
override_dialect,
normalize_case=normalize_case,
)


Expand Down
Loading