Skip to content

Commit cf695a3

Browse files
refactor: added abstractions for CLI tools used for migration
introduce DumpToolBase for migration process and mysqldump integration
1 parent 68ba19c commit cf695a3

File tree

4 files changed

+262
-137
lines changed

4 files changed

+262
-137
lines changed

aiven_mysql_migrate/dump_tools.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
# Copyright (c) 2025 Aiven, Helsinki, Finland. https://aiven.io/
2+
from abc import ABC, abstractmethod
3+
from aiven_mysql_migrate.migration_executor import ProcessExecutor
4+
from aiven_mysql_migrate.utils import MySQLConnectionInfo
5+
from enum import Enum
6+
from subprocess import Popen
7+
from typing import List, Optional
8+
9+
import logging
10+
import shlex
11+
12+
LOGGER = logging.getLogger(__name__)
13+
14+
15+
class MySQLMigrateMethod(str, Enum):
16+
dump = "dump"
17+
replication = "replication"
18+
19+
20+
class DumpToolBase(ABC):
21+
"""Abstract base class for database dump tools."""
22+
def __init__(
23+
self,
24+
source: MySQLConnectionInfo,
25+
target: MySQLConnectionInfo,
26+
databases: List[str],
27+
skip_column_stats: bool,
28+
):
29+
self.source = source
30+
self.target = target
31+
self.databases = databases
32+
self.skip_column_stats = skip_column_stats
33+
self._gtid: Optional[str] = None
34+
# Store process references for cleanup
35+
self._processes: List[Popen] = []
36+
37+
@abstractmethod
38+
def get_dump_command(self, migration_method: MySQLMigrateMethod) -> List[str]:
39+
"""Build dump command."""
40+
41+
@abstractmethod
42+
def get_import_command(self) -> List[str]:
43+
"""Build import command."""
44+
45+
def execute_migration(self, migration_method: MySQLMigrateMethod) -> Optional[str]:
46+
"""
47+
Execute the complete migration process (dump and import).
48+
49+
Args:
50+
migration_method: The migration method (dump or replication)
51+
52+
Returns:
53+
GTID string for replication setup, or None for dump method
54+
"""
55+
dump_cmd = self.get_dump_command(migration_method)
56+
import_cmd = self.get_import_command()
57+
58+
try:
59+
_, _, gtid = ProcessExecutor.execute_pipe_commands(
60+
dump_cmd=dump_cmd, import_cmd=import_cmd, source=self.source, target=self.target
61+
)
62+
self._gtid = gtid
63+
return self._gtid
64+
except Exception as e:
65+
LOGGER.error("Error during migration: %s", e)
66+
self.cleanup()
67+
raise
68+
69+
def cleanup(self) -> None:
70+
"""Cleanup any temporary resources."""
71+
ProcessExecutor.terminate_processes(self._processes)
72+
73+
def get_gtid(self) -> Optional[str]:
74+
"""Return the extracted GTID for replication setup."""
75+
return self._gtid
76+
77+
78+
class MySQLDumpTool(DumpToolBase):
79+
"""MySQL dump tool using mysqldump/mysql."""
80+
def get_dump_command(self, migration_method: MySQLMigrateMethod) -> List[str]:
81+
"""Build mysqldump command."""
82+
# "--flush-logs" and "--master-data=2" would be good options to add, but they do not work for RDS admin
83+
# user - require extra permissions for `FLUSH TABLES WITH READ LOCK`
84+
cmd = [
85+
"mysqldump",
86+
"-h",
87+
self.source.hostname,
88+
"-P",
89+
str(self.source.port),
90+
"-u",
91+
self.source.username,
92+
f"-p{self.source.password}",
93+
"--compress",
94+
"--skip-lock-tables",
95+
"--single-transaction",
96+
"--hex-blob",
97+
"--routines",
98+
"--triggers",
99+
"--events",
100+
]
101+
if migration_method == MySQLMigrateMethod.replication:
102+
cmd += ["--set-gtid-purged=ON"]
103+
else:
104+
cmd += ["--set-gtid-purged=OFF"]
105+
if self.source.ssl:
106+
cmd += ["--ssl-mode=REQUIRED"]
107+
# Dumping column statistics is not supported by MySQL < 8.0 (which is default behaviour for newer versions)
108+
if self.skip_column_stats:
109+
cmd += ["--skip-column-statistics"]
110+
cmd += ["--databases", "--", *[shlex.quote(db) for db in self.databases]]
111+
112+
return cmd
113+
114+
def get_import_command(self) -> List[str]:
115+
"""Build mysql import command."""
116+
cmd = [
117+
"mysql", "-h", self.target.hostname, "-P",
118+
str(self.target.port), "-u", self.target.username, f"-p{self.target.password}", "--compress"
119+
]
120+
if self.target.ssl:
121+
cmd += ["--ssl-mode=REQUIRED"]
122+
123+
return cmd

aiven_mysql_migrate/migration.py

Lines changed: 22 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,28 @@
11
# Copyright (c) 2020 Aiven, Helsinki, Finland. https://aiven.io/
22
from aiven_mysql_migrate import config
3+
from aiven_mysql_migrate.dump_tools import DumpToolBase, MySQLDumpTool, MySQLMigrateMethod
34
from aiven_mysql_migrate.exceptions import (
45
DatabaseTooLargeException, EndpointConnectionException, GTIDModeDisabledException, MissingReplicationGrants,
5-
MySQLDumpException, MySQLImportException, NothingToMigrateException, ReplicaSetupException,
6-
ReplicationNotAvailableException, ServerIdsOverlappingException, SSLNotSupportedException, TooManyDatabasesException,
7-
UnsupportedBinLogFormatException, UnsupportedMySQLEngineException, UnsupportedMySQLVersionException,
8-
WrongMigrationConfigurationException
6+
NothingToMigrateException, ReplicaSetupException, ReplicationNotAvailableException, ServerIdsOverlappingException,
7+
SSLNotSupportedException, TooManyDatabasesException, UnsupportedBinLogFormatException, UnsupportedMySQLEngineException,
8+
UnsupportedMySQLVersionException, WrongMigrationConfigurationException
99
)
10-
from aiven_mysql_migrate.utils import MySQLConnectionInfo, MySQLDumpProcessor, PrivilegeCheckUser, select_global_var
11-
from concurrent import futures
10+
from aiven_mysql_migrate.utils import MySQLConnectionInfo, PrivilegeCheckUser, select_global_var
1211
from looseversion import LooseVersion
1312
from pathlib import Path
1413
from pymysql.constants.ER import HANDSHAKE_ERROR
15-
from subprocess import Popen
1614
from typing import List, Optional
1715

18-
import concurrent
19-
import enum
2016
import json
2117
import logging
2218
import os
2319
import pymysql
24-
import resource
25-
import shlex
2620
import signal
27-
import subprocess
28-
import sys
2921
import time
3022

3123
LOGGER = logging.getLogger(__name__)
3224

3325

34-
@enum.unique
35-
class MySQLMigrateMethod(str, enum.Enum):
36-
dump = "dump"
37-
replication = "replication"
38-
39-
4026
class MySQLMigration:
4127
source: MySQLConnectionInfo
4228
target: MySQLConnectionInfo
@@ -53,9 +39,10 @@ def __init__(
5339
filter_dbs: Optional[str] = None,
5440
privilege_check_user: Optional[str] = None,
5541
output_meta_file: Optional[Path] = None,
42+
dump_tool: str = "mysqldump",
5643
):
57-
self.mysqldump_proc: Optional[Popen] = None
58-
self.mysql_proc: Optional[Popen] = None
44+
self.dump_tool_name = dump_tool
45+
self.dump_tool: Optional[DumpToolBase] = None
5946

6047
self.source = MySQLConnectionInfo.from_uri(source_uri, name="source")
6148
self.target = MySQLConnectionInfo.from_uri(target_uri, name="target")
@@ -81,10 +68,8 @@ def setup_signal_handlers(self):
8168

8269
def _stop_migration(self, signum, frame):
8370
LOGGER.info("Received signal: %s", signum)
84-
for subproc in (self.mysqldump_proc, self.mysql_proc):
85-
if subproc:
86-
LOGGER.warning("Terminating subprocess with pid: %s", subproc.pid)
87-
subproc.kill()
71+
if self.dump_tool:
72+
self.dump_tool.cleanup()
8873

8974
def list_databases(self) -> List[str]:
9075
with self.source.cur() as cur:
@@ -105,8 +90,8 @@ def _check_versions_replication_support(self):
10590
LOGGER.info("Checking MySQL versions for replication support")
10691

10792
if (
108-
LooseVersion("5.7.0") <= LooseVersion(self.source.version) < LooseVersion("8.1")
109-
and LooseVersion("8.0.0") <= LooseVersion(self.target.version) < LooseVersion("8.1")
93+
LooseVersion("5.7.0") <= LooseVersion(self.source.version) < LooseVersion("8.1")
94+
and LooseVersion("8.0.0") <= LooseVersion(self.target.version) < LooseVersion("8.1")
11095
):
11196
LOGGER.info("\tSource - %s, target - %s -- OK", self.source.version, self.target.version)
11297
else:
@@ -189,8 +174,7 @@ def _check_database_size(self, max_size: float):
189174
with self.source.cur() as cur:
190175
cur.execute(
191176
"SELECT SUM(DATA_LENGTH + INDEX_LENGTH) AS size FROM INFORMATION_SCHEMA.TABLES "
192-
f"WHERE TABLE_SCHEMA NOT IN ({', '.join(['%s'] * len(self.ignore_dbs))})",
193-
tuple(self.ignore_dbs)
177+
f"WHERE TABLE_SCHEMA NOT IN ({', '.join(['%s'] * len(self.ignore_dbs))})", tuple(self.ignore_dbs)
194178
)
195179
source_size = cur.fetchone()["size"] or 0
196180
if source_size > max_size:
@@ -266,114 +250,17 @@ def _stop_replication(self):
266250

267251
self._stop_and_reset_slave()
268252

269-
def _get_dump_command(self, migration_method: MySQLMigrateMethod) -> List[str]:
270-
# "--flush-logs" and "--master-data=2" would be good options to add, but they do not work for RDS admin
271-
# user - require extra permissions for `FLUSH TABLES WITH READ LOCK`
272-
cmd = [
273-
"mysqldump",
274-
"-h",
275-
self.source.hostname,
276-
"-P",
277-
str(self.source.port),
278-
"-u",
279-
self.source.username,
280-
f"-p{self.source.password}",
281-
"--compress",
282-
"--skip-lock-tables",
283-
"--single-transaction",
284-
"--hex-blob",
285-
"--routines",
286-
"--triggers",
287-
"--events",
288-
]
289-
if migration_method == MySQLMigrateMethod.replication:
290-
cmd += ["--set-gtid-purged=ON"]
291-
else:
292-
cmd += ["--set-gtid-purged=OFF"]
293-
if self.source.ssl:
294-
cmd += ["--ssl-mode=REQUIRED"]
295-
# Dumping column statistics is not supported by MySQL < 8.0 (which is default behaviour for newer versions)
296-
if self.skip_column_stats:
297-
cmd += ["--skip-column-statistics"]
298-
cmd += ["--databases", "--", *[shlex.quote(db) for db in self.databases]]
299-
300-
return cmd
301-
302-
def _get_import_command(self) -> List[str]:
303-
cmd = [
304-
"mysql", "-h", self.target.hostname, "-P",
305-
str(self.target.port), "-u", self.target.username, f"-p{self.target.password}", "--compress"
306-
]
307-
if self.target.ssl:
308-
cmd += ["--ssl-mode=REQUIRED"]
309-
310-
return cmd
311-
312253
def _migrate_data(self, migration_method: MySQLMigrateMethod) -> Optional[str]:
313-
"""Migrate data using mysqldump/mysql cli into the target database, return GTID from the dump"""
314-
LOGGER.info("Starting import MySQL dump file into target database")
315-
316-
dump_processor = MySQLDumpProcessor()
317-
self.mysqldump_proc = Popen( # pylint: disable=consider-using-with
318-
self._get_dump_command(migration_method=migration_method),
319-
stdout=subprocess.PIPE,
320-
stderr=subprocess.PIPE,
321-
universal_newlines=True
322-
)
323-
self.mysql_proc = Popen( # pylint: disable=consider-using-with
324-
self._get_import_command(),
325-
stdin=subprocess.PIPE,
326-
stderr=subprocess.PIPE, text=True
327-
)
328-
329-
# Disallow creating child processes in migration target when this runs as non-root user.
330-
resource.prlimit(self.mysql_proc.pid, resource.RLIMIT_NPROC, (0, 0))
331-
332-
# make mypy happy
333-
assert self.mysqldump_proc.stdout
334-
assert self.mysqldump_proc.stderr
335-
assert self.mysql_proc.stdin
336-
337-
# If sql_require_primary_key is ON globally - it's not possible to import tables without a primary key
338-
with self.target.cur() as cur:
339-
if select_global_var(cur, "sql_require_primary_key") == 1:
340-
self.mysql_proc.stdin.write("SET SESSION sql_require_primary_key = 0;")
341-
342-
def _reader_stdout():
343-
for line in self.mysqldump_proc.stdout:
344-
line = dump_processor.process_line(line.rstrip())
345-
346-
if not line:
347-
continue
348-
349-
LOGGER.debug("dump: %s", line)
350-
self.mysql_proc.stdin.write(line + "\n")
351-
352-
self.mysql_proc.stdin.flush()
353-
self.mysql_proc.stdin.close()
354-
355-
def _reader_stderr(proc):
356-
for line in proc.stderr:
357-
sys.stderr.write(line)
358-
359-
with futures.ThreadPoolExecutor(max_workers=3) as executor:
360-
for future in concurrent.futures.as_completed([
361-
executor.submit(_reader_stdout),
362-
executor.submit(_reader_stderr, self.mysqldump_proc),
363-
executor.submit(_reader_stderr, self.mysql_proc)
364-
]):
365-
future.result()
366-
367-
export_code = self.mysqldump_proc.wait()
368-
import_code = self.mysql_proc.wait()
369-
370-
if export_code != 0:
371-
raise MySQLDumpException(f"Error while importing data from the source database, exit code: {export_code}")
372-
373-
if import_code != 0:
374-
raise MySQLImportException(f"Error while importing data into the target database, exit code: {import_code}")
254+
"""Migrate data using the configured dump tool, return GTID from the dump"""
255+
if self.dump_tool_name == "mysqldump":
256+
self.dump_tool = MySQLDumpTool(
257+
source=self.source, target=self.target, databases=self.databases, skip_column_stats=self.skip_column_stats
258+
)
259+
else:
260+
raise ValueError(f"Unknown dump tool: {self.dump_tool_name}")
375261

376-
return dump_processor.get_gtid()
262+
# Execute migration
263+
return self.dump_tool.execute_migration(migration_method)
377264

378265
def _set_gtid(self, gtid: str):
379266
LOGGER.info("GTID from the dump is `%s`", gtid)

0 commit comments

Comments
 (0)