@@ -7583,6 +7583,107 @@ def test_forward_only_on_destructive_change_config() -> None:
75837583 assert context_model .on_destructive_change .is_allow
75847584
75857585
7586+ def test_batch_concurrency_config () -> None :
7587+ # No batch_concurrency default for incremental models
7588+ config = Config (model_defaults = ModelDefaultsConfig (dialect = "duckdb" ))
7589+ context = Context (config = config )
7590+
7591+ expressions = d .parse (
7592+ """
7593+ MODEL (
7594+ name memory.db.table,
7595+ kind INCREMENTAL_BY_TIME_RANGE (
7596+ time_column c
7597+ ),
7598+ );
7599+ SELECT a, b, c FROM source_table;
7600+ """
7601+ )
7602+ model = load_sql_based_model (expressions , defaults = config .model_defaults .dict ())
7603+ context .upsert_model (model )
7604+ context_model = context .get_model ("memory.db.table" )
7605+ assert context_model .batch_concurrency is None
7606+
7607+ # batch_concurrency specified in model defaults applies to incremental models
7608+ config = Config (model_defaults = ModelDefaultsConfig (dialect = "duckdb" , batch_concurrency = 5 ))
7609+ context = Context (config = config )
7610+
7611+ expressions = d .parse (
7612+ """
7613+ MODEL (
7614+ name memory.db.table,
7615+ kind INCREMENTAL_BY_TIME_RANGE (
7616+ time_column c
7617+ ),
7618+ );
7619+ SELECT a, b, c FROM source_table;
7620+ """
7621+ )
7622+ model = load_sql_based_model (expressions , defaults = config .model_defaults .dict ())
7623+ context .upsert_model (model )
7624+ context_model = context .get_model ("memory.db.table" )
7625+ assert context_model .batch_concurrency == 5
7626+
7627+ # batch_concurrency specified in model definition overrides default
7628+ config = Config (model_defaults = ModelDefaultsConfig (dialect = "duckdb" , batch_concurrency = 5 ))
7629+ context = Context (config = config )
7630+
7631+ expressions = d .parse (
7632+ """
7633+ MODEL (
7634+ name memory.db.table,
7635+ kind INCREMENTAL_BY_TIME_RANGE (
7636+ time_column c,
7637+ batch_concurrency 10
7638+ ),
7639+ );
7640+ SELECT a, b, c FROM source_table;
7641+ """
7642+ )
7643+ model = load_sql_based_model (expressions , defaults = config .model_defaults .dict ())
7644+ context .upsert_model (model )
7645+ context_model = context .get_model ("memory.db.table" )
7646+ assert context_model .batch_concurrency == 10
7647+
7648+ # batch_concurrency default does not apply to non-incremental models
7649+ config = Config (model_defaults = ModelDefaultsConfig (dialect = "duckdb" , batch_concurrency = 5 ))
7650+ context = Context (config = config )
7651+
7652+ expressions = d .parse (
7653+ """
7654+ MODEL (
7655+ name memory.db.table,
7656+ kind FULL,
7657+ );
7658+ SELECT a, b, c FROM source_table;
7659+ """
7660+ )
7661+ model = load_sql_based_model (expressions , defaults = config .model_defaults .dict ())
7662+ context .upsert_model (model )
7663+ context_model = context .get_model ("memory.db.table" )
7664+ assert context_model .batch_concurrency is None
7665+
7666+ # batch_concurrency default does not apply to INCREMENTAL_BY_UNIQUE_KEY models
7667+ config = Config (model_defaults = ModelDefaultsConfig (dialect = "duckdb" , batch_concurrency = 5 ))
7668+ context = Context (config = config )
7669+
7670+ expressions = d .parse (
7671+ """
7672+ MODEL (
7673+ name memory.db.table,
7674+ kind INCREMENTAL_BY_UNIQUE_KEY (
7675+ unique_key a
7676+ ),
7677+ );
7678+ SELECT a, b, c FROM source_table;
7679+ """
7680+ )
7681+ model = load_sql_based_model (expressions , defaults = config .model_defaults .dict ())
7682+ context .upsert_model (model )
7683+ context_model = context .get_model ("memory.db.table" )
7684+ assert context_model .batch_concurrency == 1
7685+
7686+
75867687def test_model_meta_on_additive_change_property () -> None :
75877688 """Test that ModelMeta has on_additive_change property that works like on_destructive_change."""
75887689 from sqlmesh .core .model .kind import IncrementalByTimeRangeKind , OnAdditiveChange
0 commit comments