From 5e851ea7a9ea06360b07ebf35622f68ac580628a Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Thu, 27 Feb 2025 23:24:55 +0200 Subject: [PATCH 01/19] Logical replication initial draft. --- actions.yaml | 24 +++ metadata.yaml | 7 + src/charm.py | 2 + src/relations/logical_replication.py | 216 +++++++++++++++++++++++++++ 4 files changed, 249 insertions(+) create mode 100644 src/relations/logical_replication.py diff --git a/actions.yaml b/actions.yaml index 7069c4668e..8a3a7fd076 100644 --- a/actions.yaml +++ b/actions.yaml @@ -1,6 +1,16 @@ # Copyright 2021 Canonical Ltd. # See LICENSE file for licensing details. +# TODO: add descriptions for logical replication actions + +add-publication: + params: + name: + type: string + database: + type: string + tables: + type: string create-backup: description: Creates a backup to s3 storage. params: @@ -30,6 +40,8 @@ get-password: Possible values - operator, replication, rewind, patroni. list-backups: description: Lists backups in s3 storage. +list-publications: +list-subscriptions: pre-upgrade-check: description: Run necessary pre-upgrade checks and preparations before executing a charm refresh. promote-to-primary: @@ -43,6 +55,10 @@ promote-to-primary: force: type: boolean description: Force the promotion of a cluster when there is already a primary cluster. +remove-publication: + params: + name: + type: string restore: description: Restore a database backup using pgBackRest. S3 credentials are retrieved from a relation with the S3 integrator charm. @@ -70,3 +86,11 @@ set-tls-private-key: private-key: type: string description: The content of private key for communications with clients. Content will be auto-generated if this option is not specified. +subscribe: + params: + name: + type: string +unsubscribe: + params: + name: + type: string \ No newline at end of file diff --git a/metadata.yaml b/metadata.yaml index 94cb47ec89..76e9721929 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -30,6 +30,9 @@ provides: interface: postgresql_async limit: 1 optional: true + logical-replication-offer: + interface: postgresql_logical_replication + optional: true database: interface: postgresql_client db: @@ -45,6 +48,10 @@ requires: interface: postgresql_async limit: 1 optional: true + logical-replication: + interface: postgresql_logical_replication + limit: 1 + optional: true certificates: interface: tls-certificates limit: 1 diff --git a/src/charm.py b/src/charm.py index 68dcef9999..c593af00cd 100755 --- a/src/charm.py +++ b/src/charm.py @@ -104,6 +104,7 @@ PostgreSQLAsyncReplication, ) from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides +from relations.logical_replication import PostgreSQLLogicalReplication from relations.postgresql_provider import PostgreSQLProvider from rotate_logs import RotateLogs from upgrade import PostgreSQLUpgrade, get_postgresql_dependencies_model @@ -204,6 +205,7 @@ def __init__(self, *args): self.backup = PostgreSQLBackups(self, "s3-parameters") self.tls = PostgreSQLTLS(self, PEER) self.async_replication = PostgreSQLAsyncReplication(self) + self.logical_replication = PostgreSQLLogicalReplication(self) self.restart_manager = RollingOpsManager( charm=self, relation="restart", callback=self._restart ) diff --git a/src/relations/logical_replication.py b/src/relations/logical_replication.py new file mode 100644 index 0000000000..7a543279a5 --- /dev/null +++ b/src/relations/logical_replication.py @@ -0,0 +1,216 @@ +import json +import logging + +from ops import ActionEvent, Object, Relation, RelationChangedEvent, RelationJoinedEvent + +from constants import APP_SCOPE, REPLICATION_PASSWORD_KEY, REPLICATION_USER + +logger = logging.getLogger(__name__) + +LOGICAL_REPLICATION_OFFER_RELATION = "logical-replication-offer" +LOGICAL_REPLICATION_RELATION = "logical-replication" + +class PostgreSQLLogicalReplication(Object): + def __init__(self, charm): + super().__init__(charm, "postgresql_logical_replication") + self.charm = charm + # Relations + self.charm.framework.observe(self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_joined, + self._on_offer_relation_joined) + self.charm.framework.observe(self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_changed, + self._on_offer_relation_changed) + # Actions + self.charm.framework.observe(self.charm.on.add_publication_action, self._on_add_publication) + self.charm.framework.observe(self.charm.on.list_publications_action, self._on_list_publications) + self.charm.framework.observe(self.charm.on.remove_publication_action, self._on_remove_publication) + self.charm.framework.observe(self.charm.on.subscribe_action, self._on_subscribe) + self.charm.framework.observe(self.charm.on.list_subscriptions_action, self._on_list_subscriptions) + self.charm.framework.observe(self.charm.on.unsubscribe_action, self._on_unsubscribe) + +#region Relations + + def _on_offer_relation_joined(self, event: RelationJoinedEvent): + if not self.charm.unit.is_leader(): + return + + if not self.charm.primary_endpoint: + event.defer() + logger.debug(f"{LOGICAL_REPLICATION_OFFER_RELATION}: joined event deferred as primary is unavailable right now") + return + + # TODO: add primary change check + # TODO: replication-user-secret + event.relation.data[self.model.app].update({ + "publications": self.charm.app_peer_data.get("publications", ""), + "replication-user": REPLICATION_USER, + "replication-user-secret": self.charm.get_secret(APP_SCOPE, REPLICATION_PASSWORD_KEY), + "primary": self.charm.primary_endpoint + }) + + def _on_offer_relation_changed(self, event: RelationChangedEvent): + if not self.charm.unit.is_leader(): + return + + subscriptions_str = event.relation.data[event.app].get("subscriptions", "") + subscriptions = subscriptions_str.split(",") if subscriptions_str else () + replication_slots = self._get_relation_replication_slots(event.relation) + + for subscription in subscriptions: + if subscription not in replication_slots: + # TODO: validation on publication existence + self._add_replication_slot(event.relation, subscription) + + for publication in replication_slots: + if publication not in subscriptions: + self._remove_replication_slot(event.relation, publication) + +#endregion + +#region Actions + + def _on_add_publication(self, event: ActionEvent): + if not self.charm.unit.is_leader(): + event.fail("Publications management can be done only on the leader unit") + return + if not (publication_name := event.params.get("name")): + event.fail("name parameter is required") + return + if not (publication_db := event.params.get("database")): + event.fail("database parameter is required") + return + if not (publication_tables := event.params.get("tables")): + event.fail("tables parameter is required") + return + publications = self._get_publications_from_str(self.charm.app_peer_data.get("publications")) + if publication_name in publications: + event.fail("Such publication already exists") + return + # TODO: check on schema existence + publications[publication_name] = { + "database": publication_db, + "tables": publication_tables.split(",") + } + self._set_publications(publications) + + def _on_list_publications(self, event: ActionEvent): + # TODO: table formatting + if not self.charm.unit.is_leader(): + event.fail("Publications management can be done only on the leader unit") + return + event.set_results({ + "publications": self.charm.app_peer_data.get("publications", "{}") + }) + + def _on_remove_publication(self, event: ActionEvent): + if not self.charm.unit.is_leader(): + event.fail("Publications management can be done only on the leader unit") + return + if not (publication_name := event.params.get("name")): + event.fail("name parameter is required") + return + # TODO: validate to delete only unused publications + publications = self._get_publications_from_str(self.charm.app_peer_data.get("publications")) + if publication_name not in publications: + event.fail("No such publication") + return + del publications[publication_name] + self._set_publications(publications) + + def _on_subscribe(self, event: ActionEvent): + if not self.charm.unit.is_leader(): + event.fail("Subscriptions management can be done only on the leader unit") + return + if not (relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION)): + event.fail("Subscription management can be done only with an active logical replication connection") + return + if not (publication_name := event.params.get("name")): + event.fail("name parameter is required") + return + subscriptions = self._get_str_list(relation.data[self.model.app].get("subscriptions")) + if publication_name in subscriptions: + event.fail("Such subscription already exists") + return + publications = self._get_publications_from_str(relation.data[relation.app].get("publications")) + # TODO: validation on overlaps and existing scheme + if publication_name not in publications: + event.fail("No such publication offered") + return + subscriptions.append(publication_name) + relation.data[self.model.app]["subscriptions"] = ",".join(subscriptions) + + def _on_list_subscriptions(self, event: ActionEvent): + if not self.charm.unit.is_leader(): + event.fail("Subscriptions management can be done only on the leader unit") + return + if not (relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION)): + event.fail("Subscription management can be done only with an active logical replication connection") + return + # TODO: table formatting + event.set_results({ + "subscriptions": relation.data[self.model.app].get("subscriptions", "") + }) + + def _on_unsubscribe(self, event: ActionEvent): + if not self.charm.unit.is_leader(): + event.fail("Subscriptions management can be done only on the leader unit") + return + if not (relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION)): + event.fail("Subscription management can be done only with an active logical replication connection") + return + if not (subscription_name := event.params.get("name")): + event.fail("name parameter is required") + return + subscriptions = self._get_str_list(relation.data[self.model.app].get("subscriptions")) + if subscription_name not in subscriptions: + event.fail("No such subscription") + return + relation.data[self.model.app]["subscriptions"] = ",".join([ + x + for x in self._get_str_list(relation.data[self.model.app].get("subscriptions")) + if x != subscription_name + ]) + # TODO: unsubscribe + +#endregion + + @staticmethod + def _get_publications_from_str(publications_str: str | None) -> dict[str, dict[str, any]]: + return json.loads(publications_str or "{}") + + def _set_publications(self, publications: dict[str, dict[str, any]]): + publications_str = json.dumps(publications) + self.charm.app_peer_data["publications"] = publications_str + for rel in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()): + rel.data[self.model.app]["publications"] = publications_str + + def _get_relation_replication_slots(self, relation: Relation) -> dict[str, str]: + return json.loads(relation.data[self.model.app].get("replication-slots", "{}")) + + @staticmethod + def _get_str_list(list_str: str | None) -> list[str]: + return list_str.split(",") if list_str else [] + + def _add_replication_slot(self, relation: Relation, publication: str): + # TODO: overwrite check + relation_replication_slots = self._get_relation_replication_slots(relation) + global_replication_slots = self._get_str_list(self.charm.app_peer_data.get("replication-slots")) + + # TODO: replication slot random name + new_replication_slot_name = publication + + global_replication_slots.append(new_replication_slot_name) + self.charm.app_peer_data["replication-slots"] = ",".join(global_replication_slots) + relation_replication_slots[publication] = new_replication_slot_name + relation.data[self.model.app]["replication-slots"] = json.dumps(relation_replication_slots) + + # TODO: patroni config update + + def _remove_replication_slot(self, relation: Relation, publication: str): + relation_replication_slots = self._get_relation_replication_slots(relation) + global_replication_slots = self._get_str_list(self.charm.app_peer_data.get("replication-slots")) + replication_slot_name = relation_replication_slots[publication] + global_replication_slots.remove(replication_slot_name) + self.charm.app_peer_data["replication-slots"] = ",".join(global_replication_slots) + del relation_replication_slots[publication] + relation.data[self.model.app]["replication-slots"] = json.dumps(relation_replication_slots) + # TODO: patroni config update From 7cadec0c5d9a8357e076b3ee3d1d4ef8d267813a Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Sat, 1 Mar 2025 19:34:13 +0200 Subject: [PATCH 02/19] Logical replication work. --- lib/charms/postgresql_k8s/v0/postgresql.py | 47 ++++++++- src/charm.py | 1 + src/cluster.py | 3 + src/relations/logical_replication.py | 110 +++++++++++++-------- templates/patroni.yml.j2 | 9 ++ 5 files changed, 127 insertions(+), 43 deletions(-) diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index e395d6892f..0bf028d733 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -35,7 +35,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 43 +LIBPATCH = 44 # Groups to distinguish database permissions PERMISSIONS_GROUP_ADMIN = "admin" @@ -100,6 +100,15 @@ class PostgreSQLListUsersError(Exception): class PostgreSQLUpdateUserPasswordError(Exception): """Exception raised when updating a user password fails.""" +class PostgreSQLDatabaseExistsError(Exception): + """Exception raised during database existence check.""" + +class PostgreSQLTableExistsError(Exception): + """Exception raised during table existence check.""" + +class PostgreSQLIsTableEmptyError(Exception): + """Exception raised during table emptiness check.""" + class PostgreSQL: """Class to encapsulate all operations related to interacting with PostgreSQL instance.""" @@ -632,6 +641,42 @@ def is_restart_pending(self) -> bool: if connection: connection.close() + def database_exists(self, db: str) -> bool: + """Check whether specified database exists.""" + try: + with self._connect_to_database() as connection, connection.cursor() as cursor: + cursor.execute( + SQL("SELECT datname FROM pg_database WHERE datname={};").format(Literal(db)) + ) + return cursor.fetchone() is not None + except psycopg2.Error as e: + logger.error(f"Failed to check Postgresql database existence: {e}") + raise PostgreSQLDatabaseExistsError() from e + + def table_exists(self, db: str | None, schema: str, table: str) -> bool: + """Check whether specified table in database exists.""" + try: + with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + cursor.execute( + SQL("SELECT tablename FROM pg_tables WHERE schemaname={} AND tablename={};").format(Literal(schema), Literal(table)) + ) + return cursor.fetchone() is not None + except psycopg2.Error as e: + logger.error(f"Failed to check Postgresql table existence: {e}") + raise PostgreSQLTableExistsError() from e + + def is_table_empty(self, db: str | None, schema: str, table: str) -> bool: + """Check whether table is empty.""" + try: + with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + cursor.execute( + SQL("SELECT COUNT(1) FROM {};").format(Identifier(schema, table)) + ) + return cursor.fetchone()[0] == 0 + except psycopg2.Error as e: + logger.error(f"Failed to check whether table is empty: {e}") + raise PostgreSQLIsTableEmptyError() from e + @staticmethod def build_postgresql_parameters( config_options: dict, available_memory: int, limit_memory: Optional[int] = None diff --git a/src/charm.py b/src/charm.py index c593af00cd..41320565a7 100755 --- a/src/charm.py +++ b/src/charm.py @@ -1894,6 +1894,7 @@ def update_config(self, is_creating_backup: bool = False, no_peers: bool = False restore_stanza=self.app_peer_data.get("restore-stanza"), parameters=pg_parameters, no_peers=no_peers, + slots=json.loads(self.app_peer_data["replication-slots"]) if "replication-slots" in self.app_peer_data else None ) if no_peers: return True diff --git a/src/cluster.py b/src/cluster.py index b321a4cac4..fa2c8335e0 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -620,6 +620,7 @@ def render_patroni_yml_file( restore_to_latest: bool = False, parameters: dict[str, str] | None = None, no_peers: bool = False, + slots: dict[str, str] | None = None, ) -> None: """Render the Patroni configuration file. @@ -636,6 +637,7 @@ def render_patroni_yml_file( restore_to_latest: restore all the WAL transaction logs from the stanza. parameters: PostgreSQL parameters to be added to the postgresql.conf file. no_peers: Don't include peers. + slots: replication slots (keys) with assigned database name (values). """ # Open the template patroni.yml file. with open("templates/patroni.yml.j2") as file: @@ -678,6 +680,7 @@ def render_patroni_yml_file( extra_replication_endpoints=self.charm.async_replication.get_standby_endpoints(), raft_password=self.raft_password, patroni_password=self.patroni_password, + slots=slots, ) self.render_file(f"{PATRONI_CONF_PATH}/patroni.yaml", rendered, 0o600) diff --git a/src/relations/logical_replication.py b/src/relations/logical_replication.py index 7a543279a5..f094d88c70 100644 --- a/src/relations/logical_replication.py +++ b/src/relations/logical_replication.py @@ -1,7 +1,7 @@ import json import logging -from ops import ActionEvent, Object, Relation, RelationChangedEvent, RelationJoinedEvent +from ops import ActionEvent, Object, RelationChangedEvent, RelationJoinedEvent from constants import APP_SCOPE, REPLICATION_PASSWORD_KEY, REPLICATION_USER @@ -53,16 +53,31 @@ def _on_offer_relation_changed(self, event: RelationChangedEvent): subscriptions_str = event.relation.data[event.app].get("subscriptions", "") subscriptions = subscriptions_str.split(",") if subscriptions_str else () - replication_slots = self._get_relation_replication_slots(event.relation) - - for subscription in subscriptions: - if subscription not in replication_slots: - # TODO: validation on publication existence - self._add_replication_slot(event.relation, subscription) - - for publication in replication_slots: + publications = self._get_publications_from_str(self.charm.app_peer_data.get("publications")) + relation_replication_slots = self._get_replication_slots_from_str( + event.relation.data[self.model.app].get("replication-slots")) + global_replication_slots = self._get_replication_slots_from_str( + self.charm.app_peer_data.get("replication-slots")) + + for publication in subscriptions: + if publication not in publications: + logger.error(f"Logical Replication: requested subscription for non-existing publication {publication}") + continue + if publication not in relation_replication_slots: + replication_slot_name = f"{event.relation.id}_{publication}" + global_replication_slots[replication_slot_name] = publications[publication]["database"] + self.charm.app_peer_data["replication-slots"] = json.dumps(global_replication_slots) + relation_replication_slots[publication] = replication_slot_name + event.relation.data[self.model.app]["replication-slots"] = json.dumps(relation_replication_slots) + for publication in relation_replication_slots: if publication not in subscriptions: - self._remove_replication_slot(event.relation, publication) + replication_slot_name = relation_replication_slots[publication] + del global_replication_slots[replication_slot_name] + self.charm.app_peer_data["replication-slots"] = json.dumps(global_replication_slots) + del relation_replication_slots[publication] + event.relation.data[self.model.app]["replication-slots"] = json.dumps(relation_replication_slots) + + self.charm.update_config() #endregion @@ -85,7 +100,17 @@ def _on_add_publication(self, event: ActionEvent): if publication_name in publications: event.fail("Such publication already exists") return - # TODO: check on schema existence + if not self.charm.postgresql.database_exists(publication_db): + event.fail(f"No such database {publication_db}") + return + for schematable in publication_tables.split(","): + schematable_split = schematable.split(".") + if len(schematable_split) != 2: + event.fail("All tables should be in schema.table format") + return + if not self.charm.postgresql.table_exists(db=publication_db, schema=schematable_split[0], table=schematable_split[1]): + event.fail(f"No such table {schematable} in database {publication_db}") + return publications[publication_name] = { "database": publication_db, "tables": publication_tables.split(",") @@ -131,10 +156,35 @@ def _on_subscribe(self, event: ActionEvent): event.fail("Such subscription already exists") return publications = self._get_publications_from_str(relation.data[relation.app].get("publications")) - # TODO: validation on overlaps and existing scheme - if publication_name not in publications: + subscribing_publication = publications.get(publication_name) + if not subscribing_publication: event.fail("No such publication offered") return + subscribing_database = subscribing_publication["database"] + + if any( + any( + publication_table in subscribing_publication["tables"] + for publication_table in publication_obj["tables"] + ) + for (publication, publication_obj) in publications.items() + if publication in subscriptions and publication_obj["database"] == subscribing_database + ): + event.fail("Tables overlap detected with existing subscriptions") + return + + if not self.charm.postgresql.database_exists(subscribing_database): + event.fail(f"No such database {subscribing_database}") + return + for schematable in subscribing_publication["tables"]: + schematable_split = schematable.split(".") + if not self.charm.postgresql.table_exists(db=subscribing_database, schema=schematable_split[0], table=schematable_split[1]): + event.fail(f"No such table {schematable} in database {subscribing_database}") + return + if not self.charm.postgresql.is_table_empty(db=subscribing_database, schema=schematable_split[0], table=schematable_split[1]): + event.fail(f"Table {schematable} in database {subscribing_database} should be empty before subscribing on it") + return + subscriptions.append(publication_name) relation.data[self.model.app]["subscriptions"] = ",".join(subscriptions) @@ -174,7 +224,7 @@ def _on_unsubscribe(self, event: ActionEvent): #endregion @staticmethod - def _get_publications_from_str(publications_str: str | None) -> dict[str, dict[str, any]]: + def _get_publications_from_str(publications_str: str | None = None) -> dict[str, dict[str, any]]: return json.loads(publications_str or "{}") def _set_publications(self, publications: dict[str, dict[str, any]]): @@ -183,34 +233,10 @@ def _set_publications(self, publications: dict[str, dict[str, any]]): for rel in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()): rel.data[self.model.app]["publications"] = publications_str - def _get_relation_replication_slots(self, relation: Relation) -> dict[str, str]: - return json.loads(relation.data[self.model.app].get("replication-slots", "{}")) + @staticmethod + def _get_replication_slots_from_str(replication_slots_str: str | None = None) -> dict[str, str]: + return json.loads(replication_slots_str or "{}") @staticmethod - def _get_str_list(list_str: str | None) -> list[str]: + def _get_str_list(list_str: str | None = None) -> list[str]: return list_str.split(",") if list_str else [] - - def _add_replication_slot(self, relation: Relation, publication: str): - # TODO: overwrite check - relation_replication_slots = self._get_relation_replication_slots(relation) - global_replication_slots = self._get_str_list(self.charm.app_peer_data.get("replication-slots")) - - # TODO: replication slot random name - new_replication_slot_name = publication - - global_replication_slots.append(new_replication_slot_name) - self.charm.app_peer_data["replication-slots"] = ",".join(global_replication_slots) - relation_replication_slots[publication] = new_replication_slot_name - relation.data[self.model.app]["replication-slots"] = json.dumps(relation_replication_slots) - - # TODO: patroni config update - - def _remove_replication_slot(self, relation: Relation, publication: str): - relation_replication_slots = self._get_relation_replication_slots(relation) - global_replication_slots = self._get_str_list(self.charm.app_peer_data.get("replication-slots")) - replication_slot_name = relation_replication_slots[publication] - global_replication_slots.remove(replication_slot_name) - self.charm.app_peer_data["replication-slots"] = ",".join(global_replication_slots) - del relation_replication_slots[publication] - relation.data[self.model.app]["replication-slots"] = json.dumps(relation_replication_slots) - # TODO: patroni config update diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index 36d4f3ff5f..ef2dd8b9cd 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -108,6 +108,15 @@ bootstrap: {{key}}: {{value}} {%- endfor -%} {% endif %} + {%- if slots %} + slots: + {%- for slot, database in slots.items() %} + {{slot}}: + database: {{database}} + plugin: pgoutput + type: logical + {%- endfor -%} + {% endif %} {%- if restoring_backup %} method: pgbackrest From 8160307e57f86aac071ff16e53d811ff9c1ea863 Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Mon, 3 Mar 2025 18:20:45 +0200 Subject: [PATCH 03/19] Logical replication work. --- lib/charms/postgresql_k8s/v0/postgresql.py | 55 +++++++++++++++++++++- src/charm.py | 3 ++ src/cluster.py | 16 +++++++ src/relations/logical_replication.py | 53 ++++++++++++++++----- 4 files changed, 113 insertions(+), 14 deletions(-) diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index 0bf028d733..776f66355f 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -109,6 +109,15 @@ class PostgreSQLTableExistsError(Exception): class PostgreSQLIsTableEmptyError(Exception): """Exception raised during table emptiness check.""" +class PostgreSQLCreatePublicationError(Exception): + """Exception raised when creating PostgreSQL publication.""" + +class PostgreSQLSubscriptionExistsError(Exception): + """Exception raised during subscription existence check.""" + +class PostgreSQLCreateSubscriptionError(Exception): + """Exception raised when creating PostgreSQL subscription.""" + class PostgreSQL: """Class to encapsulate all operations related to interacting with PostgreSQL instance.""" @@ -653,7 +662,7 @@ def database_exists(self, db: str) -> bool: logger.error(f"Failed to check Postgresql database existence: {e}") raise PostgreSQLDatabaseExistsError() from e - def table_exists(self, db: str | None, schema: str, table: str) -> bool: + def table_exists(self, table: str, schema: str, db: str | None = None) -> bool: """Check whether specified table in database exists.""" try: with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: @@ -665,7 +674,7 @@ def table_exists(self, db: str | None, schema: str, table: str) -> bool: logger.error(f"Failed to check Postgresql table existence: {e}") raise PostgreSQLTableExistsError() from e - def is_table_empty(self, db: str | None, schema: str, table: str) -> bool: + def is_table_empty(self, table: str, schema: str, db: str | None = None) -> bool: """Check whether table is empty.""" try: with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: @@ -677,6 +686,48 @@ def is_table_empty(self, db: str | None, schema: str, table: str) -> bool: logger.error(f"Failed to check whether table is empty: {e}") raise PostgreSQLIsTableEmptyError() from e + def create_publication(self, publication: str, schematables: list[str], db: str | None = None): + """Create PostgreSQL publication.""" + try: + with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + cursor.execute( + SQL("CREATE PUBLICATION {} FOR TABLE {};").format( + Identifier(publication), + SQL(",").join(Identifier(schematable.split(".")[0], schematable.split(".")[1]) for schematable in schematables) + ) + ) + except psycopg2.Error as e: + logger.error(f"Failed to create Postgresql publication: {e}") + raise PostgreSQLCreatePublicationError() from e + + def subscription_exists(self, subscription: str, db: str | None = None) -> bool: + """Check whether specified subscription in database exists.""" + try: + with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + cursor.execute( + SQL("SELECT subname FROM pg_subscription WHERE subname={};").format(Literal(subscription)) + ) + return cursor.fetchone() is not None + except psycopg2.Error as e: + logger.error(f"Failed to check Postgresql subscription existence: {e}") + raise PostgreSQLSubscriptionExistsError() from e + + def create_subscription(self, subscription: str, host: str, db: str, user: str, password: str, replication_slot: str): + """Create PostgreSQL subscription.""" + try: + with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + cursor.execute( + SQL("CREATE SUBSCRIPTION {} CONNECTION {} PUBLICATION {} WITH (copy_data=true,create_slot=false,enabled=true,slot_name={});").format( + Identifier(subscription), + Literal(f"host={host} dbname={db} user={user} password={password}"), + Identifier(subscription), + Identifier(replication_slot) + ) + ) + except psycopg2.Error as e: + logger.error(f"Failed to create Postgresql subscription: {e}") + raise PostgreSQLCreateSubscriptionError() from e + @staticmethod def build_postgresql_parameters( config_options: dict, available_memory: int, limit_memory: Optional[int] = None diff --git a/src/charm.py b/src/charm.py index 41320565a7..6c66b5268c 100755 --- a/src/charm.py +++ b/src/charm.py @@ -1930,6 +1930,9 @@ def update_config(self, is_creating_backup: bool = False, no_peers: bool = False "max_prepared_transactions": self.config.memory_max_prepared_transactions, }) + # TODO: better implementation. + self._patroni.update_slots_controller_by_patroni(json.loads(self.app_peer_data["replication-slots"]) if "replication-slots" in self.app_peer_data else {}) + self._handle_postgresql_restart_need(enable_tls) # Restart the monitoring service if the password was rotated diff --git a/src/cluster.py b/src/cluster.py index fa2c8335e0..c8cf2c8e5a 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -992,6 +992,22 @@ def bulk_update_parameters_controller_by_patroni(self, parameters: dict[str, Any timeout=PATRONI_TIMEOUT, ) + def update_slots_controller_by_patroni(self, slots: dict[str, str]) -> None: + requests.patch( + f"{self._patroni_url}/config", + verify=self.verify, + json={"slots": { + slot: { + "database": database, + "plugin": "pgoutput", + "type": "logical", + } + for slot, database in slots.items() + }}, + auth=self._patroni_auth, + timeout=PATRONI_TIMEOUT, + ) + @property def _synchronous_node_count(self) -> int: planned_units = self.charm.app.planned_units() diff --git a/src/relations/logical_replication.py b/src/relations/logical_replication.py index f094d88c70..49673e2706 100644 --- a/src/relations/logical_replication.py +++ b/src/relations/logical_replication.py @@ -3,7 +3,11 @@ from ops import ActionEvent, Object, RelationChangedEvent, RelationJoinedEvent -from constants import APP_SCOPE, REPLICATION_PASSWORD_KEY, REPLICATION_USER +from constants import ( + APP_SCOPE, + USER, + USER_PASSWORD_KEY, +) logger = logging.getLogger(__name__) @@ -19,6 +23,8 @@ def __init__(self, charm): self._on_offer_relation_joined) self.charm.framework.observe(self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_changed, self._on_offer_relation_changed) + self.charm.framework.observe(self.charm.on[LOGICAL_REPLICATION_RELATION].relation_changed, + self._on_relation_changed) # Actions self.charm.framework.observe(self.charm.on.add_publication_action, self._on_add_publication) self.charm.framework.observe(self.charm.on.list_publications_action, self._on_list_publications) @@ -42,8 +48,10 @@ def _on_offer_relation_joined(self, event: RelationJoinedEvent): # TODO: replication-user-secret event.relation.data[self.model.app].update({ "publications": self.charm.app_peer_data.get("publications", ""), - "replication-user": REPLICATION_USER, - "replication-user-secret": self.charm.get_secret(APP_SCOPE, REPLICATION_PASSWORD_KEY), + # "replication-user": REPLICATION_USER, + # "replication-user-secret": self.charm.get_secret(APP_SCOPE, REPLICATION_PASSWORD_KEY), + "replication-user": USER, + "replication-user-secret": self.charm.get_secret(APP_SCOPE, USER_PASSWORD_KEY), "primary": self.charm.primary_endpoint }) @@ -66,19 +74,38 @@ def _on_offer_relation_changed(self, event: RelationChangedEvent): if publication not in relation_replication_slots: replication_slot_name = f"{event.relation.id}_{publication}" global_replication_slots[replication_slot_name] = publications[publication]["database"] - self.charm.app_peer_data["replication-slots"] = json.dumps(global_replication_slots) relation_replication_slots[publication] = replication_slot_name - event.relation.data[self.model.app]["replication-slots"] = json.dumps(relation_replication_slots) + deleting_replication_slots = [] for publication in relation_replication_slots: if publication not in subscriptions: - replication_slot_name = relation_replication_slots[publication] - del global_replication_slots[replication_slot_name] - self.charm.app_peer_data["replication-slots"] = json.dumps(global_replication_slots) - del relation_replication_slots[publication] - event.relation.data[self.model.app]["replication-slots"] = json.dumps(relation_replication_slots) + deleting_replication_slots.append(publication) + del global_replication_slots[relation_replication_slots[publication]] + for replication_slot in deleting_replication_slots: + del relation_replication_slots[replication_slot] + self.charm.app_peer_data["replication-slots"] = json.dumps(global_replication_slots) + event.relation.data[self.model.app]["replication-slots"] = json.dumps(relation_replication_slots) self.charm.update_config() + def _on_relation_changed(self, event: RelationChangedEvent): + subscriptions_str = event.relation.data[self.model.app].get("subscriptions", "") + subscriptions = subscriptions_str.split(",") if subscriptions_str else () + publications = self._get_publications_from_str(event.relation.data[event.app].get("publications")) + relation_replication_slots = self._get_replication_slots_from_str( + event.relation.data[event.app].get("replication-slots")) + + primary = event.relation.data[event.app].get("primary") + replication_user = event.relation.data[event.app].get("replication-user") + replication_password = event.relation.data[event.app].get("replication-user-secret") + if not primary or not replication_user or not replication_password: + logger.warning("Logical Replication: skipping relation changed event as there is no primary, replication-user and replication-secret data") + return + + for subscription in subscriptions: + db = publications[subscription]["database"] + if subscription in relation_replication_slots and not self.charm.postgresql.subscription_exists(subscription, db): + self.charm.postgresql.create_subscription(subscription, primary, db, replication_user, replication_password, relation_replication_slots[subscription]) + #endregion #region Actions @@ -103,7 +130,8 @@ def _on_add_publication(self, event: ActionEvent): if not self.charm.postgresql.database_exists(publication_db): event.fail(f"No such database {publication_db}") return - for schematable in publication_tables.split(","): + publication_tables_split = publication_tables.split(",") + for schematable in publication_tables_split: schematable_split = schematable.split(".") if len(schematable_split) != 2: event.fail("All tables should be in schema.table format") @@ -111,9 +139,10 @@ def _on_add_publication(self, event: ActionEvent): if not self.charm.postgresql.table_exists(db=publication_db, schema=schematable_split[0], table=schematable_split[1]): event.fail(f"No such table {schematable} in database {publication_db}") return + self.charm.postgresql.create_publication(publication_name, publication_tables_split, publication_db) publications[publication_name] = { "database": publication_db, - "tables": publication_tables.split(",") + "tables": publication_tables_split } self._set_publications(publications) From 2e7e62062cb7a06ae452437b40e8ca2ecaea4cc0 Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Tue, 4 Mar 2025 21:52:44 +0200 Subject: [PATCH 04/19] Logical replication work. Format & lint. --- lib/charms/postgresql_k8s/v0/postgresql.py | 44 ++- src/charm.py | 10 +- src/cluster.py | 17 +- src/relations/logical_replication.py | 302 +++++++++++++-------- 4 files changed, 251 insertions(+), 122 deletions(-) diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index 776f66355f..22122de8a1 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -112,12 +112,18 @@ class PostgreSQLIsTableEmptyError(Exception): class PostgreSQLCreatePublicationError(Exception): """Exception raised when creating PostgreSQL publication.""" +class PostgreSQLDropPublicationError(Exception): + """Exception raised when dropping PostgreSQL publication.""" + class PostgreSQLSubscriptionExistsError(Exception): """Exception raised during subscription existence check.""" class PostgreSQLCreateSubscriptionError(Exception): """Exception raised when creating PostgreSQL subscription.""" +class PostgreSQLDropSubscriptionError(Exception): + """Exception raised when dropping PostgreSQL subscription.""" + class PostgreSQL: """Class to encapsulate all operations related to interacting with PostgreSQL instance.""" @@ -662,7 +668,7 @@ def database_exists(self, db: str) -> bool: logger.error(f"Failed to check Postgresql database existence: {e}") raise PostgreSQLDatabaseExistsError() from e - def table_exists(self, table: str, schema: str, db: str | None = None) -> bool: + def table_exists(self, db: str, schema: str, table: str) -> bool: """Check whether specified table in database exists.""" try: with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: @@ -674,7 +680,7 @@ def table_exists(self, table: str, schema: str, db: str | None = None) -> bool: logger.error(f"Failed to check Postgresql table existence: {e}") raise PostgreSQLTableExistsError() from e - def is_table_empty(self, table: str, schema: str, db: str | None = None) -> bool: + def is_table_empty(self, db: str, schema: str, table: str) -> bool: """Check whether table is empty.""" try: with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: @@ -686,13 +692,13 @@ def is_table_empty(self, table: str, schema: str, db: str | None = None) -> bool logger.error(f"Failed to check whether table is empty: {e}") raise PostgreSQLIsTableEmptyError() from e - def create_publication(self, publication: str, schematables: list[str], db: str | None = None): + def create_publication(self, db: str, name: str, schematables: list[str]) -> None: """Create PostgreSQL publication.""" try: with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: cursor.execute( SQL("CREATE PUBLICATION {} FOR TABLE {};").format( - Identifier(publication), + Identifier(name), SQL(",").join(Identifier(schematable.split(".")[0], schematable.split(".")[1]) for schematable in schematables) ) ) @@ -700,7 +706,20 @@ def create_publication(self, publication: str, schematables: list[str], db: str logger.error(f"Failed to create Postgresql publication: {e}") raise PostgreSQLCreatePublicationError() from e - def subscription_exists(self, subscription: str, db: str | None = None) -> bool: + def drop_publication(self, db: str, publication: str) -> None: + """Drop PostgreSQL publication.""" + try: + with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + cursor.execute( + SQL("DROP PUBLICATION IF EXISTS {};").format( + Identifier(publication), + ) + ) + except psycopg2.Error as e: + logger.error(f"Failed to drop Postgresql publication: {e}") + raise PostgreSQLDropPublicationError() from e + + def subscription_exists(self, db: str, subscription: str) -> bool: """Check whether specified subscription in database exists.""" try: with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: @@ -712,7 +731,7 @@ def subscription_exists(self, subscription: str, db: str | None = None) -> bool: logger.error(f"Failed to check Postgresql subscription existence: {e}") raise PostgreSQLSubscriptionExistsError() from e - def create_subscription(self, subscription: str, host: str, db: str, user: str, password: str, replication_slot: str): + def create_subscription(self, subscription: str, host: str, db: str, user: str, password: str, replication_slot: str) -> None: """Create PostgreSQL subscription.""" try: with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: @@ -728,6 +747,19 @@ def create_subscription(self, subscription: str, host: str, db: str, user: str, logger.error(f"Failed to create Postgresql subscription: {e}") raise PostgreSQLCreateSubscriptionError() from e + def drop_subscription(self, db: str, subscription: str) -> None: + """Drop PostgreSQL subscription.""" + try: + with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + cursor.execute( + SQL("DROP SUBSCRIPTION IF EXISTS {};").format( + Identifier(subscription), + ) + ) + except psycopg2.Error as e: + logger.error(f"Failed to drop Postgresql subscription: {e}") + raise PostgreSQLDropSubscriptionError() from e + @staticmethod def build_postgresql_parameters( config_options: dict, available_memory: int, limit_memory: Optional[int] = None diff --git a/src/charm.py b/src/charm.py index 6c66b5268c..e654dd52bb 100755 --- a/src/charm.py +++ b/src/charm.py @@ -1894,7 +1894,9 @@ def update_config(self, is_creating_backup: bool = False, no_peers: bool = False restore_stanza=self.app_peer_data.get("restore-stanza"), parameters=pg_parameters, no_peers=no_peers, - slots=json.loads(self.app_peer_data["replication-slots"]) if "replication-slots" in self.app_peer_data else None + slots=json.loads(self.app_peer_data["replication-slots"]) + if "replication-slots" in self.app_peer_data + else None, ) if no_peers: return True @@ -1931,7 +1933,11 @@ def update_config(self, is_creating_backup: bool = False, no_peers: bool = False }) # TODO: better implementation. - self._patroni.update_slots_controller_by_patroni(json.loads(self.app_peer_data["replication-slots"]) if "replication-slots" in self.app_peer_data else {}) + self._patroni.update_slots_controller_by_patroni( + json.loads(self.app_peer_data["replication-slots"]) + if "replication-slots" in self.app_peer_data + else {} + ) self._handle_postgresql_restart_need(enable_tls) diff --git a/src/cluster.py b/src/cluster.py index c8cf2c8e5a..8115a0cff3 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -993,17 +993,20 @@ def bulk_update_parameters_controller_by_patroni(self, parameters: dict[str, Any ) def update_slots_controller_by_patroni(self, slots: dict[str, str]) -> None: + """TODO: add proper management and deletion of replication slots.""" requests.patch( f"{self._patroni_url}/config", verify=self.verify, - json={"slots": { - slot: { - "database": database, - "plugin": "pgoutput", - "type": "logical", + json={ + "slots": { + slot: { + "database": database, + "plugin": "pgoutput", + "type": "logical", + } + for slot, database in slots.items() } - for slot, database in slots.items() - }}, + }, auth=self._patroni_auth, timeout=PATRONI_TIMEOUT, ) diff --git a/src/relations/logical_replication.py b/src/relations/logical_replication.py index 49673e2706..95deee0415 100644 --- a/src/relations/logical_replication.py +++ b/src/relations/logical_replication.py @@ -1,3 +1,11 @@ +# Copyright 2025 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Logical Replication implementation. + +TODO: add description after specification is accepted. +""" + import json import logging @@ -14,26 +22,42 @@ LOGICAL_REPLICATION_OFFER_RELATION = "logical-replication-offer" LOGICAL_REPLICATION_RELATION = "logical-replication" + class PostgreSQLLogicalReplication(Object): + """Defines the logical-replication logic.""" + def __init__(self, charm): super().__init__(charm, "postgresql_logical_replication") self.charm = charm # Relations - self.charm.framework.observe(self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_joined, - self._on_offer_relation_joined) - self.charm.framework.observe(self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_changed, - self._on_offer_relation_changed) - self.charm.framework.observe(self.charm.on[LOGICAL_REPLICATION_RELATION].relation_changed, - self._on_relation_changed) + self.charm.framework.observe( + self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_joined, + self._on_offer_relation_joined, + ) + self.charm.framework.observe( + self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_changed, + self._on_offer_relation_changed, + ) + self.charm.framework.observe( + self.charm.on[LOGICAL_REPLICATION_RELATION].relation_changed, self._on_relation_changed + ) # Actions - self.charm.framework.observe(self.charm.on.add_publication_action, self._on_add_publication) - self.charm.framework.observe(self.charm.on.list_publications_action, self._on_list_publications) - self.charm.framework.observe(self.charm.on.remove_publication_action, self._on_remove_publication) + self.charm.framework.observe( + self.charm.on.add_publication_action, self._on_add_publication + ) + self.charm.framework.observe( + self.charm.on.list_publications_action, self._on_list_publications + ) + self.charm.framework.observe( + self.charm.on.remove_publication_action, self._on_remove_publication + ) self.charm.framework.observe(self.charm.on.subscribe_action, self._on_subscribe) - self.charm.framework.observe(self.charm.on.list_subscriptions_action, self._on_list_subscriptions) + self.charm.framework.observe( + self.charm.on.list_subscriptions_action, self._on_list_subscriptions + ) self.charm.framework.observe(self.charm.on.unsubscribe_action, self._on_unsubscribe) -#region Relations + # region Relations def _on_offer_relation_joined(self, event: RelationJoinedEvent): if not self.charm.unit.is_leader(): @@ -41,7 +65,9 @@ def _on_offer_relation_joined(self, event: RelationJoinedEvent): if not self.charm.primary_endpoint: event.defer() - logger.debug(f"{LOGICAL_REPLICATION_OFFER_RELATION}: joined event deferred as primary is unavailable right now") + logger.debug( + f"{LOGICAL_REPLICATION_OFFER_RELATION}: joined event deferred as primary is unavailable right now" + ) return # TODO: add primary change check @@ -52,7 +78,7 @@ def _on_offer_relation_joined(self, event: RelationJoinedEvent): # "replication-user-secret": self.charm.get_secret(APP_SCOPE, REPLICATION_PASSWORD_KEY), "replication-user": USER, "replication-user-secret": self.charm.get_secret(APP_SCOPE, USER_PASSWORD_KEY), - "primary": self.charm.primary_endpoint + "primary": self.charm.primary_endpoint, }) def _on_offer_relation_changed(self, event: RelationChangedEvent): @@ -61,19 +87,27 @@ def _on_offer_relation_changed(self, event: RelationChangedEvent): subscriptions_str = event.relation.data[event.app].get("subscriptions", "") subscriptions = subscriptions_str.split(",") if subscriptions_str else () - publications = self._get_publications_from_str(self.charm.app_peer_data.get("publications")) + publications = self._get_publications_from_str( + self.charm.app_peer_data.get("publications") + ) relation_replication_slots = self._get_replication_slots_from_str( - event.relation.data[self.model.app].get("replication-slots")) + event.relation.data[self.model.app].get("replication-slots") + ) global_replication_slots = self._get_replication_slots_from_str( - self.charm.app_peer_data.get("replication-slots")) + self.charm.app_peer_data.get("replication-slots") + ) for publication in subscriptions: if publication not in publications: - logger.error(f"Logical Replication: requested subscription for non-existing publication {publication}") + logger.error( + f"Logical Replication: requested subscription for non-existing publication {publication}" + ) continue if publication not in relation_replication_slots: replication_slot_name = f"{event.relation.id}_{publication}" - global_replication_slots[replication_slot_name] = publications[publication]["database"] + global_replication_slots[replication_slot_name] = publications[publication][ + "database" + ] relation_replication_slots[publication] = replication_slot_name deleting_replication_slots = [] for publication in relation_replication_slots: @@ -84,65 +118,62 @@ def _on_offer_relation_changed(self, event: RelationChangedEvent): del relation_replication_slots[replication_slot] self.charm.app_peer_data["replication-slots"] = json.dumps(global_replication_slots) - event.relation.data[self.model.app]["replication-slots"] = json.dumps(relation_replication_slots) + event.relation.data[self.model.app]["replication-slots"] = json.dumps( + relation_replication_slots + ) self.charm.update_config() def _on_relation_changed(self, event: RelationChangedEvent): subscriptions_str = event.relation.data[self.model.app].get("subscriptions", "") subscriptions = subscriptions_str.split(",") if subscriptions_str else () - publications = self._get_publications_from_str(event.relation.data[event.app].get("publications")) + publications = self._get_publications_from_str( + event.relation.data[event.app].get("publications") + ) relation_replication_slots = self._get_replication_slots_from_str( - event.relation.data[event.app].get("replication-slots")) + event.relation.data[event.app].get("replication-slots") + ) primary = event.relation.data[event.app].get("primary") replication_user = event.relation.data[event.app].get("replication-user") replication_password = event.relation.data[event.app].get("replication-user-secret") if not primary or not replication_user or not replication_password: - logger.warning("Logical Replication: skipping relation changed event as there is no primary, replication-user and replication-secret data") + logger.warning( + "Logical Replication: skipping relation changed event as there is no primary, replication-user and replication-secret data" + ) return for subscription in subscriptions: db = publications[subscription]["database"] - if subscription in relation_replication_slots and not self.charm.postgresql.subscription_exists(subscription, db): - self.charm.postgresql.create_subscription(subscription, primary, db, replication_user, replication_password, relation_replication_slots[subscription]) - -#endregion - -#region Actions + if ( + subscription in relation_replication_slots + and not self.charm.postgresql.subscription_exists(db, subscription) + ): + self.charm.postgresql.create_subscription( + subscription, + primary, + db, + replication_user, + replication_password, + relation_replication_slots[subscription], + ) + + # endregion + + # region Actions def _on_add_publication(self, event: ActionEvent): - if not self.charm.unit.is_leader(): - event.fail("Publications management can be done only on the leader unit") - return - if not (publication_name := event.params.get("name")): - event.fail("name parameter is required") - return - if not (publication_db := event.params.get("database")): - event.fail("database parameter is required") - return - if not (publication_tables := event.params.get("tables")): - event.fail("tables parameter is required") - return - publications = self._get_publications_from_str(self.charm.app_peer_data.get("publications")) - if publication_name in publications: - event.fail("Such publication already exists") - return - if not self.charm.postgresql.database_exists(publication_db): - event.fail(f"No such database {publication_db}") + if not self._add_publication_validation(event): return - publication_tables_split = publication_tables.split(",") - for schematable in publication_tables_split: - schematable_split = schematable.split(".") - if len(schematable_split) != 2: - event.fail("All tables should be in schema.table format") - return - if not self.charm.postgresql.table_exists(db=publication_db, schema=schematable_split[0], table=schematable_split[1]): - event.fail(f"No such table {schematable} in database {publication_db}") - return - self.charm.postgresql.create_publication(publication_name, publication_tables_split, publication_db) - publications[publication_name] = { - "database": publication_db, - "tables": publication_tables_split + publications = self._get_publications_from_str( + self.charm.app_peer_data.get("publications") + ) + publication_tables_split = event.params["tables"].split(",") + self.charm.postgresql.create_publication( + event.params["database"], event.params["name"], publication_tables_split + ) + publications[event.params["name"]] = { + "database": event.params["database"], + "tables": publication_tables_split, } self._set_publications(publications) @@ -151,9 +182,7 @@ def _on_list_publications(self, event: ActionEvent): if not self.charm.unit.is_leader(): event.fail("Publications management can be done only on the leader unit") return - event.set_results({ - "publications": self.charm.app_peer_data.get("publications", "{}") - }) + event.set_results({"publications": self.charm.app_peer_data.get("publications", "{}")}) def _on_remove_publication(self, event: ActionEvent): if not self.charm.unit.is_leader(): @@ -163,58 +192,43 @@ def _on_remove_publication(self, event: ActionEvent): event.fail("name parameter is required") return # TODO: validate to delete only unused publications - publications = self._get_publications_from_str(self.charm.app_peer_data.get("publications")) + publications = self._get_publications_from_str( + self.charm.app_peer_data.get("publications") + ) if publication_name not in publications: event.fail("No such publication") return del publications[publication_name] self._set_publications(publications) + # TODO: drop publication def _on_subscribe(self, event: ActionEvent): - if not self.charm.unit.is_leader(): - event.fail("Subscriptions management can be done only on the leader unit") - return - if not (relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION)): - event.fail("Subscription management can be done only with an active logical replication connection") - return - if not (publication_name := event.params.get("name")): - event.fail("name parameter is required") - return - subscriptions = self._get_str_list(relation.data[self.model.app].get("subscriptions")) - if publication_name in subscriptions: - event.fail("Such subscription already exists") - return - publications = self._get_publications_from_str(relation.data[relation.app].get("publications")) - subscribing_publication = publications.get(publication_name) - if not subscribing_publication: - event.fail("No such publication offered") + if not self._subscribe_validation(event): return + relation = self.model.get_relation(LOGICAL_REPLICATION_RELATION) + subscribing_publication = self._get_publications_from_str( + relation.data[relation.app]["publications"] + )[event.params["name"]] subscribing_database = subscribing_publication["database"] - - if any( - any( - publication_table in subscribing_publication["tables"] - for publication_table in publication_obj["tables"] - ) - for (publication, publication_obj) in publications.items() - if publication in subscriptions and publication_obj["database"] == subscribing_database - ): - event.fail("Tables overlap detected with existing subscriptions") - return - + subscriptions = self._get_str_list(relation.data[self.model.app].get("subscriptions")) if not self.charm.postgresql.database_exists(subscribing_database): event.fail(f"No such database {subscribing_database}") return for schematable in subscribing_publication["tables"]: schematable_split = schematable.split(".") - if not self.charm.postgresql.table_exists(db=subscribing_database, schema=schematable_split[0], table=schematable_split[1]): + if not self.charm.postgresql.table_exists( + subscribing_database, schematable_split[0], schematable_split[1] + ): event.fail(f"No such table {schematable} in database {subscribing_database}") return - if not self.charm.postgresql.is_table_empty(db=subscribing_database, schema=schematable_split[0], table=schematable_split[1]): - event.fail(f"Table {schematable} in database {subscribing_database} should be empty before subscribing on it") + if not self.charm.postgresql.is_table_empty( + subscribing_database, schematable_split[0], schematable_split[1] + ): + event.fail( + f"Table {schematable} in database {subscribing_database} should be empty before subscribing on it" + ) return - - subscriptions.append(publication_name) + subscriptions.append(event.params["name"]) relation.data[self.model.app]["subscriptions"] = ",".join(subscriptions) def _on_list_subscriptions(self, event: ActionEvent): @@ -222,7 +236,9 @@ def _on_list_subscriptions(self, event: ActionEvent): event.fail("Subscriptions management can be done only on the leader unit") return if not (relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION)): - event.fail("Subscription management can be done only with an active logical replication connection") + event.fail( + "Subscription management can be done only with an active logical replication connection" + ) return # TODO: table formatting event.set_results({ @@ -234,7 +250,9 @@ def _on_unsubscribe(self, event: ActionEvent): event.fail("Subscriptions management can be done only on the leader unit") return if not (relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION)): - event.fail("Subscription management can be done only with an active logical replication connection") + event.fail( + "Subscription management can be done only with an active logical replication connection" + ) return if not (subscription_name := event.params.get("name")): event.fail("name parameter is required") @@ -243,17 +261,85 @@ def _on_unsubscribe(self, event: ActionEvent): if subscription_name not in subscriptions: event.fail("No such subscription") return - relation.data[self.model.app]["subscriptions"] = ",".join([ - x - for x in self._get_str_list(relation.data[self.model.app].get("subscriptions")) - if x != subscription_name - ]) + subscriptions.remove(subscription_name) + relation.data[self.model.app]["subscriptions"] = ",".join(subscriptions) # TODO: unsubscribe -#endregion + # endregion + + def _add_publication_validation(self, event: ActionEvent) -> bool: + if not self.charm.unit.is_leader(): + event.fail("Publications management can be done only on the leader unit") + return False + if not (publication_name := event.params.get("name")): + event.fail("name parameter is required") + return False + if not (publication_db := event.params.get("database")): + event.fail("database parameter is required") + return False + if not (publication_tables := event.params.get("tables")): + event.fail("tables parameter is required") + return False + if publication_name in self._get_publications_from_str( + self.charm.app_peer_data.get("publications") + ): + event.fail("Such publication already exists") + return False + if not self.charm.postgresql.database_exists(publication_db): + event.fail(f"No such database {publication_db}") + return False + for schematable in publication_tables.split(","): + if len(schematable_split := schematable.split(".")) != 2: + event.fail("All tables should be in schema.table format") + return False + if not self.charm.postgresql.table_exists( + publication_db, schematable_split[0], schematable_split[1] + ): + event.fail(f"No such table {schematable} in database {publication_db}") + return False + return True + + def _subscribe_validation(self, event: ActionEvent) -> bool: + if not self.charm.unit.is_leader(): + event.fail("Subscriptions management can be done only on the leader unit") + return False + if not (relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION)): + event.fail( + "Subscription management can be done only with an active logical replication connection" + ) + return False + if not (publication_name := event.params.get("name")): + event.fail("name parameter is required") + return False + subscriptions = self._get_str_list(relation.data[self.model.app].get("subscriptions")) + if publication_name in subscriptions: + event.fail("Such subscription already exists") + return False + publications = self._get_publications_from_str( + relation.data[relation.app].get("publications") + ) + subscribing_publication = publications.get(publication_name) + if not subscribing_publication: + event.fail("No such publication offered") + return False + # Check overlaps with already subscribed publications + if any( + any( + publication_table in subscribing_publication["tables"] + for publication_table in publication_obj["tables"] + ) + for (publication, publication_obj) in publications.items() + if publication in subscriptions + and publication_obj["database"] == subscribing_publication["database"] + ): + event.fail("Tables overlap detected with existing subscriptions") + return False + return True @staticmethod - def _get_publications_from_str(publications_str: str | None = None) -> dict[str, dict[str, any]]: + def _get_publications_from_str( + publications_str: str | None = None, + ) -> dict[str, dict[str, any]]: return json.loads(publications_str or "{}") def _set_publications(self, publications: dict[str, dict[str, any]]): @@ -263,7 +349,9 @@ def _set_publications(self, publications: dict[str, dict[str, any]]): rel.data[self.model.app]["publications"] = publications_str @staticmethod - def _get_replication_slots_from_str(replication_slots_str: str | None = None) -> dict[str, str]: + def _get_replication_slots_from_str( + replication_slots_str: str | None = None, + ) -> dict[str, str]: return json.loads(replication_slots_str or "{}") @staticmethod From 50479e0fb5ae825666703b690e28ab67613b30df Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Wed, 5 Mar 2025 11:51:23 +0200 Subject: [PATCH 05/19] Logical Replication: cleanup of unused subscriptions, publications and replication slots. --- lib/charms/postgresql_k8s/v0/postgresql.py | 8 ++++- src/charm.py | 17 +++++------ src/cluster.py | 34 ++++++++++++++-------- src/relations/logical_replication.py | 22 ++++++++++++-- 4 files changed, 56 insertions(+), 25 deletions(-) diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index 22122de8a1..3f0ec4eed9 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -749,8 +749,11 @@ def create_subscription(self, subscription: str, host: str, db: str, user: str, def drop_subscription(self, db: str, subscription: str) -> None: """Drop PostgreSQL subscription.""" + connection = None try: - with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + connection = self._connect_to_database(database=db) + connection.autocommit = True + with connection.cursor() as cursor: cursor.execute( SQL("DROP SUBSCRIPTION IF EXISTS {};").format( Identifier(subscription), @@ -759,6 +762,9 @@ def drop_subscription(self, db: str, subscription: str) -> None: except psycopg2.Error as e: logger.error(f"Failed to drop Postgresql subscription: {e}") raise PostgreSQLDropSubscriptionError() from e + finally: + if connection is not None: + connection.close() @staticmethod def build_postgresql_parameters( diff --git a/src/charm.py b/src/charm.py index e654dd52bb..b1c92e4db8 100755 --- a/src/charm.py +++ b/src/charm.py @@ -1881,6 +1881,12 @@ def update_config(self, is_creating_backup: bool = False, no_peers: bool = False self.model.config, self.get_available_memory(), limit_memory ) + replication_slots_json = ( + json.loads(self.app_peer_data["replication-slots"]) + if "replication-slots" in self.app_peer_data + else None + ) + # Update and reload configuration based on TLS files availability. self._patroni.render_patroni_yml_file( connectivity=self.unit_peer_data.get("connectivity", "on") == "on", @@ -1894,9 +1900,7 @@ def update_config(self, is_creating_backup: bool = False, no_peers: bool = False restore_stanza=self.app_peer_data.get("restore-stanza"), parameters=pg_parameters, no_peers=no_peers, - slots=json.loads(self.app_peer_data["replication-slots"]) - if "replication-slots" in self.app_peer_data - else None, + slots=replication_slots_json, ) if no_peers: return True @@ -1932,12 +1936,7 @@ def update_config(self, is_creating_backup: bool = False, no_peers: bool = False "max_prepared_transactions": self.config.memory_max_prepared_transactions, }) - # TODO: better implementation. - self._patroni.update_slots_controller_by_patroni( - json.loads(self.app_peer_data["replication-slots"]) - if "replication-slots" in self.app_peer_data - else {} - ) + self._patroni.ensure_slots_controller_by_patroni(replication_slots_json or {}) self._handle_postgresql_restart_need(enable_tls) diff --git a/src/cluster.py b/src/cluster.py index 8115a0cff3..eb294dd1fa 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -992,21 +992,31 @@ def bulk_update_parameters_controller_by_patroni(self, parameters: dict[str, Any timeout=PATRONI_TIMEOUT, ) - def update_slots_controller_by_patroni(self, slots: dict[str, str]) -> None: - """TODO: add proper management and deletion of replication slots.""" + def ensure_slots_controller_by_patroni(self, slots: dict[str, str]) -> None: + """Synchronises slots controlled by Patroni with the provided state by removing unneeded slots and creating new ones. + + Args: + slots: dictionary of slots in the {slot: database} format. + """ + current_config = requests.get( + f"{self._patroni_url}/config", + verify=self.verify, + timeout=API_REQUEST_TIMEOUT, + auth=self._patroni_auth, + ) + slots_patch: dict[str, dict[str, str] | None] = { + slot: None for slot in current_config.json().get("slots", ()) + } + for slot, database in slots.items(): + slots_patch[slot] = { + "database": database, + "plugin": "pgoutput", + "type": "logical", + } requests.patch( f"{self._patroni_url}/config", verify=self.verify, - json={ - "slots": { - slot: { - "database": database, - "plugin": "pgoutput", - "type": "logical", - } - for slot, database in slots.items() - } - }, + json={"slots": slots_patch}, auth=self._patroni_auth, timeout=PATRONI_TIMEOUT, ) diff --git a/src/relations/logical_replication.py b/src/relations/logical_replication.py index 95deee0415..9383383519 100644 --- a/src/relations/logical_replication.py +++ b/src/relations/logical_replication.py @@ -191,16 +191,20 @@ def _on_remove_publication(self, event: ActionEvent): if not (publication_name := event.params.get("name")): event.fail("name parameter is required") return - # TODO: validate to delete only unused publications publications = self._get_publications_from_str( self.charm.app_peer_data.get("publications") ) if publication_name not in publications: event.fail("No such publication") return + if self._count_publication_connections(publication_name): + event.fail("Cannot remove publication while it's in use") + return + self.charm.postgresql.drop_publication( + publications[publication_name]["database"], publication_name + ) del publications[publication_name] self._set_publications(publications) - # TODO: drop publication def _on_subscribe(self, event: ActionEvent): if not self._subscribe_validation(event): @@ -261,9 +265,14 @@ def _on_unsubscribe(self, event: ActionEvent): if subscription_name not in subscriptions: event.fail("No such subscription") return + self.charm.postgresql.drop_subscription( + self._get_publications_from_str(relation.data[relation.app]["publications"])[ + subscription_name + ]["database"], + subscription_name, + ) subscriptions.remove(subscription_name) relation.data[self.model.app]["subscriptions"] = ",".join(subscriptions) - # TODO: unsubscribe # endregion @@ -348,6 +357,13 @@ def _set_publications(self, publications: dict[str, dict[str, any]]): for rel in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()): rel.data[self.model.app]["publications"] = publications_str + def _count_publication_connections(self, publication: str) -> int: + count = 0 + for relation in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()): + if publication in self._get_str_list(relation.data[relation.app].get("subscriptions")): + count += 1 + return count + @staticmethod def _get_replication_slots_from_str( replication_slots_str: str | None = None, From 0e741637b043979d69eaeeaf0200bb02f1b68ddc Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Wed, 12 Mar 2025 14:01:14 +0200 Subject: [PATCH 06/19] Logical Replication: replication slots cleanup, validation improvement. --- lib/charms/postgresql_k8s/v0/postgresql.py | 30 ++-- src/relations/logical_replication.py | 193 ++++++++++++++++----- 2 files changed, 166 insertions(+), 57 deletions(-) diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index 3f0ec4eed9..982372914a 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -115,12 +115,12 @@ class PostgreSQLCreatePublicationError(Exception): class PostgreSQLDropPublicationError(Exception): """Exception raised when dropping PostgreSQL publication.""" -class PostgreSQLSubscriptionExistsError(Exception): - """Exception raised during subscription existence check.""" - class PostgreSQLCreateSubscriptionError(Exception): """Exception raised when creating PostgreSQL subscription.""" +class PostgreSQLSubscriptionExistsError(Exception): + """Exception raised during subscription existence check.""" + class PostgreSQLDropSubscriptionError(Exception): """Exception raised when dropping PostgreSQL subscription.""" @@ -719,18 +719,6 @@ def drop_publication(self, db: str, publication: str) -> None: logger.error(f"Failed to drop Postgresql publication: {e}") raise PostgreSQLDropPublicationError() from e - def subscription_exists(self, db: str, subscription: str) -> bool: - """Check whether specified subscription in database exists.""" - try: - with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: - cursor.execute( - SQL("SELECT subname FROM pg_subscription WHERE subname={};").format(Literal(subscription)) - ) - return cursor.fetchone() is not None - except psycopg2.Error as e: - logger.error(f"Failed to check Postgresql subscription existence: {e}") - raise PostgreSQLSubscriptionExistsError() from e - def create_subscription(self, subscription: str, host: str, db: str, user: str, password: str, replication_slot: str) -> None: """Create PostgreSQL subscription.""" try: @@ -747,6 +735,18 @@ def create_subscription(self, subscription: str, host: str, db: str, user: str, logger.error(f"Failed to create Postgresql subscription: {e}") raise PostgreSQLCreateSubscriptionError() from e + def subscription_exists(self, db: str, subscription: str) -> bool: + """Check whether specified subscription in database exists.""" + try: + with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + cursor.execute( + SQL("SELECT subname FROM pg_subscription WHERE subname={};").format(Literal(subscription)) + ) + return cursor.fetchone() is not None + except psycopg2.Error as e: + logger.error(f"Failed to check Postgresql subscription existence: {e}") + raise PostgreSQLSubscriptionExistsError() from e + def drop_subscription(self, db: str, subscription: str) -> None: """Drop PostgreSQL subscription.""" connection = None diff --git a/src/relations/logical_replication.py b/src/relations/logical_replication.py index 9383383519..8dd2a32239 100644 --- a/src/relations/logical_replication.py +++ b/src/relations/logical_replication.py @@ -9,7 +9,14 @@ import json import logging -from ops import ActionEvent, Object, RelationChangedEvent, RelationJoinedEvent +from ops import ( + ActionEvent, + Object, + RelationBrokenEvent, + RelationChangedEvent, + RelationDepartedEvent, + RelationJoinedEvent, +) from constants import ( APP_SCOPE, @@ -38,9 +45,20 @@ def __init__(self, charm): self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_changed, self._on_offer_relation_changed, ) + self.charm.framework.observe( + self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_broken, + self._on_offer_relation_broken, + ) self.charm.framework.observe( self.charm.on[LOGICAL_REPLICATION_RELATION].relation_changed, self._on_relation_changed ) + self.charm.framework.observe( + self.charm.on[LOGICAL_REPLICATION_RELATION].relation_departed, + self._on_relation_departed, + ) + self.charm.framework.observe( + self.charm.on[LOGICAL_REPLICATION_RELATION].relation_broken, self._on_relation_broken + ) # Actions self.charm.framework.observe( self.charm.on.add_publication_action, self._on_add_publication @@ -62,7 +80,6 @@ def __init__(self, charm): def _on_offer_relation_joined(self, event: RelationJoinedEvent): if not self.charm.unit.is_leader(): return - if not self.charm.primary_endpoint: event.defer() logger.debug( @@ -123,47 +140,119 @@ def _on_offer_relation_changed(self, event: RelationChangedEvent): ) self.charm.update_config() + def _on_offer_relation_departed(self, event: RelationDepartedEvent): + if event.departing_unit == self.charm.unit and self.charm._peers is not None: + self.charm._peers.data[self.charm.unit].update({"departing": "True"}) + + def _on_offer_relation_broken(self, event: RelationBrokenEvent): + if not self.charm._peers or self.charm.is_unit_departing: + logger.debug( + f"{LOGICAL_REPLICATION_OFFER_RELATION}: skipping departing unit in broken event" + ) + return + if not self.charm.unit.is_leader(): + return + + global_replication_slots = self._get_replication_slots_from_str( + self.charm.app_peer_data.get("replication-slots") + ) + if len(global_replication_slots) == 0: + return + + used_replication_slots = [] + for rel in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()): + if rel == event.relation: + continue + used_replication_slots += [ + v + for k, v in self._get_replication_slots_from_str( + rel.data[self.model.app].get("replication-slots") + ).items() + ] + + deleting_replication_slots = [ + k for k, v in global_replication_slots.items() if k not in used_replication_slots + ] + for deleting_replication_slot in deleting_replication_slots: + global_replication_slots.pop(deleting_replication_slot, None) + self.charm.app_peer_data["replication-slots"] = json.dumps(global_replication_slots) + self.charm.update_config() + def _on_relation_changed(self, event: RelationChangedEvent): - subscriptions_str = event.relation.data[self.model.app].get("subscriptions", "") - subscriptions = subscriptions_str.split(",") if subscriptions_str else () + if not self._relation_changed_checks(event): + return publications = self._get_publications_from_str( event.relation.data[event.app].get("publications") ) - relation_replication_slots = self._get_replication_slots_from_str( + replication_slots = self._get_replication_slots_from_str( event.relation.data[event.app].get("replication-slots") ) - - primary = event.relation.data[event.app].get("primary") - replication_user = event.relation.data[event.app].get("replication-user") - replication_password = event.relation.data[event.app].get("replication-user-secret") - if not primary or not replication_user or not replication_password: - logger.warning( - "Logical Replication: skipping relation changed event as there is no primary, replication-user and replication-secret data" - ) - return - - for subscription in subscriptions: + for subscription in self._get_str_list( + event.relation.data[self.model.app].get("subscriptions") + ): db = publications[subscription]["database"] - if ( - subscription in relation_replication_slots - and not self.charm.postgresql.subscription_exists(db, subscription) + if subscription in replication_slots and not self.charm.postgresql.subscription_exists( + db, subscription ): self.charm.postgresql.create_subscription( subscription, - primary, + event.relation.data[event.app]["primary"], db, - replication_user, - replication_password, - relation_replication_slots[subscription], + event.relation.data[event.app]["replication-user"], + event.relation.data[event.app]["replication-user-secret"], + replication_slots[subscription], ) + def _on_relation_departed(self, event: RelationDepartedEvent): + if event.departing_unit == self.charm.unit and self.charm._peers is not None: + self.charm._peers.data[self.charm.unit].update({"departing": "True"}) + + def _on_relation_broken(self, event: RelationBrokenEvent): + if not self.charm._peers or self.charm.is_unit_departing: + logger.debug( + f"{LOGICAL_REPLICATION_RELATION}: skipping departing unit in broken event" + ) + return + if not self.charm.unit.is_leader(): + return + if not self.charm.primary_endpoint: + logger.debug( + f"{LOGICAL_REPLICATION_RELATION}: broken event deferred as primary is unavailable right now" + ) + event.defer() + return False + + publications = self._get_publications_from_str( + event.relation.data[event.app].get("publications") + ) + # TODO: global subscriptions + for subscription in self._get_str_list( + event.relation.data[self.model.app].get("subscriptions") + ): + self.charm.postgresql.drop_subscription( + publications[subscription]["database"], subscription + ) + # endregion # region Actions def _on_add_publication(self, event: ActionEvent): + # TODO: check on max replication slots if not self._add_publication_validation(event): return + if not self.charm.postgresql.database_exists(event.params["database"]): + event.fail(f"No such database {event.params['database']}") + return + for schematable in event.params["tables"].split(","): + if len(schematable_split := schematable.split(".")) != 2: + event.fail("All tables should be in schema.table format") + return + if not self.charm.postgresql.table_exists( + event.params["database"], schematable_split[0], schematable_split[1] + ): + event.fail(f"No such table {schematable} in database {event.params['database']}") + return publications = self._get_publications_from_str( self.charm.app_peer_data.get("publications") ) @@ -188,6 +277,9 @@ def _on_remove_publication(self, event: ActionEvent): if not self.charm.unit.is_leader(): event.fail("Publications management can be done only on the leader unit") return + if not self.charm.primary_endpoint: + event.fail("Publication management can be proceeded only with an active primary") + return False if not (publication_name := event.params.get("name")): event.fail("name parameter is required") return @@ -251,13 +343,16 @@ def _on_list_subscriptions(self, event: ActionEvent): def _on_unsubscribe(self, event: ActionEvent): if not self.charm.unit.is_leader(): - event.fail("Subscriptions management can be done only on the leader unit") + event.fail("Subscriptions management can be proceeded only on the leader unit") return if not (relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION)): event.fail( - "Subscription management can be done only with an active logical replication connection" + "Subscription management can be proceeded only with an active logical replication connection" ) return + if not self.charm.primary_endpoint: + event.fail("Subscription management can be proceeded only with an active primary") + return False if not (subscription_name := event.params.get("name")): event.fail("name parameter is required") return @@ -276,17 +371,40 @@ def _on_unsubscribe(self, event: ActionEvent): # endregion + def _relation_changed_checks(self, event: RelationChangedEvent) -> bool: + if not self.charm.unit.is_leader(): + return False + if not self.charm.primary_endpoint: + logger.debug( + f"{LOGICAL_REPLICATION_RELATION}: changed event deferred as primary is unavailable right now" + ) + event.defer() + return False + if ( + not event.relation.data[event.app].get("primary") + or not event.relation.data[event.app].get("replication-user") + or not event.relation.data[event.app].get("replication-user-secret") + ): + logger.warning( + f"{LOGICAL_REPLICATION_RELATION}: skipping changed event as there is no primary, replication-user or replication-user-secret in the remote application data" + ) + return False + return True + def _add_publication_validation(self, event: ActionEvent) -> bool: if not self.charm.unit.is_leader(): - event.fail("Publications management can be done only on the leader unit") + event.fail("Publications management can be proceeded only on the leader unit") + return False + if not self.charm.primary_endpoint: + event.fail("Publication management can be proceeded only with an active primary") return False if not (publication_name := event.params.get("name")): event.fail("name parameter is required") return False - if not (publication_db := event.params.get("database")): + if not event.params.get("database"): event.fail("database parameter is required") return False - if not (publication_tables := event.params.get("tables")): + if not event.params.get("tables"): event.fail("tables parameter is required") return False if publication_name in self._get_publications_from_str( @@ -294,29 +412,20 @@ def _add_publication_validation(self, event: ActionEvent) -> bool: ): event.fail("Such publication already exists") return False - if not self.charm.postgresql.database_exists(publication_db): - event.fail(f"No such database {publication_db}") - return False - for schematable in publication_tables.split(","): - if len(schematable_split := schematable.split(".")) != 2: - event.fail("All tables should be in schema.table format") - return False - if not self.charm.postgresql.table_exists( - publication_db, schematable_split[0], schematable_split[1] - ): - event.fail(f"No such table {schematable} in database {publication_db}") - return False return True def _subscribe_validation(self, event: ActionEvent) -> bool: if not self.charm.unit.is_leader(): - event.fail("Subscriptions management can be done only on the leader unit") + event.fail("Subscriptions management can be proceeded only on the leader unit") return False if not (relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION)): event.fail( - "Subscription management can be done only with an active logical replication connection" + "Subscription management can be proceeded only with an active logical replication connection" ) return False + if not self.charm.primary_endpoint: + event.fail("Subscription management can be proceeded only with an active primary") + return False if not (publication_name := event.params.get("name")): event.fail("name parameter is required") return False From f776307e00cb165a8c3c1aacc0ad6477b3938891 Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Mon, 17 Mar 2025 12:04:10 +0200 Subject: [PATCH 07/19] Logical Replication: improve cleanup; restrict restore action during logical replication; add check on conflicting subscription name. --- lib/charms/postgresql_k8s/v0/postgresql.py | 20 ++++++---- src/backups.py | 9 +++++ src/relations/logical_replication.py | 44 +++++++++++----------- 3 files changed, 44 insertions(+), 29 deletions(-) diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index aef9148052..98e826f76c 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -748,22 +748,26 @@ def subscription_exists(self, db: str, subscription: str) -> bool: def drop_subscription(self, db: str, subscription: str) -> None: """Drop PostgreSQL subscription.""" - connection = None try: - connection = self._connect_to_database(database=db) - connection.autocommit = True - with connection.cursor() as cursor: + with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + cursor.execute( + SQL("ALTER SUBSCRIPTION {} DISABLE;").format( + Identifier(subscription), + ) + ) cursor.execute( - SQL("DROP SUBSCRIPTION IF EXISTS {};").format( + SQL("ALTER SUBSCRIPTION {} SET (slot_name=NONE);").format( + Identifier(subscription), + ) + ) + cursor.execute( + SQL("DROP SUBSCRIPTION {};").format( Identifier(subscription), ) ) except psycopg2.Error as e: logger.error(f"Failed to drop Postgresql subscription: {e}") raise PostgreSQLDropSubscriptionError() from e - finally: - if connection is not None: - connection.close() @staticmethod def build_postgresql_parameters( diff --git a/src/backups.py b/src/backups.py index 903ba028f6..6dfe4ff9a9 100644 --- a/src/backups.py +++ b/src/backups.py @@ -44,6 +44,7 @@ PGBACKREST_LOGS_PATH, POSTGRESQL_DATA_PATH, ) +from relations.logical_replication import LOGICAL_REPLICATION_RELATION logger = logging.getLogger(__name__) @@ -1218,6 +1219,14 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool: event.fail(error_message) return False + if self.model.get_relation(LOGICAL_REPLICATION_RELATION): + error_message = ( + "Cannot proceed with restore with an active logical-replication connection" + ) + logger.error(f"Restore failed: {error_message}") + event.fail(error_message) + return False + return True def _render_pgbackrest_conf_file(self) -> bool: diff --git a/src/relations/logical_replication.py b/src/relations/logical_replication.py index 8dd2a32239..1a3c878236 100644 --- a/src/relations/logical_replication.py +++ b/src/relations/logical_replication.py @@ -107,10 +107,10 @@ def _on_offer_relation_changed(self, event: RelationChangedEvent): publications = self._get_publications_from_str( self.charm.app_peer_data.get("publications") ) - relation_replication_slots = self._get_replication_slots_from_str( + relation_replication_slots = self._get_dict_from_str( event.relation.data[self.model.app].get("replication-slots") ) - global_replication_slots = self._get_replication_slots_from_str( + global_replication_slots = self._get_dict_from_str( self.charm.app_peer_data.get("replication-slots") ) @@ -126,13 +126,10 @@ def _on_offer_relation_changed(self, event: RelationChangedEvent): "database" ] relation_replication_slots[publication] = replication_slot_name - deleting_replication_slots = [] - for publication in relation_replication_slots: + for publication in relation_replication_slots.copy(): if publication not in subscriptions: - deleting_replication_slots.append(publication) del global_replication_slots[relation_replication_slots[publication]] - for replication_slot in deleting_replication_slots: - del relation_replication_slots[replication_slot] + del relation_replication_slots[publication] self.charm.app_peer_data["replication-slots"] = json.dumps(global_replication_slots) event.relation.data[self.model.app]["replication-slots"] = json.dumps( @@ -153,7 +150,7 @@ def _on_offer_relation_broken(self, event: RelationBrokenEvent): if not self.charm.unit.is_leader(): return - global_replication_slots = self._get_replication_slots_from_str( + global_replication_slots = self._get_dict_from_str( self.charm.app_peer_data.get("replication-slots") ) if len(global_replication_slots) == 0: @@ -165,7 +162,7 @@ def _on_offer_relation_broken(self, event: RelationBrokenEvent): continue used_replication_slots += [ v - for k, v in self._get_replication_slots_from_str( + for k, v in self._get_dict_from_str( rel.data[self.model.app].get("replication-slots") ).items() ] @@ -184,9 +181,12 @@ def _on_relation_changed(self, event: RelationChangedEvent): publications = self._get_publications_from_str( event.relation.data[event.app].get("publications") ) - replication_slots = self._get_replication_slots_from_str( + replication_slots = self._get_dict_from_str( event.relation.data[event.app].get("replication-slots") ) + global_subscriptions = self._get_dict_from_str( + self.charm.app_peer_data.get("subscriptions") + ) for subscription in self._get_str_list( event.relation.data[self.model.app].get("subscriptions") ): @@ -202,6 +202,8 @@ def _on_relation_changed(self, event: RelationChangedEvent): event.relation.data[event.app]["replication-user-secret"], replication_slots[subscription], ) + global_subscriptions[subscription] = db + self.charm.app_peer_data["subscriptions"] = json.dumps(global_subscriptions) def _on_relation_departed(self, event: RelationDepartedEvent): if event.departing_unit == self.charm.unit and self.charm._peers is not None: @@ -222,16 +224,11 @@ def _on_relation_broken(self, event: RelationBrokenEvent): event.defer() return False - publications = self._get_publications_from_str( - event.relation.data[event.app].get("publications") - ) - # TODO: global subscriptions - for subscription in self._get_str_list( - event.relation.data[self.model.app].get("subscriptions") - ): - self.charm.postgresql.drop_subscription( - publications[subscription]["database"], subscription - ) + subscriptions = self._get_dict_from_str(self.charm.app_peer_data.get("subscriptions")) + for subscription, db in subscriptions.copy().items(): + self.charm.postgresql.drop_subscription(db, subscription) + del subscriptions[subscription] + self.charm.app_peer_data["subscriptions"] = json.dumps(subscriptions) # endregion @@ -310,6 +307,11 @@ def _on_subscribe(self, event: ActionEvent): if not self.charm.postgresql.database_exists(subscribing_database): event.fail(f"No such database {subscribing_database}") return + if self.charm.postgresql.subscription_exists(subscribing_database, event.params["name"]): + event.fail( + f"PostgreSQL subscription with conflicting name {event.params['name']} already exists in the database {subscribing_database}" + ) + return for schematable in subscribing_publication["tables"]: schematable_split = schematable.split(".") if not self.charm.postgresql.table_exists( @@ -474,7 +476,7 @@ def _count_publication_connections(self, publication: str) -> int: return count @staticmethod - def _get_replication_slots_from_str( + def _get_dict_from_str( replication_slots_str: str | None = None, ) -> dict[str, str]: return json.loads(replication_slots_str or "{}") From 43f42a5e18c99f7b3a290bab3376a6827e146c71 Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Mon, 17 Mar 2025 23:34:15 +0200 Subject: [PATCH 08/19] Logical Replication: initial primary change detection. --- src/relations/logical_replication.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/relations/logical_replication.py b/src/relations/logical_replication.py index 1a3c878236..f16deffe56 100644 --- a/src/relations/logical_replication.py +++ b/src/relations/logical_replication.py @@ -18,6 +18,7 @@ RelationJoinedEvent, ) +from cluster_topology_observer import ClusterTopologyChangeEvent from constants import ( APP_SCOPE, USER, @@ -59,6 +60,9 @@ def __init__(self, charm): self.charm.framework.observe( self.charm.on[LOGICAL_REPLICATION_RELATION].relation_broken, self._on_relation_broken ) + self.charm.framework.observe( + self.charm.on.cluster_topology_change, self._on_cluster_topology_change + ) # Actions self.charm.framework.observe( self.charm.on.add_publication_action, self._on_add_publication @@ -230,6 +234,15 @@ def _on_relation_broken(self, event: RelationBrokenEvent): del subscriptions[subscription] self.charm.app_peer_data["subscriptions"] = json.dumps(subscriptions) + def _on_cluster_topology_change(self, event: ClusterTopologyChangeEvent): + if not self.charm.unit.is_leader(): + return + if not self.charm.primary_endpoint: + return + primary = self.charm.primary_endpoint + for rel in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()): + rel.data[self.model.app]["primary"] = primary + # endregion # region Actions From 7fb4f6fba04dc9b1893e5a00eadbb2ba2f000aea Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Wed, 19 Mar 2025 15:09:44 +0200 Subject: [PATCH 09/19] Logical Replication: add primary switchover and secrets change propagation; fix relations bugs. --- lib/charms/postgresql_k8s/v0/postgresql.py | 17 +++ src/relations/logical_replication.py | 122 +++++++++++++++++---- 2 files changed, 116 insertions(+), 23 deletions(-) diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index 98e826f76c..b8582275f8 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -121,6 +121,9 @@ class PostgreSQLCreateSubscriptionError(Exception): class PostgreSQLSubscriptionExistsError(Exception): """Exception raised during subscription existence check.""" +class PostgreSQLUpdateSubscriptionError(Exception): + """Exception raised when updating PostgreSQL subscription.""" + class PostgreSQLDropSubscriptionError(Exception): """Exception raised when dropping PostgreSQL subscription.""" @@ -746,6 +749,20 @@ def subscription_exists(self, db: str, subscription: str) -> bool: logger.error(f"Failed to check Postgresql subscription existence: {e}") raise PostgreSQLSubscriptionExistsError() from e + def update_subscription(self, db: str, subscription: str, host: str, user: str, password: str): + """Update PostgreSQL subscription connection details.""" + try: + with self._connect_to_database(database=db) as connection, connection.cursor() as cursor: + cursor.execute( + SQL("ALTER SUBSCRIPTION {} CONNECTION {}").format( + Identifier(subscription), + Literal(f"host={host} dbname={db} user={user} password={password}"), + ) + ) + except psycopg2.Error as e: + logger.error(f"Failed to update Postgresql subscription: {e}") + raise PostgreSQLUpdateSubscriptionError() from e + def drop_subscription(self, db: str, subscription: str) -> None: """Drop PostgreSQL subscription.""" try: diff --git a/src/relations/logical_replication.py b/src/relations/logical_replication.py index f16deffe56..d5d5259a69 100644 --- a/src/relations/logical_replication.py +++ b/src/relations/logical_replication.py @@ -11,16 +11,21 @@ from ops import ( ActionEvent, + LeaderElectedEvent, Object, RelationBrokenEvent, RelationChangedEvent, RelationDepartedEvent, RelationJoinedEvent, + Secret, + SecretChangedEvent, + SecretNotFoundError, ) from cluster_topology_observer import ClusterTopologyChangeEvent from constants import ( APP_SCOPE, + PEER, USER, USER_PASSWORD_KEY, ) @@ -29,6 +34,7 @@ LOGICAL_REPLICATION_OFFER_RELATION = "logical-replication-offer" LOGICAL_REPLICATION_RELATION = "logical-replication" +SECRET_LABEL = "logical-replication-secret" # noqa: S105 class PostgreSQLLogicalReplication(Object): @@ -46,6 +52,10 @@ def __init__(self, charm): self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_changed, self._on_offer_relation_changed, ) + self.charm.framework.observe( + self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_departed, + self._on_offer_relation_departed, + ) self.charm.framework.observe( self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_broken, self._on_offer_relation_broken, @@ -60,9 +70,14 @@ def __init__(self, charm): self.charm.framework.observe( self.charm.on[LOGICAL_REPLICATION_RELATION].relation_broken, self._on_relation_broken ) + # Events self.charm.framework.observe( self.charm.on.cluster_topology_change, self._on_cluster_topology_change ) + self.charm.framework.observe( + self.charm.on.leader_elected, self._on_cluster_topology_change + ) + self.framework.observe(self.charm.on.secret_changed, self._on_secret_changed) # Actions self.charm.framework.observe( self.charm.on.add_publication_action, self._on_add_publication @@ -91,15 +106,11 @@ def _on_offer_relation_joined(self, event: RelationJoinedEvent): ) return - # TODO: add primary change check - # TODO: replication-user-secret + secret = self._get_secret() + secret.grant(event.relation) event.relation.data[self.model.app].update({ "publications": self.charm.app_peer_data.get("publications", ""), - # "replication-user": REPLICATION_USER, - # "replication-user-secret": self.charm.get_secret(APP_SCOPE, REPLICATION_PASSWORD_KEY), - "replication-user": USER, - "replication-user-secret": self.charm.get_secret(APP_SCOPE, USER_PASSWORD_KEY), - "primary": self.charm.primary_endpoint, + "secret-id": secret.id, }) def _on_offer_relation_changed(self, event: RelationChangedEvent): @@ -143,7 +154,7 @@ def _on_offer_relation_changed(self, event: RelationChangedEvent): def _on_offer_relation_departed(self, event: RelationDepartedEvent): if event.departing_unit == self.charm.unit and self.charm._peers is not None: - self.charm._peers.data[self.charm.unit].update({"departing": "True"}) + self.charm.unit_peer_data.update({"departing": "True"}) def _on_offer_relation_broken(self, event: RelationBrokenEvent): if not self.charm._peers or self.charm.is_unit_departing: @@ -162,7 +173,7 @@ def _on_offer_relation_broken(self, event: RelationBrokenEvent): used_replication_slots = [] for rel in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()): - if rel == event.relation: + if rel.id == event.relation.id: continue used_replication_slots += [ v @@ -182,6 +193,9 @@ def _on_offer_relation_broken(self, event: RelationBrokenEvent): def _on_relation_changed(self, event: RelationChangedEvent): if not self._relation_changed_checks(event): return + secret_content = self.model.get_secret( + id=event.relation.data[event.app]["secret-id"], label=SECRET_LABEL + ).get_content(refresh=True) publications = self._get_publications_from_str( event.relation.data[event.app].get("publications") ) @@ -200,10 +214,10 @@ def _on_relation_changed(self, event: RelationChangedEvent): ): self.charm.postgresql.create_subscription( subscription, - event.relation.data[event.app]["primary"], + secret_content["logical-replication-primary"], db, - event.relation.data[event.app]["replication-user"], - event.relation.data[event.app]["replication-user-secret"], + secret_content["logical-replication-user"], + secret_content["logical-replication-password"], replication_slots[subscription], ) global_subscriptions[subscription] = db @@ -211,7 +225,7 @@ def _on_relation_changed(self, event: RelationChangedEvent): def _on_relation_departed(self, event: RelationDepartedEvent): if event.departing_unit == self.charm.unit and self.charm._peers is not None: - self.charm._peers.data[self.charm.unit].update({"departing": "True"}) + self.charm.unit_peer_data.update({"departing": "True"}) def _on_relation_broken(self, event: RelationBrokenEvent): if not self.charm._peers or self.charm.is_unit_departing: @@ -234,14 +248,58 @@ def _on_relation_broken(self, event: RelationBrokenEvent): del subscriptions[subscription] self.charm.app_peer_data["subscriptions"] = json.dumps(subscriptions) - def _on_cluster_topology_change(self, event: ClusterTopologyChangeEvent): + # endregion + + # region Events + + def _on_cluster_topology_change(self, event: ClusterTopologyChangeEvent | LeaderElectedEvent): if not self.charm.unit.is_leader(): return + if not len(self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ())): + return if not self.charm.primary_endpoint: + event.defer() return - primary = self.charm.primary_endpoint - for rel in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()): - rel.data[self.model.app]["primary"] = primary + self._get_secret() + + def _on_secret_changed(self, event: SecretChangedEvent): + if not self.charm.unit.is_leader(): + return + if not self.charm.primary_endpoint: + event.defer() + return + + if ( + len(self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ())) + and event.secret.label == f"{PEER}.{self.model.app.name}.app" + ): + logger.info("Internal secret changed, updating logical replication secret") + self._get_secret() + + if ( + relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION) + ) and event.secret.label == SECRET_LABEL: + logger.info("Logical replication secret changed, updating subscriptions") + secret_content = self.model.get_secret( + id=relation.data[relation.app]["secret-id"], label=SECRET_LABEL + ).get_content(refresh=True) + replication_slots = self._get_dict_from_str( + relation.data[relation.app].get("replication-slots") + ) + publications = self._get_publications_from_str( + relation.data[relation.app].get("publications") + ) + for subscription in self._get_str_list( + relation.data[self.model.app].get("subscriptions") + ): + if subscription in replication_slots: + self.charm.postgresql.update_subscription( + publications[subscription]["database"], + subscription, + secret_content["logical-replication-primary"], + secret_content["logical-replication-user"], + secret_content["logical-replication-password"], + ) # endregion @@ -395,13 +453,9 @@ def _relation_changed_checks(self, event: RelationChangedEvent) -> bool: ) event.defer() return False - if ( - not event.relation.data[event.app].get("primary") - or not event.relation.data[event.app].get("replication-user") - or not event.relation.data[event.app].get("replication-user-secret") - ): + if not event.relation.data[event.app].get("secret-id"): logger.warning( - f"{LOGICAL_REPLICATION_RELATION}: skipping changed event as there is no primary, replication-user or replication-user-secret in the remote application data" + f"{LOGICAL_REPLICATION_RELATION}: skipping changed event as there is no secret id in the remote application data" ) return False return True @@ -469,6 +523,28 @@ def _subscribe_validation(self, event: ActionEvent) -> bool: return False return True + def _get_secret(self) -> Secret: + """Returns logical replication secret. Updates, if content changed.""" + shared_content = { + "logical-replication-user": USER, + "logical-replication-password": self.charm.get_secret(APP_SCOPE, USER_PASSWORD_KEY), + "logical-replication-primary": self.charm.primary_endpoint, + } + try: + # Avoid recreating the secret. + secret = self.charm.model.get_secret(label=SECRET_LABEL) + if not secret.id: + # Workaround for the secret id not being set with model uuid. + secret._id = f"secret://{self.model.uuid}/{secret.get_info().id.split(':')[1]}" + if secret.peek_content() != shared_content: + logger.info("Updating outdated secret content") + secret.set_content(shared_content) + return secret + except SecretNotFoundError: + logger.debug("Secret not found, creating a new one") + pass + return self.charm.model.app.add_secret(content=shared_content, label=SECRET_LABEL) + @staticmethod def _get_publications_from_str( publications_str: str | None = None, From 1ccc21afafeb8d5daadf1479fc53ce4017beee12 Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Fri, 21 Mar 2025 16:18:48 +0200 Subject: [PATCH 10/19] Logical Replication: format tables for publications and subscriptions lists. --- src/relations/logical_replication.py | 53 ++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 7 deletions(-) diff --git a/src/relations/logical_replication.py b/src/relations/logical_replication.py index d5d5259a69..f8602d9791 100644 --- a/src/relations/logical_replication.py +++ b/src/relations/logical_replication.py @@ -8,6 +8,7 @@ import json import logging +import re from ops import ( ActionEvent, @@ -306,7 +307,6 @@ def _on_secret_changed(self, event: SecretChangedEvent): # region Actions def _on_add_publication(self, event: ActionEvent): - # TODO: check on max replication slots if not self._add_publication_validation(event): return if not self.charm.postgresql.database_exists(event.params["database"]): @@ -335,11 +335,31 @@ def _on_add_publication(self, event: ActionEvent): self._set_publications(publications) def _on_list_publications(self, event: ActionEvent): - # TODO: table formatting if not self.charm.unit.is_leader(): event.fail("Publications management can be done only on the leader unit") return - event.set_results({"publications": self.charm.app_peer_data.get("publications", "{}")}) + publications = [ + ( + publication, + str(self._count_publication_connections(publication)), + publication_obj["database"], + ",".join(publication_obj["tables"]), + ) + for publication, publication_obj in self._get_publications_from_str( + self.charm.app_peer_data.get("publications") + ).items() + ] + name_len = max([4, *[len(publication[0]) for publication in publications]]) + database_len = max([8, *[len(publication[2]) for publication in publications]]) + header = ( + f"{'name':<{name_len}s} | active_connections | {'database':<{database_len}s} | tables" + ) + res = [header, "-" * len(header)] + for name, active_connections, database, tables in publications: + res.append( + f"{name:<{name_len}s} | {active_connections:<18s} | {database:<{database_len}s} | {tables:s}" + ) + event.set_results({"publications": "\n".join(res)}) def _on_remove_publication(self, event: ActionEvent): if not self.charm.unit.is_leader(): @@ -409,10 +429,26 @@ def _on_list_subscriptions(self, event: ActionEvent): "Subscription management can be done only with an active logical replication connection" ) return - # TODO: table formatting - event.set_results({ - "subscriptions": relation.data[self.model.app].get("subscriptions", "") - }) + publications = self._get_publications_from_str( + relation.data[relation.app].get("publications") + ) + subscriptions = [ + ( + subscription, + publications.get(subscription, {}).get("database"), + ",".join(publications.get(subscription, {}).get("tables", ())), + ) + for subscription in self._get_str_list( + relation.data[self.model.app].get("subscriptions") + ) + ] + name_len = max([4, *[len(subscription[0]) for subscription in subscriptions]]) + database_len = max([8, *[len(subscription[1]) for subscription in subscriptions]]) + header = f"{'name':<{name_len}s} | {'database':<{database_len}s} | tables" + res = [header, "-" * len(header)] + for name, database, tables in subscriptions: + res.append(f"{name:<{name_len}s} | {database:<{database_len}s} | {tables:s}") + event.set_results({"subscriptions": "\n".join(res)}) def _on_unsubscribe(self, event: ActionEvent): if not self.charm.unit.is_leader(): @@ -470,6 +506,9 @@ def _add_publication_validation(self, event: ActionEvent) -> bool: if not (publication_name := event.params.get("name")): event.fail("name parameter is required") return False + if not re.match(r"^[a-zA-Z0-9_]+$", publication_name): + event.fail("name should consist of english letters, numbers and underscore") + return False if not event.params.get("database"): event.fail("database parameter is required") return False From 6d996382796c7df091959a9bac0640f9868f12cf Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Fri, 21 Mar 2025 16:43:19 +0200 Subject: [PATCH 11/19] Sync postgresql lib from K8s. --- lib/charms/postgresql_k8s/v0/postgresql.py | 148 +++++++++++++++++- .../postgresql_k8s/v0/postgresql_tls.py | 87 ++++++++-- 2 files changed, 222 insertions(+), 13 deletions(-) diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index b8582275f8..28d8c59b9d 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -35,7 +35,19 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 46 +LIBPATCH = 48 + +# Groups to distinguish HBA access +ACCESS_GROUP_IDENTITY = "identity_access" +ACCESS_GROUP_INTERNAL = "internal_access" +ACCESS_GROUP_RELATION = "relation_access" + +# List of access groups to filter role assignments by +ACCESS_GROUPS = [ + ACCESS_GROUP_IDENTITY, + ACCESS_GROUP_INTERNAL, + ACCESS_GROUP_RELATION, +] # Groups to distinguish database permissions PERMISSIONS_GROUP_ADMIN = "admin" @@ -57,10 +69,18 @@ logger = logging.getLogger(__name__) +class PostgreSQLAssignGroupError(Exception): + """Exception raised when assigning to a group fails.""" + + class PostgreSQLCreateDatabaseError(Exception): """Exception raised when creating a database fails.""" +class PostgreSQLCreateGroupError(Exception): + """Exception raised when creating a group fails.""" + + class PostgreSQLCreateUserError(Exception): """Exception raised when creating a user fails.""" @@ -93,6 +113,10 @@ class PostgreSQLGetPostgreSQLVersionError(Exception): """Exception raised when retrieving PostgreSQL version fails.""" +class PostgreSQLListGroupsError(Exception): + """Exception raised when retrieving PostgreSQL groups list fails.""" + + class PostgreSQLListUsersError(Exception): """Exception raised when retrieving PostgreSQL users list fails.""" @@ -187,6 +211,24 @@ def _connect_to_database( connection.autocommit = True return connection + def create_access_groups(self) -> None: + """Create access groups to distinguish HBA authentication methods.""" + connection = None + try: + with self._connect_to_database() as connection, connection.cursor() as cursor: + for group in ACCESS_GROUPS: + cursor.execute( + SQL("CREATE ROLE {} NOLOGIN;").format( + Identifier(group), + ) + ) + except psycopg2.Error as e: + logger.error(f"Failed to create access groups: {e}") + raise PostgreSQLCreateGroupError() from e + finally: + if connection is not None: + connection.close() + def create_database( self, database: str, @@ -348,6 +390,50 @@ def delete_user(self, user: str) -> None: logger.error(f"Failed to delete user: {e}") raise PostgreSQLDeleteUserError() from e + def grant_internal_access_group_memberships(self) -> None: + """Grant membership to the internal access-group to existing internal users.""" + connection = None + try: + with self._connect_to_database() as connection, connection.cursor() as cursor: + for user in self.system_users: + cursor.execute( + SQL("GRANT {} TO {};").format( + Identifier(ACCESS_GROUP_INTERNAL), + Identifier(user), + ) + ) + except psycopg2.Error as e: + logger.error(f"Failed to grant internal access group memberships: {e}") + raise PostgreSQLAssignGroupError() from e + finally: + if connection is not None: + connection.close() + + def grant_relation_access_group_memberships(self) -> None: + """Grant membership to the relation access-group to existing relation users.""" + rel_users = self.list_users_from_relation() + if not rel_users: + return + + connection = None + try: + with self._connect_to_database() as connection, connection.cursor() as cursor: + rel_groups = SQL(",").join(Identifier(group) for group in [ACCESS_GROUP_RELATION]) + rel_users = SQL(",").join(Identifier(user) for user in rel_users) + + cursor.execute( + SQL("GRANT {groups} TO {users};").format( + groups=rel_groups, + users=rel_users, + ) + ) + except psycopg2.Error as e: + logger.error(f"Failed to grant relation access group memberships: {e}") + raise PostgreSQLAssignGroupError() from e + finally: + if connection is not None: + connection.close() + def enable_disable_extensions( self, extensions: Dict[str, bool], database: Optional[str] = None ) -> None: @@ -510,6 +596,19 @@ def get_postgresql_timezones(self) -> Set[str]: timezones = cursor.fetchall() return {timezone[0] for timezone in timezones} + def get_postgresql_default_table_access_methods(self) -> Set[str]: + """Returns the PostgreSQL available table access methods. + + Returns: + Set of PostgreSQL table access methods. + """ + with self._connect_to_database( + database_host=self.current_host + ) as connection, connection.cursor() as cursor: + cursor.execute("SELECT amname FROM pg_am WHERE amtype = 't';") + access_methods = cursor.fetchall() + return {access_method[0] for access_method in access_methods} + def get_postgresql_version(self, current_host=True) -> str: """Returns the PostgreSQL version. @@ -548,6 +647,26 @@ def is_tls_enabled(self, check_current_host: bool = False) -> bool: # Connection errors happen when PostgreSQL has not started yet. return False + def list_access_groups(self) -> Set[str]: + """Returns the list of PostgreSQL database access groups. + + Returns: + List of PostgreSQL database access groups. + """ + try: + with self._connect_to_database() as connection, connection.cursor() as cursor: + cursor.execute( + "SELECT groname FROM pg_catalog.pg_group WHERE groname LIKE '%_access';" + ) + access_groups = cursor.fetchall() + return {group[0] for group in access_groups} + except psycopg2.Error as e: + logger.error(f"Failed to list PostgreSQL database access groups: {e}") + raise PostgreSQLListGroupsError() from e + finally: + if connection is not None: + connection.close() + def list_users(self) -> Set[str]: """Returns the list of PostgreSQL database users. @@ -562,6 +681,29 @@ def list_users(self) -> Set[str]: except psycopg2.Error as e: logger.error(f"Failed to list PostgreSQL database users: {e}") raise PostgreSQLListUsersError() from e + finally: + if connection is not None: + connection.close() + + def list_users_from_relation(self) -> Set[str]: + """Returns the list of PostgreSQL database users that were created by a relation. + + Returns: + List of PostgreSQL database users. + """ + try: + with self._connect_to_database() as connection, connection.cursor() as cursor: + cursor.execute( + "SELECT usename FROM pg_catalog.pg_user WHERE usename LIKE 'relation_id_%';" + ) + usernames = cursor.fetchall() + return {username[0] for username in usernames} + except psycopg2.Error as e: + logger.error(f"Failed to list PostgreSQL database users: {e}") + raise PostgreSQLListUsersError() from e + finally: + if connection is not None: + connection.close() def list_valid_privileges_and_roles(self) -> Tuple[Set[str], Set[str]]: """Returns two sets with valid privileges and roles. @@ -808,6 +950,8 @@ def build_postgresql_parameters( for config, value in config_options.items(): # Filter config option not related to PostgreSQL parameters. if not config.startswith(( + "connection", + "cpu", "durability", "instance", "logging", @@ -815,6 +959,8 @@ def build_postgresql_parameters( "optimizer", "request", "response", + "session", + "storage", "vacuum", )): continue diff --git a/lib/charms/postgresql_k8s/v0/postgresql_tls.py b/lib/charms/postgresql_k8s/v0/postgresql_tls.py index f55543e0cb..2aeaa52af6 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql_tls.py +++ b/lib/charms/postgresql_k8s/v0/postgresql_tls.py @@ -6,8 +6,9 @@ This class handles certificate request and renewal through the interaction with the TLS Certificates Operator. -This library needs that https://charmhub.io/tls-certificates-interface/libraries/tls_certificates -library is imported to work. +This library needs that the following libraries are imported to work: +- https://charmhub.io/certificate-transfer-interface/libraries/certificate_transfer +- https://charmhub.io/tls-certificates-interface/libraries/tls_certificates It also needs the following methods in the charm class: — get_hostname_by_unit: to retrieve the DNS hostname of the unit. @@ -24,6 +25,15 @@ import socket from typing import List, Optional +from charms.certificate_transfer_interface.v0.certificate_transfer import ( + CertificateAvailableEvent as CertificateAddedEvent, +) +from charms.certificate_transfer_interface.v0.certificate_transfer import ( + CertificateRemovedEvent as CertificateRemovedEvent, +) +from charms.certificate_transfer_interface.v0.certificate_transfer import ( + CertificateTransferRequires, +) from charms.tls_certificates_interface.v2.tls_certificates import ( CertificateAvailableEvent, CertificateExpiringEvent, @@ -45,11 +55,12 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version. -LIBPATCH = 13 +LIBPATCH = 14 logger = logging.getLogger(__name__) SCOPE = "unit" -TLS_RELATION = "certificates" +TLS_CREATION_RELATION = "certificates" +TLS_TRANSFER_RELATION = "receive-ca-cert" class PostgreSQLTLS(Object): @@ -63,18 +74,29 @@ def __init__( self.charm = charm self.peer_relation = peer_relation self.additional_dns_names = additional_dns_names or [] - self.certs = TLSCertificatesRequiresV2(self.charm, TLS_RELATION) + self.certs_creation = TLSCertificatesRequiresV2(self.charm, TLS_CREATION_RELATION) + self.certs_transfer = CertificateTransferRequires(self.charm, TLS_TRANSFER_RELATION) self.framework.observe( self.charm.on.set_tls_private_key_action, self._on_set_tls_private_key ) self.framework.observe( - self.charm.on[TLS_RELATION].relation_joined, self._on_tls_relation_joined + self.charm.on[TLS_CREATION_RELATION].relation_joined, self._on_tls_relation_joined + ) + self.framework.observe( + self.charm.on[TLS_CREATION_RELATION].relation_broken, self._on_tls_relation_broken + ) + self.framework.observe( + self.certs_creation.on.certificate_available, self._on_certificate_available + ) + self.framework.observe( + self.certs_creation.on.certificate_expiring, self._on_certificate_expiring ) self.framework.observe( - self.charm.on[TLS_RELATION].relation_broken, self._on_tls_relation_broken + self.certs_transfer.on.certificate_available, self._on_certificate_added + ) + self.framework.observe( + self.certs_transfer.on.certificate_removed, self._on_certificate_removed ) - self.framework.observe(self.certs.on.certificate_available, self._on_certificate_available) - self.framework.observe(self.certs.on.certificate_expiring, self._on_certificate_expiring) def _on_set_tls_private_key(self, event: ActionEvent) -> None: """Set the TLS private key, which will be used for requesting the certificate.""" @@ -93,8 +115,8 @@ def _request_certificate(self, param: Optional[str]): self.charm.set_secret(SCOPE, "key", key.decode("utf-8")) self.charm.set_secret(SCOPE, "csr", csr.decode("utf-8")) - if self.charm.model.get_relation(TLS_RELATION): - self.certs.request_certificate_creation(certificate_signing_request=csr) + if self.charm.model.get_relation(TLS_CREATION_RELATION): + self.certs_creation.request_certificate_creation(certificate_signing_request=csr) @staticmethod def _parse_tls_file(raw_content: str) -> bytes: @@ -117,6 +139,7 @@ def _on_tls_relation_broken(self, event: RelationBrokenEvent) -> None: self.charm.set_secret(SCOPE, "ca", None) self.charm.set_secret(SCOPE, "cert", None) self.charm.set_secret(SCOPE, "chain", None) + if not self.charm.update_config(): logger.debug("Cannot update config at this moment") event.defer() @@ -163,12 +186,52 @@ def _on_certificate_expiring(self, event: CertificateExpiringEvent) -> None: subject=self.charm.get_hostname_by_unit(self.charm.unit.name), **self._get_sans(), ) - self.certs.request_certificate_renewal( + self.certs_creation.request_certificate_renewal( old_certificate_signing_request=old_csr, new_certificate_signing_request=new_csr, ) self.charm.set_secret(SCOPE, "csr", new_csr.decode("utf-8")) + def _on_certificate_added(self, event: CertificateAddedEvent) -> None: + """Enable TLS when TLS certificate is added.""" + relation = self.charm.model.get_relation(TLS_TRANSFER_RELATION, event.relation_id) + if relation is None: + logger.error("Relationship not established anymore.") + return + + secret_name = f"ca-{relation.app.name}" + self.charm.set_secret(SCOPE, secret_name, event.ca) + + try: + if not self.charm.push_ca_file_into_workload(secret_name): + logger.debug("Cannot push TLS certificates at this moment") + event.defer() + return + except (PebbleConnectionError, PathError, ProtocolError, RetryError) as e: + logger.error("Cannot push TLS certificates: %r", e) + event.defer() + return + + def _on_certificate_removed(self, event: CertificateRemovedEvent) -> None: + """Disable TLS when TLS certificate is removed.""" + relation = self.charm.model.get_relation(TLS_TRANSFER_RELATION, event.relation_id) + if relation is None: + logger.error("Relationship not established anymore.") + return + + secret_name = f"ca-{relation.app.name}" + self.charm.set_secret(SCOPE, secret_name, None) + + try: + if not self.charm.clean_ca_file_from_workload(secret_name): + logger.debug("Cannot clean CA certificates at this moment") + event.defer() + return + except (PebbleConnectionError, PathError, ProtocolError, RetryError) as e: + logger.error("Cannot clean CA certificates: %r", e) + event.defer() + return + def _get_sans(self) -> dict: """Create a list of Subject Alternative Names for a PostgreSQL unit. From 5ce7eac7892fc4905c8702456b8acc9f1e00f0c7 Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Fri, 21 Mar 2025 22:28:57 +0200 Subject: [PATCH 12/19] Revert postgresql_tls lib. --- .../postgresql_k8s/v0/postgresql_tls.py | 87 +++---------------- 1 file changed, 12 insertions(+), 75 deletions(-) diff --git a/lib/charms/postgresql_k8s/v0/postgresql_tls.py b/lib/charms/postgresql_k8s/v0/postgresql_tls.py index 2aeaa52af6..f55543e0cb 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql_tls.py +++ b/lib/charms/postgresql_k8s/v0/postgresql_tls.py @@ -6,9 +6,8 @@ This class handles certificate request and renewal through the interaction with the TLS Certificates Operator. -This library needs that the following libraries are imported to work: -- https://charmhub.io/certificate-transfer-interface/libraries/certificate_transfer -- https://charmhub.io/tls-certificates-interface/libraries/tls_certificates +This library needs that https://charmhub.io/tls-certificates-interface/libraries/tls_certificates +library is imported to work. It also needs the following methods in the charm class: — get_hostname_by_unit: to retrieve the DNS hostname of the unit. @@ -25,15 +24,6 @@ import socket from typing import List, Optional -from charms.certificate_transfer_interface.v0.certificate_transfer import ( - CertificateAvailableEvent as CertificateAddedEvent, -) -from charms.certificate_transfer_interface.v0.certificate_transfer import ( - CertificateRemovedEvent as CertificateRemovedEvent, -) -from charms.certificate_transfer_interface.v0.certificate_transfer import ( - CertificateTransferRequires, -) from charms.tls_certificates_interface.v2.tls_certificates import ( CertificateAvailableEvent, CertificateExpiringEvent, @@ -55,12 +45,11 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version. -LIBPATCH = 14 +LIBPATCH = 13 logger = logging.getLogger(__name__) SCOPE = "unit" -TLS_CREATION_RELATION = "certificates" -TLS_TRANSFER_RELATION = "receive-ca-cert" +TLS_RELATION = "certificates" class PostgreSQLTLS(Object): @@ -74,29 +63,18 @@ def __init__( self.charm = charm self.peer_relation = peer_relation self.additional_dns_names = additional_dns_names or [] - self.certs_creation = TLSCertificatesRequiresV2(self.charm, TLS_CREATION_RELATION) - self.certs_transfer = CertificateTransferRequires(self.charm, TLS_TRANSFER_RELATION) + self.certs = TLSCertificatesRequiresV2(self.charm, TLS_RELATION) self.framework.observe( self.charm.on.set_tls_private_key_action, self._on_set_tls_private_key ) self.framework.observe( - self.charm.on[TLS_CREATION_RELATION].relation_joined, self._on_tls_relation_joined - ) - self.framework.observe( - self.charm.on[TLS_CREATION_RELATION].relation_broken, self._on_tls_relation_broken - ) - self.framework.observe( - self.certs_creation.on.certificate_available, self._on_certificate_available - ) - self.framework.observe( - self.certs_creation.on.certificate_expiring, self._on_certificate_expiring + self.charm.on[TLS_RELATION].relation_joined, self._on_tls_relation_joined ) self.framework.observe( - self.certs_transfer.on.certificate_available, self._on_certificate_added - ) - self.framework.observe( - self.certs_transfer.on.certificate_removed, self._on_certificate_removed + self.charm.on[TLS_RELATION].relation_broken, self._on_tls_relation_broken ) + self.framework.observe(self.certs.on.certificate_available, self._on_certificate_available) + self.framework.observe(self.certs.on.certificate_expiring, self._on_certificate_expiring) def _on_set_tls_private_key(self, event: ActionEvent) -> None: """Set the TLS private key, which will be used for requesting the certificate.""" @@ -115,8 +93,8 @@ def _request_certificate(self, param: Optional[str]): self.charm.set_secret(SCOPE, "key", key.decode("utf-8")) self.charm.set_secret(SCOPE, "csr", csr.decode("utf-8")) - if self.charm.model.get_relation(TLS_CREATION_RELATION): - self.certs_creation.request_certificate_creation(certificate_signing_request=csr) + if self.charm.model.get_relation(TLS_RELATION): + self.certs.request_certificate_creation(certificate_signing_request=csr) @staticmethod def _parse_tls_file(raw_content: str) -> bytes: @@ -139,7 +117,6 @@ def _on_tls_relation_broken(self, event: RelationBrokenEvent) -> None: self.charm.set_secret(SCOPE, "ca", None) self.charm.set_secret(SCOPE, "cert", None) self.charm.set_secret(SCOPE, "chain", None) - if not self.charm.update_config(): logger.debug("Cannot update config at this moment") event.defer() @@ -186,52 +163,12 @@ def _on_certificate_expiring(self, event: CertificateExpiringEvent) -> None: subject=self.charm.get_hostname_by_unit(self.charm.unit.name), **self._get_sans(), ) - self.certs_creation.request_certificate_renewal( + self.certs.request_certificate_renewal( old_certificate_signing_request=old_csr, new_certificate_signing_request=new_csr, ) self.charm.set_secret(SCOPE, "csr", new_csr.decode("utf-8")) - def _on_certificate_added(self, event: CertificateAddedEvent) -> None: - """Enable TLS when TLS certificate is added.""" - relation = self.charm.model.get_relation(TLS_TRANSFER_RELATION, event.relation_id) - if relation is None: - logger.error("Relationship not established anymore.") - return - - secret_name = f"ca-{relation.app.name}" - self.charm.set_secret(SCOPE, secret_name, event.ca) - - try: - if not self.charm.push_ca_file_into_workload(secret_name): - logger.debug("Cannot push TLS certificates at this moment") - event.defer() - return - except (PebbleConnectionError, PathError, ProtocolError, RetryError) as e: - logger.error("Cannot push TLS certificates: %r", e) - event.defer() - return - - def _on_certificate_removed(self, event: CertificateRemovedEvent) -> None: - """Disable TLS when TLS certificate is removed.""" - relation = self.charm.model.get_relation(TLS_TRANSFER_RELATION, event.relation_id) - if relation is None: - logger.error("Relationship not established anymore.") - return - - secret_name = f"ca-{relation.app.name}" - self.charm.set_secret(SCOPE, secret_name, None) - - try: - if not self.charm.clean_ca_file_from_workload(secret_name): - logger.debug("Cannot clean CA certificates at this moment") - event.defer() - return - except (PebbleConnectionError, PathError, ProtocolError, RetryError) as e: - logger.error("Cannot clean CA certificates: %r", e) - event.defer() - return - def _get_sans(self) -> dict: """Create a list of Subject Alternative Names for a PostgreSQL unit. From 1b7f5a44a1a8855c131b212d703efdcdc0084d60 Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Fri, 21 Mar 2025 23:39:04 +0200 Subject: [PATCH 13/19] Logical Replication: add logical replication offer relation to pre restore check. Add missing async replication checks in pre restore checks. --- src/backups.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/src/backups.py b/src/backups.py index 6dfe4ff9a9..70439fa631 100644 --- a/src/backups.py +++ b/src/backups.py @@ -44,7 +44,11 @@ PGBACKREST_LOGS_PATH, POSTGRESQL_DATA_PATH, ) -from relations.logical_replication import LOGICAL_REPLICATION_RELATION +from relations.async_replication import REPLICATION_CONSUMER_RELATION, REPLICATION_OFFER_RELATION +from relations.logical_replication import ( + LOGICAL_REPLICATION_OFFER_RELATION, + LOGICAL_REPLICATION_RELATION, +) logger = logging.getLogger(__name__) @@ -1212,6 +1216,18 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool: event.fail(error_message) return False + logger.info("Checking that the cluster is not replicating data to a standby cluster") + for relation in [ + self.model.get_relation(REPLICATION_CONSUMER_RELATION), + self.model.get_relation(REPLICATION_OFFER_RELATION), + ]: + if not relation: + continue + error_message = "Unit cannot restore backup as the cluster is replicating data to a standby cluster" + logger.error(f"Restore failed: {error_message}") + event.fail(error_message) + return False + logger.info("Checking that this unit was already elected the leader unit") if not self.charm.unit.is_leader(): error_message = "Unit cannot restore backup as it was not elected the leader unit yet" @@ -1219,9 +1235,11 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool: event.fail(error_message) return False - if self.model.get_relation(LOGICAL_REPLICATION_RELATION): + if self.model.get_relation(LOGICAL_REPLICATION_RELATION) or len( + self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()) + ): error_message = ( - "Cannot proceed with restore with an active logical-replication connection" + "Cannot proceed with restore with an active logical replication connection" ) logger.error(f"Restore failed: {error_message}") event.fail(error_message) From fda26fc2f2ed34984a894f73a148ea509432b93b Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Sat, 22 Mar 2025 00:11:58 +0200 Subject: [PATCH 14/19] Logical Replication: remove leader restriction for the list-publications action. --- src/relations/logical_replication.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/relations/logical_replication.py b/src/relations/logical_replication.py index f8602d9791..dd8702f028 100644 --- a/src/relations/logical_replication.py +++ b/src/relations/logical_replication.py @@ -335,9 +335,6 @@ def _on_add_publication(self, event: ActionEvent): self._set_publications(publications) def _on_list_publications(self, event: ActionEvent): - if not self.charm.unit.is_leader(): - event.fail("Publications management can be done only on the leader unit") - return publications = [ ( publication, From 8a22f6ad0900fb8056580623b6630bb2e869233d Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Mon, 24 Mar 2025 11:35:35 +0200 Subject: [PATCH 15/19] Fix unit tests. --- tests/unit/test_charm.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 7de1a502cf..6b9bf314a5 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -1274,6 +1274,7 @@ def test_update_config(harness): patch( "charm.PostgresqlOperatorCharm._handle_postgresql_restart_need" ) as _handle_postgresql_restart_need, + patch("charm.Patroni.ensure_slots_controller_by_patroni"), patch("charm.Patroni.bulk_update_parameters_controller_by_patroni"), patch("charm.Patroni.member_started", new_callable=PropertyMock) as _member_started, patch( @@ -1310,6 +1311,7 @@ def test_update_config(harness): restore_to_latest=False, parameters={"test": "test"}, no_peers=False, + slots=None, ) _handle_postgresql_restart_need.assert_called_once_with(False) assert "tls" not in harness.get_relation_data(rel_id, harness.charm.unit.name) @@ -1334,6 +1336,7 @@ def test_update_config(harness): restore_to_latest=False, parameters={"test": "test"}, no_peers=False, + slots=None, ) _handle_postgresql_restart_need.assert_called_once() assert "tls" not in harness.get_relation_data( From 5b80d6de8c89094bb5ceae102605e80a47ed7bd5 Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Mon, 24 Mar 2025 19:42:24 +0200 Subject: [PATCH 16/19] Add logical replication integration test. --- .../ha_tests/test_logical_replication.py | 314 ++++++++++++++++++ 1 file changed, 314 insertions(+) create mode 100644 tests/integration/ha_tests/test_logical_replication.py diff --git a/tests/integration/ha_tests/test_logical_replication.py b/tests/integration/ha_tests/test_logical_replication.py new file mode 100644 index 0000000000..48598c2700 --- /dev/null +++ b/tests/integration/ha_tests/test_logical_replication.py @@ -0,0 +1,314 @@ +from asyncio import gather + +import psycopg2 +import pytest as pytest +from pytest_operator.plugin import OpsTest + +from integration import markers +from integration.ha_tests.helpers import get_cluster_roles +from integration.helpers import CHARM_BASE, get_leader_unit +from integration.new_relations.helpers import build_connection_string + +DATABASE_APP_NAME = "postgresql" +SECOND_DATABASE_APP_NAME = "postgresql2" +THIRD_DATABASE_APP_NAME = "postgresql3" + +DATA_INTEGRATOR_APP_NAME = "data-integrator" +SECOND_DATA_INTEGRATOR_APP_NAME = "data-integrator2" +THIRD_DATA_INTEGRATOR_APP_NAME = "data-integrator3" +DATA_INTEGRATOR_RELATION = "postgresql" + +DATABASE_APP_CONFIG = {"profile": "testing"} + +TESTING_DATABASE = "testdb" + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_deploy(ops_test: OpsTest, charm): + await gather( + ops_test.model.deploy( + charm, + application_name=DATABASE_APP_NAME, + num_units=3, + base=CHARM_BASE, + config=DATABASE_APP_CONFIG, + ), + ops_test.model.deploy( + charm, + application_name=SECOND_DATABASE_APP_NAME, + num_units=3, + base=CHARM_BASE, + config=DATABASE_APP_CONFIG, + ), + ops_test.model.deploy( + charm, + application_name=THIRD_DATABASE_APP_NAME, + num_units=1, + base=CHARM_BASE, + config=DATABASE_APP_CONFIG, + ), + ops_test.model.deploy( + DATA_INTEGRATOR_APP_NAME, + application_name=DATA_INTEGRATOR_APP_NAME, + num_units=1, + channel="latest/edge", + config={"database-name": TESTING_DATABASE}, + ), + ops_test.model.deploy( + DATA_INTEGRATOR_APP_NAME, + application_name=SECOND_DATA_INTEGRATOR_APP_NAME, + num_units=1, + channel="latest/edge", + config={"database-name": TESTING_DATABASE}, + ), + ops_test.model.deploy( + DATA_INTEGRATOR_APP_NAME, + application_name=THIRD_DATA_INTEGRATOR_APP_NAME, + num_units=1, + channel="latest/edge", + config={"database-name": TESTING_DATABASE}, + ), + ) + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, SECOND_DATABASE_APP_NAME, THIRD_DATABASE_APP_NAME], + status="active", + timeout=2500, + # There can be error spikes during PostgreSQL deployment, that are not related to Logical Replication + raise_on_error=False, + ) + async with ops_test.fast_forward(): + await gather( + ops_test.model.integrate(DATABASE_APP_NAME, DATA_INTEGRATOR_APP_NAME), + ops_test.model.integrate(SECOND_DATABASE_APP_NAME, SECOND_DATA_INTEGRATOR_APP_NAME), + ops_test.model.integrate(THIRD_DATABASE_APP_NAME, THIRD_DATA_INTEGRATOR_APP_NAME), + ops_test.model.integrate( + f"{DATABASE_APP_NAME}:logical-replication-offer", + f"{SECOND_DATABASE_APP_NAME}:logical-replication", + ), + ) + await ops_test.model.wait_for_idle(status="active", timeout=500) + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_create_testing_data(ops_test: OpsTest): + await _create_test_table(ops_test, DATA_INTEGRATOR_APP_NAME) + await _create_test_table(ops_test, SECOND_DATA_INTEGRATOR_APP_NAME) + await _create_test_table(ops_test, THIRD_DATA_INTEGRATOR_APP_NAME) + await _insert_test_data(ops_test, DATA_INTEGRATOR_APP_NAME, "first") + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_setup_logical_replication(ops_test: OpsTest): + publisher_leader = await get_leader_unit(ops_test, DATABASE_APP_NAME) + first_subscriber_leader = await get_leader_unit(ops_test, SECOND_DATABASE_APP_NAME) + second_subscriber_leader = await get_leader_unit(ops_test, THIRD_DATABASE_APP_NAME) + + # Logical replication between first and second database applications is already established in test_deploy + await ops_test.model.integrate( + f"{DATABASE_APP_NAME}:logical-replication-offer", + f"{THIRD_DATABASE_APP_NAME}:logical-replication", + ) + + action = await publisher_leader.run_action( + "add-publication", + name="test_publication", + database=TESTING_DATABASE, + tables="public.test_table", + ) + + await action.wait() + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(status="active", timeout=500) + + action = await publisher_leader.run_action("list-publications") + await action.wait() + results = action.results.get("publications") + assert results and "test_publication | 0" in results, ( + "publication should be listed in list-publications action" + ) + + action = await first_subscriber_leader.run_action("subscribe", name="test_publication") + await action.wait() + + action = await first_subscriber_leader.run_action("list-subscriptions") + await action.wait() + results = action.results.get("subscriptions") + assert results and "test_publication" in results, ( + "subscription should be listed in list-subscriptions action" + ) + + action = await second_subscriber_leader.run_action("subscribe", name="test_publication") + await action.wait() + + action = await second_subscriber_leader.run_action("list-subscriptions") + await action.wait() + results = action.results.get("subscriptions") + assert results and "test_publication" in results, ( + "subscription should be listed in list-subscriptions action" + ) + + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(status="active", timeout=500) + + action = await publisher_leader.run_action("list-publications") + await action.wait() + results = action.results.get("publications") + assert results and "test_publication | 2" in results, ( + "publication should detect 2 active connections" + ) + + assert await _check_test_data(ops_test, SECOND_DATA_INTEGRATOR_APP_NAME, "first"), ( + "testing table should be copied to the postgresql2 on logical replication setup" + ) + assert await _check_test_data(ops_test, THIRD_DATA_INTEGRATOR_APP_NAME, "first"), ( + "testing table should be copied to the postgresql3 on logical replication setup" + ) + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_logical_replication(ops_test: OpsTest): + await _insert_test_data(ops_test, DATA_INTEGRATOR_APP_NAME, "second") + assert await _check_test_data(ops_test, SECOND_DATA_INTEGRATOR_APP_NAME, "second"), ( + "logical replication should work with postgresql -> postgresql2" + ) + assert await _check_test_data(ops_test, SECOND_DATA_INTEGRATOR_APP_NAME, "second"), ( + "logical replication should work with postgresql -> postgresql3" + ) + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_switchover(ops_test: OpsTest): + publisher_leader = await get_leader_unit(ops_test, DATABASE_APP_NAME) + publisher_roles = await get_cluster_roles(ops_test, publisher_leader.name) + publisher_candidate = ops_test.model.units[publisher_roles["sync_standbys"][0]] + action = await publisher_candidate.run_action("promote-to-primary", scope="unit", force=True) + await action.wait() + + first_subscriber_leader = await get_leader_unit(ops_test, SECOND_DATABASE_APP_NAME) + subscriber_roles = await get_cluster_roles(ops_test, first_subscriber_leader.name) + subscriber_candidate = ops_test.model.units[subscriber_roles["sync_standbys"][0]] + action = await subscriber_candidate.run_action("promote-to-primary", scope="unit", force=True) + await action.wait() + + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(status="active", timeout=500) + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_logical_replication_after_switchover(ops_test: OpsTest): + await _insert_test_data(ops_test, DATA_INTEGRATOR_APP_NAME, "third") + assert await _check_test_data(ops_test, SECOND_DATA_INTEGRATOR_APP_NAME, "third"), ( + "logical replication should work with postgresql -> postgresql2" + ) + assert await _check_test_data(ops_test, SECOND_DATA_INTEGRATOR_APP_NAME, "third"), ( + "logical replication should work with postgresql -> postgresql3" + ) + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_subscriber_removal(ops_test: OpsTest): + await ops_test.model.remove_application(THIRD_DATA_INTEGRATOR_APP_NAME, block_until_done=True) + await ops_test.model.remove_application(THIRD_DATABASE_APP_NAME, block_until_done=True) + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(status="active", timeout=1000) + connection_string = await build_connection_string( + ops_test, + DATA_INTEGRATOR_APP_NAME, + DATA_INTEGRATOR_RELATION, + database=TESTING_DATABASE, + ) + with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: + cursor.execute("SELECT COUNT(1) FROM pg_replication_slots where slot_type='logical';") + assert cursor.fetchone()[0] == 1, ( + "unused replication slot should be removed in the publisher cluster" + ) + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_logical_replication_after_subscriber_removal(ops_test: OpsTest): + await _insert_test_data(ops_test, DATA_INTEGRATOR_APP_NAME, "fourth") + assert await _check_test_data(ops_test, SECOND_DATA_INTEGRATOR_APP_NAME, "fourth"), ( + "logical replication should work with postgresql -> postgresql2" + ) + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_remove_relation(ops_test: OpsTest): + await ops_test.model.applications[DATABASE_APP_NAME].remove_relation( + f"{DATABASE_APP_NAME}:logical-replication-offer", + f"{SECOND_DATABASE_APP_NAME}:logical-replication", + ) + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(status="active", timeout=500) + connection_string = await build_connection_string( + ops_test, + SECOND_DATA_INTEGRATOR_APP_NAME, + DATA_INTEGRATOR_RELATION, + database=TESTING_DATABASE, + ) + with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: + cursor.execute("SELECT COUNT(1) FROM pg_subscription;") + assert cursor.fetchone()[0] == 0, ( + "unused PostgreSQL subscription should be removed in the subscriber cluster" + ) + + +async def _create_test_table(ops_test: OpsTest, data_integrator_app_name: str) -> None: + with ( + psycopg2.connect( + await build_connection_string( + ops_test, + data_integrator_app_name, + DATA_INTEGRATOR_RELATION, + database=TESTING_DATABASE, + ) + ) as connection, + connection.cursor() as cursor, + ): + cursor.execute("CREATE TABLE test_table (test_column text);") + + +async def _insert_test_data(ops_test: OpsTest, data_integrator_app_name: str, data: str) -> None: + with ( + psycopg2.connect( + await build_connection_string( + ops_test, + data_integrator_app_name, + DATA_INTEGRATOR_RELATION, + database=TESTING_DATABASE, + ) + ) as connection, + connection.cursor() as cursor, + ): + cursor.execute( + "INSERT INTO test_table (test_column) VALUES (%s);", + (data,), + ) + + +async def _check_test_data(ops_test: OpsTest, data_integrator_app_name: str, data: str) -> bool: + with ( + psycopg2.connect( + await build_connection_string( + ops_test, + data_integrator_app_name, + DATA_INTEGRATOR_RELATION, + database=TESTING_DATABASE, + ) + ) as connection, + connection.cursor() as cursor, + ): + cursor.execute( + "SELECT EXISTS (SELECT 1 FROM test_table WHERE test_column = %s);", + (data,), + ) + return cursor.fetchone()[0] From ad41afe7c3d84cb35637a2aca24c09c05bce36ea Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Tue, 25 Mar 2025 23:26:45 +0200 Subject: [PATCH 17/19] Update tests/integration/ha_tests/test_logical_replication.py Co-authored-by: Marcelo Henrique Neppel --- tests/integration/ha_tests/test_logical_replication.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/ha_tests/test_logical_replication.py b/tests/integration/ha_tests/test_logical_replication.py index 48598c2700..6e03ab9f94 100644 --- a/tests/integration/ha_tests/test_logical_replication.py +++ b/tests/integration/ha_tests/test_logical_replication.py @@ -175,7 +175,7 @@ async def test_logical_replication(ops_test: OpsTest): assert await _check_test_data(ops_test, SECOND_DATA_INTEGRATOR_APP_NAME, "second"), ( "logical replication should work with postgresql -> postgresql2" ) - assert await _check_test_data(ops_test, SECOND_DATA_INTEGRATOR_APP_NAME, "second"), ( + assert await _check_test_data(ops_test, THIRD_DATA_INTEGRATOR_APP_NAME, "second"), ( "logical replication should work with postgresql -> postgresql3" ) From 601c2816e5f914937243d2b6dbe7515ffd5e5217 Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Tue, 25 Mar 2025 23:26:58 +0200 Subject: [PATCH 18/19] Update tests/integration/ha_tests/test_logical_replication.py Co-authored-by: Marcelo Henrique Neppel --- tests/integration/ha_tests/test_logical_replication.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/ha_tests/test_logical_replication.py b/tests/integration/ha_tests/test_logical_replication.py index 6e03ab9f94..76dd9b92db 100644 --- a/tests/integration/ha_tests/test_logical_replication.py +++ b/tests/integration/ha_tests/test_logical_replication.py @@ -206,7 +206,7 @@ async def test_logical_replication_after_switchover(ops_test: OpsTest): assert await _check_test_data(ops_test, SECOND_DATA_INTEGRATOR_APP_NAME, "third"), ( "logical replication should work with postgresql -> postgresql2" ) - assert await _check_test_data(ops_test, SECOND_DATA_INTEGRATOR_APP_NAME, "third"), ( + assert await _check_test_data(ops_test, THIRD_DATA_INTEGRATOR_APP_NAME, "third"), ( "logical replication should work with postgresql -> postgresql3" ) From 62087a32a8d2e705d8d175bc72b46bd8b4d50b64 Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Tue, 8 Apr 2025 14:06:37 +0300 Subject: [PATCH 19/19] Lint fix. --- src/relations/logical_replication.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/relations/logical_replication.py b/src/relations/logical_replication.py index dd8702f028..dd7868d4bd 100644 --- a/src/relations/logical_replication.py +++ b/src/relations/logical_replication.py @@ -9,6 +9,9 @@ import json import logging import re +from typing import ( + Any, +) from ops import ( ActionEvent, @@ -176,15 +179,12 @@ def _on_offer_relation_broken(self, event: RelationBrokenEvent): for rel in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()): if rel.id == event.relation.id: continue - used_replication_slots += [ - v - for k, v in self._get_dict_from_str( - rel.data[self.model.app].get("replication-slots") - ).items() - ] + used_replication_slots += self._get_dict_from_str( + rel.data[self.model.app].get("replication-slots") + ).values() deleting_replication_slots = [ - k for k, v in global_replication_slots.items() if k not in used_replication_slots + k for k in global_replication_slots if k not in used_replication_slots ] for deleting_replication_slot in deleting_replication_slots: global_replication_slots.pop(deleting_replication_slot, None) @@ -584,10 +584,10 @@ def _get_secret(self) -> Secret: @staticmethod def _get_publications_from_str( publications_str: str | None = None, - ) -> dict[str, dict[str, any]]: + ) -> dict[str, dict[str, Any]]: return json.loads(publications_str or "{}") - def _set_publications(self, publications: dict[str, dict[str, any]]): + def _set_publications(self, publications: dict[str, dict[str, Any]]): publications_str = json.dumps(publications) self.charm.app_peer_data["publications"] = publications_str for rel in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()):