From 1a1c2d4f4b3e1b016a8f685e93be3c7b98e1a4d5 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 18 Jun 2024 08:16:40 -0300 Subject: [PATCH] [DPE-4257] Async replication UX Improvements (#481) * Syncing the UX with MySQL Signed-off-by: Marcelo Henrique Neppel * Fix failover and set-secret behaviour Signed-off-by: Marcelo Henrique Neppel * Improve statuses Signed-off-by: Marcelo Henrique Neppel * Fix app status set Signed-off-by: Marcelo Henrique Neppel * Fix model switch Signed-off-by: Marcelo Henrique Neppel * Fix config integration test Signed-off-by: Marcelo Henrique Neppel * Fix backups integration test Signed-off-by: Marcelo Henrique Neppel --------- Signed-off-by: Marcelo Henrique Neppel --- actions.yaml | 11 +- metadata.yaml | 8 +- src/charm.py | 50 ++- src/cluster.py | 13 +- src/relations/async_replication.py | 346 ++++++++++++------ .../ha_tests/test_async_replication.py | 32 +- tests/integration/helpers.py | 6 +- tests/integration/test_config.py | 11 +- tests/unit/test_charm.py | 4 +- 9 files changed, 323 insertions(+), 158 deletions(-) diff --git a/actions.yaml b/actions.yaml index 4051f1edd4..6e3ced3d8b 100644 --- a/actions.yaml +++ b/actions.yaml @@ -11,6 +11,13 @@ create-backup: Differential backup is a copy only of changed data since the last full backup. Incremental backup is a copy only of changed data since the last backup (any type). Possible values - full, differential, incremental. +create-replication: + description: Set up asynchronous replication between two clusters. + params: + name: + type: string + description: The name of the replication (defaults to 'default'). + default: default get-primary: description: Get the unit which is the primary/leader in the replication. get-password: @@ -25,10 +32,10 @@ list-backups: description: Lists backups in s3 storage. pre-upgrade-check: description: Run necessary pre-upgrade checks and preparations before executing a charm refresh. -promote-cluster: +promote-to-primary: description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit. params: - force-promotion: + force: type: boolean description: Force the promotion of a cluster when there is already a primary cluster. restore: diff --git a/metadata.yaml b/metadata.yaml index 0c409ccafe..850712c8b7 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -26,8 +26,8 @@ peers: interface: upgrade provides: - async-primary: - interface: async_replication + replication-offer: + interface: postgresql_async limit: 1 optional: true database: @@ -41,8 +41,8 @@ provides: limit: 1 requires: - async-replica: - interface: async_replication + replication: + interface: postgresql_async limit: 1 optional: true certificates: diff --git a/src/charm.py b/src/charm.py index aa01bab368..ce525bed36 100755 --- a/src/charm.py +++ b/src/charm.py @@ -90,7 +90,11 @@ USER, USER_PASSWORD_KEY, ) -from relations.async_replication import PostgreSQLAsyncReplication +from relations.async_replication import ( + REPLICATION_CONSUMER_RELATION, + REPLICATION_OFFER_RELATION, + PostgreSQLAsyncReplication, +) from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides from relations.postgresql_provider import PostgreSQLProvider from upgrade import PostgreSQLUpgrade, get_postgresql_dependencies_model @@ -1222,15 +1226,42 @@ def _on_set_password(self, event: ActionEvent) -> None: ) return - # Update the password in the PostgreSQL instance. - try: - self.postgresql.update_user_password(username, password) - except PostgreSQLUpdateUserPasswordError as e: - logger.exception(e) + replication_offer_relation = self.model.get_relation(REPLICATION_OFFER_RELATION) + if ( + replication_offer_relation is not None + and not self.async_replication.is_primary_cluster() + ): + # Update the password in the other cluster PostgreSQL primary instance. + other_cluster_endpoints = self.async_replication.get_all_primary_cluster_endpoints() + other_cluster_primary = self._patroni.get_primary( + alternative_endpoints=other_cluster_endpoints + ) + other_cluster_primary_ip = [ + replication_offer_relation.data[unit].get("private-address") + for unit in replication_offer_relation.units + if unit.name.replace("/", "-") == other_cluster_primary + ][0] + try: + self.postgresql.update_user_password( + username, password, database_host=other_cluster_primary_ip + ) + except PostgreSQLUpdateUserPasswordError as e: + logger.exception(e) + event.fail("Failed changing the password.") + return + elif self.model.get_relation(REPLICATION_CONSUMER_RELATION) is not None: event.fail( - "Failed changing the password: Not all members healthy or finished initial sync." + "Failed changing the password: This action can be ran only in the cluster from the offer side." ) return + else: + # Update the password in this cluster PostgreSQL primary instance. + try: + self.postgresql.update_user_password(username, password) + except PostgreSQLUpdateUserPasswordError as e: + logger.exception(e) + event.fail("Failed changing the password.") + return # Update the password in the secret store. self.set_secret(APP_SCOPE, f"{username}-password", password) @@ -1239,9 +1270,6 @@ def _on_set_password(self, event: ActionEvent) -> None: # Other units Patroni configuration will be reloaded in the peer relation changed event. self.update_config() - # Update the password in the async replication data. - self.async_replication.update_async_replication_data() - event.set_results({"password": password}) def _on_update_status(self, _) -> None: @@ -1357,7 +1385,7 @@ def _set_primary_status_message(self) -> None: if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name: self.unit.status = ActiveStatus("Primary") elif self.is_standby_leader: - self.unit.status = ActiveStatus("Standby Leader") + self.unit.status = ActiveStatus("Standby") elif self._patroni.member_started: self.unit.status = ActiveStatus() except (RetryError, ConnectionError) as e: diff --git a/src/cluster.py b/src/cluster.py index 4b08ff5a58..29a42e87b5 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -230,11 +230,12 @@ def get_member_status(self, member_name: str) -> str: return member["state"] return "" - def get_primary(self, unit_name_pattern=False) -> str: + def get_primary(self, unit_name_pattern=False, alternative_endpoints: List[str] = None) -> str: """Get primary instance. Args: unit_name_pattern: whether to convert pod name to unit name + alternative_endpoints: list of alternative endpoints to check for the primary. Returns: primary pod or unit name. @@ -242,7 +243,7 @@ def get_primary(self, unit_name_pattern=False) -> str: # Request info from cluster endpoint (which returns all members of the cluster). for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)): with attempt: - url = self._get_alternative_patroni_url(attempt) + url = self._get_alternative_patroni_url(attempt, alternative_endpoints) cluster_status = requests.get( f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}", verify=self.verify, @@ -301,12 +302,18 @@ def get_sync_standby_names(self) -> List[str]: sync_standbys.append("/".join(member["name"].rsplit("-", 1))) return sync_standbys - def _get_alternative_patroni_url(self, attempt: AttemptManager) -> str: + def _get_alternative_patroni_url( + self, attempt: AttemptManager, alternative_endpoints: List[str] = None + ) -> str: """Get an alternative REST API URL from another member each time. When the Patroni process is not running in the current unit it's needed to use a URL from another cluster member REST API to do some operations. """ + if alternative_endpoints is not None: + return self._patroni_url.replace( + self.unit_ip, alternative_endpoints[attempt.retry_state.attempt_number - 1] + ) attempt_number = attempt.retry_state.attempt_number if attempt_number > 1: url = self._patroni_url diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 320beeb487..e2adab6d8e 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -27,6 +27,7 @@ from ops import ( ActionEvent, + ActiveStatus, Application, BlockedStatus, MaintenanceStatus, @@ -35,6 +36,7 @@ RelationChangedEvent, RelationDepartedEvent, Secret, + SecretChangedEvent, SecretNotFoundError, WaitingStatus, ) @@ -51,9 +53,10 @@ logger = logging.getLogger(__name__) -ASYNC_PRIMARY_RELATION = "async-primary" -ASYNC_REPLICA_RELATION = "async-replica" -READ_ONLY_MODE_BLOCKING_MESSAGE = "Cluster in read-only mode" +READ_ONLY_MODE_BLOCKING_MESSAGE = "Standalone read-only cluster" +REPLICATION_CONSUMER_RELATION = "replication" +REPLICATION_OFFER_RELATION = "replication-offer" +SECRET_LABEL = "async-replication-secret" class PostgreSQLAsyncReplication(Object): @@ -64,36 +67,49 @@ def __init__(self, charm): super().__init__(charm, "postgresql") self.charm = charm self.framework.observe( - self.charm.on[ASYNC_PRIMARY_RELATION].relation_joined, self._on_async_relation_joined + self.charm.on[REPLICATION_OFFER_RELATION].relation_joined, + self._on_async_relation_joined, ) self.framework.observe( - self.charm.on[ASYNC_REPLICA_RELATION].relation_joined, self._on_async_relation_joined + self.charm.on[REPLICATION_CONSUMER_RELATION].relation_joined, + self._on_async_relation_joined, ) self.framework.observe( - self.charm.on[ASYNC_PRIMARY_RELATION].relation_changed, self._on_async_relation_changed + self.charm.on[REPLICATION_OFFER_RELATION].relation_changed, + self._on_async_relation_changed, ) self.framework.observe( - self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_async_relation_changed + self.charm.on[REPLICATION_CONSUMER_RELATION].relation_changed, + self._on_async_relation_changed, ) # Departure events self.framework.observe( - self.charm.on[ASYNC_PRIMARY_RELATION].relation_departed, + self.charm.on[REPLICATION_OFFER_RELATION].relation_departed, self._on_async_relation_departed, ) self.framework.observe( - self.charm.on[ASYNC_REPLICA_RELATION].relation_departed, + self.charm.on[REPLICATION_CONSUMER_RELATION].relation_departed, self._on_async_relation_departed, ) self.framework.observe( - self.charm.on[ASYNC_PRIMARY_RELATION].relation_broken, self._on_async_relation_broken + self.charm.on[REPLICATION_OFFER_RELATION].relation_broken, + self._on_async_relation_broken, ) self.framework.observe( - self.charm.on[ASYNC_REPLICA_RELATION].relation_broken, self._on_async_relation_broken + self.charm.on[REPLICATION_CONSUMER_RELATION].relation_broken, + self._on_async_relation_broken, ) # Actions - self.framework.observe(self.charm.on.promote_cluster_action, self._on_promote_cluster) + self.framework.observe( + self.charm.on.create_replication_action, self._on_create_replication + ) + self.framework.observe( + self.charm.on.promote_to_primary_action, self._on_promote_to_primary + ) + + self.framework.observe(self.charm.on.secret_changed, self._on_secret_changed) def _can_promote_cluster(self, event: ActionEvent) -> bool: """Check if the cluster can be promoted.""" @@ -109,13 +125,11 @@ def _can_promote_cluster(self, event: ActionEvent) -> bool: if standby_leader is not None: try: self.charm._patroni.promote_standby_cluster() - if ( - self.charm.is_blocked - and self.charm.unit.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE - ): + if self.charm.app.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE: self.charm._peers.data[self.charm.app].update({ "promoted-cluster-counter": "" }) + self._set_app_status() self.charm._set_primary_status_message() except (StandbyClusterAlreadyPromotedError, ClusterNotPromotedError) as e: event.fail(str(e)) @@ -130,22 +144,7 @@ def _can_promote_cluster(self, event: ActionEvent) -> bool: event.fail("This cluster is already the primary cluster.") return False - # To promote the other cluster if there is already a primary cluster, the action must be called with - # `force-promotion=true`. If not, fail the action telling that the other cluster is already the primary. - if relation.app == primary_cluster: - if not event.params.get("force-promotion"): - event.fail( - f"{relation.app.name} is already the primary cluster. Pass `force-promotion=true` to promote anyway." - ) - return False - else: - logger.warning( - "%s is already the primary cluster. Forcing promotion of %s to primary cluster due to `force-promotion=true`.", - relation.app.name, - self.charm.app.name, - ) - - return True + return self._handle_forceful_promotion(event) def _configure_primary_cluster( self, primary_cluster: Application, event: RelationChangedEvent @@ -153,7 +152,7 @@ def _configure_primary_cluster( """Configure the primary cluster.""" if self.charm.app == primary_cluster: self.charm.update_config() - if self._is_primary_cluster() and self.charm.unit.is_leader(): + if self.is_primary_cluster() and self.charm.unit.is_leader(): self._update_primary_cluster_data() # If this is a standby cluster, remove the information from DCS to make it # a normal cluster. @@ -180,25 +179,11 @@ def _configure_primary_cluster( def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: """Configure the standby cluster.""" relation = self._relation - if relation.name == ASYNC_REPLICA_RELATION: - # Update the secrets between the clusters. - primary_cluster_info = relation.data[relation.app].get("primary-cluster-data") - secret_id = ( - None - if primary_cluster_info is None - else json.loads(primary_cluster_info).get("secret-id") - ) - try: - secret = self.charm.model.get_secret(id=secret_id, label=self._secret_label) - except SecretNotFoundError: + if relation.name == REPLICATION_CONSUMER_RELATION: + if not self._update_internal_secret(): logger.debug("Secret not found, deferring event") event.defer() return False - credentials = secret.peek_content() - for key, password in credentials.items(): - user = key.split("-password")[0] - self.charm.set_secret(APP_SCOPE, key, password) - logger.debug("Synced %s password", user) system_identifier, error = self.get_system_identifier() if error is not None: raise Exception(error) @@ -210,12 +195,30 @@ def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: logger.warning("Please review the backup file %s and handle its removal", filename) return True + def get_all_primary_cluster_endpoints(self) -> List[str]: + """Return all the primary cluster endpoints.""" + relation = self._relation + primary_cluster = self._get_primary_cluster() + # List the primary endpoints only for the standby cluster. + if relation is None or primary_cluster is None or self.charm.app == primary_cluster: + return [] + return [ + relation.data[unit].get("unit-address") + for relation in [ + self.model.get_relation(REPLICATION_OFFER_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), + ] + if relation is not None + for unit in relation.units + if relation.data[unit].get("unit-address") is not None + ] + def _get_highest_promoted_cluster_counter_value(self) -> str: """Return the highest promoted cluster counter.""" promoted_cluster_counter = "0" for async_relation in [ - self.model.get_relation(ASYNC_PRIMARY_RELATION), - self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(REPLICATION_OFFER_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), ]: if async_relation is None: continue @@ -249,8 +252,8 @@ def _get_primary_cluster(self) -> Optional[Application]: primary_cluster = None promoted_cluster_counter = "0" for async_relation in [ - self.model.get_relation(ASYNC_PRIMARY_RELATION), - self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(REPLICATION_OFFER_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), ]: if async_relation is None: continue @@ -278,24 +281,27 @@ def get_primary_cluster_endpoint(self) -> Optional[str]: def _get_secret(self) -> Secret: """Return async replication necessary secrets.""" + app_secret = self.charm.model.get_secret(label=f"{PEER}.{self.model.app.name}.app") + content = app_secret.peek_content() + + # Filter out unnecessary secrets. + shared_content = dict(filter(lambda x: "password" in x[0], content.items())) + try: # Avoid recreating the secret. - secret = self.charm.model.get_secret(label=self._secret_label) + secret = self.charm.model.get_secret(label=SECRET_LABEL) if not secret.id: # Workaround for the secret id not being set with model uuid. secret._id = f"secret://{self.model.uuid}/{secret.get_info().id.split(':')[1]}" + if secret.peek_content() != shared_content: + logger.info("Updating outdated secret content") + secret.set_content(shared_content) return secret except SecretNotFoundError: logger.debug("Secret not found, creating a new one") pass - app_secret = self.charm.model.get_secret(label=f"{PEER}.{self.model.app.name}.app") - content = app_secret.peek_content() - - # Filter out unnecessary secrets. - shared_content = dict(filter(lambda x: "password" in x[0], content.items())) - - return self.charm.model.app.add_secret(content=shared_content, label=self._secret_label) + return self.charm.model.app.add_secret(content=shared_content, label=SECRET_LABEL) def get_standby_endpoints(self) -> List[str]: """Return the standby endpoints.""" @@ -307,8 +313,8 @@ def get_standby_endpoints(self) -> List[str]: return [ relation.data[unit].get("unit-address") for relation in [ - self.model.get_relation(ASYNC_PRIMARY_RELATION), - self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(REPLICATION_OFFER_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), ] if relation is not None for unit in relation.units @@ -392,21 +398,74 @@ def _handle_database_start(self, event: RelationChangedEvent) -> None: logger.debug("Deferring on_async_relation_changed: database hasn't started yet.") event.defer() + def _handle_forceful_promotion(self, event: ActionEvent) -> bool: + if not event.params.get("force"): + all_primary_cluster_endpoints = self.get_all_primary_cluster_endpoints() + if len(all_primary_cluster_endpoints) > 0: + primary_cluster_reachable = False + try: + primary = self.charm._patroni.get_primary( + alternative_endpoints=all_primary_cluster_endpoints + ) + if primary is not None: + primary_cluster_reachable = True + except RetryError: + pass + if not primary_cluster_reachable: + event.fail( + f"{self._relation.app.name} isn't reachable. Pass `force=true` to promote anyway." + ) + return False + else: + logger.warning( + "Forcing promotion of %s to primary cluster due to `force=true`.", + self.charm.app.name, + ) + return True + def handle_read_only_mode(self) -> None: """Handle read-only mode (standby cluster that lost the relation with the primary cluster).""" - promoted_cluster_counter = self.charm._peers.data[self.charm.app].get( - "promoted-cluster-counter", "" - ) - if not self.charm.is_blocked or ( - promoted_cluster_counter != "0" - and self.charm.unit.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE - ): + if not self.charm.is_blocked: self.charm._set_primary_status_message() - if ( - promoted_cluster_counter == "0" - and self.charm.unit.status.message != READ_ONLY_MODE_BLOCKING_MESSAGE - ): - self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + + if self.charm.unit.is_leader(): + self._set_app_status() + + def _handle_replication_change(self, event: ActionEvent) -> bool: + if not self._can_promote_cluster(event): + return False + + relation = self._relation + + # Check if all units from the other cluster published their pod IPs in the relation data. + # If not, fail the action telling that all units must publish their pod addresses in the + # relation data. + for unit in relation.units: + if "unit-address" not in relation.data[unit]: + event.fail( + "All units from the other cluster must publish their pod addresses in the relation data." + ) + return False + + system_identifier, error = self.get_system_identifier() + if error is not None: + logger.exception(error) + event.fail("Failed to get system identifier") + return False + + # Increment the current cluster counter in this application side based on the highest counter value. + promoted_cluster_counter = int(self._get_highest_promoted_cluster_counter_value()) + promoted_cluster_counter += 1 + logger.debug("Promoted cluster counter: %s", promoted_cluster_counter) + + self._update_primary_cluster_data(promoted_cluster_counter, system_identifier) + + # Emit an async replication changed event for this unit (to promote this cluster before demoting the + # other if this one is a standby cluster, which is needed to correctly set up the async replication + # when performing a switchover). + self._re_emit_async_relation_changed_event() + + return True def _is_following_promoted_cluster(self) -> bool: """Return True if this unit is following the promoted cluster.""" @@ -417,7 +476,7 @@ def _is_following_promoted_cluster(self) -> bool: == self._get_highest_promoted_cluster_counter_value() ) - def _is_primary_cluster(self) -> bool: + def is_primary_cluster(self) -> bool: """Return the primary cluster name.""" return self.charm.app == self._get_primary_cluster() @@ -436,7 +495,7 @@ def _on_async_relation_broken(self, _) -> None: if self.charm._patroni.get_standby_leader() is not None: if self.charm.unit.is_leader(): self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": "0"}) - self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + self._set_app_status() else: if self.charm.unit.is_leader(): self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": ""}) @@ -444,6 +503,9 @@ def _on_async_relation_broken(self, _) -> None: def _on_async_relation_changed(self, event: RelationChangedEvent) -> None: """Update the Patroni configuration if one of the clusters was already promoted.""" + if self.charm.unit.is_leader(): + self._set_app_status() + primary_cluster = self._get_primary_cluster() logger.debug("Primary cluster: %s", primary_cluster) if primary_cluster is None: @@ -501,43 +563,70 @@ def _on_async_relation_joined(self, _) -> None: "unit-promoted-cluster-counter": highest_promoted_cluster_counter }) - def _on_promote_cluster(self, event: ActionEvent) -> None: - """Promote this cluster to the primary cluster.""" - if not self._can_promote_cluster(event): + def _on_create_replication(self, event: ActionEvent) -> None: + """Set up asynchronous replication between two clusters.""" + if self._get_primary_cluster() is not None: + event.fail("There is already a replication set up.") return - relation = self._relation - - # Check if all units from the other cluster published their pod IPs in the relation data. - # If not, fail the action telling that all units must publish their pod addresses in the - # relation data. - for unit in relation.units: - if "unit-address" not in relation.data[unit]: - event.fail( - "All units from the other cluster must publish their pod addresses in the relation data." - ) - return + if self._relation.name == REPLICATION_CONSUMER_RELATION: + event.fail("This action must be run in the cluster where the offer was created.") + return - system_identifier, error = self.get_system_identifier() - if error is not None: - logger.exception(error) - event.fail("Failed to get system identifier") + if not self._handle_replication_change(event): return - # Increment the current cluster counter in this application side based on the highest counter value. - promoted_cluster_counter = int(self._get_highest_promoted_cluster_counter_value()) - promoted_cluster_counter += 1 - logger.debug("Promoted cluster counter: %s", promoted_cluster_counter) + # Set the replication name in the relation data. + self._relation.data[self.charm.app].update({"name": event.params["name"]}) - self._update_primary_cluster_data(promoted_cluster_counter, system_identifier) + # Set the status. + self.charm.unit.status = MaintenanceStatus("Creating replication...") - # Emit an async replication changed event for this unit (to promote this cluster before demoting the - # other if this one is a standby cluster, which is needed to correctly setup the async replication - # when performing a switchover). - self._re_emit_async_relation_changed_event() + def _on_promote_to_primary(self, event: ActionEvent) -> None: + """Promote this cluster to the primary cluster.""" + if ( + self.charm.app.status.message != READ_ONLY_MODE_BLOCKING_MESSAGE + and self._get_primary_cluster() is None + ): + event.fail( + "No primary cluster found. Run `create-replication` action in the cluster where the offer was created." + ) + return + + if not self._handle_replication_change(event): + return # Set the status. - self.charm.unit.status = MaintenanceStatus("Promoting cluster...") + self.charm.unit.status = MaintenanceStatus("Creating replication...") + + def _on_secret_changed(self, event: SecretChangedEvent) -> None: + """Update the internal secret when the relation secret changes.""" + relation = self._relation + if relation is None: + logger.debug("Early exit on_secret_changed: No relation found.") + return + + if ( + relation.name == REPLICATION_OFFER_RELATION + and event.secret.label == f"{PEER}.{self.model.app.name}.app" + ): + logger.info("Internal secret changed, updating relation secret") + secret = self._get_secret() + secret.grant(relation) + primary_cluster_data = { + "endpoint": self._primary_cluster_endpoint, + "secret-id": secret.id, + } + relation.data[self.charm.app]["primary-cluster-data"] = json.dumps( + primary_cluster_data + ) + return + + if relation.name == REPLICATION_CONSUMER_RELATION and event.secret.label == SECRET_LABEL: + logger.info("Relation secret changed, updating internal secret") + if not self._update_internal_secret(): + logger.debug("Secret not found, deferring event") + event.defer() @property def _primary_cluster_endpoint(self) -> str: @@ -576,16 +665,27 @@ def _reinitialise_pgdata(self) -> None: def _relation(self) -> Relation: """Return the relation object.""" for relation in [ - self.model.get_relation(ASYNC_PRIMARY_RELATION), - self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(REPLICATION_OFFER_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), ]: if relation is not None: return relation - @property - def _secret_label(self) -> str: - """Return the secret label.""" - return f"async-replication-secret-{self._get_highest_promoted_cluster_counter_value()}" + def _set_app_status(self) -> None: + """Set the app status.""" + if self.charm._peers.data[self.charm.app].get("promoted-cluster-counter") == "0": + self.charm.app.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + return + if self._relation is None: + self.charm.app.status = ActiveStatus() + return + primary_cluster = self._get_primary_cluster() + if primary_cluster is None: + self.charm.app.status = ActiveStatus() + else: + self.charm.app.status = ActiveStatus( + "Primary" if self.charm.app == primary_cluster else "Standby" + ) def _stop_database(self, event: RelationChangedEvent) -> bool: """Stop the database.""" @@ -642,9 +742,29 @@ def update_async_replication_data(self) -> None: if relation is None: return relation.data[self.charm.unit].update({"unit-address": self.charm._unit_ip}) - if self._is_primary_cluster() and self.charm.unit.is_leader(): + if self.is_primary_cluster() and self.charm.unit.is_leader(): self._update_primary_cluster_data() + def _update_internal_secret(self) -> bool: + # Update the secrets between the clusters. + relation = self._relation + primary_cluster_info = relation.data[relation.app].get("primary-cluster-data") + secret_id = ( + None + if primary_cluster_info is None + else json.loads(primary_cluster_info).get("secret-id") + ) + try: + secret = self.charm.model.get_secret(id=secret_id, label=SECRET_LABEL) + except SecretNotFoundError: + return False + credentials = secret.peek_content() + for key, password in credentials.items(): + user = key.split("-password")[0] + self.charm.set_secret(APP_SCOPE, key, password) + logger.debug("Synced %s password", user) + return True + def _update_primary_cluster_data( self, promoted_cluster_counter: int = None, system_identifier: str = None ) -> None: @@ -661,7 +781,7 @@ def _update_primary_cluster_data( primary_cluster_data = {"endpoint": self._primary_cluster_endpoint} # Retrieve the secrets that will be shared between the clusters. - if async_relation.name == ASYNC_PRIMARY_RELATION: + if async_relation.name == REPLICATION_OFFER_RELATION: secret = self._get_secret() secret.grant(async_relation) primary_cluster_data["secret-id"] = secret.id diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index 0ed9a27913..96bdb3afa4 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -156,10 +156,10 @@ async def test_async_replication( logger.info("checking whether writes are increasing") await are_writes_increasing(ops_test) - first_offer_command = f"offer {DATABASE_APP_NAME}:async-primary async-primary" + first_offer_command = f"offer {DATABASE_APP_NAME}:replication-offer replication-offer" await ops_test.juju(*first_offer_command.split()) first_consume_command = ( - f"consume -m {second_model.info.name} admin/{first_model.info.name}.async-primary" + f"consume -m {second_model.info.name} admin/{first_model.info.name}.replication-offer" ) await ops_test.juju(*first_consume_command.split()) @@ -173,7 +173,7 @@ async def test_async_replication( ), ) - await second_model.relate(DATABASE_APP_NAME, "async-primary") + await second_model.relate(DATABASE_APP_NAME, "replication-offer") async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( @@ -193,7 +193,7 @@ async def test_async_replication( leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) assert leader_unit is not None, "No leader unit found" logger.info("promoting the first cluster") - run_action = await leader_unit.run_action("promote-cluster") + run_action = await leader_unit.run_action("create-replication") await run_action.wait() assert (run_action.results.get("return-code", None) == 0) or ( run_action.results.get("Code", None) == "0" @@ -228,10 +228,10 @@ async def test_switchover( second_model_continuous_writes, ): """Test switching over to the second cluster.""" - second_offer_command = f"offer {DATABASE_APP_NAME}:async-replica async-replica" + second_offer_command = f"offer {DATABASE_APP_NAME}:replication replication" await ops_test.juju(*second_offer_command.split()) second_consume_command = ( - f"consume -m {second_model.info.name} admin/{first_model.info.name}.async-replica" + f"consume -m {second_model.info.name} admin/{first_model.info.name}.replication" ) await ops_test.juju(*second_consume_command.split()) @@ -250,7 +250,7 @@ async def test_switchover( leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME, model=second_model) assert leader_unit is not None, "No leader unit found" logger.info("promoting the second cluster") - run_action = await leader_unit.run_action("promote-cluster", **{"force-promotion": True}) + run_action = await leader_unit.run_action("promote-to-primary", **{"force": True}) await run_action.wait() assert (run_action.results.get("return-code", None) == 0) or ( run_action.results.get("Code", None) == "0" @@ -288,16 +288,16 @@ async def test_promote_standby( "database", f"{APPLICATION_NAME}:first-database" ) await second_model.applications[DATABASE_APP_NAME].remove_relation( - "async-replica", "async-primary" + "replication", "replication-offer" ) - wait_for_relation_removed_between(ops_test, "async-primary", "async-replica", second_model) + wait_for_relation_removed_between(ops_test, "replication-offer", "replication", second_model) async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( - apps=[DATABASE_APP_NAME], - status="blocked", - idle_period=IDLE_PERIOD, - timeout=TIMEOUT, + apps=[DATABASE_APP_NAME], idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + first_model.block_until( + lambda: first_model.applications[DATABASE_APP_NAME].status == "blocked", ), second_model.wait_for_idle( apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT @@ -308,7 +308,7 @@ async def test_promote_standby( leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) assert leader_unit is not None, "No leader unit found" logger.info("promoting the first cluster") - run_action = await leader_unit.run_action("promote-cluster") + run_action = await leader_unit.run_action("promote-to-primary") await run_action.wait() assert (run_action.results.get("return-code", None) == 0) or ( run_action.results.get("Code", None) == "0" @@ -365,7 +365,7 @@ async def test_reestablish_relation( await are_writes_increasing(ops_test) logger.info("reestablishing the relation") - await second_model.relate(DATABASE_APP_NAME, "async-primary") + await second_model.relate(DATABASE_APP_NAME, "replication-offer") async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( @@ -384,7 +384,7 @@ async def test_reestablish_relation( leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) assert leader_unit is not None, "No leader unit found" logger.info("promoting the first cluster") - run_action = await leader_unit.run_action("promote-cluster") + run_action = await leader_unit.run_action("create-replication") await run_action.wait() assert (run_action.results.get("return-code", None) == 0) or ( run_action.results.get("Code", None) == "0" diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 2fe0b2d086..8a2ad24aa3 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -1036,10 +1036,10 @@ async def wait_for_idle_on_blocked( unit = ops_test.model.units.get(f"{database_app_name}/{unit_number}") await asyncio.gather( ops_test.model.wait_for_idle(apps=[other_app_name], status="active"), - ops_test.model.wait_for_idle( - apps=[database_app_name], status="blocked", raise_on_blocked=False + ops_test.model.block_until( + lambda: unit.workload_status == "blocked" + and unit.workload_status_message == status_message ), - ops_test.model.block_until(lambda: unit.workload_status_message == status_message), ) diff --git a/tests/integration/test_config.py b/tests/integration/test_config.py index c75731053d..25878e8253 100644 --- a/tests/integration/test_config.py +++ b/tests/integration/test_config.py @@ -96,11 +96,16 @@ async def test_config_parameters(ops_test: OpsTest) -> None: logger.info(k) charm_config[k] = v[0] await ops_test.model.applications[DATABASE_APP_NAME].set_config(charm_config) - await ops_test.model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="blocked", timeout=100 + await ops_test.model.block_until( + lambda: ops_test.model.units[f"{DATABASE_APP_NAME}/0"].workload_status + == "blocked", + timeout=100, ) assert "Configuration Error" in leader_unit.workload_status_message charm_config[k] = v[1] await ops_test.model.applications[DATABASE_APP_NAME].set_config(charm_config) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=100) + await ops_test.model.block_until( + lambda: ops_test.model.units[f"{DATABASE_APP_NAME}/0"].workload_status == "active", + timeout=100, + ) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 7ab182447f..a0792250d8 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -2299,9 +2299,7 @@ def test_set_active_status(self, _get_primary, _is_standby_leader, _member_start self.charm.unit.status.message, "Primary" if values[0] == self.charm.unit.name - else ( - "Standby Leader" if values[1] else ("" if values[2] else "fake status") - ), + else ("Standby" if values[1] else ("" if values[2] else "fake status")), ) else: _get_primary.side_effect = values[0]