From 7ac4f3f0992566a0f287646644302cf32f61974e Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 29 May 2024 16:50:41 -0300 Subject: [PATCH] Fix failover and set-secret behaviour Signed-off-by: Marcelo Henrique Neppel --- actions.yaml | 2 +- lib/charms/postgresql_k8s/v0/postgresql.py | 26 +-- src/charm.py | 48 ++++- src/cluster.py | 13 +- src/relations/async_replication.py | 173 +++++++++++++----- .../ha_tests/test_async_replication.py | 2 +- 6 files changed, 191 insertions(+), 73 deletions(-) diff --git a/actions.yaml b/actions.yaml index f5daf6944d..02f5c525f7 100644 --- a/actions.yaml +++ b/actions.yaml @@ -27,7 +27,7 @@ pre-upgrade-check: promote-to-primary: description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit. params: - force-promotion: + force: type: boolean description: Force the promotion of a cluster when there is already a primary cluster. restore: diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index 8783f76814..ffddc66360 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -36,7 +36,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 26 +LIBPATCH = 27 INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles" @@ -111,20 +111,19 @@ def __init__( self.system_users = system_users def _connect_to_database( - self, database: str = None, connect_to_current_host: bool = False + self, database: str = None, database_host: str = None ) -> psycopg2.extensions.connection: """Creates a connection to the database. Args: database: database to connect to (defaults to the database provided when the object for this class was created). - connect_to_current_host: whether to connect to the current host - instead of the primary host. + database_host: host to connect to instead of the primary host. Returns: psycopg2 connection object. """ - host = self.current_host if connect_to_current_host else self.primary_host + host = database_host if database_host is not None else self.primary_host connection = psycopg2.connect( f"dbname='{database if database else self.database}' user='{self.user}' host='{host}'" f"password='{self.password}' connect_timeout=1" @@ -388,7 +387,7 @@ def get_postgresql_text_search_configs(self) -> Set[str]: Set of PostgreSQL text search configs. """ with self._connect_to_database( - connect_to_current_host=True + database_host=self.current_host ) as connection, connection.cursor() as cursor: cursor.execute("SELECT CONCAT('pg_catalog.', cfgname) FROM pg_ts_config;") text_search_configs = cursor.fetchall() @@ -401,7 +400,7 @@ def get_postgresql_timezones(self) -> Set[str]: Set of PostgreSQL timezones. """ with self._connect_to_database( - connect_to_current_host=True + database_host=self.current_host ) as connection, connection.cursor() as cursor: cursor.execute("SELECT name FROM pg_timezone_names;") timezones = cursor.fetchall() @@ -434,7 +433,7 @@ def is_tls_enabled(self, check_current_host: bool = False) -> bool: """ try: with self._connect_to_database( - connect_to_current_host=check_current_host + database_host=self.current_host if check_current_host else None ) as connection, connection.cursor() as cursor: cursor.execute("SHOW ssl;") return "on" in cursor.fetchone()[0] @@ -502,19 +501,24 @@ def set_up_database(self) -> None: if connection is not None: connection.close() - def update_user_password(self, username: str, password: str) -> None: + def update_user_password( + self, username: str, password: str, database_host: str = None + ) -> None: """Update a user password. Args: username: the user to update the password. password: the new password for the user. + database_host: the host to connect to. Raises: PostgreSQLUpdateUserPasswordError if the password couldn't be changed. """ connection = None try: - with self._connect_to_database() as connection, connection.cursor() as cursor: + with self._connect_to_database( + database_host=database_host + ) as connection, connection.cursor() as cursor: cursor.execute( sql.SQL("ALTER USER {} WITH ENCRYPTED PASSWORD '" + password + "';").format( sql.Identifier(username) @@ -610,7 +614,7 @@ def validate_date_style(self, date_style: str) -> bool: """ try: with self._connect_to_database( - connect_to_current_host=True + database_host=self.current_host ) as connection, connection.cursor() as cursor: cursor.execute( sql.SQL( diff --git a/src/charm.py b/src/charm.py index 99592539af..bcf31ff165 100755 --- a/src/charm.py +++ b/src/charm.py @@ -86,7 +86,11 @@ USER, USER_PASSWORD_KEY, ) -from relations.async_replication import PostgreSQLAsyncReplication +from relations.async_replication import ( + REPLICATION_CONSUMER_RELATION, + REPLICATION_OFFER_RELATION, + PostgreSQLAsyncReplication, +) from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides from relations.postgresql_provider import PostgreSQLProvider from upgrade import PostgreSQLUpgrade, get_postgresql_dependencies_model @@ -1194,15 +1198,42 @@ def _on_set_password(self, event: ActionEvent) -> None: ) return - # Update the password in the PostgreSQL instance. - try: - self.postgresql.update_user_password(username, password) - except PostgreSQLUpdateUserPasswordError as e: - logger.exception(e) + replication_offer_relation = self.model.get_relation(REPLICATION_OFFER_RELATION) + if ( + replication_offer_relation is not None + and not self.async_replication.is_primary_cluster() + ): + # Update the password in the other cluster PostgreSQL primary instance. + other_cluster_endpoints = self.async_replication.get_all_primary_cluster_endpoints() + other_cluster_primary = self._patroni.get_primary( + alternative_endpoints=other_cluster_endpoints + ) + other_cluster_primary_ip = [ + replication_offer_relation.data[unit].get("private-address") + for unit in replication_offer_relation.units + if unit.name.replace("/", "-") == other_cluster_primary + ][0] + try: + self.postgresql.update_user_password( + username, password, database_host=other_cluster_primary_ip + ) + except PostgreSQLUpdateUserPasswordError as e: + logger.exception(e) + event.fail("Failed changing the password.") + return + elif self.model.get_relation(REPLICATION_CONSUMER_RELATION) is not None: event.fail( - "Failed changing the password: Not all members healthy or finished initial sync." + "Failed changing the password: This action can be ran only in the cluster from the offer side." ) return + else: + # Update the password in this cluster PostgreSQL primary instance. + try: + self.postgresql.update_user_password(username, password) + except PostgreSQLUpdateUserPasswordError as e: + logger.exception(e) + event.fail("Failed changing the password.") + return # Update the password in the secret store. self.set_secret(APP_SCOPE, f"{username}-password", password) @@ -1211,9 +1242,6 @@ def _on_set_password(self, event: ActionEvent) -> None: # Other units Patroni configuration will be reloaded in the peer relation changed event. self.update_config() - # Update the password in the async replication data. - self.async_replication.update_async_replication_data() - event.set_results({"password": password}) def _on_update_status(self, _) -> None: diff --git a/src/cluster.py b/src/cluster.py index 4b08ff5a58..29a42e87b5 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -230,11 +230,12 @@ def get_member_status(self, member_name: str) -> str: return member["state"] return "" - def get_primary(self, unit_name_pattern=False) -> str: + def get_primary(self, unit_name_pattern=False, alternative_endpoints: List[str] = None) -> str: """Get primary instance. Args: unit_name_pattern: whether to convert pod name to unit name + alternative_endpoints: list of alternative endpoints to check for the primary. Returns: primary pod or unit name. @@ -242,7 +243,7 @@ def get_primary(self, unit_name_pattern=False) -> str: # Request info from cluster endpoint (which returns all members of the cluster). for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)): with attempt: - url = self._get_alternative_patroni_url(attempt) + url = self._get_alternative_patroni_url(attempt, alternative_endpoints) cluster_status = requests.get( f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}", verify=self.verify, @@ -301,12 +302,18 @@ def get_sync_standby_names(self) -> List[str]: sync_standbys.append("/".join(member["name"].rsplit("-", 1))) return sync_standbys - def _get_alternative_patroni_url(self, attempt: AttemptManager) -> str: + def _get_alternative_patroni_url( + self, attempt: AttemptManager, alternative_endpoints: List[str] = None + ) -> str: """Get an alternative REST API URL from another member each time. When the Patroni process is not running in the current unit it's needed to use a URL from another cluster member REST API to do some operations. """ + if alternative_endpoints is not None: + return self._patroni_url.replace( + self.unit_ip, alternative_endpoints[attempt.retry_state.attempt_number - 1] + ) attempt_number = attempt.retry_state.attempt_number if attempt_number > 1: url = self._patroni_url diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 1328e55879..3b7ffb9f57 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -35,6 +35,7 @@ RelationChangedEvent, RelationDepartedEvent, Secret, + SecretChangedEvent, SecretNotFoundError, WaitingStatus, ) @@ -54,6 +55,7 @@ READ_ONLY_MODE_BLOCKING_MESSAGE = "Cluster in read-only mode" REPLICATION_CONSUMER_RELATION = "replication" REPLICATION_OFFER_RELATION = "replication-offer" +SECRET_LABEL = "async-replication-secret" class PostgreSQLAsyncReplication(Object): @@ -106,6 +108,8 @@ def __init__(self, charm): self.charm.on.promote_to_primary_action, self._on_promote_to_primary ) + self.framework.observe(self.charm.on.secret_changed, self._on_secret_changed) + def _can_promote_cluster(self, event: ActionEvent) -> bool: """Check if the cluster can be promoted.""" if not self.charm.is_cluster_initialised: @@ -141,22 +145,7 @@ def _can_promote_cluster(self, event: ActionEvent) -> bool: event.fail("This cluster is already the primary cluster.") return False - # To promote the other cluster if there is already a primary cluster, the action must be called with - # `force-promotion=true`. If not, fail the action telling that the other cluster is already the primary. - if relation.app == primary_cluster: - if not event.params.get("force-promotion"): - event.fail( - f"{relation.app.name} is already the primary cluster. Pass `force-promotion=true` to promote anyway." - ) - return False - else: - logger.warning( - "%s is already the primary cluster. Forcing promotion of %s to primary cluster due to `force-promotion=true`.", - relation.app.name, - self.charm.app.name, - ) - - return True + return self._handle_forceful_promotion(event) def _configure_primary_cluster( self, primary_cluster: Application, event: RelationChangedEvent @@ -164,7 +153,7 @@ def _configure_primary_cluster( """Configure the primary cluster.""" if self.charm.app == primary_cluster: self.charm.update_config() - if self._is_primary_cluster() and self.charm.unit.is_leader(): + if self.is_primary_cluster() and self.charm.unit.is_leader(): self._update_primary_cluster_data() # If this is a standby cluster, remove the information from DCS to make it # a normal cluster. @@ -192,24 +181,10 @@ def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: """Configure the standby cluster.""" relation = self._relation if relation.name == REPLICATION_CONSUMER_RELATION: - # Update the secrets between the clusters. - primary_cluster_info = relation.data[relation.app].get("primary-cluster-data") - secret_id = ( - None - if primary_cluster_info is None - else json.loads(primary_cluster_info).get("secret-id") - ) - try: - secret = self.charm.model.get_secret(id=secret_id, label=self._secret_label) - except SecretNotFoundError: + if not self._update_internal_secret(): logger.debug("Secret not found, deferring event") event.defer() return False - credentials = secret.peek_content() - for key, password in credentials.items(): - user = key.split("-password")[0] - self.charm.set_secret(APP_SCOPE, key, password) - logger.debug("Synced %s password", user) system_identifier, error = self.get_system_identifier() if error is not None: raise Exception(error) @@ -221,6 +196,24 @@ def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: logger.warning("Please review the backup file %s and handle its removal", filename) return True + def get_all_primary_cluster_endpoints(self) -> List[str]: + """Return all the primary cluster endpoints.""" + relation = self._relation + primary_cluster = self._get_primary_cluster() + # List the primary endpoints only for the standby cluster. + if relation is None or primary_cluster is None or self.charm.app == primary_cluster: + return [] + return [ + relation.data[unit].get("unit-address") + for relation in [ + self.model.get_relation(REPLICATION_OFFER_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), + ] + if relation is not None + for unit in relation.units + if relation.data[unit].get("unit-address") is not None + ] + def _get_highest_promoted_cluster_counter_value(self) -> str: """Return the highest promoted cluster counter.""" promoted_cluster_counter = "0" @@ -289,24 +282,27 @@ def get_primary_cluster_endpoint(self) -> Optional[str]: def _get_secret(self) -> Secret: """Return async replication necessary secrets.""" + app_secret = self.charm.model.get_secret(label=f"{PEER}.{self.model.app.name}.app") + content = app_secret.peek_content() + + # Filter out unnecessary secrets. + shared_content = dict(filter(lambda x: "password" in x[0], content.items())) + try: # Avoid recreating the secret. - secret = self.charm.model.get_secret(label=self._secret_label) + secret = self.charm.model.get_secret(label=SECRET_LABEL) if not secret.id: # Workaround for the secret id not being set with model uuid. secret._id = f"secret://{self.model.uuid}/{secret.get_info().id.split(':')[1]}" + if secret.peek_content() != shared_content: + logger.info("Updating outdated secret content") + secret.set_content(shared_content) return secret except SecretNotFoundError: logger.debug("Secret not found, creating a new one") pass - app_secret = self.charm.model.get_secret(label=f"{PEER}.{self.model.app.name}.app") - content = app_secret.peek_content() - - # Filter out unnecessary secrets. - shared_content = dict(filter(lambda x: "password" in x[0], content.items())) - - return self.charm.model.app.add_secret(content=shared_content, label=self._secret_label) + return self.charm.model.app.add_secret(content=shared_content, label=SECRET_LABEL) def get_standby_endpoints(self) -> List[str]: """Return the standby endpoints.""" @@ -403,6 +399,31 @@ def _handle_database_start(self, event: RelationChangedEvent) -> None: logger.debug("Deferring on_async_relation_changed: database hasn't started yet.") event.defer() + def _handle_forceful_promotion(self, event: ActionEvent) -> bool: + if not event.params.get("force"): + all_primary_cluster_endpoints = self.get_all_primary_cluster_endpoints() + if len(all_primary_cluster_endpoints) > 0: + primary_cluster_reachable = False + try: + primary = self.charm._patroni.get_primary( + alternative_endpoints=all_primary_cluster_endpoints + ) + if primary is not None: + primary_cluster_reachable = True + except RetryError: + pass + if not primary_cluster_reachable: + event.fail( + f"{self._relation.app.name} isn't reachable. Pass `force=true` to promote anyway." + ) + return False + else: + logger.warning( + "Forcing promotion of %s to primary cluster due to `force=true`.", + self.charm.app.name, + ) + return True + def handle_read_only_mode(self) -> None: """Handle read-only mode (standby cluster that lost the relation with the primary cluster).""" promoted_cluster_counter = self.charm._peers.data[self.charm.app].get( @@ -464,7 +485,7 @@ def _is_following_promoted_cluster(self) -> bool: == self._get_highest_promoted_cluster_counter_value() ) - def _is_primary_cluster(self) -> bool: + def is_primary_cluster(self) -> bool: """Return the primary cluster name.""" return self.charm.app == self._get_primary_cluster() @@ -550,6 +571,14 @@ def _on_async_relation_joined(self, _) -> None: def _on_create_replication(self, event: ActionEvent) -> None: """Set up asynchronous replication between two clusters.""" + if self._get_primary_cluster() is not None: + event.fail("There is already a replication set up.") + return + + if self._relation.name == REPLICATION_CONSUMER_RELATION: + event.fail("This action must be run in the cluster where the offer was created.") + return + if not self._handle_replication_change(event): return @@ -561,12 +590,47 @@ def _on_create_replication(self, event: ActionEvent) -> None: def _on_promote_to_primary(self, event: ActionEvent) -> None: """Promote this cluster to the primary cluster.""" + if self._get_primary_cluster() is None: + event.fail( + "No primary cluster found. Run `create-replication` action in the cluster where the offer was created." + ) + return + if not self._handle_replication_change(event): return # Set the status. self.charm.unit.status = MaintenanceStatus("Creating replication...") + def _on_secret_changed(self, event: SecretChangedEvent) -> None: + """Update the internal secret when the relation secret changes.""" + relation = self._relation + if relation is None: + logger.debug("Early exit on_secret_changed: No relation found.") + return + + if ( + relation.name == REPLICATION_OFFER_RELATION + and event.secret.label == f"{PEER}.{self.model.app.name}.app" + ): + logger.info("Internal secret changed, updating relation secret") + secret = self._get_secret() + secret.grant(relation) + primary_cluster_data = { + "endpoint": self._primary_cluster_endpoint, + "secret-id": secret.id, + } + relation.data[self.charm.app]["primary-cluster-data"] = json.dumps( + primary_cluster_data + ) + return + + if relation.name == REPLICATION_CONSUMER_RELATION and event.secret.label == SECRET_LABEL: + logger.info("Relation secret changed, updating internal secret") + if not self._update_internal_secret(): + logger.debug("Secret not found, deferring event") + event.defer() + @property def _primary_cluster_endpoint(self) -> str: """Return the endpoint from one of the sync-standbys, or from the primary if there is no sync-standby.""" @@ -610,11 +674,6 @@ def _relation(self) -> Relation: if relation is not None: return relation - @property - def _secret_label(self) -> str: - """Return the secret label.""" - return f"async-replication-secret-{self._get_highest_promoted_cluster_counter_value()}" - def _stop_database(self, event: RelationChangedEvent) -> bool: """Stop the database.""" if ( @@ -670,9 +729,29 @@ def update_async_replication_data(self) -> None: if relation is None: return relation.data[self.charm.unit].update({"unit-address": self.charm._unit_ip}) - if self._is_primary_cluster() and self.charm.unit.is_leader(): + if self.is_primary_cluster() and self.charm.unit.is_leader(): self._update_primary_cluster_data() + def _update_internal_secret(self) -> bool: + # Update the secrets between the clusters. + relation = self._relation + primary_cluster_info = relation.data[relation.app].get("primary-cluster-data") + secret_id = ( + None + if primary_cluster_info is None + else json.loads(primary_cluster_info).get("secret-id") + ) + try: + secret = self.charm.model.get_secret(id=secret_id, label=SECRET_LABEL) + except SecretNotFoundError: + return False + credentials = secret.peek_content() + for key, password in credentials.items(): + user = key.split("-password")[0] + self.charm.set_secret(APP_SCOPE, key, password) + logger.debug("Synced %s password", user) + return True + def _update_primary_cluster_data( self, promoted_cluster_counter: int = None, system_identifier: str = None ) -> None: diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index 52aebf0210..0967f7b5ff 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -244,7 +244,7 @@ async def test_switchover( leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME, model=second_model) assert leader_unit is not None, "No leader unit found" logger.info("promoting the second cluster") - run_action = await leader_unit.run_action("promote-to-primary", **{"force-promotion": True}) + run_action = await leader_unit.run_action("promote-to-primary", **{"force": True}) await run_action.wait() assert (run_action.results.get("return-code", None) == 0) or ( run_action.results.get("Code", None) == "0"