diff --git a/poetry.lock b/poetry.lock index 8789e119a4..5a0162c560 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.2.0 and should not be changed by hand. [[package]] name = "allure-pytest" @@ -2703,4 +2703,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = "^3.12" -content-hash = "131f7b1456e6ba797c1a52cd94e0d27061f27b2d84ddbc1478c24358b51b08e2" +content-hash = "bef619044adc8a3882653cd8f000016a66b22b0f66d11d5140b7b66e42256055" diff --git a/pyproject.toml b/pyproject.toml index d5757060c0..54d07b4124 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,7 +71,7 @@ psycopg2-binary = "^2.9.10" boto3 = "*" tenacity = "^9.1.2" allure-pytest = "^2.15.0" -jubilant = "^1.3.0" +jubilant = "^1.4.0" [build-system] requires = ["poetry-core>=1.0.0"] diff --git a/src/relations/logical_replication.py b/src/relations/logical_replication.py index 24239442f6..0e79d41e2c 100644 --- a/src/relations/logical_replication.py +++ b/src/relations/logical_replication.py @@ -43,6 +43,10 @@ class PostgreSQLLogicalReplication(Object): """Defines the logical-replication logic.""" + def _identity(self) -> str: + """Return unique identity of this application in the model.""" + return f"{self.model.uuid}:{self.model.app.name}" + def __init__(self, charm: "PostgresqlOperatorCharm"): super().__init__(charm, "postgresql_logical_replication") self.charm = charm @@ -193,6 +197,8 @@ def _on_relation_joined(self, event: RelationJoinedEvent) -> None: event.relation.data[self.model.app]["subscription-request"] = ( self.charm.config.logical_replication_subscription_request or "" ) + # Share our identity with the publisher to prevent cyclic replication + event.relation.data[self.model.app]["requester-id"] = self._identity() def _on_relation_changed(self, event: RelationChangedEvent) -> None: if not self._relation_changed_checks(event): @@ -247,6 +253,8 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: self.charm.app_peer_data["logical-replication-subscriptions"] = json.dumps({ str(event.relation.id): subscriptions }) + # Rebuild subscribed upstream provenance after any changes + self._rebuild_subscribed_upstream() def _on_relation_departed(self, event: RelationDepartedEvent) -> None: if event.departing_unit == self.charm.unit and self.charm._peers is not None: @@ -274,9 +282,38 @@ def _on_relation_broken(self, event: RelationBrokenEvent) -> None: f"Dropped subscription {subscription} from database {database} due to relation break" ) self.charm.app_peer_data["logical-replication-subscriptions"] = "" + # Clear provenance as subscriptions are gone + self._rebuild_subscribed_upstream() # endregion + def _rebuild_subscribed_upstream(self) -> None: + """Aggregate upstream provenance for all subscribed tables. + + Stores mapping in app peer data as logical-replication-subscribed-upstream + with keys formatted as ":." and values equal to + the upstream identity ":". + """ + mapping: dict[str, str] = {} + for relation in self.model.relations.get(LOGICAL_REPLICATION_RELATION, ()): + pubs = json.loads(relation.data[relation.app].get("publications", "{}")) + for database, pub in pubs.items(): + upstream_by_table = pub.get("upstream", {}) if isinstance(pub, dict) else {} + publisher_id = pub.get("publisher-id", "") if isinstance(pub, dict) else "" + for schematable in pub.get("tables", []): + upstream = upstream_by_table.get(schematable) or publisher_id + if upstream: + mapping[f"{database}:{schematable}"] = upstream + self.charm.app_peer_data["logical-replication-subscribed-upstream"] = json.dumps(mapping) + + def _get_subscribed_upstream(self) -> dict[str, str]: + try: + return json.loads( + self.charm.app_peer_data.get("logical-replication-subscribed-upstream", "{}") + ) + except json.JSONDecodeError: + return {} + # region Events def _on_secret_changed(self, event: SecretChangedEvent) -> None: @@ -465,7 +502,7 @@ def _relation_changed_checks(self, event: RelationChangedEvent) -> bool: return False return True - def _process_offer(self, relation: Relation) -> None: + def _process_offer(self, relation: Relation) -> None: # noqa: C901 logger.debug( f"Started processing offer for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" ) @@ -473,10 +510,12 @@ def _process_offer(self, relation: Relation) -> None: subscriptions_request = json.loads( relation.data[relation.app].get("subscription-request", "{}") ) + requester_id = relation.data[relation.app].get("requester-id", "") publications = json.loads(relation.data[self.model.app].get("publications", "{}")) secret = self._get_secret(relation.id) user = secret.peek_content()["username"] errors = [] + subscribed_upstream = self._get_subscribed_upstream() for database, publication in publications.copy().items(): if database in subscriptions_request: @@ -494,6 +533,27 @@ def _process_offer(self, relation: Relation) -> None: ) for database, tables in subscriptions_request.items(): + # Cycle detection: if our local upstream for any requested table equals requester, reject + cycle_detected = False + upstream_map: dict[str, str] = {} + for schematable in tables: + key = f"{database}:{schematable}" + local_upstream = subscribed_upstream.get(key) or self._identity() + upstream_map[schematable] = local_upstream + if requester_id and requester_id == local_upstream: + cycle_detected = True + if cycle_detected: + error = ( + f"cyclic logical replication detected for database {database}: " + f"requested tables would replicate back to their upstream ({requester_id})" + ) + errors.append(error) + logger.error( + f"Cannot create/alter publication for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}: {error}" + ) + # Skip creating/altering this publication to avoid loop + continue + if database not in publications: if validation_error := self._validate_new_publication(database, tables): errors.append(validation_error) @@ -521,6 +581,8 @@ def _process_offer(self, relation: Relation) -> None: "publication-name": publication_name, "replication-slot-name": self._replication_slot_name(relation.id, database), "tables": tables, + "publisher-id": self._identity(), + "upstream": upstream_map, } elif sorted(publication_tables := publications[database]["tables"]) != sorted(tables): publication_name = publications[database]["publication-name"] @@ -551,6 +613,12 @@ def _process_offer(self, relation: Relation) -> None: ) self.charm.postgresql.alter_publication(database, publication_name, tables) publications[database]["tables"] = tables + publications[database]["publisher-id"] = self._identity() + publications[database]["upstream"] = upstream_map + else: + # Tables unchanged; still update provenance and publisher id to propagate upstream + publications[database]["publisher-id"] = self._identity() + publications[database]["upstream"] = upstream_map self._save_published_resources_info(str(relation.id), secret.id, publications) # type: ignore relation.data[self.model.app]["publications"] = json.dumps(publications) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 1fe2d64ee5..3373a948bb 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -5,8 +5,8 @@ import uuid import boto3 +import jubilant import pytest -from pytest_operator.plugin import OpsTest from . import architecture from .helpers import construct_endpoint @@ -17,6 +17,36 @@ logger = logging.getLogger(__name__) +@pytest.fixture(scope="module") +def juju(request: pytest.FixtureRequest): + """Pytest fixture that wraps :meth:`jubilant.with_model`. + + This adds command line parameter ``--keep-models`` (see help for details). + """ + controller = request.config.getoption("--controller") + model = request.config.getoption("--model") + controller_and_model = None + if controller and model: + controller_and_model = f"{controller}:{model}" + elif controller: + controller_and_model = controller + elif model: + controller_and_model = model + keep_models = bool(request.config.getoption("--keep-models")) + + if controller_and_model: + juju = jubilant.Juju(model=controller_and_model) # type: ignore + yield juju + log = juju.debug_log(limit=1000) + else: + with jubilant.temp_model(keep=keep_models) as juju: + yield juju + log = juju.debug_log(limit=1000) + + if request.session.testsfailed: + print(log, end="") + + @pytest.fixture(scope="session") def charm(): # Return str instead of pathlib.Path since python-libjuju's model.deploy(), juju deploy, and @@ -67,7 +97,7 @@ def cleanup_cloud(config: dict[str, str], credentials: dict[str, str]) -> None: @pytest.fixture(scope="module") -async def aws_cloud_configs(ops_test: OpsTest) -> None: +async def aws_cloud_configs(): if ( not os.environ.get("AWS_ACCESS_KEY", "").strip() or not os.environ.get("AWS_SECRET_KEY", "").strip() @@ -82,7 +112,7 @@ async def aws_cloud_configs(ops_test: OpsTest) -> None: @pytest.fixture(scope="module") -async def gcp_cloud_configs(ops_test: OpsTest) -> None: +async def gcp_cloud_configs(): if ( not os.environ.get("GCP_ACCESS_KEY", "").strip() or not os.environ.get("GCP_SECRET_KEY", "").strip() diff --git a/tests/integration/ha_tests/test_logical_replication_cycle.py b/tests/integration/ha_tests/test_logical_replication_cycle.py new file mode 100644 index 0000000000..9bd6303355 --- /dev/null +++ b/tests/integration/ha_tests/test_logical_replication_cycle.py @@ -0,0 +1,262 @@ +# Copyright 2025 Canonical Ltd. +# See LICENSE file for licensing details. + +import json +import logging +import subprocess + +import jubilant +import psycopg2 +import pytest +import yaml +from tenacity import Retrying, stop_after_delay, wait_fixed + +from ..helpers import METADATA + +DATABASE_APP_NAME = "postgresql" +SECOND_DATABASE_APP_NAME = "postgresql2" +THIRD_DATABASE_APP_NAME = "postgresql3" + +DATA_INTEGRATOR_APP_NAME = "data-integrator" +SECOND_DATA_INTEGRATOR_APP_NAME = "data-integrator2" +THIRD_DATA_INTEGRATOR_APP_NAME = "data-integrator3" +DATA_INTEGRATOR_RELATION = "postgresql" + +DATABASE_APP_CONFIG = {"profile": "testing"} + +TESTING_DATABASE = "testdb" +TIMEOUT = 2500 + +logger = logging.getLogger(__name__) + + +def _all_active(status: jubilant.Status, apps: list[str]) -> bool: + return all(jubilant.all_active(status, app) for app in apps) + + +def _model_name() -> str: + status_raw = subprocess.run(["juju", "status", "--format", "json"], capture_output=True).stdout + data = json.loads(status_raw or b"{}") + return data.get("model", {}).get("name") + + +def _build_connection_string(application_name: str, relation_name: str, database: str) -> str: + # Fetch relation data via juju show-unit + unit_name = f"{application_name}/0" + show_unit_raw = subprocess.run(["juju", "show-unit", unit_name], capture_output=True).stdout + if not show_unit_raw: + raise RuntimeError(f"Unable to retrieve unit info for {unit_name}") + data = yaml.safe_load(show_unit_raw) + + relation_infos = [ + r for r in data[unit_name]["relation-info"] if r["endpoint"] == relation_name + ] + if not relation_infos: + raise RuntimeError("No relation data found to build connection string") + + app_data = relation_infos[0]["application-data"] + # Handle both secret-user and plain username/password + if secret_uri := app_data.get("secret-user"): + secret_id = secret_uri.split("/")[-1] + show_secret_raw = subprocess.run( + ["juju", "show-secret", "--format", "json", "--reveal", secret_id], capture_output=True + ).stdout + secret = json.loads(show_secret_raw) + secret_data = secret[secret_id]["content"]["Data"] + username = secret_data["username"] + password = secret_data["password"] + else: + username = app_data["username"] + password = app_data["password"] + + endpoints = app_data.get("endpoints") or app_data.get("read-only-endpoints") + host = endpoints.split(",")[0].split(":")[0] + + # Translate service hostname to ClusterIP via kubectl + name = host.split(".")[0] + namespace = _model_name() + svc_json = subprocess.run( + ["kubectl", "-n", namespace, "get", "svc", name, "-o", "json"], capture_output=True + ).stdout + svc = json.loads(svc_json) + ip = svc["spec"]["clusterIP"] + + return f"dbname='{database}' user='{username}' host='{ip}' password='{password}' connect_timeout=10" + + +@pytest.mark.abort_on_fail +def test_cycle_detection_three_clusters(juju: jubilant.Juju, charm): + # Deploy three PostgreSQL clusters and three data-integrators (to create tables) + resources = { + "postgresql-image": METADATA["resources"]["postgresql-image"]["upstream-source"], + } + + if DATABASE_APP_NAME not in juju.status().apps: + juju.deploy( + charm, + app=DATABASE_APP_NAME, + num_units=1, + resources=resources, + trust=True, + config=DATABASE_APP_CONFIG.copy(), + ) + if SECOND_DATABASE_APP_NAME not in juju.status().apps: + juju.deploy( + charm, + app=SECOND_DATABASE_APP_NAME, + num_units=1, + resources=resources, + trust=True, + config=DATABASE_APP_CONFIG.copy(), + ) + if THIRD_DATABASE_APP_NAME not in juju.status().apps: + juju.deploy( + charm, + app=THIRD_DATABASE_APP_NAME, + num_units=1, + resources=resources, + trust=True, + config=DATABASE_APP_CONFIG.copy(), + ) + + for app_name in [ + DATA_INTEGRATOR_APP_NAME, + SECOND_DATA_INTEGRATOR_APP_NAME, + THIRD_DATA_INTEGRATOR_APP_NAME, + ]: + if app_name not in juju.status().apps: + juju.deploy( + DATA_INTEGRATOR_APP_NAME, + app=app_name, + num_units=1, + channel="latest/stable", + config={"database-name": TESTING_DATABASE}, + ) + + juju.wait( + lambda status: _all_active( + status, [DATABASE_APP_NAME, SECOND_DATABASE_APP_NAME, THIRD_DATABASE_APP_NAME] + ), + timeout=TIMEOUT, + ) + + # Integrate data-integrators for table creation + for provider, requirer in [ + (DATABASE_APP_NAME, DATA_INTEGRATOR_APP_NAME), + (SECOND_DATABASE_APP_NAME, SECOND_DATA_INTEGRATOR_APP_NAME), + (THIRD_DATABASE_APP_NAME, THIRD_DATA_INTEGRATOR_APP_NAME), + ]: + # avoid duplicate relations + existing = [ + relation + for relation in juju.status().apps.get(provider).relations.values() + if any(True for r in relation if r.related_app == requirer) + ] + if not existing: + juju.integrate(provider, requirer) + juju.wait( + lambda status: _all_active( + status, + [ + DATABASE_APP_NAME, + SECOND_DATABASE_APP_NAME, + THIRD_DATABASE_APP_NAME, + DATA_INTEGRATOR_APP_NAME, + SECOND_DATA_INTEGRATOR_APP_NAME, + THIRD_DATA_INTEGRATOR_APP_NAME, + ], + ), + timeout=600, + ) + + _create_test_table(DATA_INTEGRATOR_APP_NAME, TESTING_DATABASE, "public.test_cycle") + _create_test_table(SECOND_DATA_INTEGRATOR_APP_NAME, TESTING_DATABASE, "public.test_cycle") + _create_test_table(THIRD_DATA_INTEGRATOR_APP_NAME, TESTING_DATABASE, "public.test_cycle") + + print("A -> B subscription") + juju.integrate( + f"{DATABASE_APP_NAME}:logical-replication-offer", + f"{SECOND_DATABASE_APP_NAME}:logical-replication", + ) + juju.wait(lambda status: jubilant.all_active(status, SECOND_DATABASE_APP_NAME), timeout=600) + + pg2_config = DATABASE_APP_CONFIG.copy() + pg2_config["logical_replication_subscription_request"] = json.dumps({ + TESTING_DATABASE: ["public.test_cycle"], + }) + juju.config(app=SECOND_DATABASE_APP_NAME, values=pg2_config) + + print("B -> C subscription") + juju.integrate( + f"{SECOND_DATABASE_APP_NAME}:logical-replication-offer", + f"{THIRD_DATABASE_APP_NAME}:logical-replication", + ) + juju.wait(lambda status: jubilant.all_active(status, THIRD_DATABASE_APP_NAME), timeout=600) + + pg3_config = DATABASE_APP_CONFIG.copy() + pg3_config["logical_replication_subscription_request"] = json.dumps({ + TESTING_DATABASE: ["public.test_cycle"], + }) + juju.config(app=THIRD_DATABASE_APP_NAME, values=pg3_config) + + print("Attempt C -> A subscription should be blocked due to cycle detection") + juju.integrate( + f"{THIRD_DATABASE_APP_NAME}:logical-replication-offer", + f"{DATABASE_APP_NAME}:logical-replication", + ) + juju.wait(lambda status: jubilant.all_active(status, DATABASE_APP_NAME), timeout=600) + + pg1_config = DATABASE_APP_CONFIG.copy() + pg1_config["logical_replication_subscription_request"] = json.dumps({ + TESTING_DATABASE: ["public.test_cycle"], + }) + juju.config(app=DATABASE_APP_NAME, values=pg1_config) + + # Expect unit of A to go into blocked state (single unit deployment) + def unit_blocked(status: jubilant.Status) -> bool: + unit = status.get_units(DATABASE_APP_NAME).get(f"{DATABASE_APP_NAME}/0") + return unit.workload_status.current == "blocked" + + juju.wait(unit_blocked, timeout=180) + + logger.info("Success in test_cycle_detection_three_clusters") + + +def test_cycle_unblocks_with_different_table(juju: jubilant.Juju, charm): + # Create a different table to be replicated that is not part of the A->B->C chain + other_table = "public.test_cycle_2" + _create_test_table(DATA_INTEGRATOR_APP_NAME, TESTING_DATABASE, other_table) + _create_test_table(SECOND_DATA_INTEGRATOR_APP_NAME, TESTING_DATABASE, other_table) + _create_test_table(THIRD_DATA_INTEGRATOR_APP_NAME, TESTING_DATABASE, other_table) + + # Update A's subscription request to use the different table; this should clear the blocked status + pg1_config = DATABASE_APP_CONFIG.copy() + pg1_config["logical_replication_subscription_request"] = json.dumps({ + TESTING_DATABASE: [other_table], + }) + juju.config(app=DATABASE_APP_NAME, values=pg1_config) + + # Wait for A to become active (unblocked) + juju.wait(lambda status: jubilant.all_active(status, DATABASE_APP_NAME), timeout=180) + + logger.info("Success in test_cycle_unblocks_with_different_table") + + +def _create_test_table(data_integrator_app_name: str, database: str, qualified_table: str) -> None: + connection_string = _build_connection_string( + data_integrator_app_name, + DATA_INTEGRATOR_RELATION, + database=database, + ) + connection = None + try: + for attempt in Retrying(stop=stop_after_delay(120), wait=wait_fixed(3), reraise=True): + with attempt: + connection = psycopg2.connect(connection_string) + connection.autocommit = True + with connection.cursor() as cursor: + schema, table = qualified_table.split(".") + cursor.execute(f"CREATE TABLE IF NOT EXISTS {table} (test_column text);") + finally: + if connection is not None: + connection.close() diff --git a/tests/spread/test_logical_replication_cycle.py/task.yaml b/tests/spread/test_logical_replication_cycle.py/task.yaml new file mode 100644 index 0000000000..f515a109e5 --- /dev/null +++ b/tests/spread/test_logical_replication_cycle.py/task.yaml @@ -0,0 +1,7 @@ +summary: test_logical_replication_cycle.py +environment: + TEST_MODULE: ha_tests/test_logical_replication.py +execute: | + tox run -e integration -- "tests/integration/$TEST_MODULE" --model testing --alluredir="$SPREAD_TASK/allure-results" +artifacts: + - allure-results