Skip to content

Commit fff244a

Browse files
etonlelsclaude
andcommitted
feat: add batch_concurrency to ModelDefaultsConfig
This change allows users to set a default `batch_concurrency` value in their project configuration that will be applied to incremental models. - Added `batch_concurrency` field to `ModelDefaultsConfig` - Updated `create_model_kind()` to apply `batch_concurrency` default to models inheriting from `_IncrementalBy` when not explicitly set - Skips applying default for subclasses with hardcoded `batch_concurrency` (e.g., `IncrementalByUniqueKeyKind` which has `batch_concurrency: Literal[1]`) - Added comprehensive tests covering default behavior and edge cases 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 285b8f2 commit fff244a

File tree

3 files changed

+115
-0
lines changed

3 files changed

+115
-0
lines changed

sqlmesh/core/config/model.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class ModelDefaultsConfig(BaseConfig):
4545
allow_partials: Whether the models can process partial (incomplete) data intervals.
4646
enabled: Whether the models are enabled.
4747
interval_unit: The temporal granularity of the models data intervals. By default computed from cron.
48+
batch_concurrency: The maximum number of batches that can run concurrently for an incremental model.
4849
pre_statements: The list of SQL statements that get executed before a model runs.
4950
post_statements: The list of SQL statements that get executed before a model runs.
5051
on_virtual_update: The list of SQL statements to be executed after the virtual update.
@@ -69,6 +70,7 @@ class ModelDefaultsConfig(BaseConfig):
6970
interval_unit: t.Optional[t.Union[str, IntervalUnit]] = None
7071
enabled: t.Optional[t.Union[str, bool]] = None
7172
formatting: t.Optional[t.Union[str, bool]] = None
73+
batch_concurrency: t.Optional[int] = None
7274
pre_statements: t.Optional[t.List[t.Union[str, exp.Expression]]] = None
7375
post_statements: t.Optional[t.List[t.Union[str, exp.Expression]]] = None
7476
on_virtual_update: t.Optional[t.List[t.Union[str, exp.Expression]]] = None

sqlmesh/core/model/kind.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,6 +1105,18 @@ def create_model_kind(v: t.Any, dialect: str, defaults: t.Dict[str, t.Any]) -> M
11051105
):
11061106
props[on_change_property] = defaults.get(on_change_property)
11071107

1108+
# only pass the batch_concurrency user default to models inheriting from _IncrementalBy
1109+
# that don't explicitly set it in the model definition, but ignore subclasses of _IncrementalBy
1110+
# that hardcode a specific batch_concurrency
1111+
if issubclass(kind_type, _IncrementalBy):
1112+
BATCH_CONCURRENCY: t.Final = "batch_concurrency"
1113+
if (
1114+
props.get(BATCH_CONCURRENCY) is None
1115+
and defaults.get(BATCH_CONCURRENCY) is not None
1116+
and kind_type.all_field_infos()[BATCH_CONCURRENCY].default is None
1117+
):
1118+
props[BATCH_CONCURRENCY] = defaults.get(BATCH_CONCURRENCY)
1119+
11081120
if kind_type == CustomKind:
11091121
# load the custom materialization class and check if it uses a custom kind type
11101122
from sqlmesh.core.snapshot.evaluator import get_custom_materialization_type

tests/core/test_model.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7583,6 +7583,107 @@ def test_forward_only_on_destructive_change_config() -> None:
75837583
assert context_model.on_destructive_change.is_allow
75847584

75857585

7586+
def test_batch_concurrency_config() -> None:
7587+
# No batch_concurrency default for incremental models
7588+
config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb"))
7589+
context = Context(config=config)
7590+
7591+
expressions = d.parse(
7592+
"""
7593+
MODEL (
7594+
name memory.db.table,
7595+
kind INCREMENTAL_BY_TIME_RANGE (
7596+
time_column c
7597+
),
7598+
);
7599+
SELECT a, b, c FROM source_table;
7600+
"""
7601+
)
7602+
model = load_sql_based_model(expressions, defaults=config.model_defaults.dict())
7603+
context.upsert_model(model)
7604+
context_model = context.get_model("memory.db.table")
7605+
assert context_model.batch_concurrency is None
7606+
7607+
# batch_concurrency specified in model defaults applies to incremental models
7608+
config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb", batch_concurrency=5))
7609+
context = Context(config=config)
7610+
7611+
expressions = d.parse(
7612+
"""
7613+
MODEL (
7614+
name memory.db.table,
7615+
kind INCREMENTAL_BY_TIME_RANGE (
7616+
time_column c
7617+
),
7618+
);
7619+
SELECT a, b, c FROM source_table;
7620+
"""
7621+
)
7622+
model = load_sql_based_model(expressions, defaults=config.model_defaults.dict())
7623+
context.upsert_model(model)
7624+
context_model = context.get_model("memory.db.table")
7625+
assert context_model.batch_concurrency == 5
7626+
7627+
# batch_concurrency specified in model definition overrides default
7628+
config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb", batch_concurrency=5))
7629+
context = Context(config=config)
7630+
7631+
expressions = d.parse(
7632+
"""
7633+
MODEL (
7634+
name memory.db.table,
7635+
kind INCREMENTAL_BY_TIME_RANGE (
7636+
time_column c,
7637+
batch_concurrency 10
7638+
),
7639+
);
7640+
SELECT a, b, c FROM source_table;
7641+
"""
7642+
)
7643+
model = load_sql_based_model(expressions, defaults=config.model_defaults.dict())
7644+
context.upsert_model(model)
7645+
context_model = context.get_model("memory.db.table")
7646+
assert context_model.batch_concurrency == 10
7647+
7648+
# batch_concurrency default does not apply to non-incremental models
7649+
config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb", batch_concurrency=5))
7650+
context = Context(config=config)
7651+
7652+
expressions = d.parse(
7653+
"""
7654+
MODEL (
7655+
name memory.db.table,
7656+
kind FULL,
7657+
);
7658+
SELECT a, b, c FROM source_table;
7659+
"""
7660+
)
7661+
model = load_sql_based_model(expressions, defaults=config.model_defaults.dict())
7662+
context.upsert_model(model)
7663+
context_model = context.get_model("memory.db.table")
7664+
assert context_model.batch_concurrency is None
7665+
7666+
# batch_concurrency default does not apply to INCREMENTAL_BY_UNIQUE_KEY models
7667+
config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb", batch_concurrency=5))
7668+
context = Context(config=config)
7669+
7670+
expressions = d.parse(
7671+
"""
7672+
MODEL (
7673+
name memory.db.table,
7674+
kind INCREMENTAL_BY_UNIQUE_KEY (
7675+
unique_key a
7676+
),
7677+
);
7678+
SELECT a, b, c FROM source_table;
7679+
"""
7680+
)
7681+
model = load_sql_based_model(expressions, defaults=config.model_defaults.dict())
7682+
context.upsert_model(model)
7683+
context_model = context.get_model("memory.db.table")
7684+
assert context_model.batch_concurrency == 1
7685+
7686+
75867687
def test_model_meta_on_additive_change_property() -> None:
75877688
"""Test that ModelMeta has on_additive_change property that works like on_destructive_change."""
75887689
from sqlmesh.core.model.kind import IncrementalByTimeRangeKind, OnAdditiveChange

0 commit comments

Comments
 (0)