Skip to content

Commit 1958d17

Browse files
committed
PR Feedback 2
1 parent 59b0bf7 commit 1958d17

File tree

4 files changed

+14
-22
lines changed

4 files changed

+14
-22
lines changed

sqlmesh/core/context.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ def __init__(
274274
deployability_index: t.Optional[DeployabilityIndex] = None,
275275
default_dialect: t.Optional[str] = None,
276276
default_catalog: t.Optional[str] = None,
277-
is_restatement_plan: t.Optional[bool] = None,
277+
is_restatement: t.Optional[bool] = None,
278278
variables: t.Optional[t.Dict[str, t.Any]] = None,
279279
blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None,
280280
):
@@ -285,7 +285,7 @@ def __init__(
285285
self._default_dialect = default_dialect
286286
self._variables = variables or {}
287287
self._blueprint_variables = blueprint_variables or {}
288-
self._is_restatement_plan = is_restatement_plan
288+
self._is_restatement = is_restatement
289289

290290
@property
291291
def default_dialect(self) -> t.Optional[str]:
@@ -311,8 +311,8 @@ def gateway(self) -> t.Optional[str]:
311311
return self.var(c.GATEWAY)
312312

313313
@property
314-
def is_restatement_plan(self) -> t.Optional[bool]:
315-
return self._is_restatement_plan
314+
def is_restatement(self) -> t.Optional[bool]:
315+
return self._is_restatement
316316

317317
def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]:
318318
"""Returns a variable value."""
@@ -334,7 +334,7 @@ def with_variables(
334334
self.deployability_index,
335335
self._default_dialect,
336336
self._default_catalog,
337-
self._is_restatement_plan,
337+
self._is_restatement,
338338
variables=variables,
339339
blueprint_variables=blueprint_variables,
340340
)

sqlmesh/core/plan/evaluator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla
258258
allow_additive_snapshots=plan.allow_additive_models,
259259
selected_snapshot_ids=stage.selected_snapshot_ids,
260260
selected_models=plan.selected_models,
261-
is_restatement_plan=bool(plan.restatements),
261+
is_restatement=bool(plan.restatements),
262262
)
263263
if errors:
264264
raise PlanError("Plan application failed.")

sqlmesh/core/scheduler.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,6 @@ def evaluate(
200200
allow_destructive_snapshots: t.Optional[t.Set[str]] = None,
201201
allow_additive_snapshots: t.Optional[t.Set[str]] = None,
202202
target_table_exists: t.Optional[bool] = None,
203-
is_restatement_plan: bool = False,
204203
**kwargs: t.Any,
205204
) -> t.List[AuditResult]:
206205
"""Evaluate a snapshot and add the processed interval to the state sync.
@@ -338,7 +337,7 @@ def batch_intervals(
338337
deployability_index: t.Optional[DeployabilityIndex],
339338
environment_naming_info: EnvironmentNamingInfo,
340339
dag: t.Optional[DAG[SnapshotId]] = None,
341-
is_restatement_plan: bool = False,
340+
is_restatement: bool = False,
342341
) -> t.Dict[Snapshot, Intervals]:
343342
dag = dag or snapshots_to_dag(merged_intervals)
344343

@@ -371,15 +370,14 @@ def batch_intervals(
371370
deployability_index,
372371
default_dialect=adapter.dialect,
373372
default_catalog=self.default_catalog,
374-
is_restatement_plan=is_restatement_plan,
373+
is_restatement=is_restatement,
375374
)
376375

377376
intervals = self._check_ready_intervals(
378377
snapshot,
379378
intervals,
380379
context,
381380
environment_naming_info,
382-
is_restatement_plan=is_restatement_plan,
383381
)
384382
unready -= set(intervals)
385383

@@ -428,7 +426,7 @@ def run_merged_intervals(
428426
run_environment_statements: bool = False,
429427
audit_only: bool = False,
430428
auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {},
431-
is_restatement_plan: bool = False,
429+
is_restatement: bool = False,
432430
) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]:
433431
"""Runs precomputed batches of missing intervals.
434432
@@ -466,7 +464,7 @@ def run_merged_intervals(
466464
deployability_index,
467465
environment_naming_info,
468466
dag=snapshot_dag,
469-
is_restatement_plan=is_restatement_plan,
467+
is_restatement=is_restatement,
470468
)
471469
self.console.start_evaluation_progress(
472470
batched_intervals,
@@ -552,7 +550,6 @@ def run_node(node: SchedulingUnit) -> None:
552550
allow_additive_snapshots=allow_additive_snapshots,
553551
target_table_exists=snapshot.snapshot_id not in snapshots_to_create,
554552
selected_models=selected_models,
555-
is_restatement_plan=is_restatement_plan,
556553
)
557554

558555
evaluation_duration_ms = now_timestamp() - execution_start_ts
@@ -926,7 +923,6 @@ def _check_ready_intervals(
926923
intervals: Intervals,
927924
context: ExecutionContext,
928925
environment_naming_info: EnvironmentNamingInfo,
929-
is_restatement_plan: bool = False,
930926
) -> Intervals:
931927
"""Checks if the intervals are ready for evaluation for the given snapshot.
932928
@@ -948,8 +944,6 @@ def _check_ready_intervals(
948944
if not (signals and signals.signals_to_kwargs):
949945
return intervals
950946

951-
signal_names = signals.signals_to_kwargs.keys()
952-
953947
self.console.start_signal_progress(
954948
snapshot,
955949
self.default_catalog,

sqlmesh/core/signal.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from sqlmesh.core.context import ExecutionContext
88
from sqlmesh.core.snapshot.definition import Snapshot
99
from sqlmesh.utils.date import DatetimeRanges
10+
from sqlmesh.core.snapshot.definition import DeployabilityIndex
1011

1112

1213
class signal(registry_decorator):
@@ -42,15 +43,12 @@ class signal(registry_decorator):
4243

4344
@signal()
4445
def freshness(batch: DatetimeRanges, snapshot: Snapshot, context: ExecutionContext) -> bool:
45-
if context.is_restatement_plan:
46-
return True
47-
48-
deployability_index = context.deployability_index
4946
adapter = context.engine_adapter
50-
51-
if not deployability_index or not adapter.SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS:
47+
if context.is_restatement or not adapter.SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS:
5248
return True
5349

50+
deployability_index = context.deployability_index or DeployabilityIndex.all_deployable()
51+
5452
last_altered_ts = (
5553
snapshot.last_altered_ts
5654
if deployability_index.is_deployable(snapshot)

0 commit comments

Comments
 (0)