Skip to content

Commit b62dfab

Browse files
authored
Fix(mysql): use 8-character random ID for temp table names (#1589)
* Use 8-character random ID for temp table names * Tidy tests * Convert temp_table_name test function to fixture * PR feedback
1 parent 5162ac0 commit b62dfab

File tree

7 files changed

+113
-89
lines changed

7 files changed

+113
-89
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ install-dev:
44
pip3 install -e ".[dev,web,slack]"
55

66
install-engine-integration:
7-
pip3 install -e ".[dev,web,slack,mysql,postgres,databricks,redshift,bigquery,snowflake]"
7+
pip3 install -e ".[dev,web,slack,mysql,postgres,databricks,redshift,bigquery,snowflake,trino]"
88

99
install-pre-commit:
1010
pre-commit install

sqlmesh/core/engine_adapter/base.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import sys
1515
import types
1616
import typing as t
17-
import uuid
1817
from datetime import datetime, timezone
1918
from enum import Enum
2019
from functools import partial
@@ -34,7 +33,7 @@
3433
from sqlmesh.core.engine_adapter.shared import DataObject, set_catalog
3534
from sqlmesh.core.model.kind import TimeColumn
3635
from sqlmesh.core.schema_diff import SchemaDiffer
37-
from sqlmesh.utils import double_escape
36+
from sqlmesh.utils import double_escape, random_id
3837
from sqlmesh.utils.connection_pool import create_connection_pool
3938
from sqlmesh.utils.date import TimeLike, make_inclusive, to_ts
4039
from sqlmesh.utils.errors import SQLMeshError, UnsupportedCatalogOperationError
@@ -1431,7 +1430,7 @@ def _get_temp_table(
14311430
Returns the name of the temp table that should be used for the given table name.
14321431
"""
14331432
table = t.cast(exp.Table, exp.to_table(table).copy())
1434-
table.set("this", exp.to_identifier(f"__temp_{table.name}_{uuid.uuid4().hex}"))
1433+
table.set("this", exp.to_identifier(f"__temp_{table.name}_{random_id(short=True)}"))
14351434

14361435
if table_only:
14371436
table.set("db", None)

sqlmesh/utils/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import copy
44
import importlib
55
import os
6+
import random
67
import re
8+
import string
79
import sys
810
import time
911
import traceback
@@ -22,6 +24,8 @@
2224
VALUE = t.TypeVar("VALUE")
2325
DECORATOR_RETURN_TYPE = t.TypeVar("DECORATOR_RETURN_TYPE")
2426

27+
ALPHANUMERIC = string.ascii_lowercase + string.digits
28+
2529

2630
def optional_import(name: str) -> t.Optional[types.ModuleType]:
2731
"""Optionally import a module.
@@ -47,7 +51,10 @@ def unique(iterable: t.Iterable[T], by: t.Callable[[T], t.Any] = lambda i: i) ->
4751
return list({by(i): None for i in iterable})
4852

4953

50-
def random_id() -> str:
54+
def random_id(short: bool = False) -> str:
55+
if short:
56+
return "".join(random.choices(ALPHANUMERIC, k=8))
57+
5158
return uuid.uuid4().hex
5259

5360

tests/conftest.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,3 +260,13 @@ def delete_cache(project_paths: str | t.List[str]) -> None:
260260
rmtree(path + "/.cache")
261261
except FileNotFoundError:
262262
pass
263+
264+
265+
@pytest.fixture
266+
def make_temp_table_name(mocker: MockerFixture) -> t.Callable:
267+
def _make_function(table_name: str, random_id: str) -> exp.Table:
268+
temp_table = exp.to_table(table_name)
269+
temp_table.set("this", exp.to_identifier(f"__temp_{temp_table.name}_{random_id}"))
270+
return temp_table
271+
272+
return _make_function

tests/core/engine_adapter/test_bigquery.py

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
# type: ignore
22
import sys
33
import typing as t
4-
import uuid
54

65
import pandas as pd
76
import pytest
@@ -44,19 +43,20 @@ def test_insert_overwrite_by_time_partition_query(
4443

4544

4645
def test_insert_overwrite_by_partition_query(
47-
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
46+
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture, make_temp_table_name: t.Callable
4847
):
4948
adapter = make_mocked_engine_adapter(BigQueryEngineAdapter)
5049
execute_mock = mocker.patch(
5150
"sqlmesh.core.engine_adapter.bigquery.BigQueryEngineAdapter.execute"
5251
)
5352

54-
temp_table_uuid = uuid.uuid4()
55-
uuid4_mock = mocker.patch("uuid.uuid4")
56-
uuid4_mock.return_value = temp_table_uuid
53+
temp_table_mock = mocker.patch("sqlmesh.core.engine_adapter.EngineAdapter._get_temp_table")
54+
table_name = "test_schema.test_table"
55+
temp_table_id = "abcdefgh"
56+
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)
5757

5858
adapter.insert_overwrite_by_partition(
59-
"test_schema.test_table",
59+
table_name,
6060
parse_one("SELECT a, ds FROM tbl"),
6161
partitioned_by=[
6262
d.parse_one("DATETIME_TRUNC(ds, MONTH)"),
@@ -70,15 +70,15 @@ def test_insert_overwrite_by_partition_query(
7070
sql_calls = _to_sql_calls(execute_mock)
7171
assert sql_calls == [
7272
"CREATE SCHEMA IF NOT EXISTS `test_schema`",
73-
f"CREATE TABLE IF NOT EXISTS `test_schema`.`__temp_test_table_{temp_table_uuid.hex}` PARTITION BY DATETIME_TRUNC(`ds`, MONTH) AS SELECT `a`, `ds` FROM `tbl`",
74-
f"DECLARE _sqlmesh_target_partitions_ ARRAY<DATETIME> DEFAULT (SELECT ARRAY_AGG(PARSE_DATETIME('%Y%m', partition_id)) FROM `test_schema`.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '__temp_test_table_{temp_table_uuid.hex}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__');",
75-
f"MERGE INTO `test_schema`.`test_table` AS `__MERGE_TARGET__` USING (SELECT `a`, `ds` FROM (SELECT * FROM `test_schema`.`__temp_test_table_{temp_table_uuid.hex}`) AS `_subquery` WHERE DATETIME_TRUNC(`ds`, MONTH) IN UNNEST(`_sqlmesh_target_partitions_`)) AS `__MERGE_SOURCE__` ON FALSE WHEN NOT MATCHED BY SOURCE AND DATETIME_TRUNC(`ds`, MONTH) IN UNNEST(`_sqlmesh_target_partitions_`) THEN DELETE WHEN NOT MATCHED THEN INSERT (`a`, `ds`) VALUES (`a`, `ds`)",
76-
f"DROP TABLE IF EXISTS `test_schema`.`__temp_test_table_{temp_table_uuid.hex}`",
73+
f"CREATE TABLE IF NOT EXISTS `test_schema`.`__temp_test_table_{temp_table_id}` PARTITION BY DATETIME_TRUNC(`ds`, MONTH) AS SELECT `a`, `ds` FROM `tbl`",
74+
f"DECLARE _sqlmesh_target_partitions_ ARRAY<DATETIME> DEFAULT (SELECT ARRAY_AGG(PARSE_DATETIME('%Y%m', partition_id)) FROM `test_schema`.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '__temp_test_table_{temp_table_id}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__');",
75+
f"MERGE INTO `test_schema`.`test_table` AS `__MERGE_TARGET__` USING (SELECT `a`, `ds` FROM (SELECT * FROM `test_schema`.`__temp_test_table_{temp_table_id}`) AS `_subquery` WHERE DATETIME_TRUNC(`ds`, MONTH) IN UNNEST(`_sqlmesh_target_partitions_`)) AS `__MERGE_SOURCE__` ON FALSE WHEN NOT MATCHED BY SOURCE AND DATETIME_TRUNC(`ds`, MONTH) IN UNNEST(`_sqlmesh_target_partitions_`) THEN DELETE WHEN NOT MATCHED THEN INSERT (`a`, `ds`) VALUES (`a`, `ds`)",
76+
f"DROP TABLE IF EXISTS `test_schema`.`__temp_test_table_{temp_table_id}`",
7777
]
7878

7979

8080
def test_insert_overwrite_by_partition_query_unknown_column_types(
81-
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
81+
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture, make_temp_table_name: t.Callable
8282
):
8383
adapter = make_mocked_engine_adapter(BigQueryEngineAdapter)
8484
execute_mock = mocker.patch(
@@ -93,12 +93,13 @@ def test_insert_overwrite_by_partition_query_unknown_column_types(
9393
"ds": exp.DataType.build("DATETIME"),
9494
}
9595

96-
temp_table_uuid = uuid.uuid4()
97-
uuid4_mock = mocker.patch("uuid.uuid4")
98-
uuid4_mock.return_value = temp_table_uuid
96+
temp_table_mock = mocker.patch("sqlmesh.core.engine_adapter.EngineAdapter._get_temp_table")
97+
table_name = "test_schema.test_table"
98+
temp_table_id = "abcdefgh"
99+
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)
99100

100101
adapter.insert_overwrite_by_partition(
101-
"test_schema.test_table",
102+
table_name,
102103
parse_one("SELECT a, ds FROM tbl"),
103104
partitioned_by=[
104105
d.parse_one("DATETIME_TRUNC(ds, MONTH)"),
@@ -110,16 +111,16 @@ def test_insert_overwrite_by_partition_query_unknown_column_types(
110111
)
111112

112113
columns_mock.assert_called_once_with(
113-
exp.to_table(f"test_schema.__temp_test_table_{temp_table_uuid.hex}")
114+
exp.to_table(f"test_schema.__temp_test_table_{temp_table_id}")
114115
)
115116

116117
sql_calls = _to_sql_calls(execute_mock)
117118
assert sql_calls == [
118119
"CREATE SCHEMA IF NOT EXISTS `test_schema`",
119-
f"CREATE TABLE IF NOT EXISTS `test_schema`.`__temp_test_table_{temp_table_uuid.hex}` PARTITION BY DATETIME_TRUNC(`ds`, MONTH) AS SELECT `a`, `ds` FROM `tbl`",
120-
f"DECLARE _sqlmesh_target_partitions_ ARRAY<DATETIME> DEFAULT (SELECT ARRAY_AGG(PARSE_DATETIME('%Y%m', partition_id)) FROM `test_schema`.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '__temp_test_table_{temp_table_uuid.hex}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__');",
121-
f"MERGE INTO `test_schema`.`test_table` AS `__MERGE_TARGET__` USING (SELECT `a`, `ds` FROM (SELECT * FROM `test_schema`.`__temp_test_table_{temp_table_uuid.hex}`) AS `_subquery` WHERE DATETIME_TRUNC(`ds`, MONTH) IN UNNEST(`_sqlmesh_target_partitions_`)) AS `__MERGE_SOURCE__` ON FALSE WHEN NOT MATCHED BY SOURCE AND DATETIME_TRUNC(`ds`, MONTH) IN UNNEST(`_sqlmesh_target_partitions_`) THEN DELETE WHEN NOT MATCHED THEN INSERT (`a`, `ds`) VALUES (`a`, `ds`)",
122-
f"DROP TABLE IF EXISTS `test_schema`.`__temp_test_table_{temp_table_uuid.hex}`",
120+
f"CREATE TABLE IF NOT EXISTS `test_schema`.`__temp_test_table_{temp_table_id}` PARTITION BY DATETIME_TRUNC(`ds`, MONTH) AS SELECT `a`, `ds` FROM `tbl`",
121+
f"DECLARE _sqlmesh_target_partitions_ ARRAY<DATETIME> DEFAULT (SELECT ARRAY_AGG(PARSE_DATETIME('%Y%m', partition_id)) FROM `test_schema`.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '__temp_test_table_{temp_table_id}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__');",
122+
f"MERGE INTO `test_schema`.`test_table` AS `__MERGE_TARGET__` USING (SELECT `a`, `ds` FROM (SELECT * FROM `test_schema`.`__temp_test_table_{temp_table_id}`) AS `_subquery` WHERE DATETIME_TRUNC(`ds`, MONTH) IN UNNEST(`_sqlmesh_target_partitions_`)) AS `__MERGE_SOURCE__` ON FALSE WHEN NOT MATCHED BY SOURCE AND DATETIME_TRUNC(`ds`, MONTH) IN UNNEST(`_sqlmesh_target_partitions_`) THEN DELETE WHEN NOT MATCHED THEN INSERT (`a`, `ds`) VALUES (`a`, `ds`)",
123+
f"DROP TABLE IF EXISTS `test_schema`.`__temp_test_table_{temp_table_id}`",
123124
]
124125

125126

@@ -232,10 +233,6 @@ def test_replace_query_pandas(make_mocked_engine_adapter: t.Callable, mocker: Mo
232233
"sqlmesh.core.engine_adapter.bigquery.BigQueryEngineAdapter.execute"
233234
)
234235

235-
temp_table_uuid = uuid.uuid4()
236-
uuid4_mock = mocker.patch("uuid.uuid4")
237-
uuid4_mock.return_value = temp_table_uuid
238-
239236
df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
240237
adapter.replace_query(
241238
"test_table", df, {"a": exp.DataType.build("int"), "b": exp.DataType.build("int")}

tests/core/engine_adapter/test_mixins.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
# type: ignore
22
import typing as t
3-
import uuid
43
from unittest.mock import call
54

65
from pytest_mock.plugin import MockerFixture
@@ -53,13 +52,15 @@ def test_logical_replace_query_does_not_exist(
5352

5453

5554
def test_logical_replace_self_reference(
56-
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
55+
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture, make_temp_table_name: t.Callable
5756
):
5857
adapter = make_mocked_engine_adapter(LogicalReplaceQueryMixin, "postgres")
5958
adapter.cursor.fetchone.return_value = (1,)
60-
temp_table_uuid = uuid.uuid4()
61-
uuid4_mock = mocker.patch("uuid.uuid4")
62-
uuid4_mock.return_value = temp_table_uuid
59+
60+
temp_table_mock = mocker.patch("sqlmesh.core.engine_adapter.EngineAdapter._get_temp_table")
61+
table_name = "db.table"
62+
temp_table_id = "abcdefgh"
63+
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)
6364

6465
mocker.patch(
6566
"sqlmesh.core.engine_adapter.postgres.LogicalReplaceQueryMixin.table_exists",
@@ -70,14 +71,14 @@ def test_logical_replace_self_reference(
7071
return_value={"col": exp.DataType(this=exp.DataType.Type.INT)},
7172
)
7273

73-
adapter.replace_query("db.table", parse_one("SELECT col + 1 AS col FROM db.table"))
74+
adapter.replace_query(table_name, parse_one(f"SELECT col + 1 AS col FROM {table_name}"))
7475

7576
assert to_sql_calls(adapter) == [
7677
f'CREATE SCHEMA IF NOT EXISTS "db"',
77-
f'CREATE TABLE IF NOT EXISTS "db"."__temp_table_{temp_table_uuid.hex}" AS SELECT "col" FROM "db"."table"',
78+
f'CREATE TABLE IF NOT EXISTS "db"."__temp_table_{temp_table_id}" AS SELECT "col" FROM "db"."table"',
7879
'TRUNCATE "db"."table"',
79-
f'INSERT INTO "db"."table" ("col") SELECT "col" + 1 AS "col" FROM "db"."__temp_table_{temp_table_uuid.hex}"',
80-
f'DROP TABLE IF EXISTS "db"."__temp_table_{temp_table_uuid.hex}"',
80+
f'INSERT INTO "db"."table" ("col") SELECT "col" + 1 AS "col" FROM "db"."__temp_table_{temp_table_id}"',
81+
f'DROP TABLE IF EXISTS "db"."__temp_table_{temp_table_id}"',
8182
]
8283

8384

0 commit comments

Comments
 (0)