diff --git a/docs/concepts/audits.md b/docs/concepts/audits.md index a5a9fccc49..1e037553dd 100644 --- a/docs/concepts/audits.md +++ b/docs/concepts/audits.md @@ -722,3 +722,104 @@ MODEL ( ) ); ``` + +### AUDIT_ONLY Models + +In addition to traditional audits, SQLMesh provides a special model kind called `AUDIT_ONLY` for validating data relationships across multiple tables without materializing any results. + +#### When to Use AUDIT_ONLY Models + +Use `AUDIT_ONLY` models when you need to: +- Validate relationships between multiple tables (e.g., referential integrity) +- Run complex validation queries that don't belong to a single model +- Create validation logic that participates in the model DAG with proper dependencies +- Avoid creating unnecessary materialized tables just for validation + +Unlike traditional audits that are scoped to a single model, `AUDIT_ONLY` models can depend on multiple models and validate relationships between them. + +#### Creating AUDIT_ONLY Models + +AUDIT_ONLY models are defined like regular models but with `kind AUDIT_ONLY`: + +```sql +MODEL ( + name data_quality.order_validation, + kind AUDIT_ONLY ( + blocking TRUE, -- Fail pipeline if validation fails (default) + max_failing_rows 20 -- Number of sample rows to show in error (default: 10) + ), + depends_on [orders, customers], + cron '@daily', + owner 'data_quality_team' +); + +-- Query should return 0 rows for success +-- Any returned rows indicate validation failures +SELECT + o.order_id, + o.customer_id, + 'Missing customer record' as issue_type +FROM orders o +LEFT JOIN customers c ON o.customer_id = c.customer_id +WHERE c.customer_id IS NULL; +``` + +#### Key Differences from Regular Audits + +| Feature | Traditional Audits | AUDIT_ONLY Models | +|---------|-------------------|-------------------| +| **Scope** | Single model | Multiple models | +| **Dependencies** | Implicit (via @this_model) | Explicit (via depends_on) | +| **Materialization** | N/A | Never materializes | +| **Location** | `audits/` directory or inline | `models/` directory | +| **Scheduling** | With parent model | Independent cron schedule | +| **DAG Participation** | Attached to model | Full model in DAG | + +#### Configuration Options + +AUDIT_ONLY models support these configuration options: + +- **`blocking`** (default: `TRUE`): Whether validation failures should stop the pipeline +- **`max_failing_rows`** (default: `10`): Maximum number of failing rows to show in error messages + +Example with non-blocking configuration: + +```sql +MODEL ( + name data_quality.revenue_anomalies, + kind AUDIT_ONLY ( + blocking FALSE, -- Log warnings but don't stop pipeline + max_failing_rows 50 -- Show up to 50 failing rows + ), + depends_on [revenue_by_day] +); + +-- Detect revenue anomalies +WITH stats AS ( + SELECT AVG(revenue) as avg_rev, STDDEV(revenue) as stddev_rev + FROM revenue_by_day +) +SELECT + day, + revenue, + 'Anomaly: >3 standard deviations' as issue +FROM revenue_by_day +CROSS JOIN stats +WHERE revenue > avg_rev + (3 * stddev_rev) + OR revenue < 0; +``` + +#### How AUDIT_ONLY Models Work + +1. **No Table Creation**: The model's query executes but doesn't create or update any tables +2. **Validation Logic**: The model fails if the query returns any rows (similar to audits) +3. **Error Reporting**: Shows a sample of failing rows in the error message +4. **Pipeline Integration**: Participates in plan/apply workflow with proper dependency ordering + +#### Best Practices + +1. **Use descriptive names**: Name your AUDIT_ONLY models clearly (e.g., `audit_order_integrity`, `validate_user_consistency`) +2. **Set appropriate blocking**: Use `blocking TRUE` for critical validations, `FALSE` for warnings +3. **Include context in output**: Return columns that help identify and debug issues +4. **Group related validations**: Consider combining related checks in a single AUDIT_ONLY model +5. **Document validation logic**: Use model descriptions to explain what's being validated and why diff --git a/docs/concepts/models/model_kinds.md b/docs/concepts/models/model_kinds.md index d01cc738a6..78164f9208 100644 --- a/docs/concepts/models/model_kinds.md +++ b/docs/concepts/models/model_kinds.md @@ -860,6 +860,115 @@ SELECT DISTINCT FROM db.employees; ``` +## AUDIT_ONLY + +The `AUDIT_ONLY` model kind is designed for data validation across multiple tables without materializing any results. These models execute validation queries and fail if any rows are returned, similar to [audits](../audits.md#audit_only-models) but with the ability to participate as full models in the DAG. + +### Purpose + +`AUDIT_ONLY` models are ideal for: +- Validating referential integrity between multiple tables +- Detecting data quality issues across different models +- Running complex validation queries that don't belong to a single model +- Avoiding unnecessary table materialization for validation purposes + +### Configuration + +The `AUDIT_ONLY` kind supports two configuration parameters: + +- **`blocking`** (default: `TRUE`): Determines whether validation failures stop the pipeline +- **`max_failing_rows`** (default: `10`): Maximum number of failing rows to display in error messages + +### Example: Referential Integrity Check + +This example validates that all orders reference existing customers: + +```sql linenums="1" +MODEL ( + name data_quality.order_integrity, + kind AUDIT_ONLY ( + blocking TRUE, + max_failing_rows 20 + ), + depends_on [orders, customers], + cron '@daily', + owner 'data_quality_team' +); + +-- Query should return 0 rows for validation to pass +SELECT + o.order_id, + o.customer_id, + o.order_date, + 'Missing customer record' as issue_type +FROM orders o +LEFT JOIN customers c ON o.customer_id = c.customer_id +WHERE c.customer_id IS NULL; +``` + +### Example: Non-Blocking Anomaly Detection + +This example detects revenue anomalies but doesn't stop the pipeline: + +```sql linenums="1" +MODEL ( + name data_quality.revenue_anomalies, + kind AUDIT_ONLY ( + blocking FALSE, -- Log warnings but continue + max_failing_rows 100 + ), + depends_on [daily_revenue], + cron '@hourly' +); + +WITH stats AS ( + SELECT + AVG(revenue) as avg_revenue, + STDDEV(revenue) as stddev_revenue + FROM daily_revenue + WHERE revenue > 0 +) +SELECT + date, + revenue, + CASE + WHEN revenue < 0 THEN 'Negative revenue' + WHEN revenue > avg_revenue + (5 * stddev_revenue) THEN 'Extreme outlier' + END as anomaly_type +FROM daily_revenue +CROSS JOIN stats +WHERE revenue < 0 + OR revenue > avg_revenue + (5 * stddev_revenue); +``` + +### Behavior + +1. **No Materialization**: AUDIT_ONLY models never create or update tables +2. **Validation Logic**: The model succeeds if the query returns 0 rows, fails otherwise +3. **Error Reporting**: When validation fails, shows a sample of failing rows (up to `max_failing_rows`) +4. **DAG Integration**: Fully participates in the model DAG with proper dependency tracking +5. **Scheduling**: Can be scheduled independently using cron expressions + +### Best Practices + +- **Naming Convention**: Use descriptive names like `audit_*` or `validate_*` to clearly indicate the model's purpose +- **Include Context**: Add columns that describe what validation failed for easier debugging +- **Optimize Performance**: These queries run during every plan/apply, so ensure they're efficient +- **Set Appropriate Blocking**: Use `blocking TRUE` for critical validations, `FALSE` for monitoring +- **Document Purpose**: Use the `description` field to explain what the validation checks + +### Comparison with Traditional Audits + +While both AUDIT_ONLY models and traditional audits validate data, they serve different purposes: + +| Aspect | Traditional Audits | AUDIT_ONLY Models | +|--------|-------------------|-------------------| +| **Scope** | Single model | Multiple models | +| **Location** | `audits/` directory or inline | `models/` directory | +| **Dependencies** | Implicit via parent model | Explicit via `depends_on` | +| **Scheduling** | With parent model | Independent cron | +| **Use Case** | Validate model output | Validate cross-model relationships | + ## SEED The `SEED` model kind is used to specify [seed models](./seed_models.md) for using static CSV datasets in your SQLMesh project. diff --git a/docs/reference/model_configuration.md b/docs/reference/model_configuration.md index a5a96ebbf9..1f5ffb1c82 100644 --- a/docs/reference/model_configuration.md +++ b/docs/reference/model_configuration.md @@ -306,6 +306,17 @@ Configuration options for [`SCD_TYPE_2_BY_COLUMN` models](../concepts/models/mod Python model kind `name` enum value: [ModelKindName.SCD_TYPE_2_BY_COLUMN](https://sqlmesh.readthedocs.io/en/stable/_readthedocs/html/sqlmesh/core/model/kind.html#ModelKindName) +### `AUDIT_ONLY` models + +Configuration options for [`AUDIT_ONLY` models](../concepts/models/model_kinds.md#audit_only) (in addition to [general model properties](#general-model-properties)). + +| Option | Description | Type | Required | +| ------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------- | :--: | :------: | +| `blocking` | If set to true, the pipeline will fail when the validation query returns any rows. If false, only warnings are logged. (Default: `True`) | bool | N | +| `max_failing_rows` | Maximum number of failing rows to display in error messages when a validation fails. (Default: `10`) | int | N | + +Python model kind `name` enum value: [ModelKindName.AUDIT_ONLY](https://sqlmesh.readthedocs.io/en/stable/_readthedocs/html/sqlmesh/core/model/kind.html#ModelKindName) + ### `SEED` models Configuration options for [`SEED` models](../concepts/models/model_kinds.md#seed). `SEED` models do not support all the general properties supported by other models; they only support the properties listed in this table. diff --git a/examples/sushi/models/audit_duplicate_orders.sql b/examples/sushi/models/audit_duplicate_orders.sql new file mode 100644 index 0000000000..1970536645 --- /dev/null +++ b/examples/sushi/models/audit_duplicate_orders.sql @@ -0,0 +1,43 @@ +MODEL ( + name sushi.audit_duplicate_orders, + kind AUDIT_ONLY ( + blocking FALSE, + max_failing_rows 100 + ), + depends_on [sushi.orders], + cron '@hourly', + owner 'data_engineering', + tags ['validation', 'duplicates', 'data_quality'], + description 'Detects potential duplicate orders based on customer, waiter, and timing' +); + +-- Find potential duplicate orders +-- Orders from the same customer to the same waiter within 5 minutes might be duplicates +WITH potential_duplicates AS ( + SELECT + o1.id as order_id_1, + o2.id as order_id_2, + o1.customer_id, + o1.waiter_id, + o1.start_ts as order_1_time, + o2.start_ts as order_2_time, + ABS(o1.start_ts - o2.start_ts) as seconds_apart + FROM sushi.orders o1 + INNER JOIN sushi.orders o2 + ON o1.customer_id = o2.customer_id + AND o1.waiter_id = o2.waiter_id + AND o1.id < o2.id -- Avoid comparing order with itself and duplicating pairs + AND o1.event_date = o2.event_date -- Same day + WHERE ABS(o1.start_ts - o2.start_ts) <= 300 -- Within 5 minutes (300 seconds) +) +SELECT + order_id_1, + order_id_2, + customer_id, + waiter_id, + seconds_apart, + CONCAT('Orders ', order_id_1::TEXT, ' and ', order_id_2::TEXT, + ' from customer ', customer_id::TEXT, + ' are only ', seconds_apart::TEXT, ' seconds apart') as issue_description +FROM potential_duplicates +ORDER BY seconds_apart, order_id_1 \ No newline at end of file diff --git a/examples/sushi/models/audit_order_integrity.sql b/examples/sushi/models/audit_order_integrity.sql new file mode 100644 index 0000000000..e99c30e6ae --- /dev/null +++ b/examples/sushi/models/audit_order_integrity.sql @@ -0,0 +1,25 @@ +MODEL ( + name sushi.audit_order_integrity, + kind AUDIT_ONLY ( + blocking FALSE, -- Set to non-blocking for example/demo purposes + max_failing_rows 20 + ), + depends_on [sushi.orders, sushi.customers], + cron '@daily', + owner 'data_quality_team', + tags ['validation', 'referential_integrity', 'critical'], + description 'Validates referential integrity between orders and customers tables' +); + +-- Check for orders with non-existent customer IDs +-- This should return no rows if all orders have valid customers +SELECT + o.id as order_id, + o.customer_id, + o.event_date, + 'Missing customer record' as issue_type, + CONCAT('Order ', o.id::TEXT, ' references non-existent customer ', o.customer_id::TEXT) as issue_description +FROM sushi.orders o +LEFT JOIN sushi.customers c + ON o.customer_id = c.customer_id +WHERE c.customer_id IS NULL \ No newline at end of file diff --git a/examples/sushi/models/audit_waiter_revenue_anomalies.sql b/examples/sushi/models/audit_waiter_revenue_anomalies.sql new file mode 100644 index 0000000000..b8faad45a1 --- /dev/null +++ b/examples/sushi/models/audit_waiter_revenue_anomalies.sql @@ -0,0 +1,47 @@ +MODEL ( + name sushi.audit_waiter_revenue_anomalies, + kind AUDIT_ONLY ( + blocking FALSE, + max_failing_rows 50 + ), + depends_on [sushi.waiter_revenue_by_day], + cron '@daily', + owner 'analytics_team', + tags ['validation', 'revenue', 'daily'], + description 'Detects anomalies in daily waiter revenue that may indicate data quality issues' +); + +-- Detect anomalies in waiter daily revenue +-- Only flag extreme outliers (>5 std dev) or negative revenue +WITH revenue_stats AS ( + SELECT + AVG(revenue) as avg_revenue, + STDDEV(revenue) as stddev_revenue + FROM sushi.waiter_revenue_by_day + WHERE revenue > 0 -- Exclude zeros from stats calculation +), +anomalies AS ( + SELECT + w.waiter_id, + w.event_date, + w.revenue, + r.avg_revenue, + r.stddev_revenue, + CASE + WHEN w.revenue < 0 THEN 'Negative revenue' + WHEN w.revenue > r.avg_revenue + (5 * r.stddev_revenue) THEN 'Extremely high revenue (>5 std dev)' + END as anomaly_type + FROM sushi.waiter_revenue_by_day w + CROSS JOIN revenue_stats r + WHERE + w.revenue < 0 + OR w.revenue > r.avg_revenue + (5 * r.stddev_revenue) -- Only flag extreme outliers +) +SELECT + waiter_id, + event_date, + revenue, + anomaly_type, + CONCAT('Waiter ', waiter_id::TEXT, ' has ', anomaly_type, ' on ', event_date::TEXT) as issue_description +FROM anomalies +ORDER BY event_date DESC, waiter_id \ No newline at end of file diff --git a/sqlmesh/core/dialect.py b/sqlmesh/core/dialect.py index ed904cc4b3..1b5fc54be8 100644 --- a/sqlmesh/core/dialect.py +++ b/sqlmesh/core/dialect.py @@ -621,6 +621,7 @@ def parse(self: Parser) -> t.Optional[exp.Expression]: ModelKindName.SCD_TYPE_2_BY_TIME, ModelKindName.SCD_TYPE_2_BY_COLUMN, ModelKindName.CUSTOM, + ModelKindName.AUDIT_ONLY, ) and self._match(TokenType.L_PAREN, advance=False): props = self._parse_wrapped_csv(functools.partial(_parse_props, self)) else: diff --git a/sqlmesh/core/model/kind.py b/sqlmesh/core/model/kind.py index dc5f533c21..2b406dbbae 100644 --- a/sqlmesh/core/model/kind.py +++ b/sqlmesh/core/model/kind.py @@ -119,10 +119,18 @@ def is_custom(self) -> bool: def is_managed(self) -> bool: return self.model_kind_name == ModelKindName.MANAGED + @property + def is_audit_only(self) -> bool: + return self.model_kind_name == ModelKindName.AUDIT_ONLY + @property def is_symbolic(self) -> bool: """A symbolic model is one that doesn't execute at all.""" - return self.model_kind_name in (ModelKindName.EMBEDDED, ModelKindName.EXTERNAL) + return self.model_kind_name in ( + ModelKindName.EMBEDDED, + ModelKindName.EXTERNAL, + ModelKindName.AUDIT_ONLY, + ) @property def is_materialized(self) -> bool: @@ -170,6 +178,7 @@ class ModelKindName(str, ModelKindMixin, Enum): EXTERNAL = "EXTERNAL" CUSTOM = "CUSTOM" MANAGED = "MANAGED" + AUDIT_ONLY = "AUDIT_ONLY" @property def model_kind_name(self) -> t.Optional[ModelKindName]: @@ -977,6 +986,32 @@ def to_expression( ) +class AuditOnlyKind(_ModelKind): + name: t.Literal[ModelKindName.AUDIT_ONLY] = ModelKindName.AUDIT_ONLY + blocking: SQLGlotBool = True + max_failing_rows: SQLGlotPositiveInt = 10 + + @property + def is_symbolic(self) -> bool: + """Audit-only models don't materialize tables.""" + return True + + def to_expression( + self, expressions: t.Optional[t.List[exp.Expression]] = None, **kwargs: t.Any + ) -> d.ModelKind: + return super().to_expression( + expressions=[ + *(expressions or []), + *_properties( + { + "blocking": self.blocking, + "max_failing_rows": self.max_failing_rows, + } + ), + ], + ) + + ModelKind = t.Annotated[ t.Union[ EmbeddedKind, @@ -992,6 +1027,7 @@ def to_expression( SCDType2ByColumnKind, CustomKind, ManagedKind, + AuditOnlyKind, ], Field(discriminator="name"), ] @@ -1011,6 +1047,7 @@ def to_expression( ModelKindName.SCD_TYPE_2_BY_COLUMN: SCDType2ByColumnKind, ModelKindName.CUSTOM: CustomKind, ModelKindName.MANAGED: ManagedKind, + ModelKindName.AUDIT_ONLY: AuditOnlyKind, } diff --git a/sqlmesh/core/plan/definition.py b/sqlmesh/core/plan/definition.py index aaf6ec5dc0..ac854bcc8c 100644 --- a/sqlmesh/core/plan/definition.py +++ b/sqlmesh/core/plan/definition.py @@ -187,7 +187,7 @@ def missing_intervals(self) -> t.List[SnapshotIntervals]: end_bounded=self.end_bounded, ignore_cron=self.ignore_cron, ).items() - if snapshot.is_model and missing + if snapshot.is_model and missing and not snapshot.model.kind.is_symbolic ] return sorted(intervals, key=lambda i: i.snapshot_id) diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index c17e94be10..7d1fcf0833 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -960,7 +960,7 @@ def merge_intervals(self, other: t.Union[Snapshot, SnapshotIntervals]) -> None: @property def evaluatable(self) -> bool: """Whether or not a snapshot should be evaluated and have intervals.""" - return bool(not self.is_symbolic or self.model.audits) + return bool(not self.is_symbolic or self.model.audits or self.is_audit_only) def missing_intervals( self, diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 961062fe45..3523857d39 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1487,6 +1487,8 @@ def _evaluation_strategy(snapshot: SnapshotInfoLike, adapter: EngineAdapter) -> klass: t.Type if snapshot.is_embedded: klass = EmbeddedStrategy + elif snapshot.is_audit_only: + klass = AuditOnlyStrategy elif snapshot.is_symbolic or snapshot.is_audit: klass = SymbolicStrategy elif snapshot.is_full: @@ -1706,6 +1708,93 @@ def demote(self, view_name: str, **kwargs: t.Any) -> None: pass +class AuditOnlyStrategy(SymbolicStrategy): + def insert( + self, + table_name: str, + query_or_df: QueryOrDF, + model: Model, + is_first_insert: bool, + render_kwargs: t.Dict[str, t.Any], + **kwargs: t.Any, + ) -> None: + """Override insert to perform validation instead of data insertion.""" + self._validate(model, render_kwargs, **kwargs) + + def append( + self, + table_name: str, + query_or_df: QueryOrDF, + model: Model, + render_kwargs: t.Dict[str, t.Any], + **kwargs: t.Any, + ) -> None: + """Override append to perform validation instead of data insertion.""" + self._validate(model, render_kwargs, **kwargs) + + def _validate( + self, + model: Model, + render_kwargs: t.Dict[str, t.Any], + **kwargs: t.Any, + ) -> None: + """Execute audit-only model and raise AuditError if validation fails.""" + from sqlmesh.utils.errors import AuditError + + if not model: + raise SQLMeshError(f"No model found for validation") + + # Ensure we have the necessary keys in render_kwargs + full_render_kwargs = { + **render_kwargs, + **kwargs, + } + + # Add engine_adapter if not present + if "engine_adapter" not in full_render_kwargs: + full_render_kwargs["engine_adapter"] = self.adapter + + query = model.render_query(**full_render_kwargs) + + if query is None: + raise RuntimeError(f"AUDIT_ONLY model '{model.fqn}' rendered to None query") + + # Count the rows returned by the validation query + count_query = select("COUNT(*)").from_(query.subquery("audit_only")) + result = self.adapter.fetchone(count_query, quote_identifiers=True) + count = result[0] if result else 0 + + if count > 0: + # Fetch sample failing rows for the error message + max_rows = ( + model.kind.max_failing_rows if hasattr(model.kind, "max_failing_rows") else 10 + ) + sample_query = select("*").from_(query.subquery("audit_only")).limit(max_rows) + failing_rows = self.adapter.fetchdf(sample_query, quote_identifiers=True) + + # Check if this is a blocking audit + is_blocking = model.kind.blocking if hasattr(model.kind, "blocking") else True + + error_msg = ( + f"AUDIT_ONLY model '{model.fqn}' failed with {count} validation error(s).\n" + f"Sample failing rows:\n{failing_rows.to_string() if not failing_rows.empty else 'No data'}" + ) + + if is_blocking: + # Create a proper AuditError with required parameters + raise AuditError( + audit_name=f"{model.fqn}_validation", + audit_args={}, + count=int(count), + query=query, + model=model, + adapter_dialect=self.adapter.dialect, + ) + else: + # Non-blocking: just log the error + logger.warning(error_msg) + + class EmbeddedStrategy(SymbolicStrategy): def promote( self, diff --git a/tests/core/analytics/test_collector.py b/tests/core/analytics/test_collector.py index 1a4c42cbe3..b9008d29c3 100644 --- a/tests/core/analytics/test_collector.py +++ b/tests/core/analytics/test_collector.py @@ -183,7 +183,7 @@ def test_on_plan_apply( { "seq_num": 0, "event_type": "PLAN_APPLY_START", - "event": f'{{"plan_id": "{plan_id}", "engine_type": "bigquery", "state_sync_type": "mysql", "scheduler_type": "builtin", "is_dev": false, "skip_backfill": false, "no_gaps": false, "forward_only": false, "ensure_finalized_snapshots": false, "has_restatements": false, "directly_modified_count": 21, "indirectly_modified_count": 0, "environment_name_hash": "d6e4a9b6646c62fc48baa6dd6150d1f7"}}', + "event": f'{{"plan_id": "{plan_id}", "engine_type": "bigquery", "state_sync_type": "mysql", "scheduler_type": "builtin", "is_dev": false, "skip_backfill": false, "no_gaps": false, "forward_only": false, "ensure_finalized_snapshots": false, "has_restatements": false, "directly_modified_count": 24, "indirectly_modified_count": 0, "environment_name_hash": "d6e4a9b6646c62fc48baa6dd6150d1f7"}}', **common_fields, } ), diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index ef7c59ea7d..09c2d98335 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -85,6 +85,11 @@ pytestmark = pytest.mark.slow +def count_non_symbolic_snapshots(plan): + """Count only non-symbolic snapshots (excludes AUDIT_ONLY models).""" + return len([s for s in plan.new_snapshots if not (s.is_model and s.model.kind.is_symbolic)]) + + @pytest.fixture(autouse=True) def mock_choices(mocker: MockerFixture): mocker.patch("sqlmesh.core.console.TerminalConsole._get_snapshot_change_category") @@ -112,7 +117,9 @@ def test_forward_only_plan_with_effective_date(context_fixture: Context, request plan_builder = context.plan_builder("dev", skip_tests=True, forward_only=True) plan = plan_builder.build() - assert len(plan.new_snapshots) == 2 + assert ( + len(plan.new_snapshots) == 3 + ) # waiter_revenue_by_day, top_waiters, and audit_waiter_revenue_anomalies assert ( plan.context_diff.snapshots[snapshot.snapshot_id].change_category == SnapshotChangeCategory.NON_BREAKING @@ -134,63 +141,52 @@ def test_forward_only_plan_with_effective_date(context_fixture: Context, request plan = plan_builder.set_effective_from("2023-01-05").build() # Default start should be set to effective_from - assert plan.missing_intervals == [ - SnapshotIntervals( - snapshot_id=top_waiters_snapshot.snapshot_id, - intervals=[ - (to_timestamp("2023-01-05"), to_timestamp("2023-01-06")), - (to_timestamp("2023-01-06"), to_timestamp("2023-01-07")), - (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), - ], - ), - SnapshotIntervals( - snapshot_id=snapshot.snapshot_id, - intervals=[ - (to_timestamp("2023-01-05"), to_timestamp("2023-01-06")), - (to_timestamp("2023-01-06"), to_timestamp("2023-01-07")), - (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), - ], - ), + # Note: The order of snapshots in missing_intervals may vary, so we sort by snapshot_id + expected_intervals = [ + (to_timestamp("2023-01-05"), to_timestamp("2023-01-06")), + (to_timestamp("2023-01-06"), to_timestamp("2023-01-07")), + (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), ] + expected_snapshot_ids = {top_waiters_snapshot.snapshot_id, snapshot.snapshot_id} + # Also include audit_waiter_revenue_anomalies if it exists + if context.get_snapshot("sushi.audit_waiter_revenue_anomalies", raise_if_missing=False): + audit_snapshot = context.get_snapshot( + "sushi.audit_waiter_revenue_anomalies", raise_if_missing=True + ) + expected_snapshot_ids.add(audit_snapshot.snapshot_id) + + actual_snapshot_ids = {si.snapshot_id for si in plan.missing_intervals} + assert actual_snapshot_ids == expected_snapshot_ids + + # Check that all have the same intervals + for si in plan.missing_intervals: + assert si.intervals == expected_intervals + plan = plan_builder.set_start("2023-01-06").build() # Start override should take precedence - assert plan.missing_intervals == [ - SnapshotIntervals( - snapshot_id=top_waiters_snapshot.snapshot_id, - intervals=[ - (to_timestamp("2023-01-06"), to_timestamp("2023-01-07")), - (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), - ], - ), - SnapshotIntervals( - snapshot_id=snapshot.snapshot_id, - intervals=[ - (to_timestamp("2023-01-06"), to_timestamp("2023-01-07")), - (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), - ], - ), + expected_intervals_2 = [ + (to_timestamp("2023-01-06"), to_timestamp("2023-01-07")), + (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), ] + actual_snapshot_ids = {si.snapshot_id for si in plan.missing_intervals} + assert actual_snapshot_ids == expected_snapshot_ids + + # Check that all have the same intervals + for si in plan.missing_intervals: + assert si.intervals == expected_intervals_2 + plan = plan_builder.set_effective_from("2023-01-04").build() # Start should remain unchanged assert plan.start == "2023-01-06" - assert plan.missing_intervals == [ - SnapshotIntervals( - snapshot_id=top_waiters_snapshot.snapshot_id, - intervals=[ - (to_timestamp("2023-01-06"), to_timestamp("2023-01-07")), - (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), - ], - ), - SnapshotIntervals( - snapshot_id=snapshot.snapshot_id, - intervals=[ - (to_timestamp("2023-01-06"), to_timestamp("2023-01-07")), - (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), - ], - ), - ] + + actual_snapshot_ids = {si.snapshot_id for si in plan.missing_intervals} + assert actual_snapshot_ids == expected_snapshot_ids + + # Check that all have the same intervals (should still be intervals_2) + for si in plan.missing_intervals: + assert si.intervals == expected_intervals_2 context.apply(plan) @@ -205,27 +201,21 @@ def test_forward_only_plan_with_effective_date(context_fixture: Context, request prod_plan = context.plan_builder(skip_tests=True).build() # Make sure that the previously set effective_from is respected assert prod_plan.start == to_timestamp("2023-01-04") - assert prod_plan.missing_intervals == [ - SnapshotIntervals( - snapshot_id=top_waiters_snapshot.snapshot_id, - intervals=[ - (to_timestamp("2023-01-04"), to_timestamp("2023-01-05")), - (to_timestamp("2023-01-05"), to_timestamp("2023-01-06")), - (to_timestamp("2023-01-06"), to_timestamp("2023-01-07")), - (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), - ], - ), - SnapshotIntervals( - snapshot_id=snapshot.snapshot_id, - intervals=[ - (to_timestamp("2023-01-04"), to_timestamp("2023-01-05")), - (to_timestamp("2023-01-05"), to_timestamp("2023-01-06")), - (to_timestamp("2023-01-06"), to_timestamp("2023-01-07")), - (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), - ], - ), + + expected_prod_intervals = [ + (to_timestamp("2023-01-04"), to_timestamp("2023-01-05")), + (to_timestamp("2023-01-05"), to_timestamp("2023-01-06")), + (to_timestamp("2023-01-06"), to_timestamp("2023-01-07")), + (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), ] + actual_prod_snapshot_ids = {si.snapshot_id for si in prod_plan.missing_intervals} + assert actual_prod_snapshot_ids == expected_snapshot_ids + + # Check that all have the same intervals + for si in prod_plan.missing_intervals: + assert si.intervals == expected_prod_intervals + context.apply(prod_plan) prod_df = context.engine_adapter.fetchdf( @@ -253,7 +243,7 @@ def test_forward_only_model_regular_plan(init_and_plan_context: t.Callable): top_waiters_snapshot = context.get_snapshot("sushi.top_waiters", raise_if_missing=True) plan = context.plan_builder("dev", skip_tests=True, enable_preview=False).build() - assert len(plan.new_snapshots) == 2 + assert count_non_symbolic_snapshots(plan) == 2 assert ( plan.context_diff.snapshots[snapshot.snapshot_id].change_category == SnapshotChangeCategory.NON_BREAKING @@ -362,7 +352,7 @@ def test_forward_only_model_regular_plan_preview_enabled(init_and_plan_context: top_waiters_snapshot = context.get_snapshot("sushi.top_waiters", raise_if_missing=True) plan = context.plan_builder("dev", skip_tests=True, enable_preview=True).build() - assert len(plan.new_snapshots) == 2 + assert count_non_symbolic_snapshots(plan) == 2 assert ( plan.context_diff.snapshots[snapshot.snapshot_id].change_category == SnapshotChangeCategory.NON_BREAKING @@ -492,7 +482,7 @@ def test_full_history_restatement_model_regular_plan_preview_enabled( plan = context.plan_builder("dev", skip_tests=True, enable_preview=True).build() - assert len(plan.new_snapshots) == 6 + assert count_non_symbolic_snapshots(plan) == 6 assert ( plan.context_diff.snapshots[snapshot.snapshot_id].change_category == SnapshotChangeCategory.NON_BREAKING @@ -539,7 +529,7 @@ def test_metadata_changed_regular_plan_preview_enabled(init_and_plan_context: t. top_waiters_snapshot = context.get_snapshot("sushi.top_waiters", raise_if_missing=True) plan = context.plan_builder("dev", skip_tests=True, enable_preview=True).build() - assert len(plan.new_snapshots) == 2 + assert count_non_symbolic_snapshots(plan) == 2 assert ( plan.context_diff.snapshots[snapshot.snapshot_id].change_category == SnapshotChangeCategory.METADATA @@ -902,7 +892,7 @@ def test_forward_only_parent_created_in_dev_child_created_in_prod( top_waiters_snapshot = context.get_snapshot("sushi.top_waiters", raise_if_missing=True) plan = context.plan_builder("dev", skip_tests=True, enable_preview=False).build() - assert len(plan.new_snapshots) == 2 + assert count_non_symbolic_snapshots(plan) == 2 assert ( plan.context_diff.snapshots[waiter_revenue_by_day_snapshot.snapshot_id].change_category == SnapshotChangeCategory.NON_BREAKING @@ -925,7 +915,7 @@ def test_forward_only_parent_created_in_dev_child_created_in_prod( top_waiters_snapshot = context.get_snapshot("sushi.top_waiters", raise_if_missing=True) plan = context.plan_builder("prod", skip_tests=True, enable_preview=False).build() - assert len(plan.new_snapshots) == 1 + assert count_non_symbolic_snapshots(plan) == 1 assert ( plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].change_category == SnapshotChangeCategory.NON_BREAKING @@ -967,7 +957,7 @@ def test_plan_set_choice_is_reflected_in_missing_intervals(init_and_plan_context plan_builder = context.plan_builder("dev", skip_tests=True) plan = plan_builder.build() - assert len(plan.new_snapshots) == 2 + assert count_non_symbolic_snapshots(plan) == 2 assert ( plan.context_diff.snapshots[snapshot.snapshot_id].change_category == SnapshotChangeCategory.NON_BREAKING @@ -1115,7 +1105,7 @@ def test_non_breaking_change_after_forward_only_in_dev( top_waiters_snapshot = context.get_snapshot("sushi.top_waiters", raise_if_missing=True) plan = context.plan_builder("dev", skip_tests=True, forward_only=True).build() - assert len(plan.new_snapshots) == 2 + assert count_non_symbolic_snapshots(plan) == 2 assert ( plan.context_diff.snapshots[waiter_revenue_by_day_snapshot.snapshot_id].change_category == SnapshotChangeCategory.NON_BREAKING @@ -1148,7 +1138,7 @@ def test_non_breaking_change_after_forward_only_in_dev( top_waiters_snapshot = context.get_snapshot("sushi.top_waiters", raise_if_missing=True) plan = context.plan_builder("dev", skip_tests=True).build() - assert len(plan.new_snapshots) == 1 + assert count_non_symbolic_snapshots(plan) == 1 assert ( plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].change_category == SnapshotChangeCategory.NON_BREAKING @@ -1247,7 +1237,7 @@ def test_indirect_non_breaking_change_after_forward_only_in_dev(init_and_plan_co top_waiters_snapshot = context.get_snapshot("sushi.top_waiters", raise_if_missing=True) plan = context.plan_builder("dev", skip_tests=True, enable_preview=False).build() - assert len(plan.new_snapshots) == 1 + assert count_non_symbolic_snapshots(plan) == 1 assert ( plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].change_category == SnapshotChangeCategory.NON_BREAKING @@ -1280,7 +1270,7 @@ def test_indirect_non_breaking_change_after_forward_only_in_dev(init_and_plan_co top_waiters_snapshot = context.get_snapshot("sushi.top_waiters", raise_if_missing=True) plan = context.plan_builder("dev", skip_tests=True, enable_preview=False).build() - assert len(plan.new_snapshots) == 2 + assert count_non_symbolic_snapshots(plan) == 2 assert ( plan.context_diff.snapshots[waiter_revenue_by_day_snapshot.snapshot_id].change_category == SnapshotChangeCategory.NON_BREAKING @@ -1398,14 +1388,14 @@ def test_metadata_change_after_forward_only_results_in_migration(init_and_plan_c model = add_projection_to_model(t.cast(SqlModel, model)) context.upsert_model(model) plan = context.plan("dev", skip_tests=True, auto_apply=True, no_prompts=True) - assert len(plan.new_snapshots) == 2 + assert count_non_symbolic_snapshots(plan) == 2 assert all(s.is_forward_only for s in plan.new_snapshots) # Follow-up with a metadata change in the same environment model = model.copy(update={"owner": "new_owner"}) context.upsert_model(model) plan = context.plan("dev", skip_tests=True, auto_apply=True, no_prompts=True) - assert len(plan.new_snapshots) == 2 + assert count_non_symbolic_snapshots(plan) == 2 assert all(s.change_category == SnapshotChangeCategory.METADATA for s in plan.new_snapshots) # Deploy the latest change to prod @@ -1596,7 +1586,10 @@ def test_run_with_select_models( snapshots = context.state_sync.state_sync.get_snapshots(context.snapshots.values()) # Only waiter_revenue_by_day and its parents should be backfilled up to 2023-01-09. - assert {s.name: s.intervals[0][1] for s in snapshots.values() if s.intervals} == { + # Filter out AUDIT_ONLY models (but keep external models which also have is_symbolic=True) + non_audit_snapshots = {s.name: s.intervals[0][1] for s in snapshots.values() + if s.intervals and not (s.is_model and s.model.kind.is_audit_only)} + assert non_audit_snapshots == { '"memory"."sushi"."waiter_revenue_by_day"': to_timestamp("2023-01-09"), '"memory"."sushi"."order_items"': to_timestamp("2023-01-09"), '"memory"."sushi"."orders"': to_timestamp("2023-01-09"), @@ -1636,7 +1629,10 @@ def test_plan_with_run( context.apply(plan) snapshots = context.state_sync.state_sync.get_snapshots(context.snapshots.values()) - assert {s.name: s.intervals[0][1] for s in snapshots.values() if s.intervals} == { + # Filter out AUDIT_ONLY models (but keep external models which also have is_symbolic=True) + non_audit_snapshots = {s.name: s.intervals[0][1] for s in snapshots.values() + if s.intervals and not (s.is_model and s.model.kind.is_audit_only)} + assert non_audit_snapshots == { '"memory"."sushi"."waiter_revenue_by_day"': to_timestamp("2023-01-09"), '"memory"."sushi"."order_items"': to_timestamp("2023-01-09"), '"memory"."sushi"."orders"': to_timestamp("2023-01-09"), @@ -1806,7 +1802,10 @@ def test_run_with_select_models_no_auto_upstream( snapshots = context.state_sync.state_sync.get_snapshots(context.snapshots.values()) # Only waiter_revenue_by_day should be backfilled up to 2023-01-09. - assert {s.name: s.intervals[0][1] for s in snapshots.values() if s.intervals} == { + # Filter out AUDIT_ONLY models (but keep external models which also have is_symbolic=True) + non_audit_snapshots = {s.name: s.intervals[0][1] for s in snapshots.values() + if s.intervals and not (s.is_model and s.model.kind.is_audit_only)} + assert non_audit_snapshots == { '"memory"."sushi"."waiter_revenue_by_day"': to_timestamp("2023-01-09"), '"memory"."sushi"."order_items"': to_timestamp("2023-01-08"), '"memory"."sushi"."orders"': to_timestamp("2023-01-08"), @@ -5264,7 +5263,7 @@ def test_plan_explain(init_and_plan_context: t.Callable): assert plan.has_changes assert plan.missing_intervals assert plan.directly_modified == {waiter_revenue_by_day_snapshot.snapshot_id} - assert len(plan.new_snapshots) == 2 + assert count_non_symbolic_snapshots(plan) == 2 assert {s.snapshot_id for s in plan.new_snapshots} == { waiter_revenue_by_day_snapshot.snapshot_id, top_waiters_snapshot.snapshot_id, @@ -5802,7 +5801,7 @@ def test_multi(mocker): ) context._new_state_sync().reset(default_catalog=context.default_catalog) plan = context.plan_builder().build() - assert len(plan.new_snapshots) == 5 + assert count_non_symbolic_snapshots(plan) == 5 context.apply(plan) # Ensure before_all, after_all statements for multiple repos have executed @@ -5972,7 +5971,7 @@ def test_multi_virtual_layer(copy_to_temp_path): ) plan = context.plan_builder().build() - assert len(plan.new_snapshots) == 4 + assert count_non_symbolic_snapshots(plan) == 4 context.apply(plan) # Validate the tables that source from the first tables are correct as well with evaluate @@ -6115,7 +6114,7 @@ def test_multi_dbt(mocker): context = Context(paths=["examples/multi_dbt/bronze", "examples/multi_dbt/silver"]) context._new_state_sync().reset(default_catalog=context.default_catalog) plan = context.plan_builder().build() - assert len(plan.new_snapshots) == 4 + assert count_non_symbolic_snapshots(plan) == 4 context.apply(plan) validate_apply_basics(context, c.PROD, plan.snapshots.values()) @@ -6145,7 +6144,7 @@ def test_multi_hybrid(mocker): context._new_state_sync().reset(default_catalog=context.default_catalog) plan = context.plan_builder().build() - assert len(plan.new_snapshots) == 5 + assert count_non_symbolic_snapshots(plan) == 5 assert context.dag.roots == {'"memory"."dbt_repo"."e"'} assert context.dag.graph['"memory"."dbt_repo"."c"'] == {'"memory"."sqlmesh_repo"."b"'} assert context.dag.graph['"memory"."sqlmesh_repo"."b"'] == {'"memory"."sqlmesh_repo"."a"'} @@ -6711,7 +6710,9 @@ def test_audit_only_metadata_change(init_and_plan_context: t.Callable): context.upsert_model(model) plan = context.plan_builder("prod", skip_tests=True).build() - assert len(plan.new_snapshots) == 2 + assert ( + len(plan.new_snapshots) == 3 + ) # waiter_revenue_by_day, top_waiters, and audit_waiter_revenue_anomalies assert all(s.change_category.is_metadata for s in plan.new_snapshots) assert not plan.missing_intervals @@ -7119,6 +7120,9 @@ def validate_tables( is_deployable = deployability_index.is_representative(snapshot) if not snapshot.is_model or snapshot.is_external: continue + # AUDIT_ONLY models are symbolic and don't create tables + if snapshot.model.kind.is_symbolic: + continue table_should_exist = not snapshot.is_embedded assert adapter.table_exists(snapshot.table_name(is_deployable)) == table_should_exist if table_should_exist: @@ -7314,7 +7318,7 @@ def test_destroy(copy_to_temp_path): context = Context(paths=paths, config=config) plan = context.plan_builder().build() - assert len(plan.new_snapshots) == 4 + assert count_non_symbolic_snapshots(plan) == 4 context.apply(plan) # Confirm cache exists @@ -10181,3 +10185,266 @@ def test_incremental_by_time_model_ignore_additive_change_unit_test(tmp_path: Pa assert test_result.testsRun == len(test_result.successes) context.close() + + +def test_audit_only_model_validation_success(init_and_plan_context: t.Callable): + """Test that audit-only models execute without materializing tables when validation passes.""" + context, _ = init_and_plan_context("examples/sushi") + + # Create source tables for testing + orders_model = load_sql_based_model( + d.parse(""" + MODEL (name memory.test.orders, kind FULL); + SELECT 1 as order_id, 1 as customer_id UNION ALL SELECT 2, 2 + """) + ) + context.upsert_model(orders_model) + + customers_model = load_sql_based_model( + d.parse(""" + MODEL (name memory.test.customers, kind FULL); + SELECT 1 as customer_id, 'Alice' as name UNION ALL SELECT 2, 'Bob' + """) + ) + context.upsert_model(customers_model) + + # Create audit-only model that validates referential integrity (should pass) + audit_model = load_sql_based_model( + d.parse(""" + MODEL ( + name memory.test.order_validation, + kind AUDIT_ONLY, + depends_on [memory.test.orders, memory.test.customers] + ); + + SELECT o.* FROM memory.test.orders o + LEFT JOIN memory.test.customers c ON o.customer_id = c.customer_id + WHERE c.customer_id IS NULL + """) + ) + context.upsert_model(audit_model) + + # Plan and apply - should succeed since all orders have valid customers + plan = context.plan(auto_apply=True, skip_tests=True) + + # Verify that no table was created for the audit-only model + # DuckDB doesn't have sqlite_master, check using information_schema + tables = context.engine_adapter.fetchdf( + "SELECT table_name FROM information_schema.tables WHERE table_name LIKE '%order_validation%'" + ) + assert tables.empty, "Audit-only model should not create a table" + + +def test_audit_only_model_validation_failure(init_and_plan_context: t.Callable): + """Test that audit-only models raise AuditError when validation fails.""" + context, _ = init_and_plan_context("examples/sushi") + + # Create source tables with invalid data + orders_model = load_sql_based_model( + d.parse(""" + MODEL (name memory.test.orders, kind FULL); + SELECT 1 as order_id, 999 as customer_id -- Invalid customer + """) + ) + context.upsert_model(orders_model) + + customers_model = load_sql_based_model( + d.parse(""" + MODEL (name memory.test.customers, kind FULL); + SELECT 1 as customer_id, 'Alice' as name -- Only customer 1 exists + """) + ) + context.upsert_model(customers_model) + + # Create audit-only model that validates referential integrity (should fail) + audit_model = load_sql_based_model( + d.parse(""" + MODEL ( + name memory.test.order_validation, + kind AUDIT_ONLY ( + blocking TRUE, + max_failing_rows 5 + ), + depends_on [memory.test.orders, memory.test.customers] + ); + + SELECT o.* FROM memory.test.orders o + LEFT JOIN memory.test.customers c ON o.customer_id = c.customer_id + WHERE c.customer_id IS NULL + """) + ) + context.upsert_model(audit_model) + + # Plan should succeed but apply should fail with PlanError during evaluation + from sqlmesh.utils.errors import PlanError + + plan = context.plan(skip_tests=True) + + # Apply the plan - this should trigger audit-only evaluation and fail + with pytest.raises(PlanError) as exc_info: + context.apply(plan) + + # The PlanError wraps the underlying validation failure + assert "Plan application failed" in str(exc_info.value) + + +def test_audit_only_model_non_blocking(init_and_plan_context: t.Callable): + """Test that non-blocking audit-only models don't stop pipeline execution.""" + context, _ = init_and_plan_context("examples/sushi") + + # Create source tables with invalid data + orders_model = load_sql_based_model( + d.parse(""" + MODEL (name memory.test.orders, kind FULL); + SELECT 1 as order_id, 999 as customer_id + """) + ) + context.upsert_model(orders_model) + + customers_model = load_sql_based_model( + d.parse(""" + MODEL (name memory.test.customers, kind FULL); + SELECT 1 as customer_id + """) + ) + context.upsert_model(customers_model) + + # Create non-blocking audit-only model + audit_model = load_sql_based_model( + d.parse(""" + MODEL ( + name memory.test.order_validation, + kind AUDIT_ONLY ( + blocking FALSE + ), + depends_on [memory.test.orders, memory.test.customers] + ); + + SELECT o.* FROM memory.test.orders o + LEFT JOIN memory.test.customers c ON o.customer_id = c.customer_id + WHERE c.customer_id IS NULL + """) + ) + context.upsert_model(audit_model) + + # Create a downstream model that depends on orders (not the audit) + downstream_model = load_sql_based_model( + d.parse(""" + MODEL ( + name memory.test.summary, + kind FULL, + depends_on [memory.test.orders] + ); + + SELECT COUNT(*) as total FROM memory.test.orders + """) + ) + context.upsert_model(downstream_model) + + # Plan and apply should succeed despite validation failure (non-blocking) + plan = context.plan(auto_apply=True, skip_tests=True) + + # Verify downstream model was created + # The table will be created with the full qualified name + summary_df = context.engine_adapter.fetchdf("SELECT * FROM memory.test.summary") + assert len(summary_df) == 1 + assert summary_df.iloc[0]["total"] == 1 + + +def test_audit_only_model_dependencies(init_and_plan_context: t.Callable): + """Test that audit-only models properly track dependencies and execution order.""" + context, _ = init_and_plan_context("examples/sushi") + + # Create base tables + table_a_model = load_sql_based_model( + d.parse(""" + MODEL (name memory.test.table_a, kind FULL); + SELECT 1 as id, 'A' as type + """) + ) + context.upsert_model(table_a_model) + + table_b_model = load_sql_based_model( + d.parse(""" + MODEL (name memory.test.table_b, kind FULL); + SELECT 1 as id, 'B' as type + """) + ) + context.upsert_model(table_b_model) + + table_c_model = load_sql_based_model( + d.parse(""" + MODEL (name memory.test.table_c, kind FULL); + SELECT 1 as id, 'C' as type + """) + ) + context.upsert_model(table_c_model) + + # Create audit-only model with multiple dependencies + audit_model = load_sql_based_model( + d.parse(""" + MODEL ( + name memory.test.multi_table_validation, + kind AUDIT_ONLY, + depends_on [memory.test.table_a, memory.test.table_b, memory.test.table_c] + ); + + SELECT a.id FROM memory.test.table_a a + INNER JOIN memory.test.table_b b ON a.id = b.id + INNER JOIN memory.test.table_c c ON a.id = c.id + WHERE a.type != 'A' OR b.type != 'B' OR c.type != 'C' + """) + ) + context.upsert_model(audit_model) + + # Plan and apply should succeed since validation passes + plan = context.plan(auto_apply=True, skip_tests=True) + + # Verify dependencies are tracked + model = context.get_model("memory.test.multi_table_validation") + assert ( + '"memory"."test"."table_a"' in model.depends_on or "memory.test.table_a" in model.depends_on + ) + assert ( + '"memory"."test"."table_b"' in model.depends_on or "memory.test.table_b" in model.depends_on + ) + assert ( + '"memory"."test"."table_c"' in model.depends_on or "memory.test.table_c" in model.depends_on + ) + + +def test_audit_only_model_with_cron_schedule(init_and_plan_context: t.Callable): + """Test that audit-only models can be scheduled with cron.""" + context, _ = init_and_plan_context("examples/sushi") + + # Create source table with recent date (no validation errors) + events_model = load_sql_based_model( + d.parse(""" + MODEL (name memory.test.events, kind FULL); + SELECT 1 as event_id, CURRENT_DATE as event_date + """) + ) + context.upsert_model(events_model) + + # Create scheduled audit-only model + audit_model = load_sql_based_model( + d.parse(""" + MODEL ( + name memory.test.daily_validation, + kind AUDIT_ONLY, + cron '@daily', + depends_on [memory.test.events] + ); + + SELECT * FROM memory.test.events + WHERE CAST(event_date AS DATE) < CURRENT_DATE - INTERVAL '30 days' + """) + ) + context.upsert_model(audit_model) + + # Plan and apply should succeed + plan = context.plan(auto_apply=True, skip_tests=True) + + # Verify cron is set + model = context.get_model("memory.test.daily_validation") + assert model.cron == "@daily" diff --git a/tests/core/test_model.py b/tests/core/test_model.py index 1511e37c53..6a50b62009 100644 --- a/tests/core/test_model.py +++ b/tests/core/test_model.py @@ -11623,3 +11623,162 @@ def test_use_original_sql(): assert model.query_.sql == "SELECT 1 AS one, 2 AS two" assert model.pre_statements_[0].sql == "CREATE TABLE pre (a INT)" assert model.post_statements_[0].sql == "CREATE TABLE post (b INT)" + + +def test_audit_only_kind(): + """Test AUDIT_ONLY model kind parsing and properties.""" + expressions = d.parse( + """ + MODEL ( + name data_quality.order_validation, + kind AUDIT_ONLY, + depends_on [orders, customers] + ); + + SELECT o.* FROM orders o + LEFT JOIN customers c ON o.customer_id = c.customer_id + WHERE c.customer_id IS NULL + """ + ) + + model = load_sql_based_model(expressions) + + # Check model properties + assert model.kind.name == ModelKindName.AUDIT_ONLY + assert model.kind.is_symbolic is True + assert model.kind.blocking is True # Default should be blocking + assert model.kind.is_audit_only is True + assert model.kind.is_symbolic is True + assert model.kind.is_materialized is False + + # Check dependencies are correctly parsed + assert '"orders"' in model.depends_on or "orders" in model.depends_on + assert '"customers"' in model.depends_on or "customers" in model.depends_on + + +def test_audit_only_kind_with_blocking_config(): + """Test AUDIT_ONLY model kind with explicit blocking configuration.""" + expressions = d.parse( + """ + MODEL ( + name data_quality.validation, + kind AUDIT_ONLY ( + blocking FALSE + ) + ); + + SELECT 1 WHERE 1=0 + """ + ) + + model = load_sql_based_model(expressions) + + assert model.kind.name == ModelKindName.AUDIT_ONLY + assert model.kind.blocking is False + + +def test_audit_only_kind_with_max_failing_rows(): + """Test AUDIT_ONLY model kind with max_failing_rows configuration.""" + expressions = d.parse( + """ + MODEL ( + name data_quality.validation, + kind AUDIT_ONLY ( + max_failing_rows 20 + ) + ); + + SELECT 1 WHERE 1=0 + """ + ) + + model = load_sql_based_model(expressions) + + assert model.kind.name == ModelKindName.AUDIT_ONLY + assert model.kind.max_failing_rows == 20 + + +def test_audit_only_python_model(): + """Test that AUDIT_ONLY kind works with Python models.""" + + @model("test_audit_only_python", kind="audit_only", columns={"issue": "string"}) + def execute( + context: ExecutionContext, + start: datetime, + end: datetime, + execution_time: datetime, + **kwargs: t.Any, + ) -> pd.DataFrame: + # This would normally check data quality and return issues + return pd.DataFrame({"issue": []}) + + m = model.get_registry()["test_audit_only_python"].model( + module_path=Path("."), + path=Path("."), + ) + + assert m.kind.name == ModelKindName.AUDIT_ONLY + assert m.kind.is_symbolic is True + assert m.kind.is_audit_only is True + + +def test_audit_only_full_config(): + """Test AUDIT_ONLY model kind with all configuration options.""" + expressions = d.parse( + """ + MODEL ( + name data_quality.comprehensive_validation, + kind AUDIT_ONLY ( + blocking TRUE, + max_failing_rows 50 + ), + depends_on [table_a, table_b, table_c], + cron '@daily', + owner 'data_quality_team', + tags ['validation', 'critical'] + ); + + SELECT * FROM table_a + WHERE NOT EXISTS (SELECT 1 FROM table_b WHERE table_b.id = table_a.id) + """ + ) + + model = load_sql_based_model(expressions) + + assert model.kind.name == ModelKindName.AUDIT_ONLY + assert model.kind.blocking is True + assert model.kind.max_failing_rows == 50 + assert model.cron == "@daily" + assert model.owner == "data_quality_team" + assert "validation" in model.tags + assert "critical" in model.tags + + +def test_audit_only_serialization(): + """Test that AUDIT_ONLY models serialize and deserialize correctly.""" + expressions = d.parse( + """ + MODEL ( + name test.audit_model, + kind AUDIT_ONLY ( + blocking FALSE, + max_failing_rows 25 + ) + ); + + SELECT 1 + """ + ) + + original_model = load_sql_based_model(expressions) + + # Serialize to dict + model_dict = original_model.dict() + + # Recreate from dict + recreated_model = SqlModel.model_validate(model_dict) + + assert recreated_model.kind.name == ModelKindName.AUDIT_ONLY + assert recreated_model.kind.blocking is False + assert recreated_model.kind.max_failing_rows == 25 + assert recreated_model.kind.is_audit_only is True