From ea1dedd352481f150f172f97e6dc10fa7daee824 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 22 Apr 2024 20:44:38 -0300 Subject: [PATCH 1/9] Add async replication implementation Signed-off-by: Marcelo Henrique Neppel --- actions.yaml | 6 + lib/charms/postgresql_k8s/v0/postgresql.py | 45 +- metadata.yaml | 8 + poetry.lock | 3 +- src/charm.py | 50 +- src/cluster.py | 78 ++- src/relations/async_replication.py | 667 +++++++++++++++++++++ templates/patroni.yml.j2 | 14 +- 8 files changed, 832 insertions(+), 39 deletions(-) create mode 100644 src/relations/async_replication.py diff --git a/actions.yaml b/actions.yaml index 44231ad4a9..7364321fa7 100644 --- a/actions.yaml +++ b/actions.yaml @@ -17,6 +17,12 @@ list-backups: description: Lists backups in s3 storage. pre-upgrade-check: description: Run necessary pre-upgrade checks and preparations before executing a charm refresh. +promote-cluster: + description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit. + params: + force-promotion: + type: boolean + description: Force the promotion of a cluster when there is already a primary cluster. restore: description: Restore a database backup using pgBackRest. S3 credentials are retrieved from a relation with the S3 integrator charm. diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index 574e157780..8783f76814 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -18,6 +18,7 @@ Any charm using this library should import the `psycopg2` or `psycopg2-binary` dependency. """ + import logging from collections import OrderedDict from typing import Dict, List, Optional, Set, Tuple @@ -35,7 +36,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 24 +LIBPATCH = 26 INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles" @@ -358,9 +359,7 @@ def _generate_database_privileges_statements( statements.append( """UPDATE pg_catalog.pg_largeobject_metadata SET lomowner = (SELECT oid FROM pg_roles WHERE rolname = '{}') -WHERE lomowner = (SELECT oid FROM pg_roles WHERE rolname = '{}');""".format( - user, self.user - ) +WHERE lomowner = (SELECT oid FROM pg_roles WHERE rolname = '{}');""".format(user, self.user) ) else: for schema in schemas: @@ -477,11 +476,11 @@ def set_up_database(self) -> None: """Set up postgres database with the right permissions.""" connection = None try: - self.create_user( - "admin", - extra_user_roles="pg_read_all_data,pg_write_all_data", - ) with self._connect_to_database() as connection, connection.cursor() as cursor: + cursor.execute("SELECT TRUE FROM pg_roles WHERE rolname='admin';") + if cursor.fetchone() is not None: + return + # Allow access to the postgres database only to the system users. cursor.execute("REVOKE ALL PRIVILEGES ON DATABASE postgres FROM PUBLIC;") cursor.execute("REVOKE CREATE ON SCHEMA public FROM PUBLIC;") @@ -491,6 +490,10 @@ def set_up_database(self) -> None: sql.Identifier(user) ) ) + self.create_user( + "admin", + extra_user_roles="pg_read_all_data,pg_write_all_data", + ) cursor.execute("GRANT CONNECT ON DATABASE postgres TO admin;") except psycopg2.Error as e: logger.error(f"Failed to set up databases: {e}") @@ -562,18 +565,16 @@ def build_postgresql_parameters( parameters = {} for config, value in config_options.items(): # Filter config option not related to PostgreSQL parameters. - if not config.startswith( - ( - "durability", - "instance", - "logging", - "memory", - "optimizer", - "request", - "response", - "vacuum", - ) - ): + if not config.startswith(( + "durability", + "instance", + "logging", + "memory", + "optimizer", + "request", + "response", + "vacuum", + )): continue parameter = "_".join(config.split("_")[1:]) if parameter in ["date_style", "time_zone"]: @@ -594,8 +595,8 @@ def build_postgresql_parameters( # and the remaining as cache memory. shared_buffers = int(available_memory * 0.25) effective_cache_size = int(available_memory - shared_buffers) - parameters.setdefault("shared_buffers", f"{int(shared_buffers/10**6)}MB") - parameters.update({"effective_cache_size": f"{int(effective_cache_size/10**6)}MB"}) + parameters.setdefault("shared_buffers", f"{int(shared_buffers / 10**6)}MB") + parameters.update({"effective_cache_size": f"{int(effective_cache_size / 10**6)}MB"}) else: # Return default parameters.setdefault("shared_buffers", "128MB") diff --git a/metadata.yaml b/metadata.yaml index b4bd1a4331..cd29daf381 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -26,6 +26,10 @@ peers: interface: upgrade provides: + async-primary: + interface: async_replication + limit: 1 + optional: true database: interface: postgresql_client db: @@ -37,6 +41,10 @@ provides: limit: 1 requires: + async-replica: + interface: async_replication + limit: 1 + optional: true certificates: interface: tls-certificates limit: 1 diff --git a/poetry.lock b/poetry.lock index bf12d55ec2..5bdd3c7982 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "allure-pytest" @@ -1573,7 +1573,6 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, - {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, diff --git a/src/charm.py b/src/charm.py index 9e4f198941..4b82491fbf 100755 --- a/src/charm.py +++ b/src/charm.py @@ -86,6 +86,7 @@ USER, USER_PASSWORD_KEY, ) +from relations.async_replication import PostgreSQLAsyncReplication from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides from relations.postgresql_provider import PostgreSQLProvider from upgrade import PostgreSQLUpgrade, get_postgresql_dependencies_model @@ -179,6 +180,7 @@ def __init__(self, *args): self.legacy_db_admin_relation = DbProvides(self, admin=True) self.backup = PostgreSQLBackups(self, "s3-parameters") self.tls = PostgreSQLTLS(self, PEER) + self.async_replication = PostgreSQLAsyncReplication(self) self.restart_manager = RollingOpsManager( charm=self, relation="restart", callback=self._restart ) @@ -321,6 +323,8 @@ def primary_endpoint(self) -> Optional[str]: for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): with attempt: primary = self._patroni.get_primary() + if primary is None and (standby_leader := self._patroni.get_standby_leader()): + primary = standby_leader primary_endpoint = self._patroni.get_member_ip(primary) # Force a retry if there is no primary or the member that was # returned is not in the list of the current cluster members @@ -420,6 +424,9 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: self.unit.status = WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE) return + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + def _on_pgdata_storage_detaching(self, _) -> None: # Change the primary if it's the unit that is being removed. try: @@ -513,9 +520,13 @@ def _on_peer_relation_changed(self, event: HookEvent): # Restart the workload if it's stuck on the starting state after a timeline divergence # due to a backup that was restored. - if not self.is_primary and ( - self._patroni.member_replication_lag == "unknown" - or int(self._patroni.member_replication_lag) > 1000 + if ( + not self.is_primary + and not self.is_standby_leader + and ( + self._patroni.member_replication_lag == "unknown" + or int(self._patroni.member_replication_lag) > 1000 + ) ): self._patroni.reinitialize_postgresql() logger.debug("Deferring on_peer_relation_changed: reinitialising replica") @@ -551,8 +562,7 @@ def _update_new_unit_status(self) -> None: # a failed switchover, so wait until the primary is elected. if self.primary_endpoint: self._update_relation_endpoints() - if not self.is_blocked: - self.unit.status = ActiveStatus() + self.async_replication.handle_read_only_mode() else: self.unit.status = WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE) @@ -688,6 +698,7 @@ def _hosts(self) -> set: def _patroni(self) -> Patroni: """Returns an instance of the Patroni object.""" return Patroni( + self, self._unit_ip, self.cluster_name, self._member_name, @@ -704,6 +715,11 @@ def is_primary(self) -> bool: """Return whether this unit is the primary instance.""" return self.unit.name == self._patroni.get_primary(unit_name_pattern=True) + @property + def is_standby_leader(self) -> bool: + """Return whether this unit is the standby leader instance.""" + return self.unit.name == self._patroni.get_standby_leader(unit_name_pattern=True) + @property def is_tls_enabled(self) -> bool: """Return whether TLS is enabled.""" @@ -902,6 +918,9 @@ def _on_config_changed(self, event) -> None: if self.is_blocked and "Configuration Error" in self.unit.status.message: self.unit.status = ActiveStatus() + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + if not self.unit.is_leader(): return @@ -929,6 +948,9 @@ def enable_disable_extensions(self, database: str = None) -> None: Args: database: optional database where to enable/disable the extension. """ + if self._patroni.get_primary() is None: + logger.debug("Early exit enable_disable_extensions: standby cluster") + return spi_module = ["refint", "autoinc", "insert_username", "moddatetime"] plugins_exception = {"uuid_ossp": '"uuid-ossp"'} original_status = self.unit.status @@ -1188,6 +1210,9 @@ def _on_set_password(self, event: ActionEvent) -> None: # Other units Patroni configuration will be reloaded in the peer relation changed event. self.update_config() + # Update the password in the async replication data. + self.async_replication.update_async_replication_data() + event.set_results({"password": password}) def _on_update_status(self, _) -> None: @@ -1225,6 +1250,9 @@ def _on_update_status(self, _) -> None: if self._handle_workload_failures(): return + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + self._set_primary_status_message() # Restart topology observer if it is gone @@ -1270,8 +1298,16 @@ def _handle_workload_failures(self) -> bool: a bool indicating whether the charm performed any action. """ # Restart the workload if it's stuck on the starting state after a restart. + try: + is_primary = self.is_primary + is_standby_leader = self.is_standby_leader + except RetryError: + return False + if ( - not self._patroni.member_started + not is_primary + and not is_standby_leader + and not self._patroni.member_started and "postgresql_restarted" in self._peers.data[self.unit] and self._patroni.member_replication_lag == "unknown" ): @@ -1291,6 +1327,8 @@ def _set_primary_status_message(self) -> None: try: if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name: self.unit.status = ActiveStatus("Primary") + elif self.is_standby_leader: + self.unit.status = ActiveStatus("Standby Leader") elif self._patroni.member_started: self.unit.status = ActiveStatus() except (RetryError, ConnectionError) as e: diff --git a/src/cluster.py b/src/cluster.py index ea234f5f95..203a237d46 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -47,10 +47,22 @@ RUNNING_STATES = ["running", "streaming"] +class ClusterNotPromotedError(Exception): + """Raised when a cluster is not promoted.""" + + class NotReadyError(Exception): """Raised when not all cluster members healthy or finished initial sync.""" +class EndpointNotReadyError(Exception): + """Raised when an endpoint is not ready.""" + + +class StandbyClusterAlreadyPromotedError(Exception): + """Raised when a standby cluster is already promoted.""" + + class RemoveRaftMemberFailedError(Exception): """Raised when a remove raft member failed for some reason.""" @@ -68,6 +80,7 @@ class Patroni: def __init__( self, + charm, unit_ip: str, cluster_name: str, member_name: str, @@ -81,6 +94,7 @@ def __init__( """Initialize the Patroni class. Args: + charm: PostgreSQL charm instance. unit_ip: IP address of the current unit cluster_name: name of the cluster member_name: name of the member inside the cluster @@ -91,6 +105,7 @@ def __init__( rewind_password: password for the user used on rewinds tls_enabled: whether TLS is enabled """ + self.charm = charm self.unit_ip = unit_ip self.cluster_name = cluster_name self.member_name = member_name @@ -241,6 +256,38 @@ def get_primary(self, unit_name_pattern=False) -> str: primary = "/".join(primary.rsplit("-", 1)) return primary + def get_standby_leader( + self, unit_name_pattern=False, check_whether_is_running: bool = False + ) -> str: + """Get standby leader instance. + + Args: + unit_name_pattern: whether to convert pod name to unit name + check_whether_is_running: whether to check if the standby leader is running + + Returns: + standby leader pod or unit name. + """ + # Request info from cluster endpoint (which returns all members of the cluster). + for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)): + with attempt: + url = self._get_alternative_patroni_url(attempt) + cluster_status = requests.get( + f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}", + verify=self.verify, + timeout=API_REQUEST_TIMEOUT, + ) + for member in cluster_status.json()["members"]: + if member["role"] == "standby_leader": + if check_whether_is_running and member["state"] not in RUNNING_STATES: + logger.warning(f"standby leader {member['name']} is not running") + continue + standby_leader = member["name"] + if unit_name_pattern: + # Change the last dash to / in order to match unit name pattern. + standby_leader = "/".join(standby_leader.rsplit("-", 1)) + return standby_leader + def get_sync_standby_names(self) -> List[str]: """Get the list of sync standby unit names.""" sync_standbys = [] @@ -296,16 +343,19 @@ def are_all_members_ready(self) -> bool: except RetryError: return False - # Check if all members are running and one of them is a leader (primary), - # because sometimes there may exist (for some period of time) only - # replicas after a failed switchover. + # Check if all members are running and one of them is a leader (primary) or + # a standby leader, because sometimes there may exist (for some period of time) + # only replicas after a failed switchover. return all( member["state"] in RUNNING_STATES for member in cluster_status.json()["members"] - ) and any(member["role"] == "leader" for member in cluster_status.json()["members"]) + ) and any( + member["role"] in ["leader", "standby_leader"] + for member in cluster_status.json()["members"] + ) def get_patroni_health(self) -> Dict[str, str]: """Gets, retires and parses the Patroni health endpoint.""" - for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + for attempt in Retrying(stop=stop_after_delay(90), wait=wait_fixed(3)): with attempt: r = requests.get( f"{self._patroni_url}/health", @@ -423,6 +473,19 @@ def is_member_isolated(self) -> bool: return len(cluster_status.json()["members"]) == 0 + def promote_standby_cluster(self) -> None: + """Promote a standby cluster to be a regular cluster.""" + config_response = requests.get(f"{self._patroni_url}/config", verify=self.verify) + if "standby_cluster" not in config_response.json(): + raise StandbyClusterAlreadyPromotedError("standby cluster is already promoted") + requests.patch( + f"{self._patroni_url}/config", verify=self.verify, json={"standby_cluster": None} + ) + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + if self.get_primary() is None: + raise ClusterNotPromotedError("cluster not promoted") + def render_file(self, path: str, content: str, mode: int) -> None: """Write a content rendered from a template to a file. @@ -475,6 +538,7 @@ def render_patroni_yml_file( data_path=POSTGRESQL_DATA_PATH, enable_tls=enable_tls, member_name=self.member_name, + partner_addrs=self.charm.async_replication.get_partner_addresses(), peers_ips=self.peers_ips, pgbackrest_configuration_file=PGBACKREST_CONFIGURATION_FILE, scope=self.cluster_name, @@ -492,8 +556,10 @@ def render_patroni_yml_file( version=self.get_postgresql_version().split(".")[0], minority_count=self.planned_units // 2, pg_parameters=parameters, + primary_cluster_endpoint=self.charm.async_replication.get_primary_cluster_endpoint(), + extra_replication_endpoints=self.charm.async_replication.get_standby_endpoints(), ) - self.render_file(f"{PATRONI_CONF_PATH}/patroni.yaml", rendered, 0o600) + self.render_file(f"{PATRONI_CONF_PATH}/patroni.yaml", rendered, 0o644) def start_patroni(self) -> bool: """Start Patroni service using snap. diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py new file mode 100644 index 0000000000..288217cf16 --- /dev/null +++ b/src/relations/async_replication.py @@ -0,0 +1,667 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Async Replication implementation.""" + +import json +import logging +import os +import pwd +import shutil +from pathlib import Path +from subprocess import PIPE, run +from typing import List, Optional, Tuple + +from ops import ( + ActionEvent, + Application, + BlockedStatus, + MaintenanceStatus, + Object, + Relation, + RelationChangedEvent, + RelationDepartedEvent, + Secret, + SecretNotFoundError, + WaitingStatus, +) +from tenacity import RetryError, Retrying, stop_after_attempt, stop_after_delay, wait_fixed + +from cluster import ClusterNotPromotedError, NotReadyError, StandbyClusterAlreadyPromotedError +from constants import ( + APP_SCOPE, + PATRONI_CONF_PATH, + POSTGRESQL_DATA_PATH, +) + +logger = logging.getLogger(__name__) + + +ASYNC_PRIMARY_RELATION = "async-primary" +ASYNC_REPLICA_RELATION = "async-replica" +INCOMPATIBLE_CLUSTER_VERSIONS_BLOCKING_MESSAGE = ( + "Incompatible cluster versions - cannot enable async replication" +) +READ_ONLY_MODE_BLOCKING_MESSAGE = "Cluster in read-only mode" + + +class PostgreSQLAsyncReplication(Object): + """Defines the async-replication management logic.""" + + def __init__(self, charm): + """Constructor.""" + super().__init__(charm, "postgresql") + self.charm = charm + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_joined, self._on_async_relation_joined + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_joined, self._on_async_relation_joined + ) + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_changed, self._on_async_relation_changed + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_async_relation_changed + ) + + # Departure events + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_departed, + self._on_async_relation_departed, + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_departed, + self._on_async_relation_departed, + ) + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_broken, self._on_async_relation_broken + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_broken, self._on_async_relation_broken + ) + + # Actions + self.framework.observe(self.charm.on.promote_cluster_action, self._on_promote_cluster) + + def _can_promote_cluster(self, event: ActionEvent) -> bool: + """Check if the cluster can be promoted.""" + if not self.charm.is_cluster_initialised: + event.fail("Cluster not initialised yet.") + return False + + # Check if there is a relation. If not, see if there is a standby leader. If so promote it to leader. If not, + # fail the action telling that there is no relation and no standby leader. + relation = self._relation + if relation is None: + standby_leader = self.charm._patroni.get_standby_leader() + if standby_leader is not None: + try: + self.charm._patroni.promote_standby_cluster() + if ( + self.charm.is_blocked + and self.charm.unit.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE + ): + self.charm._peers.data[self.charm.app].update({ + "promoted-cluster-counter": "" + }) + self.charm._set_primary_status_message() + except (StandbyClusterAlreadyPromotedError, ClusterNotPromotedError) as e: + event.fail(str(e)) + return False + event.fail("No relation and no standby leader found.") + return False + + # Check if this cluster is already the primary cluster. If so, fail the action telling that it's already + # the primary cluster. + primary_cluster = self._get_primary_cluster() + if self.charm.app == primary_cluster: + event.fail("This cluster is already the primary cluster.") + return False + + if relation.app == primary_cluster: + if not event.params.get("force-promotion"): + event.fail( + f"{relation.app.name} is already the primary cluster. Pass `force-promotion=true` to promote anyway." + ) + return False + else: + logger.warning( + "%s is already the primary cluster. Forcing promotion of %s to primary cluster due to `force-promotion=true`.", + relation.app.name, + self.charm.app.name, + ) + + return True + + def _configure_primary_cluster( + self, primary_cluster: Application, event: RelationChangedEvent + ) -> bool: + """Configure the primary cluster.""" + if self.charm.app == primary_cluster: + self.charm.update_config() + if self._is_primary_cluster() and self.charm.unit.is_leader(): + self._update_primary_cluster_data() + if self.charm._patroni.get_standby_leader() is not None: + self.charm._patroni.promote_standby_cluster() + try: + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + if not self.charm.is_primary: + raise ClusterNotPromotedError() + except RetryError: + logger.debug( + "Deferring on_async_relation_changed: standby cluster not promoted yet." + ) + event.defer() + return True + self.charm._peers.data[self.charm.unit].update({ + "unit-promoted-cluster-counter": self._get_highest_promoted_cluster_counter_value() + }) + self.charm._set_primary_status_message() + return True + return False + + def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: + """Configure the standby cluster.""" + relation = self._relation + if relation.name == ASYNC_REPLICA_RELATION: + primary_cluster_info = relation.data[relation.app].get("primary-cluster-data") + secret_id = ( + None + if primary_cluster_info is None + else json.loads(primary_cluster_info).get("secret-id") + ) + try: + secret = self.charm.model.get_secret(id=secret_id, label=self._secret_label) + except SecretNotFoundError: + logger.warning("Secret not found, deferring event") + logger.debug("Secret not found, deferring event") + event.defer() + return False + credentials = secret.peek_content() + logger.warning("Credentials: %s", credentials) + for key, password in credentials.items(): + user = key.split("-password")[0] + self.charm.set_secret(APP_SCOPE, key, password) + logger.warning("Synced %s password to %s", user, password) + logger.debug("Synced %s password", user) + return True + + def _get_highest_promoted_cluster_counter_value(self) -> str: + """Return the highest promoted cluster counter.""" + promoted_cluster_counter = "0" + for async_relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ]: + if async_relation is None: + continue + for databag in [ + async_relation.data[async_relation.app], + self.charm._peers.data[self.charm.app], + ]: + relation_promoted_cluster_counter = databag.get("promoted-cluster-counter", "0") + if int(relation_promoted_cluster_counter) > int(promoted_cluster_counter): + promoted_cluster_counter = relation_promoted_cluster_counter + return promoted_cluster_counter + + def get_partner_addresses(self) -> List[str]: + """Return the partner addresses.""" + primary_cluster = self._get_primary_cluster() + if ( + primary_cluster is None + or self.charm.app == primary_cluster + or not self.charm.unit.is_leader() + or self.charm._peers.data[self.charm.unit].get("unit-promoted-cluster-counter") + == self._get_highest_promoted_cluster_counter_value() + ): + logger.warning(f"Partner addresses: {self.charm._peer_members_ips}") + return self.charm._peer_members_ips + + logger.warning("Partner addresses: []") + return [] + + def _get_primary_cluster(self) -> Optional[Application]: + """Return the primary cluster.""" + primary_cluster = None + promoted_cluster_counter = "0" + for async_relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ]: + if async_relation is None: + continue + for app, relation_data in { + async_relation.app: async_relation.data, + self.charm.app: self.charm._peers.data, + }.items(): + databag = relation_data[app] + relation_promoted_cluster_counter = databag.get("promoted-cluster-counter", "0") + if relation_promoted_cluster_counter > promoted_cluster_counter: + promoted_cluster_counter = relation_promoted_cluster_counter + primary_cluster = app + return primary_cluster + + def get_primary_cluster_endpoint(self) -> Optional[str]: + """Return the primary cluster endpoint.""" + primary_cluster = self._get_primary_cluster() + if primary_cluster is None or self.charm.app == primary_cluster: + return None + relation = self._relation + primary_cluster_data = relation.data[relation.app].get("primary-cluster-data") + if primary_cluster_data is None: + return None + return json.loads(primary_cluster_data).get("endpoint") + + def _get_secret(self) -> Secret: + """Return async replication necessary secrets.""" + try: + # Avoid recreating the secret. + secret = self.charm.model.get_secret(label=self._secret_label) + if not secret.id: + # Workaround for the secret id not being set with model uuid. + secret._id = f"secret://{self.model.uuid}/{secret.get_info().id.split(':')[1]}" + return secret + except SecretNotFoundError: + logger.warning("Secret not found, creating a new one") + pass + + app_secret = self.charm.model.get_secret(label=f"{self.model.app.name}.app") + content = app_secret.peek_content() + + # Filter out unnecessary secrets. + shared_content = dict(filter(lambda x: "password" in x[0], content.items())) + + return self.charm.model.app.add_secret(content=shared_content, label=self._secret_label) + + def get_standby_endpoints(self) -> List[str]: + """Return the standby endpoints.""" + relation = self._relation + primary_cluster = self._get_primary_cluster() + # List the standby endpoints only for the primary cluster. + if relation is None or primary_cluster is None or self.charm.app != primary_cluster: + return [] + return [ + relation.data[unit].get("unit-address") + for relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ] + if relation is not None + for unit in relation.units + if relation.data[unit].get("unit-address") is not None + ] + + def get_system_identifier(self) -> Tuple[Optional[str], Optional[str]]: + """Returns the PostgreSQL system identifier from this instance.""" + + def demote(): + pw_record = pwd.getpwnam("snap_daemon") + + def result(): + os.setgid(pw_record.pw_gid) + os.setuid(pw_record.pw_uid) + + return result + + process = run( + [ + f'/snap/charmed-postgresql/current/usr/lib/postgresql/{self.charm._patroni.get_postgresql_version().split(".")[0]}/bin/pg_controldata', + POSTGRESQL_DATA_PATH, + ], + stdout=PIPE, + stderr=PIPE, + preexec_fn=demote(), + ) + if process.returncode != 0: + return None, process.stderr.decode() + system_identifier = [ + line + for line in process.stdout.decode().splitlines() + if "Database system identifier" in line + ][0].split(" ")[-1] + return system_identifier, None + + def _handle_database_start(self, event: RelationChangedEvent) -> None: + """Handle the database start in the standby cluster.""" + try: + if self.charm._patroni.member_started: + self.charm._peers.data[self.charm.unit].update({"stopped": ""}) + self.charm._peers.data[self.charm.unit].update({ + "unit-promoted-cluster-counter": self._get_highest_promoted_cluster_counter_value() + }) + + if self.charm.unit.is_leader(): + self.charm.update_config() + if all( + self.charm._peers.data[unit].get("unit-promoted-cluster-counter") + == self._get_highest_promoted_cluster_counter_value() + for unit in {*self.charm._peers.units, self.charm.unit} + ): + self.charm._peers.data[self.charm.app].update({ + "cluster_initialised": "True" + }) + elif self._is_following_promoted_cluster(): + self.charm.unit.status = WaitingStatus( + "Waiting for the database to be started in all units" + ) + event.defer() + return + + self.charm._set_primary_status_message() + elif not self.charm.unit.is_leader(): + raise NotReadyError() + else: + self.charm.unit.status = WaitingStatus( + "Still starting the database in the standby leader" + ) + event.defer() + except NotReadyError: + self.charm.unit.status = WaitingStatus("Waiting for the database to start") + logger.debug("Deferring on_async_relation_changed: database hasn't started yet.") + event.defer() + + def handle_read_only_mode(self) -> None: + """Handle read-only mode.""" + promoted_cluster_counter = self.charm._peers.data[self.charm.app].get( + "promoted-cluster-counter", "" + ) + if not self.charm.is_blocked or ( + promoted_cluster_counter != "0" + and self.charm.unit.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE + ): + self.charm._set_primary_status_message() + if ( + promoted_cluster_counter == "0" + and self.charm.unit.status.message != READ_ONLY_MODE_BLOCKING_MESSAGE + ): + self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + + def _is_following_promoted_cluster(self) -> bool: + """Return True if this cluster is following the promoted cluster.""" + if self._get_primary_cluster() is None: + return False + return ( + self.charm._peers.data[self.charm.unit].get("unit-promoted-cluster-counter") + == self._get_highest_promoted_cluster_counter_value() + ) + + def _is_primary_cluster(self) -> bool: + """Return the primary cluster name.""" + return self.charm.app == self._get_primary_cluster() + + def _on_async_relation_broken(self, _) -> None: + if "departing" in self.charm._peers.data[self.charm.unit]: + logger.debug("Early exit on_async_relation_broken: Skipping departing unit.") + return + + self.charm._peers.data[self.charm.unit].update({ + "stopped": "", + "unit-promoted-cluster-counter": "", + }) + + if self.charm._patroni.get_standby_leader() is not None: + if self.charm.unit.is_leader(): + self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": "0"}) + self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + else: + if self.charm.unit.is_leader(): + self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": ""}) + self.charm.update_config() + + def _on_async_relation_changed(self, event: RelationChangedEvent) -> None: + """Update the Patroni configuration if one of the clusters was already promoted.""" + primary_cluster = self._get_primary_cluster() + logger.warning("Primary cluster: %s", primary_cluster) + if primary_cluster is None: + logger.debug("Early exit on_async_relation_changed: No primary cluster found.") + return + + if self._configure_primary_cluster(primary_cluster, event): + return + + # Return if this is a new unit. + if not self.charm.unit.is_leader() and self._is_following_promoted_cluster(): + logger.debug("Early exit on_async_relation_changed: following promoted cluster.") + return + + if not self._stop_database(event): + return + + if not all( + "stopped" in self.charm._peers.data[unit] + or self.charm._peers.data[unit].get("unit-promoted-cluster-counter") + == self._get_highest_promoted_cluster_counter_value() + for unit in {*self.charm._peers.units, self.charm.unit} + ): + self.charm.unit.status = WaitingStatus( + "Waiting for the database to be stopped in all units" + ) + logger.debug("Deferring on_async_relation_changed: not all units stopped.") + event.defer() + return + + if self._wait_for_standby_leader(event): + return + + # Update the asynchronous replication configuration and start the database. + self.charm.update_config() + if not self.charm._patroni.start_patroni(): + raise Exception("Failed to start patroni service.") + + self._handle_database_start(event) + + def _on_async_relation_departed(self, event: RelationDepartedEvent) -> None: + """Set a flag to avoid setting a wrong status message on relation broken event handler.""" + # This is needed because of https://bugs.launchpad.net/juju/+bug/1979811. + if event.departing_unit == self.charm.unit: + self.charm._peers.data[self.charm.unit].update({"departing": "True"}) + + def _on_async_relation_joined(self, _) -> None: + """Publish this unit address in the relation data.""" + self._relation.data[self.charm.unit].update({"unit-address": self.charm._unit_ip}) + + # Set the counter for new units. + highest_promoted_cluster_counter = self._get_highest_promoted_cluster_counter_value() + if highest_promoted_cluster_counter != "0": + self.charm._peers.data[self.charm.unit].update({ + "unit-promoted-cluster-counter": highest_promoted_cluster_counter + }) + + def _on_promote_cluster(self, event: ActionEvent) -> None: + """Promote this cluster to the primary cluster.""" + if not self._can_promote_cluster(event): + return + + relation = self._relation + + # Check if all units from the other cluster published their pod IPs in the relation data. + # If not, fail the action telling that all units must publish their pod addresses in the + # relation data. + for unit in relation.units: + if "unit-address" not in relation.data[unit]: + event.fail( + "All units from the other cluster must publish their pod addresses in the relation data." + ) + return + + system_identifier, error = self.get_system_identifier() + if error is not None: + logger.exception(error) + event.fail("Failed to get system identifier") + return + + # Increment the current cluster counter in this application side based on the highest counter value. + promoted_cluster_counter = int(self._get_highest_promoted_cluster_counter_value()) + promoted_cluster_counter += 1 + logger.warning("Promoted cluster counter: %s", promoted_cluster_counter) + + self._update_primary_cluster_data(promoted_cluster_counter, system_identifier) + + # Emit an async replication changed event for this unit (to promote this cluster before demoting the + # other if this one is a standby cluster, which is needed to correctly setup the async replication + # when performing a switchover). + self._re_emit_async_relation_changed_event() + + # Set the status. + self.charm.unit.status = MaintenanceStatus("Promoting cluster...") + + @property + def _primary_cluster_endpoint(self) -> str: + """Assumes the endpoint is the same, disregard if we are a primary or standby cluster.""" + sync_standby_names = self.charm._patroni.get_sync_standby_names() + if len(sync_standby_names) > 0: + unit = self.model.get_unit(sync_standby_names[0]) + return self.charm._get_unit_ip(unit) + else: + return self.charm._get_unit_ip(self.charm.unit) + + def _re_emit_async_relation_changed_event(self) -> None: + """Re-emit the async relation changed event.""" + relation = self._relation + getattr(self.charm.on, f'{relation.name.replace("-", "_")}_relation_changed').emit( + relation, + app=relation.app, + unit=[unit for unit in relation.units if unit.app == relation.app][0], + ) + + def _reinitialise_pgdata(self) -> None: + """Reinitialise the pgdata folder.""" + try: + path = Path(POSTGRESQL_DATA_PATH) + if path.exists() and path.is_dir(): + shutil.rmtree(path) + except OSError as e: + raise Exception( + f"Failed to remove contents of the data directory with error: {str(e)}" + ) + os.mkdir(POSTGRESQL_DATA_PATH) + os.chmod(POSTGRESQL_DATA_PATH, 0o750) + self.charm._patroni._change_owner(POSTGRESQL_DATA_PATH) + + @property + def _relation(self) -> Relation: + """Return the relation object.""" + for relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ]: + if relation is not None: + return relation + + @property + def _secret_label(self) -> str: + """Return the secret label.""" + return f"async-replication-secret-{self._get_highest_promoted_cluster_counter_value()}" + + def _stop_database(self, event: RelationChangedEvent) -> bool: + """Stop the database.""" + if ( + "stopped" not in self.charm._peers.data[self.charm.unit] + and not self._is_following_promoted_cluster() + ): + if not self.charm.unit.is_leader() and not os.path.exists(POSTGRESQL_DATA_PATH): + logger.debug("Early exit on_async_relation_changed: following promoted cluster.") + return False + + try: + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): + with attempt: + if not self.charm._patroni.stop_patroni(): + raise Exception("Failed to stop patroni service.") + except RetryError: + logger.debug("Deferring on_async_relation_changed: patroni hasn't stopped yet.") + event.defer() + return False + + if self.charm.unit.is_leader(): + self.charm._peers.data[self.charm.app].update({"cluster_initialised": ""}) + if not self._configure_standby_cluster(event): + return False + + logger.info("Removing and recreating pgdata folder") + self._reinitialise_pgdata() + + # Remove previous cluster information to make it possible to initialise a new cluster. + logger.info("Removing previous cluster information") + try: + path = Path(f"{PATRONI_CONF_PATH}/raft") + if path.exists() and path.is_dir(): + shutil.rmtree(path) + except OSError as e: + raise Exception( + f"Failed to remove previous cluster information with error: {str(e)}" + ) + + self.charm._peers.data[self.charm.unit].update({"stopped": "True"}) + + return True + + def update_async_replication_data(self) -> None: + """Updates the async-replication data, if the unit is the leader. + + This is used to update the standby units with the new primary information. + If the unit is not the leader, then the data is removed from its databag. + """ + relation = self._relation + if relation is None: + return + relation.data[self.charm.unit].update({"unit-address": self.charm._unit_ip}) + if self._is_primary_cluster() and self.charm.unit.is_leader(): + self._update_primary_cluster_data() + + def _update_primary_cluster_data( + self, promoted_cluster_counter: int = None, system_identifier: str = None + ) -> None: + """Update the primary cluster data.""" + async_relation = self._relation + + if promoted_cluster_counter is not None: + for relation in [async_relation, self.charm._peers]: + relation.data[self.charm.app].update({ + "promoted-cluster-counter": str(promoted_cluster_counter) + }) + + # Update the data in the relation. + primary_cluster_data = { + "endpoint": self._primary_cluster_endpoint, + "postgresql-version": self.charm._patroni.get_postgresql_version(), + } + + # Retrieve the secrets that will be shared between the clusters. + if async_relation.name == ASYNC_PRIMARY_RELATION: + secret = self._get_secret() + secret.grant(async_relation) + primary_cluster_data["secret-id"] = secret.id + + if system_identifier is not None: + primary_cluster_data["system-id"] = system_identifier + + async_relation.data[self.charm.app]["primary-cluster-data"] = json.dumps( + primary_cluster_data + ) + + def _wait_for_standby_leader(self, event: RelationChangedEvent) -> bool: + """Wait for the standby leader to be up and running.""" + try: + standby_leader = self.charm._patroni.get_standby_leader(check_whether_is_running=True) + except RetryError: + standby_leader = None + logger.warning("Standby leader: %s", standby_leader) + if not self.charm.unit.is_leader() and standby_leader is None: + if self.charm._patroni.is_member_isolated: + self.charm._patroni.restart_patroni() + self.charm.unit.status = WaitingStatus("Restarting Patroni to rejoin the cluster") + logger.debug( + "Deferring on_async_relation_changed: restarting Patroni to rejoin the cluster." + ) + event.defer() + return True + self.charm.unit.status = WaitingStatus( + "Waiting for the standby leader start the database" + ) + logger.debug("Deferring on_async_relation_changed: standby leader hasn't started yet.") + event.defer() + return True + return False diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index 0732ec7156..1a12292807 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -37,11 +37,11 @@ ctl: raft: data_dir: {{ conf_path }}/raft self_addr: '{{ self_ip }}:2222' - {% if peers_ips -%} + {% if partner_addrs -%} partner_addrs: {% endif -%} - {% for peer_ip in peers_ips -%} - - {{ peer_ip }}:2222 + {% for partner_addr in partner_addrs -%} + - {{ partner_addr }}:2222 {% endfor %} bootstrap: @@ -103,6 +103,11 @@ bootstrap: command: pgbackrest {{ pgbackrest_configuration_file }} --stanza={{ restore_stanza }} --pg1-path={{ data_path }} --set={{ backup_id }} --type=immediate --target-action=promote restore no_params: True keep_existing_recovery_conf: True + {% elif primary_cluster_endpoint %} + standby_cluster: + host: {{ primary_cluster_endpoint }} + port: 5432 + create_replica_methods: ["basebackup"] {% else %} initdb: - encoding: UTF8 @@ -146,6 +151,9 @@ postgresql: {%- endif %} - {{ 'hostssl' if enable_tls else 'host' }} all all 0.0.0.0/0 md5 # Allow replications connections from other cluster members. + {%- for endpoint in extra_replication_endpoints %} + - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ endpoint }}/32 md5 + {%- endfor %} {%- for peer_ip in peers_ips %} - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ peer_ip }}/0 md5 {% endfor %} From f03a281cbeca4fb2b9398815b9736ac67935f0e8 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 22 Apr 2024 21:41:28 -0300 Subject: [PATCH 2/9] Backup standby pgdata folder Signed-off-by: Marcelo Henrique Neppel --- src/relations/async_replication.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 288217cf16..cb2adf55bc 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -8,6 +8,7 @@ import os import pwd import shutil +from datetime import datetime from pathlib import Path from subprocess import PIPE, run from typing import List, Optional, Tuple @@ -186,6 +187,17 @@ def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: self.charm.set_secret(APP_SCOPE, key, password) logger.warning("Synced %s password to %s", user, password) logger.debug("Synced %s password", user) + system_identifier, error = self.get_system_identifier() + if error is not None: + raise Exception(error) + if system_identifier != relation.data[relation.app].get("system-id"): + # Store current data in a ZIP file, clean folder and generate configuration. + logger.info("Creating backup of pgdata folder") + filename = f"{POSTGRESQL_DATA_PATH}-{str(datetime.now()).replace(' ', '-').replace(':', '-')}.tar.gz" + self.container.exec( + f"tar -zcf {filename} {POSTGRESQL_DATA_PATH}".split() + ).wait_output() + logger.warning("Please review the backup file %s and handle its removal", filename) return True def _get_highest_promoted_cluster_counter_value(self) -> str: From 041228e5152859da282bccb3bfa56c8947d06269 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 22 Apr 2024 23:48:28 -0300 Subject: [PATCH 3/9] Fix OS call Signed-off-by: Marcelo Henrique Neppel --- src/relations/async_replication.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index cb2adf55bc..5b980bb47e 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -8,6 +8,7 @@ import os import pwd import shutil +import subprocess from datetime import datetime from pathlib import Path from subprocess import PIPE, run @@ -194,9 +195,7 @@ def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: # Store current data in a ZIP file, clean folder and generate configuration. logger.info("Creating backup of pgdata folder") filename = f"{POSTGRESQL_DATA_PATH}-{str(datetime.now()).replace(' ', '-').replace(':', '-')}.tar.gz" - self.container.exec( - f"tar -zcf {filename} {POSTGRESQL_DATA_PATH}".split() - ).wait_output() + subprocess.check_call(f"tar -zcf {filename} {POSTGRESQL_DATA_PATH}".split()) logger.warning("Please review the backup file %s and handle its removal", filename) return True From 500ce2cb94ba3af39a40893cb6f87ec134923916 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 23 Apr 2024 00:53:14 -0300 Subject: [PATCH 4/9] Fix unit tests Signed-off-by: Marcelo Henrique Neppel --- tests/unit/test_charm.py | 115 +++++++++++++++++++++++++++++-------- tests/unit/test_cluster.py | 19 +++++- 2 files changed, 107 insertions(+), 27 deletions(-) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 07d11ae441..aa1e815b86 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -328,9 +328,11 @@ def test_on_config_changed(harness): def test_check_extension_dependencies(harness): - with patch("subprocess.check_output", return_value=b"C"), patch.object( - PostgresqlOperatorCharm, "postgresql", Mock() - ): + with patch("charm.Patroni.get_primary") as _get_primary, patch( + "subprocess.check_output", return_value=b"C" + ), patch.object(PostgresqlOperatorCharm, "postgresql", Mock()): + _get_primary.return_value = harness.charm.unit + # Test when plugins dependencies exception is not caused config = { "plugin_address_standardizer_enable": False, @@ -356,9 +358,13 @@ def test_check_extension_dependencies(harness): def test_enable_disable_extensions(harness, caplog): - with patch("subprocess.check_output", return_value=b"C"), patch.object( - PostgresqlOperatorCharm, "postgresql", Mock() - ) as postgresql_mock: + with patch("charm.Patroni.get_primary") as _get_primary, patch( + "charm.PostgresqlOperatorCharm._unit_ip" + ), patch("charm.PostgresqlOperatorCharm._patroni"), patch( + "subprocess.check_output", return_value=b"C" + ), patch.object(PostgresqlOperatorCharm, "postgresql", Mock()) as postgresql_mock: + _get_primary.return_value = harness.charm.unit + # Test when all extensions install/uninstall succeed. postgresql_mock.enable_disable_extension.side_effect = None with caplog.at_level(logging.ERROR): @@ -843,6 +849,12 @@ def test_on_update_status(harness): "charm.Patroni.member_replication_lag", new_callable=PropertyMock ) as _member_replication_lag, patch("charm.Patroni.member_started", new_callable=PropertyMock) as _member_started, + patch( + "charm.PostgresqlOperatorCharm.is_standby_leader", new_callable=PropertyMock + ) as _is_standby_leader, + patch( + "charm.PostgresqlOperatorCharm.is_primary", new_callable=PropertyMock + ) as _is_primary, patch( "charm.PostgresqlOperatorCharm._update_relation_endpoints" ) as _update_relation_endpoints, @@ -875,6 +887,8 @@ def test_on_update_status(harness): # Test the reinitialisation of the replica when its lag is unknown # after a restart. _set_primary_status_message.reset_mock() + _is_primary.return_value = False + _is_standby_leader.return_value = False _member_started.return_value = False _is_member_isolated.return_value = False _member_replication_lag.return_value = "unknown" @@ -1362,9 +1376,7 @@ def test_validate_config_options(harness): def test_on_peer_relation_changed(harness): with ( patch("charm.snap.SnapCache"), - patch( - "charm.PostgresqlOperatorCharm._update_relation_endpoints" - ) as _update_relation_endpoints, + patch("charm.PostgresqlOperatorCharm._update_new_unit_status") as _update_new_unit_status, patch( "charm.PostgresqlOperatorCharm.primary_endpoint", new_callable=PropertyMock ) as _primary_endpoint, @@ -1377,6 +1389,7 @@ def test_on_peer_relation_changed(harness): patch( "charm.Patroni.member_replication_lag", new_callable=PropertyMock ) as _member_replication_lag, + patch("charm.PostgresqlOperatorCharm.is_standby_leader") as _is_standby_leader, patch("charm.PostgresqlOperatorCharm.is_primary") as _is_primary, patch("charm.Patroni.member_started", new_callable=PropertyMock) as _member_started, patch("charm.Patroni.start_patroni") as _start_patroni, @@ -1425,20 +1438,19 @@ def test_on_peer_relation_changed(harness): _update_member_ip.assert_called_once() _update_config.assert_called_once() _start_patroni.assert_called_once() - _update_relation_endpoints.assert_called_once() - assert isinstance(harness.model.unit.status, ActiveStatus) + _update_new_unit_status.assert_called_once() # Test when the cluster member updates its IP. _update_member_ip.reset_mock() _update_config.reset_mock() _start_patroni.reset_mock() - _update_relation_endpoints.reset_mock() _update_member_ip.return_value = True + _update_new_unit_status.reset_mock() harness.charm._on_peer_relation_changed(mock_event) _update_member_ip.assert_called_once() _update_config.assert_not_called() _start_patroni.assert_not_called() - _update_relation_endpoints.assert_not_called() + _update_new_unit_status.assert_not_called() # Test when the unit fails to update the Patroni configuration. _update_member_ip.return_value = False @@ -1446,7 +1458,7 @@ def test_on_peer_relation_changed(harness): harness.charm._on_peer_relation_changed(mock_event) _update_config.assert_called_once() _start_patroni.assert_not_called() - _update_relation_endpoints.assert_not_called() + _update_new_unit_status.assert_not_called() assert isinstance(harness.model.unit.status, BlockedStatus) # Test when Patroni hasn't started yet in the unit. @@ -1454,7 +1466,7 @@ def test_on_peer_relation_changed(harness): _member_started.return_value = False harness.charm._on_peer_relation_changed(mock_event) _start_patroni.assert_called_once() - _update_relation_endpoints.assert_not_called() + _update_new_unit_status.assert_not_called() assert isinstance(harness.model.unit.status, WaitingStatus) # Test when Patroni has already started but this is a replica with a @@ -1466,6 +1478,7 @@ def test_on_peer_relation_changed(harness): _check_stanza.reset_mock() _start_stop_pgbackrest_service.reset_mock() _is_primary.return_value = values[0] + _is_standby_leader.return_value = values[0] _member_replication_lag.return_value = values[1] harness.charm.unit.status = ActiveStatus() harness.charm.on.database_peers_relation_changed.emit(relation) @@ -2213,6 +2226,9 @@ def test_on_peer_relation_departed(harness): def test_update_new_unit_status(harness): with ( + patch( + "relations.async_replication.PostgreSQLAsyncReplication.handle_read_only_mode" + ) as handle_read_only_mode, patch( "charm.PostgresqlOperatorCharm._update_relation_endpoints" ) as _update_relation_endpoints, @@ -2220,23 +2236,72 @@ def test_update_new_unit_status(harness): "charm.PostgresqlOperatorCharm.primary_endpoint", new_callable=PropertyMock ) as _primary_endpoint, ): - # Test when the charm is blocked. + # Test when the primary endpoint is reachable. _primary_endpoint.return_value = "endpoint" - harness.charm.unit.status = BlockedStatus("fake blocked status") + harness.charm.unit.status = MaintenanceStatus("fake status") harness.charm._update_new_unit_status() _update_relation_endpoints.assert_called_once() - assert isinstance(harness.charm.unit.status, BlockedStatus) - - # Test when the charm is not blocked. - _update_relation_endpoints.reset_mock() - harness.charm.unit.status = WaitingStatus() - harness.charm._update_new_unit_status() - _update_relation_endpoints.assert_called_once() - assert isinstance(harness.charm.unit.status, ActiveStatus) + handle_read_only_mode.assert_called_once() + assert not isinstance(harness.charm.unit.status, WaitingStatus) # Test when the primary endpoint is not reachable yet. _update_relation_endpoints.reset_mock() + handle_read_only_mode.reset_mock() _primary_endpoint.return_value = None harness.charm._update_new_unit_status() _update_relation_endpoints.assert_not_called() + handle_read_only_mode.assert_not_called() assert isinstance(harness.charm.unit.status, WaitingStatus) + + @patch("charm.Patroni.member_started", new_callable=PropertyMock) + @patch("charm.PostgresqlOperatorCharm.is_standby_leader", new_callable=PropertyMock) + @patch("charm.Patroni.get_primary") + def test_set_active_status(self, _get_primary, _is_standby_leader, _member_started): + for values in itertools.product( + [ + RetryError(last_attempt=1), + ConnectionError, + self.charm.unit.name, + f"{self.charm.app.name}/2", + ], + [ + RetryError(last_attempt=1), + ConnectionError, + True, + False, + ], + [True, False], + ): + self.charm.unit.status = MaintenanceStatus("fake status") + _member_started.return_value = values[2] + if isinstance(values[0], str): + _get_primary.side_effect = None + _get_primary.return_value = values[0] + if values[0] != self.charm.unit.name and not isinstance(values[1], bool): + _is_standby_leader.side_effect = values[1] + _is_standby_leader.return_value = None + self.charm._set_active_status() + self.assertIsInstance(self.charm.unit.status, MaintenanceStatus) + else: + _is_standby_leader.side_effect = None + _is_standby_leader.return_value = values[1] + self.charm._set_active_status() + self.assertIsInstance( + self.charm.unit.status, + ActiveStatus + if values[0] == self.charm.unit.name or values[1] or values[2] + else MaintenanceStatus, + ) + self.assertEqual( + self.charm.unit.status.message, + "Primary" + if values[0] == self.charm.unit.name + else ( + "Standby Leader" if values[1] else ("" if values[2] else "fake status") + ), + ) + else: + _get_primary.side_effect = values[0] + _get_primary.return_value = None + self.charm._set_active_status() + self.assertIsInstance(self.charm.unit.status, MaintenanceStatus) diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index 24afe390af..64b6bff459 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -8,8 +8,10 @@ import tenacity as tenacity from charms.operator_libs_linux.v2 import snap from jinja2 import Template +from ops.testing import Harness from tenacity import stop_after_delay +from charm import PostgresqlOperatorCharm from cluster import Patroni from constants import ( PATRONI_CONF_PATH, @@ -47,9 +49,15 @@ def json(self): class TestCluster(unittest.TestCase): def setUp(self): # Setup a cluster. + self.harness = Harness(PostgresqlOperatorCharm) + self.addCleanup(self.harness.cleanup) + self.harness.begin() + self.charm = self.harness.charm + self.peers_ips = {"2.2.2.2", "3.3.3.3"} self.patroni = Patroni( + self.charm, "1.1.1.1", "postgresql", "postgresql-0", @@ -225,10 +233,16 @@ def test_render_file(self, _temp_file, _pwnam, _chown, _chmod): # Ensure the file is chown'd correctly. _chown.assert_called_with(filename, uid=35, gid=35) + @patch( + "relations.async_replication.PostgreSQLAsyncReplication.get_partner_addresses", + return_value=["2.2.2.2", "3.3.3.3"], + ) @patch("charm.Patroni.get_postgresql_version") @patch("charm.Patroni.render_file") @patch("charm.Patroni._create_directory") - def test_render_patroni_yml_file(self, _, _render_file, _get_postgresql_version): + def test_render_patroni_yml_file( + self, _, _render_file, _get_postgresql_version, _get_partner_addresses + ): _get_postgresql_version.return_value = "14.7" # Define variables to render in the template. @@ -248,6 +262,7 @@ def test_render_patroni_yml_file(self, _, _render_file, _get_postgresql_version) log_path=PATRONI_LOGS_PATH, postgresql_log_path=POSTGRESQL_LOGS_PATH, member_name=member_name, + partner_addrs=["2.2.2.2", "3.3.3.3"], peers_ips=self.peers_ips, scope=scope, self_ip=self.patroni.unit_ip, @@ -275,7 +290,7 @@ def test_render_patroni_yml_file(self, _, _render_file, _get_postgresql_version) _render_file.assert_called_once_with( "/var/snap/charmed-postgresql/current/etc/patroni/patroni.yaml", expected_content, - 0o600, + 0o644, ) @patch("charm.snap.SnapCache") From e41b18b9dc0240642febaf2cf26141b91ec9c762 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 23 Apr 2024 01:04:11 -0300 Subject: [PATCH 5/9] Improve comments and logs Signed-off-by: Marcelo Henrique Neppel --- src/relations/async_replication.py | 58 ++++++++++++++++++------------ 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 5b980bb47e..f2324839bb 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -1,7 +1,18 @@ # Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. -"""Async Replication implementation.""" +"""Async Replication implementation. + +The highest "promoted-cluster-counter" value is used to determine the primary cluster. +The application in any side of the relation which has the highest value in its application +relation databag is considered the primary cluster. + +The "unit-promoted-cluster-counter" field in the unit relation databag is used to determine +if the unit is following the promoted cluster. If the value is the same as the highest value +in the application relation databag, then the unit is following the promoted cluster. +Otherwise, it's needed to restart the database in the unit to follow the promoted cluster +if the unit is from the standby cluster (the one that was not promoted). +""" import json import logging @@ -41,9 +52,6 @@ ASYNC_PRIMARY_RELATION = "async-primary" ASYNC_REPLICA_RELATION = "async-replica" -INCOMPATIBLE_CLUSTER_VERSIONS_BLOCKING_MESSAGE = ( - "Incompatible cluster versions - cannot enable async replication" -) READ_ONLY_MODE_BLOCKING_MESSAGE = "Cluster in read-only mode" @@ -121,6 +129,8 @@ def _can_promote_cluster(self, event: ActionEvent) -> bool: event.fail("This cluster is already the primary cluster.") return False + # To promote the other cluster if there is already a primary cluster, the action must be called with + # `force-promotion=true`. If not, fail the action telling that the other cluster is already the primary. if relation.app == primary_cluster: if not event.params.get("force-promotion"): event.fail( @@ -144,6 +154,8 @@ def _configure_primary_cluster( self.charm.update_config() if self._is_primary_cluster() and self.charm.unit.is_leader(): self._update_primary_cluster_data() + # If this is a standby cluster, remove the information from DCS to make it + # a normal cluster. if self.charm._patroni.get_standby_leader() is not None: self.charm._patroni.promote_standby_cluster() try: @@ -168,6 +180,7 @@ def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: """Configure the standby cluster.""" relation = self._relation if relation.name == ASYNC_REPLICA_RELATION: + # Update the secrets between the clusters. primary_cluster_info = relation.data[relation.app].get("primary-cluster-data") secret_id = ( None @@ -177,22 +190,19 @@ def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: try: secret = self.charm.model.get_secret(id=secret_id, label=self._secret_label) except SecretNotFoundError: - logger.warning("Secret not found, deferring event") logger.debug("Secret not found, deferring event") event.defer() return False credentials = secret.peek_content() - logger.warning("Credentials: %s", credentials) for key, password in credentials.items(): user = key.split("-password")[0] self.charm.set_secret(APP_SCOPE, key, password) - logger.warning("Synced %s password to %s", user, password) logger.debug("Synced %s password", user) system_identifier, error = self.get_system_identifier() if error is not None: raise Exception(error) if system_identifier != relation.data[relation.app].get("system-id"): - # Store current data in a ZIP file, clean folder and generate configuration. + # Store current data in a tar.gz file. logger.info("Creating backup of pgdata folder") filename = f"{POSTGRESQL_DATA_PATH}-{str(datetime.now()).replace(' ', '-').replace(':', '-')}.tar.gz" subprocess.check_call(f"tar -zcf {filename} {POSTGRESQL_DATA_PATH}".split()) @@ -227,10 +237,10 @@ def get_partner_addresses(self) -> List[str]: or self.charm._peers.data[self.charm.unit].get("unit-promoted-cluster-counter") == self._get_highest_promoted_cluster_counter_value() ): - logger.warning(f"Partner addresses: {self.charm._peer_members_ips}") + logger.debug(f"Partner addresses: {self.charm._peer_members_ips}") return self.charm._peer_members_ips - logger.warning("Partner addresses: []") + logger.debug("Partner addresses: []") return [] def _get_primary_cluster(self) -> Optional[Application]: @@ -275,7 +285,7 @@ def _get_secret(self) -> Secret: secret._id = f"secret://{self.model.uuid}/{secret.get_info().id.split(':')[1]}" return secret except SecretNotFoundError: - logger.warning("Secret not found, creating a new one") + logger.debug("Secret not found, creating a new one") pass app_secret = self.charm.model.get_secret(label=f"{self.model.app.name}.app") @@ -338,12 +348,16 @@ def _handle_database_start(self, event: RelationChangedEvent) -> None: """Handle the database start in the standby cluster.""" try: if self.charm._patroni.member_started: + # If the database is started, update the databag in a way the unit is marked as configured + # for async replication. self.charm._peers.data[self.charm.unit].update({"stopped": ""}) self.charm._peers.data[self.charm.unit].update({ "unit-promoted-cluster-counter": self._get_highest_promoted_cluster_counter_value() }) if self.charm.unit.is_leader(): + # If this unit is the leader, check if all units are ready before making the cluster + # active again (including the health checks from the update status hook). self.charm.update_config() if all( self.charm._peers.data[unit].get("unit-promoted-cluster-counter") @@ -374,7 +388,7 @@ def _handle_database_start(self, event: RelationChangedEvent) -> None: event.defer() def handle_read_only_mode(self) -> None: - """Handle read-only mode.""" + """Handle read-only mode (standby cluster that lost the relation with the primary cluster).""" promoted_cluster_counter = self.charm._peers.data[self.charm.app].get( "promoted-cluster-counter", "" ) @@ -390,7 +404,7 @@ def handle_read_only_mode(self) -> None: self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) def _is_following_promoted_cluster(self) -> bool: - """Return True if this cluster is following the promoted cluster.""" + """Return True if this unit is following the promoted cluster.""" if self._get_primary_cluster() is None: return False return ( @@ -412,6 +426,8 @@ def _on_async_relation_broken(self, _) -> None: "unit-promoted-cluster-counter": "", }) + # If this is the standby cluster, set 0 in the "promoted-cluster-counter" field to set + # the cluster in read-only mode message also in the other units. if self.charm._patroni.get_standby_leader() is not None: if self.charm.unit.is_leader(): self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": "0"}) @@ -424,7 +440,7 @@ def _on_async_relation_broken(self, _) -> None: def _on_async_relation_changed(self, event: RelationChangedEvent) -> None: """Update the Patroni configuration if one of the clusters was already promoted.""" primary_cluster = self._get_primary_cluster() - logger.warning("Primary cluster: %s", primary_cluster) + logger.debug("Primary cluster: %s", primary_cluster) if primary_cluster is None: logger.debug("Early exit on_async_relation_changed: No primary cluster found.") return @@ -506,7 +522,7 @@ def _on_promote_cluster(self, event: ActionEvent) -> None: # Increment the current cluster counter in this application side based on the highest counter value. promoted_cluster_counter = int(self._get_highest_promoted_cluster_counter_value()) promoted_cluster_counter += 1 - logger.warning("Promoted cluster counter: %s", promoted_cluster_counter) + logger.debug("Promoted cluster counter: %s", promoted_cluster_counter) self._update_primary_cluster_data(promoted_cluster_counter, system_identifier) @@ -520,7 +536,7 @@ def _on_promote_cluster(self, event: ActionEvent) -> None: @property def _primary_cluster_endpoint(self) -> str: - """Assumes the endpoint is the same, disregard if we are a primary or standby cluster.""" + """Return the endpoint from one of the sync-standbys, or from the primary if there is no sync-standby.""" sync_standby_names = self.charm._patroni.get_sync_standby_names() if len(sync_standby_names) > 0: unit = self.model.get_unit(sync_standby_names[0]) @@ -587,10 +603,13 @@ def _stop_database(self, event: RelationChangedEvent) -> bool: return False if self.charm.unit.is_leader(): + # Remove the "cluster_initialised" flag to avoid self-healing in the update status hook. self.charm._peers.data[self.charm.app].update({"cluster_initialised": ""}) if not self._configure_standby_cluster(event): return False + # Remove and recreate the pgdata folder to enable replication of the data from the + # primary cluster. logger.info("Removing and recreating pgdata folder") self._reinitialise_pgdata() @@ -613,7 +632,6 @@ def update_async_replication_data(self) -> None: """Updates the async-replication data, if the unit is the leader. This is used to update the standby units with the new primary information. - If the unit is not the leader, then the data is removed from its databag. """ relation = self._relation if relation is None: @@ -635,10 +653,7 @@ def _update_primary_cluster_data( }) # Update the data in the relation. - primary_cluster_data = { - "endpoint": self._primary_cluster_endpoint, - "postgresql-version": self.charm._patroni.get_postgresql_version(), - } + primary_cluster_data = {"endpoint": self._primary_cluster_endpoint} # Retrieve the secrets that will be shared between the clusters. if async_relation.name == ASYNC_PRIMARY_RELATION: @@ -659,7 +674,6 @@ def _wait_for_standby_leader(self, event: RelationChangedEvent) -> bool: standby_leader = self.charm._patroni.get_standby_leader(check_whether_is_running=True) except RetryError: standby_leader = None - logger.warning("Standby leader: %s", standby_leader) if not self.charm.unit.is_leader() and standby_leader is None: if self.charm._patroni.is_member_isolated: self.charm._patroni.restart_patroni() From bbd5976285a1d0072cba4f6854d531c85b3c2594 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 29 Apr 2024 16:50:36 -0300 Subject: [PATCH 6/9] Revert permission change Signed-off-by: Marcelo Henrique Neppel --- src/cluster.py | 2 +- tests/unit/test_cluster.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cluster.py b/src/cluster.py index 203a237d46..80da9b9ddf 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -559,7 +559,7 @@ def render_patroni_yml_file( primary_cluster_endpoint=self.charm.async_replication.get_primary_cluster_endpoint(), extra_replication_endpoints=self.charm.async_replication.get_standby_endpoints(), ) - self.render_file(f"{PATRONI_CONF_PATH}/patroni.yaml", rendered, 0o644) + self.render_file(f"{PATRONI_CONF_PATH}/patroni.yaml", rendered, 0o600) def start_patroni(self) -> bool: """Start Patroni service using snap. diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index 6a861da776..e94f3bb2bd 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -318,7 +318,7 @@ def test_render_patroni_yml_file(peers_ips, patroni): _render_file.assert_called_once_with( "/var/snap/charmed-postgresql/current/etc/patroni/patroni.yaml", expected_content, - 0o644, + 0o600, ) From e918e4dfb7410cd9d4d48b5c8f1bc93198c0918a Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Thu, 2 May 2024 08:55:43 -0300 Subject: [PATCH 7/9] Add optional type hint Signed-off-by: Marcelo Henrique Neppel --- src/cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster.py b/src/cluster.py index 80da9b9ddf..4b08ff5a58 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -258,7 +258,7 @@ def get_primary(self, unit_name_pattern=False) -> str: def get_standby_leader( self, unit_name_pattern=False, check_whether_is_running: bool = False - ) -> str: + ) -> Optional[str]: """Get standby leader instance. Args: From 1fd2b77bd466c0ddeb4de7cf337ce9d8208d78e3 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Thu, 2 May 2024 15:36:39 -0300 Subject: [PATCH 8/9] Add relation name to secret label and revert poetry.lock Signed-off-by: Marcelo Henrique Neppel --- poetry.lock | 3 ++- src/relations/async_replication.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/poetry.lock b/poetry.lock index 001281218c..0d36435368 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "allure-pytest" @@ -1573,6 +1573,7 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index f2324839bb..83f4017c84 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -44,6 +44,7 @@ from constants import ( APP_SCOPE, PATRONI_CONF_PATH, + PEER, POSTGRESQL_DATA_PATH, ) @@ -288,7 +289,7 @@ def _get_secret(self) -> Secret: logger.debug("Secret not found, creating a new one") pass - app_secret = self.charm.model.get_secret(label=f"{self.model.app.name}.app") + app_secret = self.charm.model.get_secret(label=f"{PEER}.{self.model.app.name}.app") content = app_secret.peek_content() # Filter out unnecessary secrets. From 0f2ba76aeacb39fe9cfcab035190db0d38582710 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Thu, 2 May 2024 16:33:19 -0300 Subject: [PATCH 9/9] Reload Patroni configuration when member is not ready yet Signed-off-by: Marcelo Henrique Neppel --- src/relations/async_replication.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 83f4017c84..1e4df96c88 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -377,6 +377,10 @@ def _handle_database_start(self, event: RelationChangedEvent) -> None: self.charm._set_primary_status_message() elif not self.charm.unit.is_leader(): + try: + self.charm._patroni.reload_patroni_configuration() + except RetryError: + pass raise NotReadyError() else: self.charm.unit.status = WaitingStatus(