diff --git a/be/app/insights/rules.py b/be/app/insights/rules.py index 1ecf72c..f88cfc0 100644 --- a/be/app/insights/rules.py +++ b/be/app/insights/rules.py @@ -10,7 +10,7 @@ from statistics import median from typing import Dict -from app.models import Rule +from app.models import Rule, RuleLevel from app.models import Insight rules_yaml_path = os.path.join(os.path.dirname(__file__), "rules.yaml") @@ -252,6 +252,23 @@ def rule_skewed_or_largest_partitions_table(table: Table) -> Optional[Insight]: return None +def rule_multiple_tables_same_location(tables_by_location: Dict[str, list[str]]) -> list[Insight]: + result = [] + meta = INSIGHT_META["MULTIPLE_TABLES_SAME_LOCATION"] + for location, tables in tables_by_location.items(): + if len(tables) > 0: + for tablename in tables: + result.append( + Insight( + code="MULTIPLE_TABLES_SAME_LOCATION", + table=tablename, + message=meta["message"].format(tables=", ".join(tables)), + severity=meta["severity"], + suggested_action=meta["suggested_action"] + ) + ) + return result + ALL_RULES = [ rule_small_files, @@ -261,7 +278,8 @@ def rule_skewed_or_largest_partitions_table(table: Table) -> Optional[Insight]: rule_column_uuid_table, rule_no_rows_table, rule_too_many_snapshot_table, - rule_skewed_or_largest_partitions_table + rule_skewed_or_largest_partitions_table, + rule_multiple_tables_same_location ] ALL_RULES_OBJECT = [ @@ -272,5 +290,6 @@ def rule_skewed_or_largest_partitions_table(table: Table) -> Optional[Insight]: Rule("UUID_COLUMN", "UUID type not universally supported", "UUID column type may not be supported in all environments, especially Spark and older Presto", rule_column_uuid_table), Rule("NO_ROWS_TABLE", "Empty table", "Table has been declared but has no data. Possibly intentional.", rule_no_rows_table), Rule("SNAPSHOT_SPRAWL_TABLE", "Too many snapshots", "A high snapshot count for a table creates memory and process overhead for the catalog service and catalog results processing on clients. Expire snapshots or adjust snapshot age configuration if practical.", rule_too_many_snapshot_table), - Rule("SKEWED_OR_LARGEST_PARTITIONS_TABLE", "Large partition", "The table contains one partition that is considerably larger than the rest. Evaluate if the table can be repartitioned by another column/criteria.", rule_skewed_or_largest_partitions_table) + Rule("SKEWED_OR_LARGEST_PARTITIONS_TABLE", "Large partition", "The table contains one partition that is considerably larger than the rest. Evaluate if the table can be repartitioned by another column/criteria.", rule_skewed_or_largest_partitions_table), + Rule("MULTIPLE_TABLES_SAME_LOCATION", "Multiple tables with the same location", "More than one table cataloged using the same location. Review and resolve duplicate table locations to avoid data corruption.", rule_multiple_tables_same_location, RuleLevel.LAKEHOUSE) ] \ No newline at end of file diff --git a/be/app/insights/rules.yaml b/be/app/insights/rules.yaml index 425b154..f8a373f 100644 --- a/be/app/insights/rules.yaml +++ b/be/app/insights/rules.yaml @@ -29,4 +29,8 @@ SNAPSHOT_SPRAWL_TABLE: SKEWED_OR_LARGEST_PARTITIONS_TABLE: severity: Info message: "This table has {partitions} partitions. The largest partition is much bigger than the typical one. Median partition size: {median_size} bytes, largest partition: {largest_size} bytes. Median records: {median_records}, largest partition: {largest_records} records." - suggested_action: "Review partitioning strategy; consider rebalancing data" \ No newline at end of file + suggested_action: "Review partitioning strategy; consider rebalancing data" +MULTIPLE_TABLES_SAME_LOCATION: + severity: Warning + message: "More than one table cataloged using the same location. Tables sharing location: {tables}" + suggested_action: "Review and resolve duplicate table locations to avoid data corruption" diff --git a/be/app/insights/runner.py b/be/app/insights/runner.py index 7932d82..d037160 100644 --- a/be/app/insights/runner.py +++ b/be/app/insights/runner.py @@ -1,9 +1,9 @@ from typing import List, Dict, Any, Optional, Set from collections import defaultdict -from app.insights.rules import ALL_RULES_OBJECT, INSIGHT_META +from app.insights.rules import ALL_RULES_OBJECT, INSIGHT_META, rule_multiple_tables_same_location from app.insights.utils import qualified_table_name, get_namespace_and_table_name -from app.models import Insight, InsightRun, InsightRecord, JobSchedule, InsightRunOut, ActiveInsight, InsightOccurrence, RuleSummaryOut +from app.models import Insight, InsightRun, InsightRecord, JobSchedule, InsightRunOut, ActiveInsight, InsightOccurrence, RuleSummaryOut, RuleLevel from app.lakeviewer import LakeView from app.storage.interface import StorageInterface from sqlalchemy import text, bindparam @@ -17,6 +17,7 @@ def __init__(self, lakeview, self.run_storage = run_storage self.insight_storage = insight_storage self.active_insight_storage = active_insight_storage + self._tables_by_location: Dict[str, List[str]] = {} def get_latest_run(self, namespace: str, size: int, table_name: str = None, showEmpty: bool = True) -> List[InsightRun]: """ @@ -109,30 +110,8 @@ def get_summary_by_rule(self, ) return final_summary - - def run_for_table(self, table_identifier, rule_ids: List[str] = None, type: str = "manual") -> List[Insight]: - table = self.lakeview.load_table(table_identifier) - - all_valid_ids: Set[str] = {rule.id for rule in ALL_RULES_OBJECT} - ids_to_run: Set[str] - - if rule_ids is None: - ids_to_run = all_valid_ids - else: - provided_ids = set(rule_ids) - invalid_ids = provided_ids - all_valid_ids - if invalid_ids: - raise ValueError(f"Invalid rule IDs provided: {', '.join(sorted(invalid_ids))}") - ids_to_run = provided_ids - - namespace, table_name = get_namespace_and_table_name(table_identifier) - - run_result: List[Insight] = [ - insight - for rule in ALL_RULES_OBJECT - if rule.id in ids_to_run and (insight := rule.method(table)) - ] - + + def _store_results(self, run_result: List[Insight], ids_to_run: Set[str], type: str, namespace: Optional[str], table_name: Optional[str]): run = InsightRun( namespace=namespace, table_name=table_name, @@ -168,19 +147,57 @@ def run_for_table(self, table_identifier, rule_ids: List[str] = None, type: str ] self.active_insight_storage.save_many(new_active_insights) + def _get_valid_run_ids(self, rule_ids: List[str] = None) -> Set[str]: + all_valid_ids: Set[str] = {rule.id for rule in ALL_RULES_OBJECT} + ids_to_run: Set[str] + + if rule_ids is None: + ids_to_run = all_valid_ids + else: + provided_ids = set(rule_ids) + invalid_ids = provided_ids - all_valid_ids + if invalid_ids: + raise ValueError(f"Invalid rule IDs provided: {', '.join(sorted(invalid_ids))}") + ids_to_run = provided_ids + + return ids_to_run + + def run_for_table(self, table_identifier, rule_ids: List[str] = None, type: str = "manual") -> List[Insight]: + table = self.lakeview.load_table(table_identifier) + + if "MULTIPLE_TABLES_SAME_LOCATION" in rule_ids: + location = getattr(table, "location", None) + if location in self._tables_by_location: + self._tables_by_location[location].append(table_identifier) + else: + self._tables_by_location[location] = [table_identifier] + + ids_to_run = self._get_valid_run_ids(rule_ids) + + namespace, table_name = get_namespace_and_table_name(table_identifier) + + run_result: List[Insight] = [ + insight + for rule in ALL_RULES_OBJECT + if rule.id in ids_to_run and rule.level == RuleLevel.TABLE and (insight := rule.method(table)) + ] + + self._store_results(run_result, ids_to_run, type, namespace, table_name) + return run_result - def run_for_namespace(self, namespace: str, rule_ids: List[str] = None, recursive: bool = True, type: str = "manual") -> Dict[str, List[Insight]]: + def run_for_namespace(self, namespace: str, rule_ids: List[str] = None, recursive: bool = False, type: str = "manual") -> Dict[str, List[Insight]]: tables = self.lakeview.get_tables(namespace) results = [] for t_ident in tables: qualified = qualified_table_name(t_ident) results.extend(self.run_for_table(qualified, rule_ids, type)) if recursive: - nested_namespaces = self.lakeview._get_nested_namespaces(namespace) + ns = tuple(namespace.split('.')) + nested_namespaces = self.lakeview._get_nested_namespaces(ns, len(ns)) for ns in nested_namespaces: ns_str = ".".join(ns) - results.extend(self.run_for_namespace(ns_str, rule_ids, recursive=False, type=type)) + results.extend(self.run_for_namespace(ns_str, rule_ids, recursive=recursive, type=type)) return results def run_for_lakehouse(self, rule_ids: List[str] = None, type: str = "manual") -> Dict[str, List[Insight]]: @@ -189,4 +206,10 @@ def run_for_lakehouse(self, rule_ids: List[str] = None, type: str = "manual") -> for ns in namespaces: ns_str = ".".join(ns) if isinstance(ns, (tuple, list)) else str(ns) results.extend(self.run_for_namespace(ns_str, rule_ids, recursive=True, type=type)) + + if "MULTIPLE_TABLES_SAME_LOCATION" in rule_ids: + run_result: List[Insight] = rule_multiple_tables_same_location(self._tables_by_location) + ids_to_run = self._get_valid_run_ids(rule_ids) + self._store_results(run_result, ids_to_run, type) + return results \ No newline at end of file diff --git a/be/app/models.py b/be/app/models.py index 1a08e8c..e317af6 100644 --- a/be/app/models.py +++ b/be/app/models.py @@ -4,6 +4,7 @@ import dataclasses from dataclasses import dataclass, field import uuid +from enum import Enum from pyiceberg.table import FileScanTask from pyiceberg.typedef import Record @@ -90,12 +91,17 @@ class JobSchedule: id: str = field(default_factory=lambda: str(uuid.uuid4())) is_enabled: bool = True +class RuleLevel(Enum): + TABLE = 'TABLE' + LAKEHOUSE = 'LAKEHOUSE' + @dataclass class Rule: id: str name: str description: str method: Any + level: RuleLevel = RuleLevel.TABLE class RuleOut(BaseModel): id: str diff --git a/be/app/routers/insights.py b/be/app/routers/insights.py index 2add0d5..2eb151b 100644 --- a/be/app/routers/insights.py +++ b/be/app/routers/insights.py @@ -6,7 +6,7 @@ from app.insights.rules import ALL_RULES_OBJECT from app.dependencies import get_runner from app.exceptions import LVException -from app.models import RuleOut, RuleSummaryOut, InsightRun, InsightRunOut +from app.models import RuleOut, RuleSummaryOut, InsightRun, InsightRunOut, RuleLevel router = APIRouter() @@ -55,5 +55,8 @@ def get_insights_summary( return summary_data @router.get("/api/lakehouse/insights/rules", response_model=List[RuleOut]) -def get_insight_rules(): - return ALL_RULES_OBJECT \ No newline at end of file +def get_insight_rules(level: Optional[RuleLevel]= None): + if level: + return [rule for rule in ALL_RULES_OBJECT if rule.level == level] + else: + return ALL_RULES_OBJECT \ No newline at end of file diff --git a/fe/src/routes/+layout.svelte b/fe/src/routes/+layout.svelte index 2feac2b..3bd2486 100644 --- a/fe/src/routes/+layout.svelte +++ b/fe/src/routes/+layout.svelte @@ -28,6 +28,7 @@ let AUTH_ENABLED = false; let CHAT_ENABLED = false; let CHAT_NAME = 'Chat'; + let CHAT_DESCRIPTION = ''; let extra_link; let extra_link_text; let company = 'Apache Iceberg'; @@ -75,6 +76,9 @@ if (env.PUBLIC_CHAT_NAME) { CHAT_NAME = env.PUBLIC_CHAT_NAME; } + if (env.PUBLIC_CHAT_DESCRIPTION) { + CHAT_DESCRIPTION = env.PUBLIC_CHAT_DESCRIPTION; + } CHAT_ENABLED = true; } if (env.PUBLIC_EXTRA_LINK) { @@ -201,11 +205,12 @@ size='lg' passiveModal bind:open={isChatOpen} - modalHeading="{CHAT_NAME} - Chat with your Lakehouse" + modalHeading="{CHAT_NAME}" on:open={handleChatOpen} on:close={handleChatClose} on:click:overlay={handleChatClose} > +
{CHAT_DESCRIPTION}
{#if user}