diff --git a/src/databricks/labs/lakebridge/assessments/_constants.py b/src/databricks/labs/lakebridge/assessments/_constants.py index 9a793b19d1..e1299ccbae 100644 --- a/src/databricks/labs/lakebridge/assessments/_constants.py +++ b/src/databricks/labs/lakebridge/assessments/_constants.py @@ -5,10 +5,11 @@ PLATFORM_TO_SOURCE_TECHNOLOGY_CFG = { "synapse": "src/databricks/labs/lakebridge/resources/assessments/synapse/pipeline_config.yml", + "mssql": "src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml", } # TODO modify this PLATFORM_TO_SOURCE_TECHNOLOGY.keys() once all platforms are supported -PROFILER_SOURCE_SYSTEM = ["synapse"] +PROFILER_SOURCE_SYSTEM = ["synapse", "mssql"] # This flag indicates whether a connector is required for the source system when pipeline is trigger diff --git a/src/databricks/labs/lakebridge/assessments/configure_assessment.py b/src/databricks/labs/lakebridge/assessments/configure_assessment.py index 0da1c28efa..5923d8d96b 100644 --- a/src/databricks/labs/lakebridge/assessments/configure_assessment.py +++ b/src/databricks/labs/lakebridge/assessments/configure_assessment.py @@ -87,18 +87,21 @@ def _configure_credentials(self) -> str: secret_vault_type = str(self.prompts.choice("Enter secret vault type (local | env)", ["local", "env"])).lower() secret_vault_name = None - logger.info("Please refer to the documentation to understand the difference between local and env.") - credential = { "secret_vault_type": secret_vault_type, "secret_vault_name": secret_vault_name, source: { - "database": self.prompts.question("Enter the database name"), - "driver": self.prompts.question("Enter the driver details"), - "server": self.prompts.question("Enter the server or host details"), + "auth_type": "sql_authentication", + "fetch_size": self.prompts.question("Enter fetch size", default="1000"), + "login_timeout": self.prompts.question("Enter login timeout (seconds)", default="30"), + "server": self.prompts.question("Enter the fully-qualified server name"), "port": int(self.prompts.question("Enter the port details", valid_number=True)), - "user": self.prompts.question("Enter the user details"), - "password": self.prompts.password("Enter the password details"), + "user": self.prompts.question("Enter the SQL username"), + "password": self.prompts.password("Enter the SQL password"), + "tz_info": self.prompts.question("Enter timezone (e.g. America/New_York)", default="UTC"), + "driver": self.prompts.question( + "Enter the ODBC driver installed locally", default="ODBC Driver 18 for SQL Server" + ), }, } diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/__init__.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py new file mode 100644 index 0000000000..e9f3b4f815 --- /dev/null +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py @@ -0,0 +1,72 @@ +import json +import sys + +from databricks.labs.lakebridge.connections.credential_manager import create_credential_manager +from databricks.labs.lakebridge.assessments import PRODUCT_NAME +from databricks.labs.lakebridge.resources.assessments.mssql.common.connector import get_sqlserver_reader +from databricks.labs.lakebridge.resources.assessments.mssql.common.queries import MSSQLQueries +from databricks.labs.lakebridge.resources.assessments.synapse.common.duckdb_helpers import save_resultset_to_db +from databricks.labs.lakebridge.resources.assessments.synapse.common.functions import arguments_loader, set_logger + + +def execute(): + logger = set_logger(__file__) + + db_path, creds_file = arguments_loader(desc="MSSQL Server Activity Extract Script") + cred_manager = create_credential_manager(PRODUCT_NAME, creds_file) + mssql_settings = cred_manager.get_credentials("mssql") + auth_type = mssql_settings.get("auth_type", "sql_authentication") + server_name = mssql_settings.get("server", "") + try: + + # TODO: get the last time the profiler was executed + # For now, we'll default to None, but this will eventually need + # input from a scheduler component. + last_execution_time = None + mode = "overwrite" + + # Extract activity metrics + logger.info(f"Extracting activity metrics for: {server_name}") + print(f"Extracting activity metrics for: {server_name}") + connection = get_sqlserver_reader( + mssql_settings, db_name="master", server_name=server_name, auth_type=auth_type + ) + + # Query stats + table_name = "query_stats" + table_query = MSSQLQueries.get_query_stats(last_execution_time) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # Stored procedure stats + table_name = "proc_stats" + table_query = MSSQLQueries.get_procedure_stats(last_execution_time) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # Session info + table_name = "sessions" + table_query = MSSQLQueries.get_sessions(last_execution_time) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # CPU Utilization + table_name = "cpu_utilization" + table_query = MSSQLQueries.get_cpu_utilization(last_execution_time) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + print(json.dumps({"status": "success", "message": "All data loaded successfully loaded successfully"})) + + except Exception as e: + logger.error(f"Failed to extract activity info for SQL server: {str(e)}") + print(json.dumps({"status": "error", "message": str(e)}), file=sys.stderr) + sys.exit(1) + + +if __name__ == '__main__': + execute() diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/common/__init__.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/common/connector.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/connector.py new file mode 100644 index 0000000000..4b5f2eb496 --- /dev/null +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/connector.py @@ -0,0 +1,22 @@ +from databricks.labs.lakebridge.connections.database_manager import DatabaseManager + + +def get_sqlserver_reader( + input_cred: dict, + db_name: str, + *, + server_name: str, + auth_type: str = 'sql_authentication', +) -> DatabaseManager: + config = { + "driver": input_cred['driver'], + "server": server_name, + "database": db_name, + "user": input_cred['user'], + "password": input_cred['password'], + "port": input_cred.get('port', 1433), + "auth_type": auth_type, + } + source = "mssql" + + return DatabaseManager(source, config) diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/common/functions.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/functions.py new file mode 100644 index 0000000000..33e63e446e --- /dev/null +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/functions.py @@ -0,0 +1,9 @@ +from azure.identity import DefaultAzureCredential +from azure.mgmt.sql import SqlManagementClient + + +def create_msql_sql_client(config: dict) -> SqlManagementClient: + """ + Creates an Azure SQL management client for the provided subscription using the default Azure credential. + """ + return SqlManagementClient(credential=DefaultAzureCredential(), subscription_id=config["subscription_id"]) diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/common/queries.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/queries.py new file mode 100644 index 0000000000..7ba18dc7fe --- /dev/null +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/queries.py @@ -0,0 +1,385 @@ +class MSSQLQueries: + + @staticmethod + def get_query_stats(last_execution_time: str | None) -> str: + """ + Retrieves and classifies recently executed SQL statements from `sys.dm_exec_query_stats`. + Includes execution metrics (count, duration, CPU time, rows) and categorizes each statement as QUERY, DML, DDL, + ROUTINE, TRANSACTION_CONTROL, or OTHER based on its command type. + """ + predicate = ( + f"WHERE qs.last_execution_time > CAST('{last_execution_time}' AS DATETIME2(6))" + if last_execution_time + else "" + ) + + return f""" + with query_stats as ( + SELECT + CONVERT(VARCHAR(64), HASHBYTES('SHA2_256', qs.sql_handle), 1) as sql_handle, + st.dbid, + qs.creation_time, + qs.last_execution_time, + qs.execution_count, + qs.total_worker_time, + qs.total_elapsed_time, + qs.total_rows, + SUBSTRING(st.text, (qs.statement_start_offset/2) + 1, + ((CASE statement_end_offset + WHEN -1 THEN DATALENGTH(st.text) + ELSE qs.statement_end_offset END + - qs.statement_start_offset)/2) + 1) AS statement_text + FROM sys.dm_exec_query_stats AS qs + CROSS APPLY sys.dm_exec_sql_text(qs.sql_handle) AS st + {predicate} + ), + query_stats_ex as ( + select + dbid, + creation_time, + last_execution_time, + execution_count, + total_worker_time, + total_elapsed_time, + total_rows, + UPPER(SUBSTRING(LTRIM(RTRIM(statement_text)), 1, 40)) as command + from query_stats + ) + SELECT + *, + CASE + WHEN command like 'SELECT%' THEN 'QUERY' + WHEN command like 'WITH%' THEN 'QUERY' + WHEN command like 'INSERT%' THEN 'DML' + WHEN command like 'UPDATE%' THEN 'DML' + WHEN command like 'MERGE%' THEN 'DML' + WHEN command like 'DELETE%' THEN 'DML' + WHEN command like 'TRUNCATE%' THEN 'DML' + WHEN command like 'COPY%' THEN 'DML' + WHEN command like 'IF%' THEN 'DML' + WHEN command like 'BEGIN%' THEN 'DML' + WHEN command like 'DECLARE%' THEN 'DML' + WHEN command like 'BUILDREPLICATEDTABLECACHE%' THEN 'DML' + WHEN command like 'CREATE%' THEN 'DDL' + WHEN command like 'DROP%' THEN 'DDL' + WHEN command like 'ALTER%' THEN 'DDL' + WHEN command like 'EXEC%' THEN 'ROUTINE' + WHEN command like 'EXECUTE %' THEN 'ROUTINE' + WHEN command like 'BEGIN%TRAN%' THEN 'TRANSACTION_CONTROL' + WHEN command like 'END%TRAN%' THEN 'TRANSACTION_CONTROL' + WHEN command like 'COMMIT%' THEN 'TRANSACTION_CONTROL' + WHEN command like 'ROLLBACK%' THEN 'TRANSACTION_CONTROL' + ELSE 'OTHER' + END as command_type, + SYSDATETIME() as extract_ts + FROM query_stats_ex + ORDER BY last_execution_time + """ + + @staticmethod + def get_procedure_stats(last_execution_time: str | None): + """ + Retrieves execution statistics for stored procedures from `sys.dm_exec_procedure_stats`, + including execution counts, total CPU and elapsed time, last execution timestamp, + and maps object and database IDs to their names. Results are ordered by most recent execution. + """ + predicate = ( + f"WHERE last_execution_time > CAST('{last_execution_time}' AS DATETIME2(6))" if last_execution_time else "" + ) + return f""" + SELECT + database_id, + DB_NAME(database_id) AS db_name, + object_id, + OBJECT_NAME(object_id, database_id) AS object_name, + type, + last_execution_time, + execution_count, + total_worker_time, + total_elapsed_time, + SYSDATETIME() as extract_ts + FROM + sys.dm_exec_procedure_stats + {predicate} + ORDER BY + last_execution_time + """ + + @staticmethod + def get_sessions(last_execution_time: str | None): + """ + Retrieves active user session details from `sys.dm_exec_sessions`, including login info, + program and client names, CPU and memory usage, request timing, row counts, and database context. + Excludes system sessions and orders results by the end time of the last request. + """ + predicate = ( + f"AND last_request_end_time > CAST('{last_execution_time}' AS DATETIME2(6))" if last_execution_time else "" + ) + return f""" + SELECT + session_id, + login_time, + program_name, + client_interface_name, + CONVERT(VARCHAR(64), HASHBYTES('SHA2_256', login_name), 1) as login_name, + status, + cpu_time, + memory_usage, + total_scheduled_time, + total_elapsed_time, + last_request_start_time, + last_request_end_time, + is_user_process, + row_count, + database_id, + DB_NAME(database_id) AS db_name, + SYSDATETIME() as extract_ts + FROM + sys.dm_exec_sessions + WHERE + is_user_process <> 0 {predicate} + ORDER BY + last_request_end_time + """ + + @staticmethod + def get_cpu_utilization(last_execution_time: str | None): + """ + Extracts SQL Server CPU and system utilization metrics from `sys.dm_os_ring_buffers`, + including system idle and SQL process utilization over time. + """ + predicate = f"WHERE EventTime > CAST('{last_execution_time}' AS DATETIME2(6))" if last_execution_time else "" + return f""" + WITH process_utilization_info + AS (SELECT record.value('(./Record/@id)[1]', 'int') + AS record_id, + [timestamp], + record.value('(./Record/SchedulerMonitorEvent/SystemHealth/SystemIdle)[1]', 'int') AS SystemIdle, + record.value('(./Record/SchedulerMonitorEvent/SystemHealth/ProcessUtilization)[1]', 'int') AS SQLProcessUtilization + FROM (SELECT [timestamp], + CONVERT(XML, record) AS record + FROM sys.dm_os_ring_buffers + WHERE ring_buffer_type = N'RING_BUFFER_SCHEDULER_MONITOR' + AND record LIKE '%%') X), + os_sysinfo + AS (SELECT TOP 1 ms_ticks + FROM sys.dm_os_sys_info), + cpu_utilization + AS (SELECT record_id, + Dateadd (ms, ( [timestamp] - ms_ticks ), Getdate()) AS EventTime, + systemidle, + sqlprocessutilization + FROM process_utilization_info + CROSS JOIN os_sysinfo) + SELECT *, + Sysdatetime() AS extract_ts + FROM cpu_utilization + {predicate} + ORDER BY eventtime + """ + + @staticmethod + def get_sys_info(): + """ + Retrieves system-level information from SQL Server using sys.dm_os_sys_info. + Returns details about memory, CPU, scheduler count, and other OS-related + metadata for the SQL Server instance, along with a timestamp indicating when + the data was extracted. + """ + return """ + SELECT *, + Sysdatetime() AS extract_ts + FROM sys.dm_os_sys_info + """ + + @staticmethod + def get_databases(): + """ + Retrieve metadata for all user databases, excluding system databases. + Returns each database's ID, name, collation, creation date, + and a timestamp indicating when the data was extracted. + """ + return """ + SELECT DB_ID(NAME) AS db_id, + NAME, + collation_name, + create_date, + SYSDATETIME() AS extract_ts + FROM sys.databases + WHERE NAME NOT IN ( 'master', 'tempdb', 'model', 'msdb' ); + """ + + @staticmethod + def get_tables(): + """ + Retrieves metadata for all tables in the specified database by querying + INFORMATION_SCHEMA.TABLES. Returns table definitions along with a timestamp + indicating when the data was extracted. + """ + return """ + SELECT + TABLE_CATALOG, + TABLE_SCHEMA, + TABLE_NAME, + TABLE_TYPE + FROM INFORMATION_SCHEMA.TABLES ; + """ + + @staticmethod + def get_views(): + """ + Retrieves metadata for all views in the specified database by querying + `INFORMATION_SCHEMA.VIEWS`. Returns view definitions along with a timestamp + indicating when the data was extracted. + """ + return """ + SELECT + TABLE_CATALOG, + TABLE_SCHEMA, + TABLE_NAME, + CHECK_OPTION, + IS_UPDATABLE, + '[REDACTED]' as VIEW_DEFINITION + FROM INFORMATION_SCHEMA.VIEWS + """ + + @staticmethod + def get_columns(): + """ + Retrieves column-level metadata for all tables and views in the specified + database by querying INFORMATION_SCHEMA.COLUMNS. Returns column attributes + along with a timestamp indicating when the data was extracted. + """ + return """ + SELECT + TABLE_CATALOG, + TABLE_SCHEMA, + TABLE_NAME, + COLUMN_NAME, + ORDINAL_POSITION, + COLUMN_DEFAULT, + IS_NULLABLE, + DATA_TYPE, + CHARACTER_MAXIMUM_LENGTH, + CHARACTER_OCTET_LENGTH, + NUMERIC_PRECISION, + NUMERIC_PRECISION_RADIX, + NUMERIC_SCALE, + DATETIME_PRECISION, + CHARACTER_SET_CATALOG, + CHARACTER_SET_SCHEMA, + CHARACTER_SET_NAME, + COLLATION_CATALOG, + COLLATION_SCHEMA, + COLLATION_NAME, + DOMAIN_CATALOG, + DOMAIN_SCHEMA, + DOMAIN_NAME + FROM INFORMATION_SCHEMA.COLUMNS ; + """ + + @staticmethod + def get_indexed_views(): + """ + Retrieves metadata for all indexed views in the specified database by joining + `sys.views` with `sys.indexes`. Returns view details for those with a clustered + index (index_id = 1) along with a timestamp indicating when the data was extracted. + """ + return """ + SELECT + v.[name] AS indexed_view_name, + s.[name] AS schema_name, + i.[name] AS index_name, + i.[type_desc] AS index_type, + i.[index_id], + SYSDATETIME() as extract_ts + FROM sys.views AS v + JOIN sys.schemas AS s + ON v.[schema_id] = s.[schema_id] + JOIN sys.indexes AS i + ON v.[object_id] = i.[object_id] + WHERE i.[index_id] = 1; + """ + + @staticmethod + def get_routines(): + """ + Retrieves metadata for all routines (stored procedures and functions) in the + specified database by querying INFORMATION_SCHEMA.ROUTINES. Returns routine + details along with a timestamp indicating when the data was extracted. + """ + return """ + SELECT + CREATED, + DATA_TYPE, + IS_DETERMINISTIC, + IS_IMPLICITLY_INVOCABLE, + IS_NULL_CALL, + IS_USER_DEFINED_CAST, + LAST_ALTERED, + MAX_DYNAMIC_RESULT_SETS, + NUMERIC_PRECISION, + NUMERIC_PRECISION_RADIX, + NUMERIC_SCALE, + ROUTINE_BODY, + ROUTINE_CATALOG, + '[REDACTED]' as ROUTINE_DEFINITION, + ROUTINE_NAME, + ROUTINE_SCHEMA, + ROUTINE_TYPE, + SCHEMA_LEVEL_ROUTINE, + SPECIFIC_CATALOG, + SPECIFIC_NAME, + SPECIFIC_SCHEMA, + SQL_DATA_ACCESS + FROM information_schema.routines + """ + + @staticmethod + def get_db_sizes(): + """ + Retrieves metadata for all data files (type = 0) in the specified database + from sys.database_files. Returns file name, type, current size, free space, + maximum size, and a timestamp indicating when the data was extracted. + """ + return """ + SELECT + DB_NAME() AS database_name, + name AS FileName, + type_desc, + size/128.0 AS CurrentSizeMB, + size/128.0 - CAST(FILEPROPERTY(name, 'SpaceUsed') AS int)/128.0 AS FreeSpaceInMB, + max_size as MaxSize, + SYSDATETIME() as extract_ts + FROM sys.database_files WHERE type=0 + """ + + @staticmethod + def get_table_sizes(): + """ + Retrieves storage and row count statistics for all user tables in the specified + database by querying sys.dm_db_partition_stats and sys.objects. Returns table + name, total rows, reserved, used, and unused space (MB), breakdown of data vs. + index space, and a timestamp indicating when the data was extracted. + """ + return """ + SELECT + o.name AS TableName, + SUM(ps.row_count) AS [RowCount], + SUM(ps.reserved_page_count) * 8 / 1024 AS ReservedMB, + SUM(ps.used_page_count) * 8 / 1024 AS UsedMB, + (SUM(ps.reserved_page_count) - SUM(ps.used_page_count)) * 8 / 1024 AS UnusedMB, + SUM(CASE + WHEN ps.index_id < 2 THEN ps.in_row_data_page_count + ps.lob_used_page_count + ps.row_overflow_used_page_count + ELSE 0 + END) * 8 / 1024 AS DataMB, + SUM(CASE + WHEN ps.index_id >= 2 THEN ps.in_row_data_page_count + ELSE 0 + END) * 8 / 1024 AS IndexMB, + SYSDATETIME() as extract_ts + FROM sys.dm_db_partition_stats AS ps + JOIN sys.objects AS o ON ps.object_id = o.object_id + WHERE o.type = 'U' + GROUP BY schema_name(o.schema_id), o.name + """ diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py new file mode 100644 index 0000000000..57d5841b1a --- /dev/null +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py @@ -0,0 +1,108 @@ +import json +import sys + + +from databricks.labs.lakebridge.connections.credential_manager import create_credential_manager +from databricks.labs.lakebridge.assessments import PRODUCT_NAME +from databricks.labs.lakebridge.resources.assessments.mssql.common.connector import get_sqlserver_reader +from databricks.labs.lakebridge.resources.assessments.mssql.common.queries import MSSQLQueries +from databricks.labs.lakebridge.resources.assessments.synapse.common.duckdb_helpers import save_resultset_to_db +from databricks.labs.lakebridge.resources.assessments.synapse.common.functions import arguments_loader, set_logger + + +def execute(): + logger = set_logger(__file__) + + db_path, creds_file = arguments_loader(desc="MSSQL Server Info Extract Script") + cred_manager = create_credential_manager(PRODUCT_NAME, creds_file) + mssql_settings = cred_manager.get_credentials("mssql") + auth_type = mssql_settings.get("auth_type", "sql_authentication") + server_name = mssql_settings.get("server", "") + + try: + # TODO: get the last time the profiler was executed + # For now, we'll default to None, but this will eventually need + # input from a scheduler component. + mode = "overwrite" + + # Extract info metrics + logger.info(f"Extracting info metrics for: {server_name}") + print(f"Extracting info metrics for: {server_name}") + connection = get_sqlserver_reader( + mssql_settings, db_name="master", server_name=server_name, auth_type=auth_type + ) + + # System info + table_name = "sys_info" + table_query = MSSQLQueries.get_sys_info() + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + print(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # Databases + table_name = "databases" + table_query = MSSQLQueries.get_databases() + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # Tables + table_name = "tables" + table_query = MSSQLQueries.get_tables() + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # Views + table_name = "views" + table_query = MSSQLQueries.get_views() + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # Columns + table_name = "columns" + table_query = MSSQLQueries.get_columns() + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # Indexed views + table_name = "indexed_views" + table_query = MSSQLQueries.get_indexed_views() + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # Routines + table_name = "routines" + table_query = MSSQLQueries.get_routines() + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # Database sizes + table_name = "db_sizes" + table_query = MSSQLQueries.get_db_sizes() + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # Table sizes + table_name = "table_sizes" + table_query = MSSQLQueries.get_table_sizes() + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + print(json.dumps({"status": "success", "message": "All data loaded successfully loaded successfully"})) + + except Exception as e: + logger.error(f"Failed to execute info extract for SQL server: {str(e)}") + print(json.dumps({"status": "error", "message": str(e)}), file=sys.stderr) + sys.exit(1) + + +if __name__ == '__main__': + execute() diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml b/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml new file mode 100644 index 0000000000..4d69689729 --- /dev/null +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml @@ -0,0 +1,38 @@ +name: mssql_assessment +version: "1.0" +extract_folder: "/tmp/data/mssql_assessment" +steps: + - name: activity_extract + type: python + extract_source: src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py + mode: overwrite + frequency: once + flag: active + dependencies: + - pandas + - duckdb + - databricks-sdk + - databricks-labs-blueprint[yaml]>=0.10.1 + - sqlalchemy + - azure-synapse-artifacts + - azure-mgmt-sql + - azure-identity + - azure-monitor-query==1.3.0b1 + - pyodbc + - name: info_extract + type: python + extract_source: src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py + mode: overwrite + frequency: once + flag: active + dependencies: + - pandas + - duckdb + - databricks-sdk + - databricks-labs-blueprint[yaml]>=0.10.1 + - sqlalchemy + - azure-synapse-artifacts + - azure-mgmt-sql + - azure-identity + - azure-monitor-query==1.3.0b1 + - pyodbc diff --git a/src/databricks/labs/lakebridge/resources/assessments/synapse/common/duckdb_helpers.py b/src/databricks/labs/lakebridge/resources/assessments/synapse/common/duckdb_helpers.py index 52448c2b11..372559ee35 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/synapse/common/duckdb_helpers.py +++ b/src/databricks/labs/lakebridge/resources/assessments/synapse/common/duckdb_helpers.py @@ -66,6 +66,21 @@ def save_resultset_to_db( "serverless_requests_history": "STATUS STRING, TRANSACTION_ID BIGINT, DISTRIBUTED_STATEMENT_ID STRING, QUERY_HASH STRING, LOGIN_NAME STRING, START_TIME STRING, ERROR_CODE INTEGER, REJECTED_ROWS_PATH STRING, END_TIME STRING, COMMAND STRING, QUERY_TEXT STRING, TOTAL_ELAPSED_TIME_MS BIGINT, DATA_PROCESSED_MB BIGINT, ERROR STRING", # Data processed info "serverless_data_processed": "DATA_PROCESSED_MB BIGINT, TYPE STRING, POOL_NAME STRING, EXTRACT_TS STRING", + # MSSQL activity extract + "mssql_query_stats": "DBID VARCHAR, CREATION_TIME TIMESTAMP, LAST_EXECUTION_TIME TIMESTAMP, EXECUTION_COUNT BIGINT, TOTAL_WORKER_TIME BIGINT, TOTAL_ELAPSED_TIME BIGINT, TOTAL_ROWS BIGINT, COMMAND VARCHAR, COMMAND_TYPE VARCHAR, EXTRACT_TS TIMESTAMP", + "mssql_proc_stats": "DATABASE_ID BIGINT, DB_NAME VARCHAR, OBJECT_ID BIGINT, OBJECT_NAME VARCHAR, TYPE VARCHAR, LAST_EXECUTION_TIME TIMESTAMP, EXECUTION_COUNT BIGINT, TOTAL_WORKER_TIME BIGINT, TOTAL_ELAPSED_TIME BIGINT, EXTRACT_TS TIMESTAMP", + "mssql_sessions": "SESSION_ID BIGINT, LOGIN_TIME TIMESTAMP, PROGRAM_NAME VARCHAR, CLIENT_INTERFACE_NAME VARCHAR, LOGIN_NAME VARCHAR, STATUS VARCHAR, CPU_TIME BIGINT, MEMORY_USAGE BIGINT, TOTAL_SCHEDULED_TIME BIGINT, TOTAL_ELAPSED_TIME BIGINT, LAST_REQUEST_START_TIME TIMESTAMP, LAST_REQUEST_END_TIME TIMESTAMP, IS_USER_PROCESS BOOLEAN, ROW_COUNT BIGINT, DATABASE_ID BIGINT, DB_NAME VARCHAR, EXTRACT_TS TIMESTAMP", + "mssql_cpu_utilization": "RECORD_ID BIGINT, EVENTTIME TIMESTAMP, SYSTEMIDLE BIGINT, SQLPROCESSUTILIZATION BIGINT, EXTRACT_TS TIMESTAMP", + # MSSQL info extract + "mssql_sys_info": "CPU_TICKS BIGINT, MS_TICKS BIGINT, CPU_COUNT BIGINT, HYPERTHREAD_RATIO BIGINT, PHYSICAL_MEMORY_KB BIGINT, VIRTUAL_MEMORY_KB BIGINT, COMMITTED_KB BIGINT, COMMITTED_TARGET_KB BIGINT, VISIBLE_TARGET_KB BIGINT, STACK_SIZE_IN_BYTES BIGINT, OS_QUANTUM BIGINT, OS_ERROR_MODE BIGINT, OS_PRIORITY_CLASS BIGINT, MAX_WORKERS_COUNT BIGINT, SCHEDULER_COUNT BIGINT, SCHEDULER_TOTAL_COUNT BIGINT, DEADLOCK_MONITOR_SERIAL_NUMBER BIGINT, SQLSERVER_START_TIME_MS_TICKS BIGINT, SQLSERVER_START_TIME TIMESTAMP, AFFINITY_TYPE BIGINT, AFFINITY_TYPE_DESC VARCHAR, PROCESS_KERNEL_TIME_MS BIGINT, PROCESS_USER_TIME_MS BIGINT, TIME_SOURCE BIGINT, TIME_SOURCE_DESC VARCHAR, VIRTUAL_MACHINE_TYPE BIGINT, VIRTUAL_MACHINE_TYPE_DESC VARCHAR, SOFTNUMA_CONFIGURATION BIGINT, SOFTNUMA_CONFIGURATION_DESC VARCHAR, PROCESS_PHYSICAL_AFFINITY VARCHAR, SQL_MEMORY_MODEL BIGINT, SQL_MEMORY_MODEL_DESC VARCHAR, SOCKET_COUNT BIGINT, CORES_PER_SOCKET BIGINT, NUMA_NODE_COUNT BIGINT, CONTAINER_TYPE BIGINT, CONTAINER_TYPE_DESC VARCHAR, EXTRACT_TS TIMESTAMP", + "mssql_databases": "DB_ID VARCHAR, NAME VARCHAR, COLLATION_NAME VARCHAR, CREATE_DATE TIMESTAMP, EXTRACT_TS TIMESTAMP", + "mssql_tables": "TABLE_CATALOG VARCHAR, TABLE_SCHEMA VARCHAR, TABLE_NAME VARCHAR, TABLE_TYPE VARCHAR", + "mssql_views": "TABLE_CATALOG VARCHAR, TABLE_SCHEMA VARCHAR, TABLE_NAME VARCHAR, CHECK_OPTION VARCHAR, IS_UPDATABLE VARCHAR, VIEW_DEFINITION VARCHAR", + "mssql_columns": "TABLE_CATALOG VARCHAR, TABLE_SCHEMA VARCHAR, TABLE_NAME VARCHAR, COLUMN_NAME VARCHAR, ORDINAL_POSITION BIGINT, COLUMN_DEFAULT VARCHAR, IS_NULLABLE VARCHAR, DATA_TYPE VARCHAR, CHARACTER_MAXIMUM_LENGTH DOUBLE, CHARACTER_OCTET_LENGTH DOUBLE, NUMERIC_PRECISION DOUBLE, NUMERIC_PRECISION_RADIX DOUBLE, NUMERIC_SCALE DOUBLE, DATETIME_PRECISION DOUBLE, CHARACTER_SET_CATALOG VARCHAR, CHARACTER_SET_SCHEMA VARCHAR, CHARACTER_SET_NAME VARCHAR, COLLATION_CATALOG VARCHAR, COLLATION_SCHEMA VARCHAR, COLLATION_NAME VARCHAR, DOMAIN_CATALOG VARCHAR, DOMAIN_SCHEMA VARCHAR, DOMAIN_NAME VARCHAR", + "mssql_indexed_views": "INDEXED_VIEW_NAME VARCHAR, SCHEMA_NAME VARCHAR, INDEX_NAME VARCHAR, INDEX_TYPE VARCHAR, INDEX_ID VARCHAR, EXTRACT_TS VARCHAR", + "mssql_routines": "CREATED TIMESTAMP, DATA_TYPE VARCHAR, IS_DETERMINISTIC VARCHAR, IS_IMPLICITLY_INVOCABLE VARCHAR, IS_NULL_CALL VARCHAR, IS_USER_DEFINED_CAST VARCHAR, LAST_ALTERED TIMESTAMP, MAX_DYNAMIC_RESULT_SETS BIGINT, NUMERIC_PRECISION DOUBLE, NUMERIC_PRECISION_RADIX DOUBLE, NUMERIC_SCALE DOUBLE, ROUTINE_BODY VARCHAR, ROUTINE_CATALOG VARCHAR, ROUTINE_DEFINITION VARCHAR, ROUTINE_NAME VARCHAR, ROUTINE_SCHEMA VARCHAR, ROUTINE_TYPE VARCHAR, SCHEMA_LEVEL_ROUTINE VARCHAR, SPECIFIC_CATALOG VARCHAR, SPECIFIC_NAME VARCHAR, SPECIFIC_SCHEMA VARCHAR, SQL_DATA_ACCESS VARCHAR", + "mssql_db_sizes": "DATABASE_NAME VARCHAR, FILENAME VARCHAR, TYPE_DESC VARCHAR, CURRENTSIZEMB VARCHAR, FREESPACEINMB VARCHAR, MAXSIZE BIGINT, EXTRACT_TS TIMESTAMP", + "mssql_table_sizes": "TABLENAME VARCHAR, ROWCOUNT BIGINT, RESERVEDMB BIGINT, USEDMB BIGINT, UNUSEDMB BIGINT, DATAMB BIGINT, INDEXMB BIGINT, EXTRACT_TS TIMESTAMP", } try: columns = list(result.columns)