diff --git a/actions.yaml b/actions.yaml index 7069c4668e..8a3a7fd076 100644 --- a/actions.yaml +++ b/actions.yaml @@ -1,6 +1,16 @@ # Copyright 2021 Canonical Ltd. # See LICENSE file for licensing details. +# TODO: add descriptions for logical replication actions + +add-publication: + params: + name: + type: string + database: + type: string + tables: + type: string create-backup: description: Creates a backup to s3 storage. params: @@ -30,6 +40,8 @@ get-password: Possible values - operator, replication, rewind, patroni. list-backups: description: Lists backups in s3 storage. +list-publications: +list-subscriptions: pre-upgrade-check: description: Run necessary pre-upgrade checks and preparations before executing a charm refresh. promote-to-primary: @@ -43,6 +55,10 @@ promote-to-primary: force: type: boolean description: Force the promotion of a cluster when there is already a primary cluster. +remove-publication: + params: + name: + type: string restore: description: Restore a database backup using pgBackRest. S3 credentials are retrieved from a relation with the S3 integrator charm. @@ -70,3 +86,11 @@ set-tls-private-key: private-key: type: string description: The content of private key for communications with clients. Content will be auto-generated if this option is not specified. +subscribe: + params: + name: + type: string +unsubscribe: + params: + name: + type: string \ No newline at end of file diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index 9fe1957e4f..28d8c59b9d 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -35,7 +35,19 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 46 +LIBPATCH = 48 + +# Groups to distinguish HBA access +ACCESS_GROUP_IDENTITY = "identity_access" +ACCESS_GROUP_INTERNAL = "internal_access" +ACCESS_GROUP_RELATION = "relation_access" + +# List of access groups to filter role assignments by +ACCESS_GROUPS = [ + ACCESS_GROUP_IDENTITY, + ACCESS_GROUP_INTERNAL, + ACCESS_GROUP_RELATION, +] # Groups to distinguish database permissions PERMISSIONS_GROUP_ADMIN = "admin" @@ -57,10 +69,18 @@ logger = logging.getLogger(__name__) +class PostgreSQLAssignGroupError(Exception): + """Exception raised when assigning to a group fails.""" + + class PostgreSQLCreateDatabaseError(Exception): """Exception raised when creating a database fails.""" +class PostgreSQLCreateGroupError(Exception): + """Exception raised when creating a group fails.""" + + class PostgreSQLCreateUserError(Exception): """Exception raised when creating a user fails.""" @@ -93,6 +113,10 @@ class PostgreSQLGetPostgreSQLVersionError(Exception): """Exception raised when retrieving PostgreSQL version fails.""" +class PostgreSQLListGroupsError(Exception): + """Exception raised when retrieving PostgreSQL groups list fails.""" + + class PostgreSQLListUsersError(Exception): """Exception raised when retrieving PostgreSQL users list fails.""" @@ -100,6 +124,33 @@ class PostgreSQLListUsersError(Exception): class PostgreSQLUpdateUserPasswordError(Exception): """Exception raised when updating a user password fails.""" +class PostgreSQLDatabaseExistsError(Exception): + """Exception raised during database existence check.""" + +class PostgreSQLTableExistsError(Exception): + """Exception raised during table existence check.""" + +class PostgreSQLIsTableEmptyError(Exception): + """Exception raised during table emptiness check.""" + +class PostgreSQLCreatePublicationError(Exception): + """Exception raised when creating PostgreSQL publication.""" + +class PostgreSQLDropPublicationError(Exception): + """Exception raised when dropping PostgreSQL publication.""" + +class PostgreSQLCreateSubscriptionError(Exception): + """Exception raised when creating PostgreSQL subscription.""" + +class PostgreSQLSubscriptionExistsError(Exception): + """Exception raised during subscription existence check.""" + +class PostgreSQLUpdateSubscriptionError(Exception): + """Exception raised when updating PostgreSQL subscription.""" + +class PostgreSQLDropSubscriptionError(Exception): + """Exception raised when dropping PostgreSQL subscription.""" + class PostgreSQL: """Class to encapsulate all operations related to interacting with PostgreSQL instance.""" @@ -160,6 +211,24 @@ def _connect_to_database( connection.autocommit = True return connection + def create_access_groups(self) -> None: + """Create access groups to distinguish HBA authentication methods.""" + connection = None + try: + with self._connect_to_database() as connection, connection.cursor() as cursor: + for group in ACCESS_GROUPS: + cursor.execute( + SQL("CREATE ROLE {} NOLOGIN;").format( + Identifier(group), + ) + ) + except psycopg2.Error as e: + logger.error(f"Failed to create access groups: {e}") + raise PostgreSQLCreateGroupError() from e + finally: + if connection is not None: + connection.close() + def create_database( self, database: str, @@ -321,6 +390,50 @@ def delete_user(self, user: str) -> None: logger.error(f"Failed to delete user: {e}") raise PostgreSQLDeleteUserError() from e + def grant_internal_access_group_memberships(self) -> None: + """Grant membership to the internal access-group to existing internal users.""" + connection = None + try: + with self._connect_to_database() as connection, connection.cursor() as cursor: + for user in self.system_users: + cursor.execute( + SQL("GRANT {} TO {};").format( + Identifier(ACCESS_GROUP_INTERNAL), + Identifier(user), + ) + ) + except psycopg2.Error as e: + logger.error(f"Failed to grant internal access group memberships: {e}") + raise PostgreSQLAssignGroupError() from e + finally: + if connection is not None: + connection.close() + + def grant_relation_access_group_memberships(self) -> None: + """Grant membership to the relation access-group to existing relation users.""" + rel_users = self.list_users_from_relation() + if not rel_users: + return + + connection = None + try: + with self._connect_to_database() as connection, connection.cursor() as cursor: + rel_groups = SQL(",").join(Identifier(group) for group in [ACCESS_GROUP_RELATION]) + rel_users = SQL(",").join(Identifier(user) for user in rel_users) + + cursor.execute( + SQL("GRANT {groups} TO {users};").format( + groups=rel_groups, + users=rel_users, + ) + ) + except psycopg2.Error as e: + logger.error(f"Failed to grant relation access group memberships: {e}") + raise PostgreSQLAssignGroupError() from e + finally: + if connection is not None: + connection.close() + def enable_disable_extensions( self, extensions: Dict[str, bool], database: Optional[str] = None ) -> None: @@ -534,6 +647,26 @@ def is_tls_enabled(self, check_current_host: bool = False) -> bool: # Connection errors happen when PostgreSQL has not started yet. return False + def list_access_groups(self) -> Set[str]: + """Returns the list of PostgreSQL database access groups. + + Returns: + List of PostgreSQL database access groups. + """ + try: + with self._connect_to_database() as connection, connection.cursor() as cursor: + cursor.execute( + "SELECT groname FROM pg_catalog.pg_group WHERE groname LIKE '%_access';" + ) + access_groups = cursor.fetchall() + return {group[0] for group in access_groups} + except psycopg2.Error as e: + logger.error(f"Failed to list PostgreSQL database access groups: {e}") + raise PostgreSQLListGroupsError() from e + finally: + if connection is not None: + connection.close() + def list_users(self) -> Set[str]: """Returns the list of PostgreSQL database users. @@ -548,6 +681,29 @@ def list_users(self) -> Set[str]: except psycopg2.Error as e: logger.error(f"Failed to list PostgreSQL database users: {e}") raise PostgreSQLListUsersError() from e + finally: + if connection is not None: + connection.close() + + def list_users_from_relation(self) -> Set[str]: + """Returns the list of PostgreSQL database users that were created by a relation. + + Returns: + List of PostgreSQL database users. + """ + try: + with self._connect_to_database() as connection, connection.cursor() as cursor: + cursor.execute( + "SELECT usename FROM pg_catalog.pg_user WHERE usename LIKE 'relation_id_%';" + ) + usernames = cursor.fetchall() + return {username[0] for username in usernames} + except psycopg2.Error as e: + logger.error(f"Failed to list PostgreSQL database users: {e}") + raise PostgreSQLListUsersError() from e + finally: + if connection is not None: + connection.close() def list_valid_privileges_and_roles(self) -> Tuple[Set[str], Set[str]]: """Returns two sets with valid privileges and roles. @@ -644,6 +800,134 @@ def is_restart_pending(self) -> bool: if connection: connection.close() + def database_exists(self, db: str) -> bool: + """Check whether specified database exists.""" + try: + with self._connect_to_database() as connection, connection.cursor() as cursor: + cursor.execute( + SQL("SELECT datname FROM pg_database WHERE datname={};").format(Literal(db)) + ) + return cursor.fetchone() is not None + except psycopg2.Error as e: + logger.error(f"Failed to check Postgresql database existence: {e}") + raise PostgreSQLDatabaseExistsError() from e + + def table_exists(self, db: str, schema: str, table: str) -> bool: + """Check whether specified table in database exists.""" + try: + with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + cursor.execute( + SQL("SELECT tablename FROM pg_tables WHERE schemaname={} AND tablename={};").format(Literal(schema), Literal(table)) + ) + return cursor.fetchone() is not None + except psycopg2.Error as e: + logger.error(f"Failed to check Postgresql table existence: {e}") + raise PostgreSQLTableExistsError() from e + + def is_table_empty(self, db: str, schema: str, table: str) -> bool: + """Check whether table is empty.""" + try: + with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + cursor.execute( + SQL("SELECT COUNT(1) FROM {};").format(Identifier(schema, table)) + ) + return cursor.fetchone()[0] == 0 + except psycopg2.Error as e: + logger.error(f"Failed to check whether table is empty: {e}") + raise PostgreSQLIsTableEmptyError() from e + + def create_publication(self, db: str, name: str, schematables: list[str]) -> None: + """Create PostgreSQL publication.""" + try: + with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + cursor.execute( + SQL("CREATE PUBLICATION {} FOR TABLE {};").format( + Identifier(name), + SQL(",").join(Identifier(schematable.split(".")[0], schematable.split(".")[1]) for schematable in schematables) + ) + ) + except psycopg2.Error as e: + logger.error(f"Failed to create Postgresql publication: {e}") + raise PostgreSQLCreatePublicationError() from e + + def drop_publication(self, db: str, publication: str) -> None: + """Drop PostgreSQL publication.""" + try: + with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + cursor.execute( + SQL("DROP PUBLICATION IF EXISTS {};").format( + Identifier(publication), + ) + ) + except psycopg2.Error as e: + logger.error(f"Failed to drop Postgresql publication: {e}") + raise PostgreSQLDropPublicationError() from e + + def create_subscription(self, subscription: str, host: str, db: str, user: str, password: str, replication_slot: str) -> None: + """Create PostgreSQL subscription.""" + try: + with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + cursor.execute( + SQL("CREATE SUBSCRIPTION {} CONNECTION {} PUBLICATION {} WITH (copy_data=true,create_slot=false,enabled=true,slot_name={});").format( + Identifier(subscription), + Literal(f"host={host} dbname={db} user={user} password={password}"), + Identifier(subscription), + Identifier(replication_slot) + ) + ) + except psycopg2.Error as e: + logger.error(f"Failed to create Postgresql subscription: {e}") + raise PostgreSQLCreateSubscriptionError() from e + + def subscription_exists(self, db: str, subscription: str) -> bool: + """Check whether specified subscription in database exists.""" + try: + with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + cursor.execute( + SQL("SELECT subname FROM pg_subscription WHERE subname={};").format(Literal(subscription)) + ) + return cursor.fetchone() is not None + except psycopg2.Error as e: + logger.error(f"Failed to check Postgresql subscription existence: {e}") + raise PostgreSQLSubscriptionExistsError() from e + + def update_subscription(self, db: str, subscription: str, host: str, user: str, password: str): + """Update PostgreSQL subscription connection details.""" + try: + with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + cursor.execute( + SQL("ALTER SUBSCRIPTION {} CONNECTION {}").format( + Identifier(subscription), + Literal(f"host={host} dbname={db} user={user} password={password}"), + ) + ) + except psycopg2.Error as e: + logger.error(f"Failed to update Postgresql subscription: {e}") + raise PostgreSQLUpdateSubscriptionError() from e + + def drop_subscription(self, db: str, subscription: str) -> None: + """Drop PostgreSQL subscription.""" + try: + with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + cursor.execute( + SQL("ALTER SUBSCRIPTION {} DISABLE;").format( + Identifier(subscription), + ) + ) + cursor.execute( + SQL("ALTER SUBSCRIPTION {} SET (slot_name=NONE);").format( + Identifier(subscription), + ) + ) + cursor.execute( + SQL("DROP SUBSCRIPTION {};").format( + Identifier(subscription), + ) + ) + except psycopg2.Error as e: + logger.error(f"Failed to drop Postgresql subscription: {e}") + raise PostgreSQLDropSubscriptionError() from e + @staticmethod def build_postgresql_parameters( config_options: dict, available_memory: int, limit_memory: Optional[int] = None diff --git a/metadata.yaml b/metadata.yaml index 94cb47ec89..76e9721929 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -30,6 +30,9 @@ provides: interface: postgresql_async limit: 1 optional: true + logical-replication-offer: + interface: postgresql_logical_replication + optional: true database: interface: postgresql_client db: @@ -45,6 +48,10 @@ requires: interface: postgresql_async limit: 1 optional: true + logical-replication: + interface: postgresql_logical_replication + limit: 1 + optional: true certificates: interface: tls-certificates limit: 1 diff --git a/src/backups.py b/src/backups.py index 903ba028f6..70439fa631 100644 --- a/src/backups.py +++ b/src/backups.py @@ -44,6 +44,11 @@ PGBACKREST_LOGS_PATH, POSTGRESQL_DATA_PATH, ) +from relations.async_replication import REPLICATION_CONSUMER_RELATION, REPLICATION_OFFER_RELATION +from relations.logical_replication import ( + LOGICAL_REPLICATION_OFFER_RELATION, + LOGICAL_REPLICATION_RELATION, +) logger = logging.getLogger(__name__) @@ -1211,6 +1216,18 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool: event.fail(error_message) return False + logger.info("Checking that the cluster is not replicating data to a standby cluster") + for relation in [ + self.model.get_relation(REPLICATION_CONSUMER_RELATION), + self.model.get_relation(REPLICATION_OFFER_RELATION), + ]: + if not relation: + continue + error_message = "Unit cannot restore backup as the cluster is replicating data to a standby cluster" + logger.error(f"Restore failed: {error_message}") + event.fail(error_message) + return False + logger.info("Checking that this unit was already elected the leader unit") if not self.charm.unit.is_leader(): error_message = "Unit cannot restore backup as it was not elected the leader unit yet" @@ -1218,6 +1235,16 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool: event.fail(error_message) return False + if self.model.get_relation(LOGICAL_REPLICATION_RELATION) or len( + self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()) + ): + error_message = ( + "Cannot proceed with restore with an active logical replication connection" + ) + logger.error(f"Restore failed: {error_message}") + event.fail(error_message) + return False + return True def _render_pgbackrest_conf_file(self) -> bool: diff --git a/src/charm.py b/src/charm.py index 30ca8117a9..787b8080e4 100755 --- a/src/charm.py +++ b/src/charm.py @@ -104,6 +104,7 @@ PostgreSQLAsyncReplication, ) from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides +from relations.logical_replication import PostgreSQLLogicalReplication from relations.postgresql_provider import PostgreSQLProvider from rotate_logs import RotateLogs from upgrade import PostgreSQLUpgrade, get_postgresql_dependencies_model @@ -204,6 +205,7 @@ def __init__(self, *args): self.backup = PostgreSQLBackups(self, "s3-parameters") self.tls = PostgreSQLTLS(self, PEER) self.async_replication = PostgreSQLAsyncReplication(self) + self.logical_replication = PostgreSQLLogicalReplication(self) self.restart_manager = RollingOpsManager( charm=self, relation="restart", callback=self._restart ) @@ -1879,6 +1881,12 @@ def update_config(self, is_creating_backup: bool = False, no_peers: bool = False self.model.config, self.get_available_memory(), limit_memory ) + replication_slots_json = ( + json.loads(self.app_peer_data["replication-slots"]) + if "replication-slots" in self.app_peer_data + else None + ) + # Update and reload configuration based on TLS files availability. self._patroni.render_patroni_yml_file( connectivity=self.unit_peer_data.get("connectivity", "on") == "on", @@ -1892,6 +1900,7 @@ def update_config(self, is_creating_backup: bool = False, no_peers: bool = False restore_stanza=self.app_peer_data.get("restore-stanza"), parameters=pg_parameters, no_peers=no_peers, + slots=replication_slots_json, ) if no_peers: return True @@ -1927,6 +1936,8 @@ def update_config(self, is_creating_backup: bool = False, no_peers: bool = False "max_prepared_transactions": self.config.memory_max_prepared_transactions, }) + self._patroni.ensure_slots_controller_by_patroni(replication_slots_json or {}) + self._handle_postgresql_restart_need(enable_tls) # Restart the monitoring service if the password was rotated diff --git a/src/cluster.py b/src/cluster.py index b321a4cac4..eb294dd1fa 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -620,6 +620,7 @@ def render_patroni_yml_file( restore_to_latest: bool = False, parameters: dict[str, str] | None = None, no_peers: bool = False, + slots: dict[str, str] | None = None, ) -> None: """Render the Patroni configuration file. @@ -636,6 +637,7 @@ def render_patroni_yml_file( restore_to_latest: restore all the WAL transaction logs from the stanza. parameters: PostgreSQL parameters to be added to the postgresql.conf file. no_peers: Don't include peers. + slots: replication slots (keys) with assigned database name (values). """ # Open the template patroni.yml file. with open("templates/patroni.yml.j2") as file: @@ -678,6 +680,7 @@ def render_patroni_yml_file( extra_replication_endpoints=self.charm.async_replication.get_standby_endpoints(), raft_password=self.raft_password, patroni_password=self.patroni_password, + slots=slots, ) self.render_file(f"{PATRONI_CONF_PATH}/patroni.yaml", rendered, 0o600) @@ -989,6 +992,35 @@ def bulk_update_parameters_controller_by_patroni(self, parameters: dict[str, Any timeout=PATRONI_TIMEOUT, ) + def ensure_slots_controller_by_patroni(self, slots: dict[str, str]) -> None: + """Synchronises slots controlled by Patroni with the provided state by removing unneeded slots and creating new ones. + + Args: + slots: dictionary of slots in the {slot: database} format. + """ + current_config = requests.get( + f"{self._patroni_url}/config", + verify=self.verify, + timeout=API_REQUEST_TIMEOUT, + auth=self._patroni_auth, + ) + slots_patch: dict[str, dict[str, str] | None] = { + slot: None for slot in current_config.json().get("slots", ()) + } + for slot, database in slots.items(): + slots_patch[slot] = { + "database": database, + "plugin": "pgoutput", + "type": "logical", + } + requests.patch( + f"{self._patroni_url}/config", + verify=self.verify, + json={"slots": slots_patch}, + auth=self._patroni_auth, + timeout=PATRONI_TIMEOUT, + ) + @property def _synchronous_node_count(self) -> int: planned_units = self.charm.app.planned_units() diff --git a/src/relations/logical_replication.py b/src/relations/logical_replication.py new file mode 100644 index 0000000000..dd7868d4bd --- /dev/null +++ b/src/relations/logical_replication.py @@ -0,0 +1,611 @@ +# Copyright 2025 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Logical Replication implementation. + +TODO: add description after specification is accepted. +""" + +import json +import logging +import re +from typing import ( + Any, +) + +from ops import ( + ActionEvent, + LeaderElectedEvent, + Object, + RelationBrokenEvent, + RelationChangedEvent, + RelationDepartedEvent, + RelationJoinedEvent, + Secret, + SecretChangedEvent, + SecretNotFoundError, +) + +from cluster_topology_observer import ClusterTopologyChangeEvent +from constants import ( + APP_SCOPE, + PEER, + USER, + USER_PASSWORD_KEY, +) + +logger = logging.getLogger(__name__) + +LOGICAL_REPLICATION_OFFER_RELATION = "logical-replication-offer" +LOGICAL_REPLICATION_RELATION = "logical-replication" +SECRET_LABEL = "logical-replication-secret" # noqa: S105 + + +class PostgreSQLLogicalReplication(Object): + """Defines the logical-replication logic.""" + + def __init__(self, charm): + super().__init__(charm, "postgresql_logical_replication") + self.charm = charm + # Relations + self.charm.framework.observe( + self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_joined, + self._on_offer_relation_joined, + ) + self.charm.framework.observe( + self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_changed, + self._on_offer_relation_changed, + ) + self.charm.framework.observe( + self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_departed, + self._on_offer_relation_departed, + ) + self.charm.framework.observe( + self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_broken, + self._on_offer_relation_broken, + ) + self.charm.framework.observe( + self.charm.on[LOGICAL_REPLICATION_RELATION].relation_changed, self._on_relation_changed + ) + self.charm.framework.observe( + self.charm.on[LOGICAL_REPLICATION_RELATION].relation_departed, + self._on_relation_departed, + ) + self.charm.framework.observe( + self.charm.on[LOGICAL_REPLICATION_RELATION].relation_broken, self._on_relation_broken + ) + # Events + self.charm.framework.observe( + self.charm.on.cluster_topology_change, self._on_cluster_topology_change + ) + self.charm.framework.observe( + self.charm.on.leader_elected, self._on_cluster_topology_change + ) + self.framework.observe(self.charm.on.secret_changed, self._on_secret_changed) + # Actions + self.charm.framework.observe( + self.charm.on.add_publication_action, self._on_add_publication + ) + self.charm.framework.observe( + self.charm.on.list_publications_action, self._on_list_publications + ) + self.charm.framework.observe( + self.charm.on.remove_publication_action, self._on_remove_publication + ) + self.charm.framework.observe(self.charm.on.subscribe_action, self._on_subscribe) + self.charm.framework.observe( + self.charm.on.list_subscriptions_action, self._on_list_subscriptions + ) + self.charm.framework.observe(self.charm.on.unsubscribe_action, self._on_unsubscribe) + + # region Relations + + def _on_offer_relation_joined(self, event: RelationJoinedEvent): + if not self.charm.unit.is_leader(): + return + if not self.charm.primary_endpoint: + event.defer() + logger.debug( + f"{LOGICAL_REPLICATION_OFFER_RELATION}: joined event deferred as primary is unavailable right now" + ) + return + + secret = self._get_secret() + secret.grant(event.relation) + event.relation.data[self.model.app].update({ + "publications": self.charm.app_peer_data.get("publications", ""), + "secret-id": secret.id, + }) + + def _on_offer_relation_changed(self, event: RelationChangedEvent): + if not self.charm.unit.is_leader(): + return + + subscriptions_str = event.relation.data[event.app].get("subscriptions", "") + subscriptions = subscriptions_str.split(",") if subscriptions_str else () + publications = self._get_publications_from_str( + self.charm.app_peer_data.get("publications") + ) + relation_replication_slots = self._get_dict_from_str( + event.relation.data[self.model.app].get("replication-slots") + ) + global_replication_slots = self._get_dict_from_str( + self.charm.app_peer_data.get("replication-slots") + ) + + for publication in subscriptions: + if publication not in publications: + logger.error( + f"Logical Replication: requested subscription for non-existing publication {publication}" + ) + continue + if publication not in relation_replication_slots: + replication_slot_name = f"{event.relation.id}_{publication}" + global_replication_slots[replication_slot_name] = publications[publication][ + "database" + ] + relation_replication_slots[publication] = replication_slot_name + for publication in relation_replication_slots.copy(): + if publication not in subscriptions: + del global_replication_slots[relation_replication_slots[publication]] + del relation_replication_slots[publication] + + self.charm.app_peer_data["replication-slots"] = json.dumps(global_replication_slots) + event.relation.data[self.model.app]["replication-slots"] = json.dumps( + relation_replication_slots + ) + self.charm.update_config() + + def _on_offer_relation_departed(self, event: RelationDepartedEvent): + if event.departing_unit == self.charm.unit and self.charm._peers is not None: + self.charm.unit_peer_data.update({"departing": "True"}) + + def _on_offer_relation_broken(self, event: RelationBrokenEvent): + if not self.charm._peers or self.charm.is_unit_departing: + logger.debug( + f"{LOGICAL_REPLICATION_OFFER_RELATION}: skipping departing unit in broken event" + ) + return + if not self.charm.unit.is_leader(): + return + + global_replication_slots = self._get_dict_from_str( + self.charm.app_peer_data.get("replication-slots") + ) + if len(global_replication_slots) == 0: + return + + used_replication_slots = [] + for rel in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()): + if rel.id == event.relation.id: + continue + used_replication_slots += self._get_dict_from_str( + rel.data[self.model.app].get("replication-slots") + ).values() + + deleting_replication_slots = [ + k for k in global_replication_slots if k not in used_replication_slots + ] + for deleting_replication_slot in deleting_replication_slots: + global_replication_slots.pop(deleting_replication_slot, None) + self.charm.app_peer_data["replication-slots"] = json.dumps(global_replication_slots) + self.charm.update_config() + + def _on_relation_changed(self, event: RelationChangedEvent): + if not self._relation_changed_checks(event): + return + secret_content = self.model.get_secret( + id=event.relation.data[event.app]["secret-id"], label=SECRET_LABEL + ).get_content(refresh=True) + publications = self._get_publications_from_str( + event.relation.data[event.app].get("publications") + ) + replication_slots = self._get_dict_from_str( + event.relation.data[event.app].get("replication-slots") + ) + global_subscriptions = self._get_dict_from_str( + self.charm.app_peer_data.get("subscriptions") + ) + for subscription in self._get_str_list( + event.relation.data[self.model.app].get("subscriptions") + ): + db = publications[subscription]["database"] + if subscription in replication_slots and not self.charm.postgresql.subscription_exists( + db, subscription + ): + self.charm.postgresql.create_subscription( + subscription, + secret_content["logical-replication-primary"], + db, + secret_content["logical-replication-user"], + secret_content["logical-replication-password"], + replication_slots[subscription], + ) + global_subscriptions[subscription] = db + self.charm.app_peer_data["subscriptions"] = json.dumps(global_subscriptions) + + def _on_relation_departed(self, event: RelationDepartedEvent): + if event.departing_unit == self.charm.unit and self.charm._peers is not None: + self.charm.unit_peer_data.update({"departing": "True"}) + + def _on_relation_broken(self, event: RelationBrokenEvent): + if not self.charm._peers or self.charm.is_unit_departing: + logger.debug( + f"{LOGICAL_REPLICATION_RELATION}: skipping departing unit in broken event" + ) + return + if not self.charm.unit.is_leader(): + return + if not self.charm.primary_endpoint: + logger.debug( + f"{LOGICAL_REPLICATION_RELATION}: broken event deferred as primary is unavailable right now" + ) + event.defer() + return False + + subscriptions = self._get_dict_from_str(self.charm.app_peer_data.get("subscriptions")) + for subscription, db in subscriptions.copy().items(): + self.charm.postgresql.drop_subscription(db, subscription) + del subscriptions[subscription] + self.charm.app_peer_data["subscriptions"] = json.dumps(subscriptions) + + # endregion + + # region Events + + def _on_cluster_topology_change(self, event: ClusterTopologyChangeEvent | LeaderElectedEvent): + if not self.charm.unit.is_leader(): + return + if not len(self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ())): + return + if not self.charm.primary_endpoint: + event.defer() + return + self._get_secret() + + def _on_secret_changed(self, event: SecretChangedEvent): + if not self.charm.unit.is_leader(): + return + if not self.charm.primary_endpoint: + event.defer() + return + + if ( + len(self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ())) + and event.secret.label == f"{PEER}.{self.model.app.name}.app" + ): + logger.info("Internal secret changed, updating logical replication secret") + self._get_secret() + + if ( + relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION) + ) and event.secret.label == SECRET_LABEL: + logger.info("Logical replication secret changed, updating subscriptions") + secret_content = self.model.get_secret( + id=relation.data[relation.app]["secret-id"], label=SECRET_LABEL + ).get_content(refresh=True) + replication_slots = self._get_dict_from_str( + relation.data[relation.app].get("replication-slots") + ) + publications = self._get_publications_from_str( + relation.data[relation.app].get("publications") + ) + for subscription in self._get_str_list( + relation.data[self.model.app].get("subscriptions") + ): + if subscription in replication_slots: + self.charm.postgresql.update_subscription( + publications[subscription]["database"], + subscription, + secret_content["logical-replication-primary"], + secret_content["logical-replication-user"], + secret_content["logical-replication-password"], + ) + + # endregion + + # region Actions + + def _on_add_publication(self, event: ActionEvent): + if not self._add_publication_validation(event): + return + if not self.charm.postgresql.database_exists(event.params["database"]): + event.fail(f"No such database {event.params['database']}") + return + for schematable in event.params["tables"].split(","): + if len(schematable_split := schematable.split(".")) != 2: + event.fail("All tables should be in schema.table format") + return + if not self.charm.postgresql.table_exists( + event.params["database"], schematable_split[0], schematable_split[1] + ): + event.fail(f"No such table {schematable} in database {event.params['database']}") + return + publications = self._get_publications_from_str( + self.charm.app_peer_data.get("publications") + ) + publication_tables_split = event.params["tables"].split(",") + self.charm.postgresql.create_publication( + event.params["database"], event.params["name"], publication_tables_split + ) + publications[event.params["name"]] = { + "database": event.params["database"], + "tables": publication_tables_split, + } + self._set_publications(publications) + + def _on_list_publications(self, event: ActionEvent): + publications = [ + ( + publication, + str(self._count_publication_connections(publication)), + publication_obj["database"], + ",".join(publication_obj["tables"]), + ) + for publication, publication_obj in self._get_publications_from_str( + self.charm.app_peer_data.get("publications") + ).items() + ] + name_len = max([4, *[len(publication[0]) for publication in publications]]) + database_len = max([8, *[len(publication[2]) for publication in publications]]) + header = ( + f"{'name':<{name_len}s} | active_connections | {'database':<{database_len}s} | tables" + ) + res = [header, "-" * len(header)] + for name, active_connections, database, tables in publications: + res.append( + f"{name:<{name_len}s} | {active_connections:<18s} | {database:<{database_len}s} | {tables:s}" + ) + event.set_results({"publications": "\n".join(res)}) + + def _on_remove_publication(self, event: ActionEvent): + if not self.charm.unit.is_leader(): + event.fail("Publications management can be done only on the leader unit") + return + if not self.charm.primary_endpoint: + event.fail("Publication management can be proceeded only with an active primary") + return False + if not (publication_name := event.params.get("name")): + event.fail("name parameter is required") + return + publications = self._get_publications_from_str( + self.charm.app_peer_data.get("publications") + ) + if publication_name not in publications: + event.fail("No such publication") + return + if self._count_publication_connections(publication_name): + event.fail("Cannot remove publication while it's in use") + return + self.charm.postgresql.drop_publication( + publications[publication_name]["database"], publication_name + ) + del publications[publication_name] + self._set_publications(publications) + + def _on_subscribe(self, event: ActionEvent): + if not self._subscribe_validation(event): + return + relation = self.model.get_relation(LOGICAL_REPLICATION_RELATION) + subscribing_publication = self._get_publications_from_str( + relation.data[relation.app]["publications"] + )[event.params["name"]] + subscribing_database = subscribing_publication["database"] + subscriptions = self._get_str_list(relation.data[self.model.app].get("subscriptions")) + if not self.charm.postgresql.database_exists(subscribing_database): + event.fail(f"No such database {subscribing_database}") + return + if self.charm.postgresql.subscription_exists(subscribing_database, event.params["name"]): + event.fail( + f"PostgreSQL subscription with conflicting name {event.params['name']} already exists in the database {subscribing_database}" + ) + return + for schematable in subscribing_publication["tables"]: + schematable_split = schematable.split(".") + if not self.charm.postgresql.table_exists( + subscribing_database, schematable_split[0], schematable_split[1] + ): + event.fail(f"No such table {schematable} in database {subscribing_database}") + return + if not self.charm.postgresql.is_table_empty( + subscribing_database, schematable_split[0], schematable_split[1] + ): + event.fail( + f"Table {schematable} in database {subscribing_database} should be empty before subscribing on it" + ) + return + subscriptions.append(event.params["name"]) + relation.data[self.model.app]["subscriptions"] = ",".join(subscriptions) + + def _on_list_subscriptions(self, event: ActionEvent): + if not self.charm.unit.is_leader(): + event.fail("Subscriptions management can be done only on the leader unit") + return + if not (relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION)): + event.fail( + "Subscription management can be done only with an active logical replication connection" + ) + return + publications = self._get_publications_from_str( + relation.data[relation.app].get("publications") + ) + subscriptions = [ + ( + subscription, + publications.get(subscription, {}).get("database"), + ",".join(publications.get(subscription, {}).get("tables", ())), + ) + for subscription in self._get_str_list( + relation.data[self.model.app].get("subscriptions") + ) + ] + name_len = max([4, *[len(subscription[0]) for subscription in subscriptions]]) + database_len = max([8, *[len(subscription[1]) for subscription in subscriptions]]) + header = f"{'name':<{name_len}s} | {'database':<{database_len}s} | tables" + res = [header, "-" * len(header)] + for name, database, tables in subscriptions: + res.append(f"{name:<{name_len}s} | {database:<{database_len}s} | {tables:s}") + event.set_results({"subscriptions": "\n".join(res)}) + + def _on_unsubscribe(self, event: ActionEvent): + if not self.charm.unit.is_leader(): + event.fail("Subscriptions management can be proceeded only on the leader unit") + return + if not (relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION)): + event.fail( + "Subscription management can be proceeded only with an active logical replication connection" + ) + return + if not self.charm.primary_endpoint: + event.fail("Subscription management can be proceeded only with an active primary") + return False + if not (subscription_name := event.params.get("name")): + event.fail("name parameter is required") + return + subscriptions = self._get_str_list(relation.data[self.model.app].get("subscriptions")) + if subscription_name not in subscriptions: + event.fail("No such subscription") + return + self.charm.postgresql.drop_subscription( + self._get_publications_from_str(relation.data[relation.app]["publications"])[ + subscription_name + ]["database"], + subscription_name, + ) + subscriptions.remove(subscription_name) + relation.data[self.model.app]["subscriptions"] = ",".join(subscriptions) + + # endregion + + def _relation_changed_checks(self, event: RelationChangedEvent) -> bool: + if not self.charm.unit.is_leader(): + return False + if not self.charm.primary_endpoint: + logger.debug( + f"{LOGICAL_REPLICATION_RELATION}: changed event deferred as primary is unavailable right now" + ) + event.defer() + return False + if not event.relation.data[event.app].get("secret-id"): + logger.warning( + f"{LOGICAL_REPLICATION_RELATION}: skipping changed event as there is no secret id in the remote application data" + ) + return False + return True + + def _add_publication_validation(self, event: ActionEvent) -> bool: + if not self.charm.unit.is_leader(): + event.fail("Publications management can be proceeded only on the leader unit") + return False + if not self.charm.primary_endpoint: + event.fail("Publication management can be proceeded only with an active primary") + return False + if not (publication_name := event.params.get("name")): + event.fail("name parameter is required") + return False + if not re.match(r"^[a-zA-Z0-9_]+$", publication_name): + event.fail("name should consist of english letters, numbers and underscore") + return False + if not event.params.get("database"): + event.fail("database parameter is required") + return False + if not event.params.get("tables"): + event.fail("tables parameter is required") + return False + if publication_name in self._get_publications_from_str( + self.charm.app_peer_data.get("publications") + ): + event.fail("Such publication already exists") + return False + return True + + def _subscribe_validation(self, event: ActionEvent) -> bool: + if not self.charm.unit.is_leader(): + event.fail("Subscriptions management can be proceeded only on the leader unit") + return False + if not (relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION)): + event.fail( + "Subscription management can be proceeded only with an active logical replication connection" + ) + return False + if not self.charm.primary_endpoint: + event.fail("Subscription management can be proceeded only with an active primary") + return False + if not (publication_name := event.params.get("name")): + event.fail("name parameter is required") + return False + subscriptions = self._get_str_list(relation.data[self.model.app].get("subscriptions")) + if publication_name in subscriptions: + event.fail("Such subscription already exists") + return False + publications = self._get_publications_from_str( + relation.data[relation.app].get("publications") + ) + subscribing_publication = publications.get(publication_name) + if not subscribing_publication: + event.fail("No such publication offered") + return False + # Check overlaps with already subscribed publications + if any( + any( + publication_table in subscribing_publication["tables"] + for publication_table in publication_obj["tables"] + ) + for (publication, publication_obj) in publications.items() + if publication in subscriptions + and publication_obj["database"] == subscribing_publication["database"] + ): + event.fail("Tables overlap detected with existing subscriptions") + return False + return True + + def _get_secret(self) -> Secret: + """Returns logical replication secret. Updates, if content changed.""" + shared_content = { + "logical-replication-user": USER, + "logical-replication-password": self.charm.get_secret(APP_SCOPE, USER_PASSWORD_KEY), + "logical-replication-primary": self.charm.primary_endpoint, + } + try: + # Avoid recreating the secret. + secret = self.charm.model.get_secret(label=SECRET_LABEL) + if not secret.id: + # Workaround for the secret id not being set with model uuid. + secret._id = f"secret://{self.model.uuid}/{secret.get_info().id.split(':')[1]}" + if secret.peek_content() != shared_content: + logger.info("Updating outdated secret content") + secret.set_content(shared_content) + return secret + except SecretNotFoundError: + logger.debug("Secret not found, creating a new one") + pass + return self.charm.model.app.add_secret(content=shared_content, label=SECRET_LABEL) + + @staticmethod + def _get_publications_from_str( + publications_str: str | None = None, + ) -> dict[str, dict[str, Any]]: + return json.loads(publications_str or "{}") + + def _set_publications(self, publications: dict[str, dict[str, Any]]): + publications_str = json.dumps(publications) + self.charm.app_peer_data["publications"] = publications_str + for rel in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()): + rel.data[self.model.app]["publications"] = publications_str + + def _count_publication_connections(self, publication: str) -> int: + count = 0 + for relation in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()): + if publication in self._get_str_list(relation.data[relation.app].get("subscriptions")): + count += 1 + return count + + @staticmethod + def _get_dict_from_str( + replication_slots_str: str | None = None, + ) -> dict[str, str]: + return json.loads(replication_slots_str or "{}") + + @staticmethod + def _get_str_list(list_str: str | None = None) -> list[str]: + return list_str.split(",") if list_str else [] diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index 63d99c160f..e401b640ae 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -103,6 +103,15 @@ bootstrap: logging_collector: 'on' wal_level: logical shared_preload_libraries: 'timescaledb,pgaudit' + {%- if slots %} + slots: + {%- for slot, database in slots.items() %} + {{slot}}: + database: {{database}} + plugin: pgoutput + type: logical + {%- endfor -%} + {% endif %} {%- if restoring_backup %} method: pgbackrest diff --git a/tests/integration/ha_tests/test_logical_replication.py b/tests/integration/ha_tests/test_logical_replication.py new file mode 100644 index 0000000000..76dd9b92db --- /dev/null +++ b/tests/integration/ha_tests/test_logical_replication.py @@ -0,0 +1,314 @@ +from asyncio import gather + +import psycopg2 +import pytest as pytest +from pytest_operator.plugin import OpsTest + +from integration import markers +from integration.ha_tests.helpers import get_cluster_roles +from integration.helpers import CHARM_BASE, get_leader_unit +from integration.new_relations.helpers import build_connection_string + +DATABASE_APP_NAME = "postgresql" +SECOND_DATABASE_APP_NAME = "postgresql2" +THIRD_DATABASE_APP_NAME = "postgresql3" + +DATA_INTEGRATOR_APP_NAME = "data-integrator" +SECOND_DATA_INTEGRATOR_APP_NAME = "data-integrator2" +THIRD_DATA_INTEGRATOR_APP_NAME = "data-integrator3" +DATA_INTEGRATOR_RELATION = "postgresql" + +DATABASE_APP_CONFIG = {"profile": "testing"} + +TESTING_DATABASE = "testdb" + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_deploy(ops_test: OpsTest, charm): + await gather( + ops_test.model.deploy( + charm, + application_name=DATABASE_APP_NAME, + num_units=3, + base=CHARM_BASE, + config=DATABASE_APP_CONFIG, + ), + ops_test.model.deploy( + charm, + application_name=SECOND_DATABASE_APP_NAME, + num_units=3, + base=CHARM_BASE, + config=DATABASE_APP_CONFIG, + ), + ops_test.model.deploy( + charm, + application_name=THIRD_DATABASE_APP_NAME, + num_units=1, + base=CHARM_BASE, + config=DATABASE_APP_CONFIG, + ), + ops_test.model.deploy( + DATA_INTEGRATOR_APP_NAME, + application_name=DATA_INTEGRATOR_APP_NAME, + num_units=1, + channel="latest/edge", + config={"database-name": TESTING_DATABASE}, + ), + ops_test.model.deploy( + DATA_INTEGRATOR_APP_NAME, + application_name=SECOND_DATA_INTEGRATOR_APP_NAME, + num_units=1, + channel="latest/edge", + config={"database-name": TESTING_DATABASE}, + ), + ops_test.model.deploy( + DATA_INTEGRATOR_APP_NAME, + application_name=THIRD_DATA_INTEGRATOR_APP_NAME, + num_units=1, + channel="latest/edge", + config={"database-name": TESTING_DATABASE}, + ), + ) + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, SECOND_DATABASE_APP_NAME, THIRD_DATABASE_APP_NAME], + status="active", + timeout=2500, + # There can be error spikes during PostgreSQL deployment, that are not related to Logical Replication + raise_on_error=False, + ) + async with ops_test.fast_forward(): + await gather( + ops_test.model.integrate(DATABASE_APP_NAME, DATA_INTEGRATOR_APP_NAME), + ops_test.model.integrate(SECOND_DATABASE_APP_NAME, SECOND_DATA_INTEGRATOR_APP_NAME), + ops_test.model.integrate(THIRD_DATABASE_APP_NAME, THIRD_DATA_INTEGRATOR_APP_NAME), + ops_test.model.integrate( + f"{DATABASE_APP_NAME}:logical-replication-offer", + f"{SECOND_DATABASE_APP_NAME}:logical-replication", + ), + ) + await ops_test.model.wait_for_idle(status="active", timeout=500) + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_create_testing_data(ops_test: OpsTest): + await _create_test_table(ops_test, DATA_INTEGRATOR_APP_NAME) + await _create_test_table(ops_test, SECOND_DATA_INTEGRATOR_APP_NAME) + await _create_test_table(ops_test, THIRD_DATA_INTEGRATOR_APP_NAME) + await _insert_test_data(ops_test, DATA_INTEGRATOR_APP_NAME, "first") + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_setup_logical_replication(ops_test: OpsTest): + publisher_leader = await get_leader_unit(ops_test, DATABASE_APP_NAME) + first_subscriber_leader = await get_leader_unit(ops_test, SECOND_DATABASE_APP_NAME) + second_subscriber_leader = await get_leader_unit(ops_test, THIRD_DATABASE_APP_NAME) + + # Logical replication between first and second database applications is already established in test_deploy + await ops_test.model.integrate( + f"{DATABASE_APP_NAME}:logical-replication-offer", + f"{THIRD_DATABASE_APP_NAME}:logical-replication", + ) + + action = await publisher_leader.run_action( + "add-publication", + name="test_publication", + database=TESTING_DATABASE, + tables="public.test_table", + ) + + await action.wait() + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(status="active", timeout=500) + + action = await publisher_leader.run_action("list-publications") + await action.wait() + results = action.results.get("publications") + assert results and "test_publication | 0" in results, ( + "publication should be listed in list-publications action" + ) + + action = await first_subscriber_leader.run_action("subscribe", name="test_publication") + await action.wait() + + action = await first_subscriber_leader.run_action("list-subscriptions") + await action.wait() + results = action.results.get("subscriptions") + assert results and "test_publication" in results, ( + "subscription should be listed in list-subscriptions action" + ) + + action = await second_subscriber_leader.run_action("subscribe", name="test_publication") + await action.wait() + + action = await second_subscriber_leader.run_action("list-subscriptions") + await action.wait() + results = action.results.get("subscriptions") + assert results and "test_publication" in results, ( + "subscription should be listed in list-subscriptions action" + ) + + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(status="active", timeout=500) + + action = await publisher_leader.run_action("list-publications") + await action.wait() + results = action.results.get("publications") + assert results and "test_publication | 2" in results, ( + "publication should detect 2 active connections" + ) + + assert await _check_test_data(ops_test, SECOND_DATA_INTEGRATOR_APP_NAME, "first"), ( + "testing table should be copied to the postgresql2 on logical replication setup" + ) + assert await _check_test_data(ops_test, THIRD_DATA_INTEGRATOR_APP_NAME, "first"), ( + "testing table should be copied to the postgresql3 on logical replication setup" + ) + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_logical_replication(ops_test: OpsTest): + await _insert_test_data(ops_test, DATA_INTEGRATOR_APP_NAME, "second") + assert await _check_test_data(ops_test, SECOND_DATA_INTEGRATOR_APP_NAME, "second"), ( + "logical replication should work with postgresql -> postgresql2" + ) + assert await _check_test_data(ops_test, THIRD_DATA_INTEGRATOR_APP_NAME, "second"), ( + "logical replication should work with postgresql -> postgresql3" + ) + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_switchover(ops_test: OpsTest): + publisher_leader = await get_leader_unit(ops_test, DATABASE_APP_NAME) + publisher_roles = await get_cluster_roles(ops_test, publisher_leader.name) + publisher_candidate = ops_test.model.units[publisher_roles["sync_standbys"][0]] + action = await publisher_candidate.run_action("promote-to-primary", scope="unit", force=True) + await action.wait() + + first_subscriber_leader = await get_leader_unit(ops_test, SECOND_DATABASE_APP_NAME) + subscriber_roles = await get_cluster_roles(ops_test, first_subscriber_leader.name) + subscriber_candidate = ops_test.model.units[subscriber_roles["sync_standbys"][0]] + action = await subscriber_candidate.run_action("promote-to-primary", scope="unit", force=True) + await action.wait() + + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(status="active", timeout=500) + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_logical_replication_after_switchover(ops_test: OpsTest): + await _insert_test_data(ops_test, DATA_INTEGRATOR_APP_NAME, "third") + assert await _check_test_data(ops_test, SECOND_DATA_INTEGRATOR_APP_NAME, "third"), ( + "logical replication should work with postgresql -> postgresql2" + ) + assert await _check_test_data(ops_test, THIRD_DATA_INTEGRATOR_APP_NAME, "third"), ( + "logical replication should work with postgresql -> postgresql3" + ) + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_subscriber_removal(ops_test: OpsTest): + await ops_test.model.remove_application(THIRD_DATA_INTEGRATOR_APP_NAME, block_until_done=True) + await ops_test.model.remove_application(THIRD_DATABASE_APP_NAME, block_until_done=True) + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(status="active", timeout=1000) + connection_string = await build_connection_string( + ops_test, + DATA_INTEGRATOR_APP_NAME, + DATA_INTEGRATOR_RELATION, + database=TESTING_DATABASE, + ) + with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: + cursor.execute("SELECT COUNT(1) FROM pg_replication_slots where slot_type='logical';") + assert cursor.fetchone()[0] == 1, ( + "unused replication slot should be removed in the publisher cluster" + ) + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_logical_replication_after_subscriber_removal(ops_test: OpsTest): + await _insert_test_data(ops_test, DATA_INTEGRATOR_APP_NAME, "fourth") + assert await _check_test_data(ops_test, SECOND_DATA_INTEGRATOR_APP_NAME, "fourth"), ( + "logical replication should work with postgresql -> postgresql2" + ) + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_remove_relation(ops_test: OpsTest): + await ops_test.model.applications[DATABASE_APP_NAME].remove_relation( + f"{DATABASE_APP_NAME}:logical-replication-offer", + f"{SECOND_DATABASE_APP_NAME}:logical-replication", + ) + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(status="active", timeout=500) + connection_string = await build_connection_string( + ops_test, + SECOND_DATA_INTEGRATOR_APP_NAME, + DATA_INTEGRATOR_RELATION, + database=TESTING_DATABASE, + ) + with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: + cursor.execute("SELECT COUNT(1) FROM pg_subscription;") + assert cursor.fetchone()[0] == 0, ( + "unused PostgreSQL subscription should be removed in the subscriber cluster" + ) + + +async def _create_test_table(ops_test: OpsTest, data_integrator_app_name: str) -> None: + with ( + psycopg2.connect( + await build_connection_string( + ops_test, + data_integrator_app_name, + DATA_INTEGRATOR_RELATION, + database=TESTING_DATABASE, + ) + ) as connection, + connection.cursor() as cursor, + ): + cursor.execute("CREATE TABLE test_table (test_column text);") + + +async def _insert_test_data(ops_test: OpsTest, data_integrator_app_name: str, data: str) -> None: + with ( + psycopg2.connect( + await build_connection_string( + ops_test, + data_integrator_app_name, + DATA_INTEGRATOR_RELATION, + database=TESTING_DATABASE, + ) + ) as connection, + connection.cursor() as cursor, + ): + cursor.execute( + "INSERT INTO test_table (test_column) VALUES (%s);", + (data,), + ) + + +async def _check_test_data(ops_test: OpsTest, data_integrator_app_name: str, data: str) -> bool: + with ( + psycopg2.connect( + await build_connection_string( + ops_test, + data_integrator_app_name, + DATA_INTEGRATOR_RELATION, + database=TESTING_DATABASE, + ) + ) as connection, + connection.cursor() as cursor, + ): + cursor.execute( + "SELECT EXISTS (SELECT 1 FROM test_table WHERE test_column = %s);", + (data,), + ) + return cursor.fetchone()[0] diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 7de1a502cf..6b9bf314a5 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -1274,6 +1274,7 @@ def test_update_config(harness): patch( "charm.PostgresqlOperatorCharm._handle_postgresql_restart_need" ) as _handle_postgresql_restart_need, + patch("charm.Patroni.ensure_slots_controller_by_patroni"), patch("charm.Patroni.bulk_update_parameters_controller_by_patroni"), patch("charm.Patroni.member_started", new_callable=PropertyMock) as _member_started, patch( @@ -1310,6 +1311,7 @@ def test_update_config(harness): restore_to_latest=False, parameters={"test": "test"}, no_peers=False, + slots=None, ) _handle_postgresql_restart_need.assert_called_once_with(False) assert "tls" not in harness.get_relation_data(rel_id, harness.charm.unit.name) @@ -1334,6 +1336,7 @@ def test_update_config(harness): restore_to_latest=False, parameters={"test": "test"}, no_peers=False, + slots=None, ) _handle_postgresql_restart_need.assert_called_once() assert "tls" not in harness.get_relation_data(