Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions be/app/insights/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from statistics import median
from typing import Dict

from app.models import Rule
from app.models import Rule, RuleLevel
from app.models import Insight

rules_yaml_path = os.path.join(os.path.dirname(__file__), "rules.yaml")
Expand Down Expand Up @@ -252,6 +252,23 @@ def rule_skewed_or_largest_partitions_table(table: Table) -> Optional[Insight]:

return None

def rule_multiple_tables_same_location(tables_by_location: Dict[str, list[str]]) -> list[Insight]:
result = []
meta = INSIGHT_META["MULTIPLE_TABLES_SAME_LOCATION"]
for location, tables in tables_by_location.items():
if len(tables) > 0:
for tablename in tables:
result.append(
Insight(
code="MULTIPLE_TABLES_SAME_LOCATION",
table=tablename,
message=meta["message"].format(tables=", ".join(tables)),
severity=meta["severity"],
suggested_action=meta["suggested_action"]
)
)
return result


ALL_RULES = [
rule_small_files,
Expand All @@ -261,7 +278,8 @@ def rule_skewed_or_largest_partitions_table(table: Table) -> Optional[Insight]:
rule_column_uuid_table,
rule_no_rows_table,
rule_too_many_snapshot_table,
rule_skewed_or_largest_partitions_table
rule_skewed_or_largest_partitions_table,
rule_multiple_tables_same_location
]

ALL_RULES_OBJECT = [
Expand All @@ -272,5 +290,6 @@ def rule_skewed_or_largest_partitions_table(table: Table) -> Optional[Insight]:
Rule("UUID_COLUMN", "UUID type not universally supported", "UUID column type may not be supported in all environments, especially Spark and older Presto", rule_column_uuid_table),
Rule("NO_ROWS_TABLE", "Empty table", "Table has been declared but has no data. Possibly intentional.", rule_no_rows_table),
Rule("SNAPSHOT_SPRAWL_TABLE", "Too many snapshots", "A high snapshot count for a table creates memory and process overhead for the catalog service and catalog results processing on clients. Expire snapshots or adjust snapshot age configuration if practical.", rule_too_many_snapshot_table),
Rule("SKEWED_OR_LARGEST_PARTITIONS_TABLE", "Large partition", "The table contains one partition that is considerably larger than the rest. Evaluate if the table can be repartitioned by another column/criteria.", rule_skewed_or_largest_partitions_table)
Rule("SKEWED_OR_LARGEST_PARTITIONS_TABLE", "Large partition", "The table contains one partition that is considerably larger than the rest. Evaluate if the table can be repartitioned by another column/criteria.", rule_skewed_or_largest_partitions_table),
Rule("MULTIPLE_TABLES_SAME_LOCATION", "Multiple tables with the same location", "More than one table cataloged using the same location. Review and resolve duplicate table locations to avoid data corruption.", rule_multiple_tables_same_location, RuleLevel.LAKEHOUSE)
]
6 changes: 5 additions & 1 deletion be/app/insights/rules.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,8 @@ SNAPSHOT_SPRAWL_TABLE:
SKEWED_OR_LARGEST_PARTITIONS_TABLE:
severity: Info
message: "This table has {partitions} partitions. The largest partition is much bigger than the typical one. Median partition size: {median_size} bytes, largest partition: {largest_size} bytes. Median records: {median_records}, largest partition: {largest_records} records."
suggested_action: "Review partitioning strategy; consider rebalancing data"
suggested_action: "Review partitioning strategy; consider rebalancing data"
MULTIPLE_TABLES_SAME_LOCATION:
severity: Warning
message: "More than one table cataloged using the same location. Tables sharing location: {tables}"
suggested_action: "Review and resolve duplicate table locations to avoid data corruption"
81 changes: 52 additions & 29 deletions be/app/insights/runner.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from typing import List, Dict, Any, Optional, Set
from collections import defaultdict

from app.insights.rules import ALL_RULES_OBJECT, INSIGHT_META
from app.insights.rules import ALL_RULES_OBJECT, INSIGHT_META, rule_multiple_tables_same_location
from app.insights.utils import qualified_table_name, get_namespace_and_table_name
from app.models import Insight, InsightRun, InsightRecord, JobSchedule, InsightRunOut, ActiveInsight, InsightOccurrence, RuleSummaryOut
from app.models import Insight, InsightRun, InsightRecord, JobSchedule, InsightRunOut, ActiveInsight, InsightOccurrence, RuleSummaryOut, RuleLevel
from app.lakeviewer import LakeView
from app.storage.interface import StorageInterface
from sqlalchemy import text, bindparam
Expand All @@ -17,6 +17,7 @@ def __init__(self, lakeview,
self.run_storage = run_storage
self.insight_storage = insight_storage
self.active_insight_storage = active_insight_storage
self._tables_by_location: Dict[str, List[str]] = {}

def get_latest_run(self, namespace: str, size: int, table_name: str = None, showEmpty: bool = True) -> List[InsightRun]:
"""
Expand Down Expand Up @@ -109,30 +110,8 @@ def get_summary_by_rule(self,
)

return final_summary

def run_for_table(self, table_identifier, rule_ids: List[str] = None, type: str = "manual") -> List[Insight]:
table = self.lakeview.load_table(table_identifier)

all_valid_ids: Set[str] = {rule.id for rule in ALL_RULES_OBJECT}
ids_to_run: Set[str]

if rule_ids is None:
ids_to_run = all_valid_ids
else:
provided_ids = set(rule_ids)
invalid_ids = provided_ids - all_valid_ids
if invalid_ids:
raise ValueError(f"Invalid rule IDs provided: {', '.join(sorted(invalid_ids))}")
ids_to_run = provided_ids

namespace, table_name = get_namespace_and_table_name(table_identifier)

run_result: List[Insight] = [
insight
for rule in ALL_RULES_OBJECT
if rule.id in ids_to_run and (insight := rule.method(table))
]


def _store_results(self, run_result: List[Insight], ids_to_run: Set[str], type: str, namespace: Optional[str], table_name: Optional[str]):
run = InsightRun(
namespace=namespace,
table_name=table_name,
Expand Down Expand Up @@ -168,19 +147,57 @@ def run_for_table(self, table_identifier, rule_ids: List[str] = None, type: str
]
self.active_insight_storage.save_many(new_active_insights)

def _get_valid_run_ids(self, rule_ids: List[str] = None) -> Set[str]:
all_valid_ids: Set[str] = {rule.id for rule in ALL_RULES_OBJECT}
ids_to_run: Set[str]

if rule_ids is None:
ids_to_run = all_valid_ids
else:
provided_ids = set(rule_ids)
invalid_ids = provided_ids - all_valid_ids
if invalid_ids:
raise ValueError(f"Invalid rule IDs provided: {', '.join(sorted(invalid_ids))}")
ids_to_run = provided_ids

return ids_to_run

def run_for_table(self, table_identifier, rule_ids: List[str] = None, type: str = "manual") -> List[Insight]:
table = self.lakeview.load_table(table_identifier)

if "MULTIPLE_TABLES_SAME_LOCATION" in rule_ids:
location = getattr(table, "location", None)
if location in self._tables_by_location:
self._tables_by_location[location].append(table_identifier)
else:
self._tables_by_location[location] = [table_identifier]

ids_to_run = self._get_valid_run_ids(rule_ids)

namespace, table_name = get_namespace_and_table_name(table_identifier)

run_result: List[Insight] = [
insight
for rule in ALL_RULES_OBJECT
if rule.id in ids_to_run and rule.level == RuleLevel.TABLE and (insight := rule.method(table))
]

self._store_results(run_result, ids_to_run, type, namespace, table_name)

return run_result

def run_for_namespace(self, namespace: str, rule_ids: List[str] = None, recursive: bool = True, type: str = "manual") -> Dict[str, List[Insight]]:
def run_for_namespace(self, namespace: str, rule_ids: List[str] = None, recursive: bool = False, type: str = "manual") -> Dict[str, List[Insight]]:
tables = self.lakeview.get_tables(namespace)
results = []
for t_ident in tables:
qualified = qualified_table_name(t_ident)
results.extend(self.run_for_table(qualified, rule_ids, type))
if recursive:
nested_namespaces = self.lakeview._get_nested_namespaces(namespace)
ns = tuple(namespace.split('.'))
nested_namespaces = self.lakeview._get_nested_namespaces(ns, len(ns))
for ns in nested_namespaces:
ns_str = ".".join(ns)
results.extend(self.run_for_namespace(ns_str, rule_ids, recursive=False, type=type))
results.extend(self.run_for_namespace(ns_str, rule_ids, recursive=recursive, type=type))
return results

def run_for_lakehouse(self, rule_ids: List[str] = None, type: str = "manual") -> Dict[str, List[Insight]]:
Expand All @@ -189,4 +206,10 @@ def run_for_lakehouse(self, rule_ids: List[str] = None, type: str = "manual") ->
for ns in namespaces:
ns_str = ".".join(ns) if isinstance(ns, (tuple, list)) else str(ns)
results.extend(self.run_for_namespace(ns_str, rule_ids, recursive=True, type=type))

if "MULTIPLE_TABLES_SAME_LOCATION" in rule_ids:
run_result: List[Insight] = rule_multiple_tables_same_location(self._tables_by_location)
ids_to_run = self._get_valid_run_ids(rule_ids)
self._store_results(run_result, ids_to_run, type)

return results
6 changes: 6 additions & 0 deletions be/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import dataclasses
from dataclasses import dataclass, field
import uuid
from enum import Enum

from pyiceberg.table import FileScanTask
from pyiceberg.typedef import Record
Expand Down Expand Up @@ -90,12 +91,17 @@ class JobSchedule:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
is_enabled: bool = True

class RuleLevel(Enum):
TABLE = 'TABLE'
LAKEHOUSE = 'LAKEHOUSE'

@dataclass
class Rule:
id: str
name: str
description: str
method: Any
level: RuleLevel = RuleLevel.TABLE

class RuleOut(BaseModel):
id: str
Expand Down
9 changes: 6 additions & 3 deletions be/app/routers/insights.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from app.insights.rules import ALL_RULES_OBJECT
from app.dependencies import get_runner
from app.exceptions import LVException
from app.models import RuleOut, RuleSummaryOut, InsightRun, InsightRunOut
from app.models import RuleOut, RuleSummaryOut, InsightRun, InsightRunOut, RuleLevel

router = APIRouter()

Expand Down Expand Up @@ -55,5 +55,8 @@ def get_insights_summary(
return summary_data

@router.get("/api/lakehouse/insights/rules", response_model=List[RuleOut])
def get_insight_rules():
return ALL_RULES_OBJECT
def get_insight_rules(level: Optional[RuleLevel]= None):
if level:
return [rule for rule in ALL_RULES_OBJECT if rule.level == level]
else:
return ALL_RULES_OBJECT
7 changes: 6 additions & 1 deletion fe/src/routes/+layout.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
let AUTH_ENABLED = false;
let CHAT_ENABLED = false;
let CHAT_NAME = 'Chat';
let CHAT_DESCRIPTION = '';
let extra_link;
let extra_link_text;
let company = 'Apache Iceberg';
Expand Down Expand Up @@ -75,6 +76,9 @@
if (env.PUBLIC_CHAT_NAME) {
CHAT_NAME = env.PUBLIC_CHAT_NAME;
}
if (env.PUBLIC_CHAT_DESCRIPTION) {
CHAT_DESCRIPTION = env.PUBLIC_CHAT_DESCRIPTION;
}
CHAT_ENABLED = true;
}
if (env.PUBLIC_EXTRA_LINK) {
Expand Down Expand Up @@ -201,11 +205,12 @@
size='lg'
passiveModal
bind:open={isChatOpen}
modalHeading="{CHAT_NAME} - Chat with your Lakehouse"
modalHeading="{CHAT_NAME}"
on:open={handleChatOpen}
on:close={handleChatClose}
on:click:overlay={handleChatClose}
>
<p>{CHAT_DESCRIPTION}</p>
{#if user}
<Chat {user} />
{/if}
Expand Down
2 changes: 1 addition & 1 deletion fe/src/routes/+page.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@

async function fetchAllRules() {
try {
const response = await fetch('/api/lakehouse/insights/rules');
const response = await fetch('/api/lakehouse/insights/rules?level=TABLE');
if (!response.ok) throw new Error('Failed to fetch rules');
allRules = await response.json();
ruleIdToNameMap = new Map(allRules.map((rule) => [rule.id, rule.name]));
Expand Down
1 change: 1 addition & 0 deletions my.env
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ AUTHZ_CLASS_NAME=
#Chat config
PUBLIC_CHAT_ENABLED=false
PUBLIC_CHAT_NAME=Chat
PUBLIC_CHAT_DESCRIPTION=Chat with your Lakehouse
PUBLIC_CHAT_SERVER=ws://localhost:8000/ws

#Any extra link we want to put, would go in top right app selector
Expand Down