diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 8341f8466f..93fdb8516b 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -1058,6 +1058,7 @@ class BigQueryConnectionConfig(ConnectionConfig): job_retry_deadline_seconds: t.Optional[int] = None priority: t.Optional[BigQueryPriority] = None maximum_bytes_billed: t.Optional[int] = None + reservation_id: t.Optional[str] = None concurrent_tasks: int = 1 register_comments: bool = True @@ -1167,6 +1168,7 @@ def _extra_engine_config(self) -> t.Dict[str, t.Any]: "job_retry_deadline_seconds", "priority", "maximum_bytes_billed", + "reservation_id", } } diff --git a/sqlmesh/core/engine_adapter/bigquery.py b/sqlmesh/core/engine_adapter/bigquery.py index 59a56b6ace..8fa543a9f7 100644 --- a/sqlmesh/core/engine_adapter/bigquery.py +++ b/sqlmesh/core/engine_adapter/bigquery.py @@ -1106,7 +1106,13 @@ def _execute( else [] ) + # Create job config with reservation support job_config = QueryJobConfig(**self._job_params, connection_properties=connection_properties) + + # Set reservation directly on the job_config object if specified + reservation_id = self._extra_config.get("reservation_id") + if reservation_id: + job_config.reservation = reservation_id self._query_job = self._db_call( self.client.query, query=sql, diff --git a/tests/core/test_connection_config.py b/tests/core/test_connection_config.py index 4e1397b7f1..7241f0fd7c 100644 --- a/tests/core/test_connection_config.py +++ b/tests/core/test_connection_config.py @@ -1042,6 +1042,27 @@ def test_bigquery(make_config): assert config.get_catalog() == "project" assert config.is_recommended_for_state_sync is False + # Test reservation_id + config_with_reservation = make_config( + type="bigquery", + project="project", + reservation_id="projects/my-project/locations/us-central1/reservations/my-reservation", + check_import=False, + ) + assert isinstance(config_with_reservation, BigQueryConnectionConfig) + assert ( + config_with_reservation.reservation_id + == "projects/my-project/locations/us-central1/reservations/my-reservation" + ) + + # Test that reservation_id is included in _extra_engine_config + extra_config = config_with_reservation._extra_engine_config + assert "reservation_id" in extra_config + assert ( + extra_config["reservation_id"] + == "projects/my-project/locations/us-central1/reservations/my-reservation" + ) + with pytest.raises(ConfigError, match="you must also specify the `project` field"): make_config(type="bigquery", execution_project="execution_project", check_import=False)