diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index 02dc0b0160..520800cae7 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -45,7 +45,7 @@ jobs: codecov_yml_path: codecov.yml token: ${{ secrets.CODECOV_TOKEN }} - integration-python: + test-reconcile: runs-on: group: databrickslabs-protected-runner-group labels: linux-ubuntu-latest @@ -68,8 +68,8 @@ jobs: chmod +x $GITHUB_WORKSPACE/.github/scripts/setup_spark_remote.sh $GITHUB_WORKSPACE/.github/scripts/setup_spark_remote.sh - - name: Run integration tests - run: hatch run integration + - name: Run reconcile tests + run: hatch run test-reconcile - name: Publish test coverage uses: codecov/codecov-action@v5 @@ -175,7 +175,7 @@ jobs: echo "No file produced in tests-reports/" exit 1 - end-to-end: + test-install: runs-on: ${{ matrix.runner }} env: # this is a temporary hack diff --git a/Makefile b/Makefile index f131f99ce7..5d2446e659 100644 --- a/Makefile +++ b/Makefile @@ -36,8 +36,8 @@ test: test-install: hatch run test-install -integration: setup_spark_remote - hatch run integration +test-reconcile: setup_spark_remote + hatch run test-reconcile coverage: hatch run coverage && open htmlcov/index.html diff --git a/pyproject.toml b/pyproject.toml index a484596ec1..66898f6526 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,18 +97,18 @@ reconcile = "databricks.labs.lakebridge.reconcile.execute:main" profiler_dashboards = "databricks.labs.lakebridge.assessments.dashboards.execute:main" [tool.hatch.envs.default.scripts] -test = "pytest --cov src --cov-report=xml tests/unit" -test-install = "pytest --cov src --cov-report=xml tests/integration/install" -coverage = "pytest --cov src tests --cov-report=html --ignore=tests/integration/install --ignore=tests/integration/connections --ignore=tests/integration/assessments" -integration = "pytest --cov src tests/integration/reconcile --durations 20" -fmt = ["black .", - "ruff check . --fix", - "mypy --disable-error-code 'annotation-unchecked' .", - "pylint --output-format=colorized -j 0 src tests"] -verify = ["black --check .", - "ruff check .", - "mypy --disable-error-code 'annotation-unchecked' .", - "pylint --output-format=colorized -j 0 src tests"] +test = "pytest --cov src --cov-report=xml tests/unit" +test-install = "pytest --cov src --cov-report=xml tests/integration/install" +test-reconcile = "pytest --cov src tests/integration/reconcile --durations 20" +coverage = "pytest --cov src tests --cov-report=html --ignore=tests/integration/install --ignore=tests/integration/connections --ignore=tests/integration/assessments" +fmt = ["black .", + "ruff check . --fix", + "mypy --disable-error-code 'annotation-unchecked' .", + "pylint --output-format=colorized -j 0 src tests"] +verify = ["black --check .", + "ruff check .", + "mypy --disable-error-code 'annotation-unchecked' .", + "pylint --output-format=colorized -j 0 src tests"] [tool.hatch.envs.sqlglot-latest] python="3.10" diff --git a/src/databricks/labs/lakebridge/cli.py b/src/databricks/labs/lakebridge/cli.py index a2d198f47f..c1b8edbbcf 100644 --- a/src/databricks/labs/lakebridge/cli.py +++ b/src/databricks/labs/lakebridge/cli.py @@ -7,7 +7,8 @@ import re import sys import time -from collections.abc import Mapping +import webbrowser +from collections.abc import Mapping, Callable from pathlib import Path from typing import NoReturn, TextIO @@ -642,9 +643,11 @@ def _override_workspace_client_config(ctx: ApplicationContext, overrides: dict[s @lakebridge.command -def reconcile(*, w: WorkspaceClient) -> None: +def reconcile( + *, w: WorkspaceClient, application_ctx_factory: Callable[[WorkspaceClient], ApplicationContext] = ApplicationContext +) -> None: """[EXPERIMENTAL] Reconciles source to Databricks datasets""" - ctx = ApplicationContext(w) + ctx = application_ctx_factory(w) ctx.add_user_agent_extra("cmd", "execute-reconcile") user = ctx.current_user logger.debug(f"User: {user}") @@ -652,15 +655,19 @@ def reconcile(*, w: WorkspaceClient) -> None: ctx.workspace_client, ctx.installation, ctx.install_state, - ctx.prompts, ) - recon_runner.run(operation_name=RECONCILE_OPERATION_NAME) + + _, job_run_url = recon_runner.run(operation_name=RECONCILE_OPERATION_NAME) + if ctx.prompts.confirm(f"Would you like to open the job run URL `{job_run_url}` in the browser?"): + webbrowser.open(job_run_url) @lakebridge.command -def aggregates_reconcile(*, w: WorkspaceClient) -> None: +def aggregates_reconcile( + *, w: WorkspaceClient, application_ctx_factory: Callable[[WorkspaceClient], ApplicationContext] = ApplicationContext +) -> None: """[EXPERIMENTAL] Reconciles Aggregated source to Databricks datasets""" - ctx = ApplicationContext(w) + ctx = application_ctx_factory(w) ctx.add_user_agent_extra("cmd", "execute-aggregates-reconcile") user = ctx.current_user logger.debug(f"User: {user}") @@ -668,10 +675,11 @@ def aggregates_reconcile(*, w: WorkspaceClient) -> None: ctx.workspace_client, ctx.installation, ctx.install_state, - ctx.prompts, ) - recon_runner.run(operation_name=AGG_RECONCILE_OPERATION_NAME) + _, job_run_url = recon_runner.run(operation_name=AGG_RECONCILE_OPERATION_NAME) + if ctx.prompts.confirm(f"Would you like to open the job run URL `{job_run_url}` in the browser?"): + webbrowser.open(job_run_url) @lakebridge.command diff --git a/src/databricks/labs/lakebridge/config.py b/src/databricks/labs/lakebridge/config.py index a27e7b13d8..a552bab1ab 100644 --- a/src/databricks/labs/lakebridge/config.py +++ b/src/databricks/labs/lakebridge/config.py @@ -259,6 +259,12 @@ class ReconcileMetadataConfig: volume: str = "reconcile_volume" +@dataclass +class DeployReconcileConfig: + existing_cluster_id: str + tags: dict[str, str] + + @dataclass class ReconcileConfig: __file__ = "reconcile.yml" @@ -271,6 +277,7 @@ class ReconcileConfig: metadata_config: ReconcileMetadataConfig job_id: str | None = None tables: ReconcileTablesConfig | None = None + deployment_overrides: DeployReconcileConfig | None = None @dataclass diff --git a/src/databricks/labs/lakebridge/deployment/job.py b/src/databricks/labs/lakebridge/deployment/job.py index e20c4b7d1f..8c4e8339ab 100644 --- a/src/databricks/labs/lakebridge/deployment/job.py +++ b/src/databricks/labs/lakebridge/deployment/job.py @@ -1,5 +1,4 @@ import logging -from datetime import datetime, timezone, timedelta from typing import Any from databricks.labs.blueprint.installation import Installation @@ -20,10 +19,11 @@ logger = logging.getLogger(__name__) -_TEST_JOBS_PURGE_TIMEOUT = timedelta(hours=1, minutes=15) - class JobDeployment: + + DEFAULT_CLUSTER_NAME = "Remorph_Reconciliation_Cluster" + def __init__( self, ws: WorkspaceClient, @@ -73,30 +73,17 @@ def _recon_job_settings( recon_config: ReconcileConfig, lakebridge_wheel_path: str, ) -> dict[str, Any]: - latest_lts_spark = self._ws.clusters.select_spark_version(latest=True, long_term_support=True) version = self._product_info.version() version = version if not self._ws.config.is_gcp else version.replace("+", "-") tags = {"version": f"v{version}"} - if self._is_testing(): - # Add RemoveAfter tag for test job cleanup - date_to_remove = self._get_test_purge_time() - tags.update({"RemoveAfter": date_to_remove}) + if recon_config.deployment_overrides: + logger.debug(f"Applying deployment overrides: {recon_config.deployment_overrides}") + tags.update(recon_config.deployment_overrides.tags) return { "name": self._name_with_prefix(job_name), "tags": tags, - "job_clusters": [ - JobCluster( - job_cluster_key="Remorph_Reconciliation_Cluster", - new_cluster=compute.ClusterSpec( - data_security_mode=compute.DataSecurityMode.USER_ISOLATION, - spark_conf={}, - node_type_id=self._get_default_node_type_id(), - autoscale=compute.AutoScale(min_workers=2, max_workers=10), - spark_version=latest_lts_spark, - ), - ) - ], + "job_clusters": [] if recon_config.deployment_overrides else [self._default_job_cluster()], "tasks": [ self._job_recon_task( task_key, @@ -125,10 +112,13 @@ def _job_recon_task( ), ) - return Task( + task = Task( task_key=task_key, description=description, - job_cluster_key="Remorph_Reconciliation_Cluster", + job_cluster_key=None if recon_config.deployment_overrides else self.DEFAULT_CLUSTER_NAME, + existing_cluster_id=( + recon_config.deployment_overrides.existing_cluster_id if recon_config.deployment_overrides else None + ), libraries=libraries, python_wheel_task=PythonWheelTask( package_name=self.parse_package_name(lakebridge_wheel_path), @@ -136,14 +126,21 @@ def _job_recon_task( parameters=["{{job.parameters.[operation_name]}}"], ), ) + logger.debug(f"Reconciliation job task cluster: existing: {task.existing_cluster_id} or name: {task.job_cluster_key}") + return task - # TODO: DRY: delete as it is already implemented in install.py - def _is_testing(self): - return self._product_info.product_name() != "lakebridge" - - @staticmethod - def _get_test_purge_time() -> str: - return (datetime.now(timezone.utc) + _TEST_JOBS_PURGE_TIMEOUT).strftime("%Y%m%d%H") + def _default_job_cluster(self) -> JobCluster: + latest_lts_spark = self._ws.clusters.select_spark_version(latest=True, long_term_support=True) + return JobCluster( + job_cluster_key=self.DEFAULT_CLUSTER_NAME, + new_cluster=compute.ClusterSpec( + data_security_mode=compute.DataSecurityMode.USER_ISOLATION, + spark_conf={}, + node_type_id=self._get_default_node_type_id(), + autoscale=compute.AutoScale(min_workers=2, max_workers=10), + spark_version=latest_lts_spark, + ), + ) def _get_default_node_type_id(self) -> str: return self._ws.clusters.select_node_type(local_disk=True, min_memory_gb=16) @@ -230,10 +227,6 @@ def _profiler_ingestion_job_settings( version = self._product_info.version() version = version if not self._ws.config.is_gcp else version.replace("+", "-") tags = {"version": f"v{version}"} - if self._is_testing(): - # Add RemoveAfter tag for test job cleanup - date_to_remove = self._get_test_purge_time() - tags.update({"RemoveAfter": date_to_remove}) return { "name": self._name_with_prefix(job_name), diff --git a/src/databricks/labs/lakebridge/reconcile/runner.py b/src/databricks/labs/lakebridge/reconcile/runner.py index 20919ec526..4279e14bca 100644 --- a/src/databricks/labs/lakebridge/reconcile/runner.py +++ b/src/databricks/labs/lakebridge/reconcile/runner.py @@ -1,12 +1,12 @@ import logging -import webbrowser from databricks.labs.blueprint.installation import Installation from databricks.labs.blueprint.installation import SerdeError from databricks.labs.blueprint.installer import InstallState -from databricks.labs.blueprint.tui import Prompts from databricks.sdk import WorkspaceClient from databricks.sdk.errors import NotFound, PermissionDenied +from databricks.sdk.service._internal import Wait +from databricks.sdk.service.jobs import Run from databricks.labs.lakebridge.config import ReconcileConfig, TableRecon from databricks.labs.lakebridge.deployment.recon import RECON_JOB_NAME @@ -23,14 +23,12 @@ def __init__( ws: WorkspaceClient, installation: Installation, install_state: InstallState, - prompts: Prompts, ): self._ws = ws self._installation = installation self._install_state = install_state - self._prompts = prompts - def run(self, operation_name=RECONCILE_OPERATION_NAME): + def run(self, operation_name: str = RECONCILE_OPERATION_NAME) -> tuple[Wait[Run], str]: reconcile_config = self._get_verified_recon_config() job_id = self._get_recon_job_id(reconcile_config) logger.info(f"Triggering the reconcile job with job_id: `{job_id}`") @@ -42,8 +40,7 @@ def run(self, operation_name=RECONCILE_OPERATION_NAME): logger.info( f"'{operation_name.upper()}' job started. Please check the job_url `{job_run_url}` for the current status." ) - if self._prompts.confirm(f"Would you like to open the job run URL `{job_run_url}` in the browser?"): - webbrowser.open(job_run_url) + return wait, job_run_url def _get_verified_recon_config(self) -> ReconcileConfig: try: diff --git a/tests/integration/deployment/__init__.py b/tests/integration/deployment/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/integration/deployment/test_job.py b/tests/integration/deployment/test_job.py new file mode 100644 index 0000000000..71d37cc60a --- /dev/null +++ b/tests/integration/deployment/test_job.py @@ -0,0 +1,68 @@ +from unittest.mock import create_autospec + +import pytest +from databricks.labs.blueprint.installation import MockInstallation +from databricks.labs.blueprint.installer import InstallState +from databricks.labs.blueprint.wheels import ProductInfo +from databricks.sdk import WorkspaceClient +from databricks.sdk.errors import InvalidParameterValue +from databricks.sdk.service.jobs import Job + +from databricks.labs.lakebridge.config import ( + LakebridgeConfiguration, + ReconcileConfig, + DatabaseConfig, + ReconcileMetadataConfig, +) +from databricks.labs.lakebridge.deployment.job import JobDeployment + + +@pytest.fixture +def snowflake_recon_config() -> ReconcileConfig: + return ReconcileConfig( + data_source="snowflake", + report_type="all", + secret_scope="remorph_snowflake9", + database_config=DatabaseConfig( + source_schema="tpch_sf10009", + target_catalog="tpch9", + target_schema="1000gb9", + source_catalog="snowflake_sample_data9", + ), + metadata_config=ReconcileMetadataConfig( + catalog="remorph9", + schema="reconcile9", + volume="reconcile_volume9", + ), + ) + + +def test_deploy_existing_job(snowflake_recon_config): + workspace_client = create_autospec(WorkspaceClient) + workspace_client.config.is_gcp = True + job_id = 1234 + job = Job(job_id=job_id) + name = "Recon Job" + installation = MockInstallation({"state.json": {"resources": {"jobs": {name: str(job_id)}}, "version": 1}}) + install_state = InstallState.from_installation(installation) + product_info = ProductInfo.for_testing(LakebridgeConfiguration) + job_deployer = JobDeployment(workspace_client, installation, install_state, product_info) + job_deployer.deploy_recon_job(name, snowflake_recon_config, "lakebridge-x.y.z-py3-none-any.whl") + workspace_client.jobs.reset.assert_called_once() + assert install_state.jobs[name] == str(job.job_id) + + +def test_deploy_missing_job(snowflake_recon_config): + workspace_client = create_autospec(WorkspaceClient) + job_id = 1234 + job = Job(job_id=job_id) + workspace_client.jobs.create.return_value = job + workspace_client.jobs.reset.side_effect = InvalidParameterValue("Job not found") + name = "Recon Job" + installation = MockInstallation({"state.json": {"resources": {"jobs": {name: "5678"}}, "version": 1}}) + install_state = InstallState.from_installation(installation) + product_info = ProductInfo.for_testing(LakebridgeConfiguration) + job_deployer = JobDeployment(workspace_client, installation, install_state, product_info) + job_deployer.deploy_recon_job(name, snowflake_recon_config, "lakebridge-x.y.z-py3-none-any.whl") + workspace_client.jobs.create.assert_called_once() + assert install_state.jobs[name] == str(job.job_id) diff --git a/tests/integration/reconcile_system_tests/__init__.py b/tests/integration/reconcile_system_tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/integration/reconcile/test_oracle_reconcile.py b/tests/integration/reconcile_system_tests/test_oracle_reconcile.py similarity index 97% rename from tests/integration/reconcile/test_oracle_reconcile.py rename to tests/integration/reconcile_system_tests/test_oracle_reconcile.py index 841b130168..47de4dd201 100644 --- a/tests/integration/reconcile/test_oracle_reconcile.py +++ b/tests/integration/reconcile_system_tests/test_oracle_reconcile.py @@ -12,7 +12,7 @@ from databricks.labs.lakebridge.reconcile.schema_compare import SchemaCompare from databricks.labs.lakebridge.reconcile.trigger_recon_service import TriggerReconService from databricks.labs.lakebridge.transpiler.sqlglot.dialect_utils import get_dialect -from tests.integration.reconcile.connectors.test_read_schema import OracleDataSourceUnderTest +from tests.integration.reconcile_system_tests.test_read_schema import OracleDataSourceUnderTest class DatabricksDataSourceUnderTest(DatabricksDataSource): diff --git a/tests/integration/reconcile/connectors/test_read_schema.py b/tests/integration/reconcile_system_tests/test_read_schema.py similarity index 95% rename from tests/integration/reconcile/connectors/test_read_schema.py rename to tests/integration/reconcile_system_tests/test_read_schema.py index b1d2752c7f..6233b0137a 100644 --- a/tests/integration/reconcile/connectors/test_read_schema.py +++ b/tests/integration/reconcile_system_tests/test_read_schema.py @@ -75,7 +75,6 @@ def _get_snowflake_options(self): return opts -@pytest.mark.skip(reason="Add the creds to Github secrets and populate the actions' env to enable this test") def test_sql_server_read_schema_happy(mock_spark): mock_ws = create_autospec(WorkspaceClient) connector = TSQLServerDataSourceUnderTest(mock_spark, mock_ws) @@ -96,6 +95,13 @@ def test_databricks_read_schema_happy(mock_spark): assert columns +def test_databricks_read_schema_happy_sandbox(spark, ws): + connector = DatabricksDataSource(get_dialect("databricks"), spark, ws, "my_secret") + + columns = connector.get_schema("main", "lakebridge", "diamonds") + assert columns + + # FIXME # 1. Deploy Oracle Free # 2. Add credentials to the test env getter diff --git a/tests/integration/reconcile_system_tests/test_recon_databricks.py b/tests/integration/reconcile_system_tests/test_recon_databricks.py new file mode 100644 index 0000000000..5a88a5fc8b --- /dev/null +++ b/tests/integration/reconcile_system_tests/test_recon_databricks.py @@ -0,0 +1,84 @@ +from datetime import datetime, timezone, timedelta + +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.jobs import Run, TerminationTypeType + +from databricks.labs.lakebridge.config import ( + ReconcileConfig, + DatabaseConfig, + ReconcileMetadataConfig, + LakebridgeConfiguration, + DeployReconcileConfig, +) +from databricks.labs.lakebridge.contexts.application import ApplicationContext +from databricks.labs.lakebridge.reconcile.recon_config import RECONCILE_OPERATION_NAME +from databricks.labs.lakebridge.reconcile.runner import ReconcileRunner +from databricks.labs.blueprint.wheels import ProductInfo + +from tests.integration.debug_envgetter import TestEnvGetter + +TEST_JOBS_PURGE_TIMEOUT = timedelta(hours=1, minutes=15) + + +def get_test_purge_time() -> str: + return (datetime.now(timezone.utc) + TEST_JOBS_PURGE_TIMEOUT).strftime("%Y%m%d%H") + + +TABLE_RECON_JSON = """ +{ + "source_schema": "test_source", + "target_catalog": "sandbox", + "target_schema": "test_target", + "tables": [ + { + "source_name": "diamonds", + "target_name": "diamonds", + "join_columns": ["color", "clarity"] + } + ] +} +""" + +test_env = TestEnvGetter(True) +cluster = test_env.get("TEST_DEFAULT_CLUSTER_ID") +date_to_remove = get_test_purge_time() +tags = {"RemoveAfter": date_to_remove} +deployment_overrides = DeployReconcileConfig(existing_cluster_id=cluster, tags=tags) + +recon_config = ReconcileConfig( + data_source="databricks", + report_type="all", + secret_scope="NOT_NEEDED", + database_config=DatabaseConfig( + source_catalog="sandbox", source_schema="test_source", target_catalog="sandbox", target_schema="test_target" + ), + metadata_config=ReconcileMetadataConfig(catalog="sandbox", schema="reconcile"), + deployment_overrides=deployment_overrides, +) +config = LakebridgeConfiguration(None, recon_config) +source_catalog_or_schema = ( + recon_config.database_config.source_catalog + if recon_config.database_config.source_catalog + else recon_config.database_config.source_schema +) +filename = f"recon_config_{recon_config.data_source}_{source_catalog_or_schema}_{recon_config.report_type}.json" + + +def test_recon_databricks_job_succeeds(ws: WorkspaceClient) -> None: + ctx = ApplicationContext(ws) + ctx.replace(product_info=ProductInfo.for_testing(LakebridgeConfiguration)) + ctx.installation.save(recon_config) + ctx.installation.upload(filename, TABLE_RECON_JSON.encode()) + ctx.workspace_installation.install(config) + + recon_runner = ReconcileRunner( + ctx.workspace_client, + ctx.installation, + ctx.install_state, + ) + run, _ = recon_runner.run(operation_name=RECONCILE_OPERATION_NAME) + result = run.result() + + assert result.status.termination_details + assert result.status.termination_details.type + assert result.status.termination_details.type.value == TerminationTypeType.SUCCESS.value diff --git a/tests/unit/deployment/test_job.py b/tests/unit/deployment/test_job.py index e5f263e0f2..dd82594b63 100644 --- a/tests/unit/deployment/test_job.py +++ b/tests/unit/deployment/test_job.py @@ -36,26 +36,6 @@ def oracle_recon_config() -> ReconcileConfig: ) -@pytest.fixture -def snowflake_recon_config() -> ReconcileConfig: - return ReconcileConfig( - data_source="snowflake", - report_type="all", - secret_scope="remorph_snowflake9", - database_config=DatabaseConfig( - source_schema="tpch_sf10009", - target_catalog="tpch9", - target_schema="1000gb9", - source_catalog="snowflake_sample_data9", - ), - metadata_config=ReconcileMetadataConfig( - catalog="remorph9", - schema="reconcile9", - volume="reconcile_volume9", - ), - ) - - def test_deploy_new_job(oracle_recon_config): workspace_client = create_autospec(WorkspaceClient) job = Job(job_id=1234) @@ -70,37 +50,6 @@ def test_deploy_new_job(oracle_recon_config): assert install_state.jobs[name] == str(job.job_id) -def test_deploy_existing_job(snowflake_recon_config): - workspace_client = create_autospec(WorkspaceClient) - workspace_client.config.is_gcp = True - job_id = 1234 - job = Job(job_id=job_id) - name = "Recon Job" - installation = MockInstallation({"state.json": {"resources": {"jobs": {name: str(job_id)}}, "version": 1}}) - install_state = InstallState.from_installation(installation) - product_info = ProductInfo.for_testing(LakebridgeConfiguration) - job_deployer = JobDeployment(workspace_client, installation, install_state, product_info) - job_deployer.deploy_recon_job(name, snowflake_recon_config, "lakebridge-x.y.z-py3-none-any.whl") - workspace_client.jobs.reset.assert_called_once() - assert install_state.jobs[name] == str(job.job_id) - - -def test_deploy_missing_job(snowflake_recon_config): - workspace_client = create_autospec(WorkspaceClient) - job_id = 1234 - job = Job(job_id=job_id) - workspace_client.jobs.create.return_value = job - workspace_client.jobs.reset.side_effect = InvalidParameterValue("Job not found") - name = "Recon Job" - installation = MockInstallation({"state.json": {"resources": {"jobs": {name: "5678"}}, "version": 1}}) - install_state = InstallState.from_installation(installation) - product_info = ProductInfo.for_testing(LakebridgeConfiguration) - job_deployer = JobDeployment(workspace_client, installation, install_state, product_info) - job_deployer.deploy_recon_job(name, snowflake_recon_config, "lakebridge-x.y.z-py3-none-any.whl") - workspace_client.jobs.create.assert_called_once() - assert install_state.jobs[name] == str(job.job_id) - - def test_parse_package_name() -> None: workspace_client = create_autospec(WorkspaceClient) installation = MockInstallation(is_global=False) diff --git a/tests/unit/reconcile/test_runner.py b/tests/unit/reconcile/test_runner.py index 46a64a0df0..d025fb4b9e 100644 --- a/tests/unit/reconcile/test_runner.py +++ b/tests/unit/reconcile/test_runner.py @@ -2,7 +2,6 @@ import pytest from databricks.labs.blueprint.installation import MockInstallation from databricks.labs.blueprint.installer import InstallState -from databricks.labs.blueprint.tui import MockPrompts from databricks.sdk import WorkspaceClient from databricks.labs.lakebridge.reconcile.runner import ReconcileRunner from databricks.labs.lakebridge.deployment.recon import RECON_JOB_NAME @@ -12,15 +11,15 @@ def test_run_with_missing_recon_config(): ws = create_autospec(WorkspaceClient) installation = MockInstallation() install_state = InstallState.from_installation(installation) - prompts = MockPrompts({}) - recon_runner = ReconcileRunner(ws, installation, install_state, prompts) + + recon_runner = ReconcileRunner(ws, installation, install_state) with pytest.raises(SystemExit): recon_runner.run() def test_run_with_corrupt_recon_config(): ws = create_autospec(WorkspaceClient) - prompts = MockPrompts({}) + installation = MockInstallation( { "reconcile.yml": { @@ -42,7 +41,7 @@ def test_run_with_corrupt_recon_config(): } ) install_state = InstallState.from_installation(installation) - recon_runner = ReconcileRunner(ws, installation, install_state, prompts) + recon_runner = ReconcileRunner(ws, installation, install_state) with pytest.raises(SystemExit): recon_runner.run() @@ -76,8 +75,8 @@ def test_run_with_missing_table_config(): } ) install_state = InstallState.from_installation(installation) - prompts = MockPrompts({}) - recon_runner = ReconcileRunner(ws, installation, install_state, prompts) + + recon_runner = ReconcileRunner(ws, installation, install_state) with pytest.raises(SystemExit): recon_runner.run() @@ -129,8 +128,8 @@ def test_run_with_corrupt_table_config(): } ) install_state = InstallState.from_installation(installation) - prompts = MockPrompts({}) - recon_runner = ReconcileRunner(ws, installation, install_state, prompts) + + recon_runner = ReconcileRunner(ws, installation, install_state) with pytest.raises(SystemExit): recon_runner.run() @@ -181,19 +180,14 @@ def test_run_with_missing_job_id(): } ) install_state = InstallState.from_installation(installation) - prompts = MockPrompts({}) - recon_runner = ReconcileRunner(ws, installation, install_state, prompts) + + recon_runner = ReconcileRunner(ws, installation, install_state) with pytest.raises(SystemExit): recon_runner.run() def test_run_with_job_id_in_config(): ws = create_autospec(WorkspaceClient) - prompts = MockPrompts( - { - r"Would you like to open the job run URL .*": "no", - } - ) installation = MockInstallation( { "reconcile.yml": { @@ -243,7 +237,7 @@ def test_run_with_job_id_in_config(): wait.run_id = "rid" ws.jobs.run_now.return_value = wait - recon_runner = ReconcileRunner(ws, installation, install_state, prompts) + recon_runner = ReconcileRunner(ws, installation, install_state) recon_runner.run() ws.jobs.run_now.assert_called_once_with(1234, job_parameters={'operation_name': 'reconcile'}) @@ -251,11 +245,6 @@ def test_run_with_job_id_in_config(): def test_run_with_job_id_in_state(monkeypatch): monkeypatch.setattr("webbrowser.open", lambda url: None) ws = create_autospec(WorkspaceClient) - prompts = MockPrompts( - { - r"Would you like to open the job run URL .*": "yes", - } - ) installation = MockInstallation( { "state.json": { @@ -308,7 +297,7 @@ def test_run_with_job_id_in_state(monkeypatch): wait.run_id = "rid" ws.jobs.run_now.return_value = wait - recon_runner = ReconcileRunner(ws, installation, install_state, prompts) + recon_runner = ReconcileRunner(ws, installation, install_state) recon_runner.run() ws.jobs.run_now.assert_called_once_with(1234, job_parameters={'operation_name': 'reconcile'}) @@ -363,12 +352,12 @@ def test_run_with_failed_execution(): } ) install_state = InstallState.from_installation(installation) - prompts = MockPrompts({}) + wait = Mock() wait.run_id = None ws.jobs.run_now.return_value = wait - recon_runner = ReconcileRunner(ws, installation, install_state, prompts) + recon_runner = ReconcileRunner(ws, installation, install_state) with pytest.raises(SystemExit): recon_runner.run() ws.jobs.run_now.assert_called_once_with(1234, job_parameters={'operation_name': 'reconcile'}) @@ -377,11 +366,6 @@ def test_run_with_failed_execution(): def test_aggregates_reconcile_run_with_job_id_in_state(monkeypatch): monkeypatch.setattr("webbrowser.open", lambda url: None) ws = create_autospec(WorkspaceClient) - prompts = MockPrompts( - { - r"Would you like to open the job run URL .*": "yes", - } - ) state = { "resources": {"jobs": {RECON_JOB_NAME: "1234"}}, "version": 1, @@ -449,6 +433,6 @@ def test_aggregates_reconcile_run_with_job_id_in_state(monkeypatch): wait.run_id = "rid" ws.jobs.run_now.return_value = wait - recon_runner = ReconcileRunner(ws, installation, install_state, prompts) + recon_runner = ReconcileRunner(ws, installation, install_state) recon_runner.run(operation_name="aggregates-reconcile") ws.jobs.run_now.assert_called_once_with(1234, job_parameters={'operation_name': 'aggregates-reconcile'}) diff --git a/tests/unit/test_cli_other.py b/tests/unit/test_cli_other.py index 5d184bffc0..eb916e3177 100644 --- a/tests/unit/test_cli_other.py +++ b/tests/unit/test_cli_other.py @@ -1,5 +1,5 @@ import io -from unittest.mock import Mock, patch +from unittest.mock import Mock, patch, MagicMock import pytest @@ -66,14 +66,25 @@ def test_cli_configure_secrets_config(mock_workspace_client): mock_recon_config.assert_called_once_with(mock_workspace_client) +def app_factory(_): + ctx_mock = MagicMock() + prompts = MockPrompts( + { + r"Would you like to open the job run URL .*": "no", + } + ) + ctx_mock.prompts = prompts + return ctx_mock + + def test_cli_reconcile(mock_workspace_client): - with patch("databricks.labs.lakebridge.reconcile.runner.ReconcileRunner.run", return_value=True): - cli.reconcile(w=mock_workspace_client) + with patch("databricks.labs.lakebridge.reconcile.runner.ReconcileRunner.run", return_value=(MagicMock(), True)): + cli.reconcile(w=mock_workspace_client, application_ctx_factory=app_factory) def test_cli_aggregates_reconcile(mock_workspace_client): - with patch("databricks.labs.lakebridge.reconcile.runner.ReconcileRunner.run", return_value=True): - cli.aggregates_reconcile(w=mock_workspace_client) + with patch("databricks.labs.lakebridge.reconcile.runner.ReconcileRunner.run", return_value=(MagicMock(), True)): + cli.aggregates_reconcile(w=mock_workspace_client, application_ctx_factory=app_factory) def test_prompts_question():