Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions aiven_mysql_migrate/dump_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Copyright (c) 2025 Aiven, Helsinki, Finland. https://aiven.io/
from abc import ABC, abstractmethod
from aiven_mysql_migrate.migration_executor import ProcessExecutor
from aiven_mysql_migrate.utils import MySQLConnectionInfo
from enum import Enum
from typing import List, Optional

import logging
import shlex

LOGGER = logging.getLogger(__name__)


class MySQLMigrateMethod(str, Enum):
dump = "dump"
replication = "replication"


class DumpToolBase(ABC):
"""Abstract base class for database dump tools."""
def __init__(
self,
source: MySQLConnectionInfo,
target: MySQLConnectionInfo,
databases: List[str],
skip_column_stats: bool,
):
self.source = source
self.target = target
self.databases = databases
self.skip_column_stats = skip_column_stats
self.process_executor = ProcessExecutor()
self._gtid: Optional[str] = None

@abstractmethod
def get_dump_command(self, migration_method: MySQLMigrateMethod) -> List[str]:
"""Build dump command."""

@abstractmethod
def get_import_command(self) -> List[str]:
"""Build import command."""

def execute_migration(self, migration_method: MySQLMigrateMethod) -> Optional[str]:
"""
Execute the complete migration process (dump and import).
Args:
migration_method: The migration method (dump or replication)
Returns:
GTID string for replication setup, or None for dump method
"""
dump_cmd = self.get_dump_command(migration_method)
import_cmd = self.get_import_command()

try:
_, _, gtid = self.process_executor.execute_pipe_commands(
dump_cmd=dump_cmd, import_cmd=import_cmd, target=self.target
)
self._gtid = gtid
return self._gtid
except Exception as e:
LOGGER.error("Error during migration: %s", e)
self.cleanup()
raise

def cleanup(self) -> None:
"""Cleanup any temporary resources."""
self.process_executor.terminate_processes()

def get_gtid(self) -> Optional[str]:
"""Return the extracted GTID for replication setup."""
return self._gtid


class MySQLDumpTool(DumpToolBase):
"""MySQL dump tool using mysqldump/mysql."""
def get_dump_command(self, migration_method: MySQLMigrateMethod) -> List[str]:
"""Build mysqldump command."""
# "--flush-logs" and "--master-data=2" would be good options to add, but they do not work for RDS admin
# user - require extra permissions for `FLUSH TABLES WITH READ LOCK`
cmd = [
"mysqldump",
"-h",
self.source.hostname,
"-P",
str(self.source.port),
"-u",
self.source.username,
f"-p{self.source.password}",
"--compress",
"--skip-lock-tables",
"--single-transaction",
"--hex-blob",
"--routines",
"--triggers",
"--events",
]
if migration_method == MySQLMigrateMethod.replication:
cmd += ["--set-gtid-purged=ON"]
else:
cmd += ["--set-gtid-purged=OFF"]
if self.source.ssl:
cmd += ["--ssl-mode=REQUIRED"]
# Dumping column statistics is not supported by MySQL < 8.0 (which is default behaviour for newer versions)
if self.skip_column_stats:
cmd += ["--skip-column-statistics"]
cmd += ["--databases", "--", *[shlex.quote(db) for db in self.databases]]

return cmd

def get_import_command(self) -> List[str]:
"""Build mysql import command."""
cmd = [
"mysql", "-h", self.target.hostname, "-P",
str(self.target.port), "-u", self.target.username, f"-p{self.target.password}", "--compress"
]
if self.target.ssl:
cmd += ["--ssl-mode=REQUIRED"]

return cmd
157 changes: 22 additions & 135 deletions aiven_mysql_migrate/migration.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,28 @@
# Copyright (c) 2020 Aiven, Helsinki, Finland. https://aiven.io/
from aiven_mysql_migrate import config
from aiven_mysql_migrate.dump_tools import DumpToolBase, MySQLDumpTool, MySQLMigrateMethod
from aiven_mysql_migrate.exceptions import (
DatabaseTooLargeException, EndpointConnectionException, GTIDModeDisabledException, MissingReplicationGrants,
MySQLDumpException, MySQLImportException, NothingToMigrateException, ReplicaSetupException,
ReplicationNotAvailableException, ServerIdsOverlappingException, SSLNotSupportedException, TooManyDatabasesException,
UnsupportedBinLogFormatException, UnsupportedMySQLEngineException, UnsupportedMySQLVersionException,
WrongMigrationConfigurationException
NothingToMigrateException, ReplicaSetupException, ReplicationNotAvailableException, ServerIdsOverlappingException,
SSLNotSupportedException, TooManyDatabasesException, UnsupportedBinLogFormatException, UnsupportedMySQLEngineException,
UnsupportedMySQLVersionException, WrongMigrationConfigurationException
)
from aiven_mysql_migrate.utils import MySQLConnectionInfo, MySQLDumpProcessor, PrivilegeCheckUser, select_global_var
from concurrent import futures
from aiven_mysql_migrate.utils import MySQLConnectionInfo, PrivilegeCheckUser, select_global_var
from looseversion import LooseVersion
from pathlib import Path
from pymysql.constants.ER import HANDSHAKE_ERROR
from subprocess import Popen
from typing import List, Optional

import concurrent
import enum
import json
import logging
import os
import pymysql
import resource
import shlex
import signal
import subprocess
import sys
import time

LOGGER = logging.getLogger(__name__)


@enum.unique
class MySQLMigrateMethod(str, enum.Enum):
dump = "dump"
replication = "replication"


class MySQLMigration:
source: MySQLConnectionInfo
target: MySQLConnectionInfo
Expand All @@ -53,9 +39,10 @@ def __init__(
filter_dbs: Optional[str] = None,
privilege_check_user: Optional[str] = None,
output_meta_file: Optional[Path] = None,
dump_tool: str = "mysqldump",
):
self.mysqldump_proc: Optional[Popen] = None
self.mysql_proc: Optional[Popen] = None
self.dump_tool_name = dump_tool
self.dump_tool: Optional[DumpToolBase] = None

self.source = MySQLConnectionInfo.from_uri(source_uri, name="source")
self.target = MySQLConnectionInfo.from_uri(target_uri, name="target")
Expand All @@ -81,10 +68,8 @@ def setup_signal_handlers(self):

def _stop_migration(self, signum, frame):
LOGGER.info("Received signal: %s", signum)
for subproc in (self.mysqldump_proc, self.mysql_proc):
if subproc:
LOGGER.warning("Terminating subprocess with pid: %s", subproc.pid)
subproc.kill()
if self.dump_tool:
self.dump_tool.cleanup()

def list_databases(self) -> List[str]:
with self.source.cur() as cur:
Expand All @@ -105,8 +90,8 @@ def _check_versions_replication_support(self):
LOGGER.info("Checking MySQL versions for replication support")

if (
LooseVersion("5.7.0") <= LooseVersion(self.source.version) < LooseVersion("8.1")
and LooseVersion("8.0.0") <= LooseVersion(self.target.version) < LooseVersion("8.1")
LooseVersion("5.7.0") <= LooseVersion(self.source.version) < LooseVersion("8.1")
and LooseVersion("8.0.0") <= LooseVersion(self.target.version) < LooseVersion("8.1")
):
LOGGER.info("\tSource - %s, target - %s -- OK", self.source.version, self.target.version)
else:
Expand Down Expand Up @@ -189,8 +174,7 @@ def _check_database_size(self, max_size: float):
with self.source.cur() as cur:
cur.execute(
"SELECT SUM(DATA_LENGTH + INDEX_LENGTH) AS size FROM INFORMATION_SCHEMA.TABLES "
f"WHERE TABLE_SCHEMA NOT IN ({', '.join(['%s'] * len(self.ignore_dbs))})",
tuple(self.ignore_dbs)
f"WHERE TABLE_SCHEMA NOT IN ({', '.join(['%s'] * len(self.ignore_dbs))})", tuple(self.ignore_dbs)
)
source_size = cur.fetchone()["size"] or 0
if source_size > max_size:
Expand Down Expand Up @@ -266,114 +250,17 @@ def _stop_replication(self):

self._stop_and_reset_slave()

def _get_dump_command(self, migration_method: MySQLMigrateMethod) -> List[str]:
# "--flush-logs" and "--master-data=2" would be good options to add, but they do not work for RDS admin
# user - require extra permissions for `FLUSH TABLES WITH READ LOCK`
cmd = [
"mysqldump",
"-h",
self.source.hostname,
"-P",
str(self.source.port),
"-u",
self.source.username,
f"-p{self.source.password}",
"--compress",
"--skip-lock-tables",
"--single-transaction",
"--hex-blob",
"--routines",
"--triggers",
"--events",
]
if migration_method == MySQLMigrateMethod.replication:
cmd += ["--set-gtid-purged=ON"]
else:
cmd += ["--set-gtid-purged=OFF"]
if self.source.ssl:
cmd += ["--ssl-mode=REQUIRED"]
# Dumping column statistics is not supported by MySQL < 8.0 (which is default behaviour for newer versions)
if self.skip_column_stats:
cmd += ["--skip-column-statistics"]
cmd += ["--databases", "--", *[shlex.quote(db) for db in self.databases]]

return cmd

def _get_import_command(self) -> List[str]:
cmd = [
"mysql", "-h", self.target.hostname, "-P",
str(self.target.port), "-u", self.target.username, f"-p{self.target.password}", "--compress"
]
if self.target.ssl:
cmd += ["--ssl-mode=REQUIRED"]

return cmd

def _migrate_data(self, migration_method: MySQLMigrateMethod) -> Optional[str]:
"""Migrate data using mysqldump/mysql cli into the target database, return GTID from the dump"""
LOGGER.info("Starting import MySQL dump file into target database")

dump_processor = MySQLDumpProcessor()
self.mysqldump_proc = Popen( # pylint: disable=consider-using-with
self._get_dump_command(migration_method=migration_method),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
)
self.mysql_proc = Popen( # pylint: disable=consider-using-with
self._get_import_command(),
stdin=subprocess.PIPE,
stderr=subprocess.PIPE, text=True
)

# Disallow creating child processes in migration target when this runs as non-root user.
resource.prlimit(self.mysql_proc.pid, resource.RLIMIT_NPROC, (0, 0))

# make mypy happy
assert self.mysqldump_proc.stdout
assert self.mysqldump_proc.stderr
assert self.mysql_proc.stdin

# If sql_require_primary_key is ON globally - it's not possible to import tables without a primary key
with self.target.cur() as cur:
if select_global_var(cur, "sql_require_primary_key") == 1:
self.mysql_proc.stdin.write("SET SESSION sql_require_primary_key = 0;")

def _reader_stdout():
for line in self.mysqldump_proc.stdout:
line = dump_processor.process_line(line.rstrip())

if not line:
continue

LOGGER.debug("dump: %s", line)
self.mysql_proc.stdin.write(line + "\n")

self.mysql_proc.stdin.flush()
self.mysql_proc.stdin.close()

def _reader_stderr(proc):
for line in proc.stderr:
sys.stderr.write(line)

with futures.ThreadPoolExecutor(max_workers=3) as executor:
for future in concurrent.futures.as_completed([
executor.submit(_reader_stdout),
executor.submit(_reader_stderr, self.mysqldump_proc),
executor.submit(_reader_stderr, self.mysql_proc)
]):
future.result()

export_code = self.mysqldump_proc.wait()
import_code = self.mysql_proc.wait()

if export_code != 0:
raise MySQLDumpException(f"Error while importing data from the source database, exit code: {export_code}")

if import_code != 0:
raise MySQLImportException(f"Error while importing data into the target database, exit code: {import_code}")
"""Migrate data using the configured dump tool, return GTID from the dump"""
if self.dump_tool_name == "mysqldump":
self.dump_tool = MySQLDumpTool(
source=self.source, target=self.target, databases=self.databases, skip_column_stats=self.skip_column_stats
)
else:
raise ValueError(f"Unknown dump tool: {self.dump_tool_name}")

return dump_processor.get_gtid()
# Execute migration
return self.dump_tool.execute_migration(migration_method)

def _set_gtid(self, gtid: str):
LOGGER.info("GTID from the dump is `%s`", gtid)
Expand Down
Loading