From acfeea05a118b7d032dabfafe9b0c6fbf6e5c160 Mon Sep 17 00:00:00 2001 From: fresioAS Date: Wed, 8 Oct 2025 23:43:28 +0200 Subject: [PATCH 1/8] feat: implement no-op alter_table method for FabricEngineAdapter --- sqlmesh/core/engine_adapter/fabric.py | 13 ++++++++++- tests/core/engine_adapter/test_fabric.py | 28 ++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index 8e2fb0e496..ca11b89b1a 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -14,7 +14,7 @@ ) from sqlmesh.utils.errors import SQLMeshError from sqlmesh.utils.connection_pool import ConnectionPool - +from sqlmesh.core.schema_diff import TableAlterOperation logger = logging.getLogger(__name__) @@ -154,6 +154,17 @@ def set_current_catalog(self, catalog_name: str) -> None: f"Unable to switch catalog to {catalog_name}, catalog ended up as {catalog_after_switch}" ) + def alter_table( + self, + alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], + ) -> None: + """ + Disables ALTER TABLE for Fabric since it has limited support. + By making this a no-op, we signal to the caller to fall back + to a more reliable drop-and-recreate strategy to apply schema changes. + """ + return + class FabricHttpClient: def __init__(self, tenant_id: str, workspace_id: str, client_id: str, client_secret: str): diff --git a/tests/core/engine_adapter/test_fabric.py b/tests/core/engine_adapter/test_fabric.py index 6b80ef7337..094c11e65a 100644 --- a/tests/core/engine_adapter/test_fabric.py +++ b/tests/core/engine_adapter/test_fabric.py @@ -9,6 +9,7 @@ from sqlmesh.core.engine_adapter import FabricEngineAdapter from tests.core.engine_adapter import to_sql_calls from sqlmesh.core.engine_adapter.shared import DataObject +from sqlmesh.core.schema_diff import TableAlterOperation pytestmark = [pytest.mark.engine, pytest.mark.fabric] @@ -88,3 +89,30 @@ def test_replace_query(adapter: FabricEngineAdapter, mocker: MockerFixture): "TRUNCATE TABLE [test_table];", "INSERT INTO [test_table] ([a]) SELECT [a] FROM [tbl];", ] + + +def test_alter_table_is_noop(adapter: FabricEngineAdapter): + """ + Tests that alter_table is a no-op for Fabric. + + The adapter should not execute any SQL, signaling to the caller + that it should use a fallback strategy (like drop/recreate) + to apply schema changes. + """ + adapter.alter_table( + [ + TableAlterOperation( + expression=exp.AlterTable( + this=exp.to_table("test_table"), + actions=[ + exp.AddColumn( + this=exp.to_column("new_col"), + def_col_type=exp.DataType(this=exp.DataType.Type.INT), + ) + ], + ) + ) + ] + ) + + adapter.cursor.execute.assert_not_called() \ No newline at end of file From 445c775456ba2f36f052efc5cf8741a136574a75 Mon Sep 17 00:00:00 2001 From: fresioAS Date: Wed, 8 Oct 2025 23:59:26 +0200 Subject: [PATCH 2/8] proper test --- tests/core/engine_adapter/test_fabric.py | 27 ++++++++++-------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/tests/core/engine_adapter/test_fabric.py b/tests/core/engine_adapter/test_fabric.py index 094c11e65a..129b475a47 100644 --- a/tests/core/engine_adapter/test_fabric.py +++ b/tests/core/engine_adapter/test_fabric.py @@ -9,7 +9,6 @@ from sqlmesh.core.engine_adapter import FabricEngineAdapter from tests.core.engine_adapter import to_sql_calls from sqlmesh.core.engine_adapter.shared import DataObject -from sqlmesh.core.schema_diff import TableAlterOperation pytestmark = [pytest.mark.engine, pytest.mark.fabric] @@ -96,23 +95,19 @@ def test_alter_table_is_noop(adapter: FabricEngineAdapter): Tests that alter_table is a no-op for Fabric. The adapter should not execute any SQL, signaling to the caller - that it should use a fallback strategy (like drop/recreate) + that it should use the fallback strategy (like drop/add) to apply schema changes. """ - adapter.alter_table( - [ - TableAlterOperation( - expression=exp.AlterTable( - this=exp.to_table("test_table"), - actions=[ - exp.AddColumn( - this=exp.to_column("new_col"), - def_col_type=exp.DataType(this=exp.DataType.Type.INT), - ) - ], - ) + alter_expression = exp.Alter( + this=exp.to_table("test_table"), + actions=[ + exp.ColumnDef( + this=exp.to_column("new_col"), + kind=exp.DataType(this=exp.DataType.Type.INT), ) - ] + ], ) - adapter.cursor.execute.assert_not_called() \ No newline at end of file + adapter.alter_table([alter_expression]) + + adapter.cursor.execute.assert_not_called() From fa0fdc624a70cb74f871421ba7220dfa8c0b2fd7 Mon Sep 17 00:00:00 2001 From: fresioAS Date: Thu, 9 Oct 2025 00:03:40 +0200 Subject: [PATCH 3/8] cosmetics on descriptions --- sqlmesh/core/engine_adapter/fabric.py | 4 ++-- tests/core/engine_adapter/test_fabric.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index ca11b89b1a..7c07c08451 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -160,8 +160,8 @@ def alter_table( ) -> None: """ Disables ALTER TABLE for Fabric since it has limited support. - By making this a no-op, we signal to the caller to fall back - to a more reliable drop-and-recreate strategy to apply schema changes. + By making this a no-op, we signal to the caller to fall back to a + more reliable drop/add strategy for columns to apply schema changes. """ return diff --git a/tests/core/engine_adapter/test_fabric.py b/tests/core/engine_adapter/test_fabric.py index 129b475a47..9ce3dd555c 100644 --- a/tests/core/engine_adapter/test_fabric.py +++ b/tests/core/engine_adapter/test_fabric.py @@ -95,7 +95,7 @@ def test_alter_table_is_noop(adapter: FabricEngineAdapter): Tests that alter_table is a no-op for Fabric. The adapter should not execute any SQL, signaling to the caller - that it should use the fallback strategy (like drop/add) + that it should use the fallback strategy (drop/add) to apply schema changes. """ alter_expression = exp.Alter( From f0acd3b6ab2484a0734000c29ae1470b2bc737b9 Mon Sep 17 00:00:00 2001 From: fresioAS Date: Thu, 9 Oct 2025 00:11:45 +0200 Subject: [PATCH 4/8] ruff --- sqlmesh/core/engine_adapter/fabric.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index 7c07c08451..9c676a9b90 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -16,6 +16,7 @@ from sqlmesh.utils.connection_pool import ConnectionPool from sqlmesh.core.schema_diff import TableAlterOperation + logger = logging.getLogger(__name__) @@ -160,7 +161,7 @@ def alter_table( ) -> None: """ Disables ALTER TABLE for Fabric since it has limited support. - By making this a no-op, we signal to the caller to fall back to a + By making this a no-op, we signal to the caller to fall back to a more reliable drop/add strategy for columns to apply schema changes. """ return From 7cdbf81154d203e81faf9a3427216af1a459ebef Mon Sep 17 00:00:00 2001 From: fresioAS Date: Thu, 23 Oct 2025 20:20:03 +0200 Subject: [PATCH 5/8] add new column, copy to new column, drop original column, rename new column to old --- sqlmesh/core/engine_adapter/fabric.py | 109 +++++++++++++++++++++-- tests/core/engine_adapter/test_fabric.py | 68 ++++++++------ 2 files changed, 144 insertions(+), 33 deletions(-) diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index 9c676a9b90..8f84dab19f 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -15,6 +15,7 @@ from sqlmesh.utils.errors import SQLMeshError from sqlmesh.utils.connection_pool import ConnectionPool from sqlmesh.core.schema_diff import TableAlterOperation +from sqlmesh.utils import random_id logger = logging.getLogger(__name__) @@ -156,15 +157,111 @@ def set_current_catalog(self, catalog_name: str) -> None: ) def alter_table( - self, - alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], + self, alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]] ) -> None: """ - Disables ALTER TABLE for Fabric since it has limited support. - By making this a no-op, we signal to the caller to fall back to a - more reliable drop/add strategy for columns to apply schema changes. + Applies alter expressions to a table. Fabric has limited support for ALTER TABLE, + so this method implements a workaround for column type changes. + This method is self-contained and sets its own catalog context. """ - return + if not alter_expressions: + return + + # Get the target table from the first expression to determine the correct catalog. + first_op = alter_expressions[0] + expression = first_op.expression if isinstance(first_op, TableAlterOperation) else first_op + if not isinstance(expression, exp.Alter) or not expression.this.catalog: + # Fallback for unexpected scenarios + logger.warning( + "Could not determine catalog from alter expression, executing with current context." + ) + super().alter_table(alter_expressions) + return + + target_catalog = expression.this.catalog + self.set_current_catalog(target_catalog) + + with self.transaction(): + for op in alter_expressions: + expression = op.expression if isinstance(op, TableAlterOperation) else op + + if not isinstance(expression, exp.Alter): + self.execute(expression) + continue + + for action in expression.actions: + table_name = expression.this + + table_name_without_catalog = table_name.copy() + table_name_without_catalog.set("catalog", None) + + is_type_change = isinstance(action, exp.AlterColumn) and action.args.get( + "dtype" + ) + + if is_type_change: + column_to_alter = action.this + new_type = action.args["dtype"] + temp_column_name_str = f"{column_to_alter.name}__{random_id(short=True)}" + temp_column_name = exp.to_identifier(temp_column_name_str) + + logger.info( + "Applying workaround for column '%s' on table '%s' to change type to '%s'.", + column_to_alter.sql(), + table_name.sql(), + new_type.sql(), + ) + + # Step 1: Add a temporary column. + add_column_expr = exp.Alter( + this=table_name_without_catalog.copy(), + kind="TABLE", + actions=[ + exp.ColumnDef(this=temp_column_name.copy(), kind=new_type.copy()) + ], + ) + add_sql = self._to_sql(add_column_expr) + self.execute(add_sql) + + # Step 2: Copy and cast data. + update_sql = self._to_sql( + exp.Update( + this=table_name_without_catalog.copy(), + expressions=[ + exp.EQ( + this=temp_column_name.copy(), + expression=exp.Cast( + this=column_to_alter.copy(), to=new_type.copy() + ), + ) + ], + ) + ) + self.execute(update_sql) + + # Step 3: Drop the original column. + drop_sql = self._to_sql( + exp.Alter( + this=table_name_without_catalog.copy(), + kind="TABLE", + actions=[exp.Drop(this=column_to_alter.copy(), kind="COLUMN")], + ) + ) + self.execute(drop_sql) + + # Step 4: Rename the temporary column. + old_name_qualified = f"{table_name_without_catalog.sql(dialect=self.dialect)}.{temp_column_name.sql(dialect=self.dialect)}" + new_name_unquoted = column_to_alter.sql( + dialect=self.dialect, identify=False + ) + rename_sql = f"EXEC sp_rename '{old_name_qualified}', '{new_name_unquoted}', 'COLUMN'" + self.execute(rename_sql) + else: + # For other alterations, execute directly. + direct_alter_expr = exp.Alter( + this=table_name_without_catalog.copy(), kind="TABLE", actions=[action] + ) + self.execute(direct_alter_expr) class FabricHttpClient: diff --git a/tests/core/engine_adapter/test_fabric.py b/tests/core/engine_adapter/test_fabric.py index 9ce3dd555c..8590c5638d 100644 --- a/tests/core/engine_adapter/test_fabric.py +++ b/tests/core/engine_adapter/test_fabric.py @@ -8,7 +8,6 @@ from sqlmesh.core.engine_adapter import FabricEngineAdapter from tests.core.engine_adapter import to_sql_calls -from sqlmesh.core.engine_adapter.shared import DataObject pytestmark = [pytest.mark.engine, pytest.mark.fabric] @@ -73,41 +72,56 @@ def test_insert_overwrite_by_time_partition(adapter: FabricEngineAdapter): ] -def test_replace_query(adapter: FabricEngineAdapter, mocker: MockerFixture): - mocker.patch.object( - adapter, - "_get_data_objects", - return_value=[DataObject(schema="", name="test_table", type="table")], - ) - adapter.replace_query( - "test_table", parse_one("SELECT a FROM tbl"), {"a": exp.DataType.build("int")} +def test_alter_table_column_type_workaround(adapter: FabricEngineAdapter, mocker: MockerFixture): + """ + Tests the alter_table method's workaround for changing a column's data type. + """ + # Mock set_current_catalog to avoid connection pool side effects + set_catalog_mock = mocker.patch.object(adapter, "set_current_catalog") + # Mock random_id to have a predictable temporary column name + mocker.patch("sqlmesh.core.engine_adapter.fabric.random_id", return_value="abcdef") + + alter_expression = exp.Alter( + this=exp.to_table("my_db.my_schema.my_table"), + actions=[ + exp.AlterColumn( + this=exp.to_column("col_a"), + dtype=exp.DataType.build("BIGINT"), + ) + ], ) - # This behavior is inherited from MSSQLEngineAdapter and should be TRUNCATE + INSERT - assert to_sql_calls(adapter) == [ - "TRUNCATE TABLE [test_table];", - "INSERT INTO [test_table] ([a]) SELECT [a] FROM [tbl];", + adapter.alter_table([alter_expression]) + + set_catalog_mock.assert_called_once_with("my_db") + + expected_calls = [ + "ALTER TABLE [my_schema].[my_table] ADD [col_a__abcdef] BIGINT;", + "UPDATE [my_schema].[my_table] SET [col_a__abcdef] = CAST([col_a] AS BIGINT);", + "ALTER TABLE [my_schema].[my_table] DROP COLUMN [col_a];", + "EXEC sp_rename 'my_schema.my_table.col_a__abcdef', 'col_a', 'COLUMN'", ] + assert to_sql_calls(adapter) == expected_calls -def test_alter_table_is_noop(adapter: FabricEngineAdapter): - """ - Tests that alter_table is a no-op for Fabric. - The adapter should not execute any SQL, signaling to the caller - that it should use the fallback strategy (drop/add) - to apply schema changes. +def test_alter_table_direct_alteration(adapter: FabricEngineAdapter, mocker: MockerFixture): + """ + Tests the alter_table method for direct alterations like adding a column. """ + set_catalog_mock = mocker.patch.object(adapter, "set_current_catalog") + alter_expression = exp.Alter( - this=exp.to_table("test_table"), - actions=[ - exp.ColumnDef( - this=exp.to_column("new_col"), - kind=exp.DataType(this=exp.DataType.Type.INT), - ) - ], + this=exp.to_table("my_db.my_schema.my_table"), + actions=[exp.ColumnDef(this=exp.to_column("new_col"), kind=exp.DataType.build("INT"))], ) adapter.alter_table([alter_expression]) - adapter.cursor.execute.assert_not_called() + set_catalog_mock.assert_called_once_with("my_db") + + expected_calls = [ + "ALTER TABLE [my_schema].[my_table] ADD [new_col] INT;", + ] + + assert to_sql_calls(adapter) == expected_calls From 2607f7cfde01982c01a09c74ece1fbbee3f18c2b Mon Sep 17 00:00:00 2001 From: Andreas <65893109+fresioAS@users.noreply.github.com> Date: Thu, 23 Oct 2025 20:53:59 +0200 Subject: [PATCH 6/8] revert test deleted by accident --- tests/core/engine_adapter/test_fabric.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/core/engine_adapter/test_fabric.py b/tests/core/engine_adapter/test_fabric.py index 8590c5638d..bb3a06768b 100644 --- a/tests/core/engine_adapter/test_fabric.py +++ b/tests/core/engine_adapter/test_fabric.py @@ -8,6 +8,7 @@ from sqlmesh.core.engine_adapter import FabricEngineAdapter from tests.core.engine_adapter import to_sql_calls +from sqlmesh.core.engine_adapter.shared import DataObject pytestmark = [pytest.mark.engine, pytest.mark.fabric] @@ -72,6 +73,16 @@ def test_insert_overwrite_by_time_partition(adapter: FabricEngineAdapter): ] +def test_replace_query(adapter: FabricEngineAdapter, mocker: MockerFixture): + mocker.patch.object( + adapter, + "_get_data_objects", + return_value=[DataObject(schema="", name="test_table", type="table")], + ) + adapter.replace_query( + "test_table", parse_one("SELECT a FROM tbl"), {"a": exp.DataType.build("int")} + + def test_alter_table_column_type_workaround(adapter: FabricEngineAdapter, mocker: MockerFixture): """ Tests the alter_table method's workaround for changing a column's data type. From 768716c1fa44cc7b574a0e3df6725d9b4e6cef58 Mon Sep 17 00:00:00 2001 From: Andreas <65893109+fresioAS@users.noreply.github.com> Date: Thu, 23 Oct 2025 20:56:25 +0200 Subject: [PATCH 7/8] revert#2 --- tests/core/engine_adapter/test_fabric.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/core/engine_adapter/test_fabric.py b/tests/core/engine_adapter/test_fabric.py index bb3a06768b..9e19bd7ef8 100644 --- a/tests/core/engine_adapter/test_fabric.py +++ b/tests/core/engine_adapter/test_fabric.py @@ -81,7 +81,13 @@ def test_replace_query(adapter: FabricEngineAdapter, mocker: MockerFixture): ) adapter.replace_query( "test_table", parse_one("SELECT a FROM tbl"), {"a": exp.DataType.build("int")} + ) + # This behavior is inherited from MSSQLEngineAdapter and should be TRUNCATE + INSERT + assert to_sql_calls(adapter) == [ + "TRUNCATE TABLE [test_table];", + "INSERT INTO [test_table] ([a]) SELECT [a] FROM [tbl];", + ] def test_alter_table_column_type_workaround(adapter: FabricEngineAdapter, mocker: MockerFixture): """ From 1835883b3fec6fd297639babfe26c649c8495f8d Mon Sep 17 00:00:00 2001 From: Andreas <65893109+fresioAS@users.noreply.github.com> Date: Thu, 23 Oct 2025 21:06:44 +0200 Subject: [PATCH 8/8] ruff --- tests/core/engine_adapter/test_fabric.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/core/engine_adapter/test_fabric.py b/tests/core/engine_adapter/test_fabric.py index 9e19bd7ef8..8d7f804bd0 100644 --- a/tests/core/engine_adapter/test_fabric.py +++ b/tests/core/engine_adapter/test_fabric.py @@ -89,6 +89,7 @@ def test_replace_query(adapter: FabricEngineAdapter, mocker: MockerFixture): "INSERT INTO [test_table] ([a]) SELECT [a] FROM [tbl];", ] + def test_alter_table_column_type_workaround(adapter: FabricEngineAdapter, mocker: MockerFixture): """ Tests the alter_table method's workaround for changing a column's data type.