From 3209fc155b7688b9b29e4510017aea6a64ffa431 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 27 May 2024 17:09:11 -0300 Subject: [PATCH 1/7] Syncing the UX with MySQL Signed-off-by: Marcelo Henrique Neppel --- actions.yaml | 9 +- metadata.yaml | 8 +- src/relations/async_replication.py | 130 +++++++++++------- .../ha_tests/test_async_replication.py | 24 ++-- 4 files changed, 103 insertions(+), 68 deletions(-) diff --git a/actions.yaml b/actions.yaml index 7364321fa7..f5daf6944d 100644 --- a/actions.yaml +++ b/actions.yaml @@ -3,6 +3,13 @@ create-backup: description: Creates a backup to s3 storage. +create-replication: + description: Set up asynchronous replication between two clusters. + params: + name: + type: string + description: The name of the replication (defaults to 'default'). + default: default get-primary: description: Get the unit which is the primary/leader in the replication. get-password: @@ -17,7 +24,7 @@ list-backups: description: Lists backups in s3 storage. pre-upgrade-check: description: Run necessary pre-upgrade checks and preparations before executing a charm refresh. -promote-cluster: +promote-to-primary: description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit. params: force-promotion: diff --git a/metadata.yaml b/metadata.yaml index cd29daf381..57addc1e9b 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -26,8 +26,8 @@ peers: interface: upgrade provides: - async-primary: - interface: async_replication + replication-offer: + interface: postgresql_async limit: 1 optional: true database: @@ -41,8 +41,8 @@ provides: limit: 1 requires: - async-replica: - interface: async_replication + replication: + interface: postgresql_async limit: 1 optional: true certificates: diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 1e4df96c88..1328e55879 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -51,9 +51,9 @@ logger = logging.getLogger(__name__) -ASYNC_PRIMARY_RELATION = "async-primary" -ASYNC_REPLICA_RELATION = "async-replica" READ_ONLY_MODE_BLOCKING_MESSAGE = "Cluster in read-only mode" +REPLICATION_CONSUMER_RELATION = "replication" +REPLICATION_OFFER_RELATION = "replication-offer" class PostgreSQLAsyncReplication(Object): @@ -64,36 +64,47 @@ def __init__(self, charm): super().__init__(charm, "postgresql") self.charm = charm self.framework.observe( - self.charm.on[ASYNC_PRIMARY_RELATION].relation_joined, self._on_async_relation_joined + self.charm.on[REPLICATION_OFFER_RELATION].relation_joined, + self._on_async_relation_joined, ) self.framework.observe( - self.charm.on[ASYNC_REPLICA_RELATION].relation_joined, self._on_async_relation_joined + self.charm.on[REPLICATION_CONSUMER_RELATION].relation_joined, + self._on_async_relation_joined, ) self.framework.observe( - self.charm.on[ASYNC_PRIMARY_RELATION].relation_changed, self._on_async_relation_changed + self.charm.on[REPLICATION_OFFER_RELATION].relation_changed, + self._on_async_relation_changed, ) self.framework.observe( - self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_async_relation_changed + self.charm.on[REPLICATION_CONSUMER_RELATION].relation_changed, + self._on_async_relation_changed, ) # Departure events self.framework.observe( - self.charm.on[ASYNC_PRIMARY_RELATION].relation_departed, + self.charm.on[REPLICATION_OFFER_RELATION].relation_departed, self._on_async_relation_departed, ) self.framework.observe( - self.charm.on[ASYNC_REPLICA_RELATION].relation_departed, + self.charm.on[REPLICATION_CONSUMER_RELATION].relation_departed, self._on_async_relation_departed, ) self.framework.observe( - self.charm.on[ASYNC_PRIMARY_RELATION].relation_broken, self._on_async_relation_broken + self.charm.on[REPLICATION_OFFER_RELATION].relation_broken, + self._on_async_relation_broken, ) self.framework.observe( - self.charm.on[ASYNC_REPLICA_RELATION].relation_broken, self._on_async_relation_broken + self.charm.on[REPLICATION_CONSUMER_RELATION].relation_broken, + self._on_async_relation_broken, ) # Actions - self.framework.observe(self.charm.on.promote_cluster_action, self._on_promote_cluster) + self.framework.observe( + self.charm.on.create_replication_action, self._on_create_replication + ) + self.framework.observe( + self.charm.on.promote_to_primary_action, self._on_promote_to_primary + ) def _can_promote_cluster(self, event: ActionEvent) -> bool: """Check if the cluster can be promoted.""" @@ -180,7 +191,7 @@ def _configure_primary_cluster( def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: """Configure the standby cluster.""" relation = self._relation - if relation.name == ASYNC_REPLICA_RELATION: + if relation.name == REPLICATION_CONSUMER_RELATION: # Update the secrets between the clusters. primary_cluster_info = relation.data[relation.app].get("primary-cluster-data") secret_id = ( @@ -214,8 +225,8 @@ def _get_highest_promoted_cluster_counter_value(self) -> str: """Return the highest promoted cluster counter.""" promoted_cluster_counter = "0" for async_relation in [ - self.model.get_relation(ASYNC_PRIMARY_RELATION), - self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(REPLICATION_OFFER_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), ]: if async_relation is None: continue @@ -249,8 +260,8 @@ def _get_primary_cluster(self) -> Optional[Application]: primary_cluster = None promoted_cluster_counter = "0" for async_relation in [ - self.model.get_relation(ASYNC_PRIMARY_RELATION), - self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(REPLICATION_OFFER_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), ]: if async_relation is None: continue @@ -307,8 +318,8 @@ def get_standby_endpoints(self) -> List[str]: return [ relation.data[unit].get("unit-address") for relation in [ - self.model.get_relation(ASYNC_PRIMARY_RELATION), - self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(REPLICATION_OFFER_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), ] if relation is not None for unit in relation.units @@ -408,6 +419,42 @@ def handle_read_only_mode(self) -> None: ): self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + def _handle_replication_change(self, event: ActionEvent) -> bool: + if not self._can_promote_cluster(event): + return False + + relation = self._relation + + # Check if all units from the other cluster published their pod IPs in the relation data. + # If not, fail the action telling that all units must publish their pod addresses in the + # relation data. + for unit in relation.units: + if "unit-address" not in relation.data[unit]: + event.fail( + "All units from the other cluster must publish their pod addresses in the relation data." + ) + return False + + system_identifier, error = self.get_system_identifier() + if error is not None: + logger.exception(error) + event.fail("Failed to get system identifier") + return False + + # Increment the current cluster counter in this application side based on the highest counter value. + promoted_cluster_counter = int(self._get_highest_promoted_cluster_counter_value()) + promoted_cluster_counter += 1 + logger.debug("Promoted cluster counter: %s", promoted_cluster_counter) + + self._update_primary_cluster_data(promoted_cluster_counter, system_identifier) + + # Emit an async replication changed event for this unit (to promote this cluster before demoting the + # other if this one is a standby cluster, which is needed to correctly set up the async replication + # when performing a switchover). + self._re_emit_async_relation_changed_event() + + return True + def _is_following_promoted_cluster(self) -> bool: """Return True if this unit is following the promoted cluster.""" if self._get_primary_cluster() is None: @@ -501,43 +548,24 @@ def _on_async_relation_joined(self, _) -> None: "unit-promoted-cluster-counter": highest_promoted_cluster_counter }) - def _on_promote_cluster(self, event: ActionEvent) -> None: - """Promote this cluster to the primary cluster.""" - if not self._can_promote_cluster(event): + def _on_create_replication(self, event: ActionEvent) -> None: + """Set up asynchronous replication between two clusters.""" + if not self._handle_replication_change(event): return - relation = self._relation + # Set the replication name in the relation data. + self._relation.data[self.charm.app].update({"name": event.params["name"]}) - # Check if all units from the other cluster published their pod IPs in the relation data. - # If not, fail the action telling that all units must publish their pod addresses in the - # relation data. - for unit in relation.units: - if "unit-address" not in relation.data[unit]: - event.fail( - "All units from the other cluster must publish their pod addresses in the relation data." - ) - return + # Set the status. + self.charm.unit.status = MaintenanceStatus("Creating replication...") - system_identifier, error = self.get_system_identifier() - if error is not None: - logger.exception(error) - event.fail("Failed to get system identifier") + def _on_promote_to_primary(self, event: ActionEvent) -> None: + """Promote this cluster to the primary cluster.""" + if not self._handle_replication_change(event): return - # Increment the current cluster counter in this application side based on the highest counter value. - promoted_cluster_counter = int(self._get_highest_promoted_cluster_counter_value()) - promoted_cluster_counter += 1 - logger.debug("Promoted cluster counter: %s", promoted_cluster_counter) - - self._update_primary_cluster_data(promoted_cluster_counter, system_identifier) - - # Emit an async replication changed event for this unit (to promote this cluster before demoting the - # other if this one is a standby cluster, which is needed to correctly setup the async replication - # when performing a switchover). - self._re_emit_async_relation_changed_event() - # Set the status. - self.charm.unit.status = MaintenanceStatus("Promoting cluster...") + self.charm.unit.status = MaintenanceStatus("Creating replication...") @property def _primary_cluster_endpoint(self) -> str: @@ -576,8 +604,8 @@ def _reinitialise_pgdata(self) -> None: def _relation(self) -> Relation: """Return the relation object.""" for relation in [ - self.model.get_relation(ASYNC_PRIMARY_RELATION), - self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(REPLICATION_OFFER_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), ]: if relation is not None: return relation @@ -661,7 +689,7 @@ def _update_primary_cluster_data( primary_cluster_data = {"endpoint": self._primary_cluster_endpoint} # Retrieve the secrets that will be shared between the clusters. - if async_relation.name == ASYNC_PRIMARY_RELATION: + if async_relation.name == REPLICATION_OFFER_RELATION: secret = self._get_secret() secret.grant(async_relation) primary_cluster_data["secret-id"] = secret.id diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index 5f0c27dea8..52aebf0210 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -150,10 +150,10 @@ async def test_async_replication( logger.info("checking whether writes are increasing") await are_writes_increasing(ops_test) - first_offer_command = f"offer {DATABASE_APP_NAME}:async-primary async-primary" + first_offer_command = f"offer {DATABASE_APP_NAME}:replication-offer replication-offer" await ops_test.juju(*first_offer_command.split()) first_consume_command = ( - f"consume -m {second_model.info.name} admin/{first_model.info.name}.async-primary" + f"consume -m {second_model.info.name} admin/{first_model.info.name}.replication-offer" ) await ops_test.juju(*first_consume_command.split()) @@ -167,7 +167,7 @@ async def test_async_replication( ), ) - await second_model.relate(DATABASE_APP_NAME, "async-primary") + await second_model.relate(DATABASE_APP_NAME, "replication-offer") async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( @@ -187,7 +187,7 @@ async def test_async_replication( leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) assert leader_unit is not None, "No leader unit found" logger.info("promoting the first cluster") - run_action = await leader_unit.run_action("promote-cluster") + run_action = await leader_unit.run_action("create-replication") await run_action.wait() assert (run_action.results.get("return-code", None) == 0) or ( run_action.results.get("Code", None) == "0" @@ -222,10 +222,10 @@ async def test_switchover( second_model_continuous_writes, ): """Test switching over to the second cluster.""" - second_offer_command = f"offer {DATABASE_APP_NAME}:async-replica async-replica" + second_offer_command = f"offer {DATABASE_APP_NAME}:replication replication" await ops_test.juju(*second_offer_command.split()) second_consume_command = ( - f"consume -m {second_model.info.name} admin/{first_model.info.name}.async-replica" + f"consume -m {second_model.info.name} admin/{first_model.info.name}.replication" ) await ops_test.juju(*second_consume_command.split()) @@ -244,7 +244,7 @@ async def test_switchover( leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME, model=second_model) assert leader_unit is not None, "No leader unit found" logger.info("promoting the second cluster") - run_action = await leader_unit.run_action("promote-cluster", **{"force-promotion": True}) + run_action = await leader_unit.run_action("promote-to-primary", **{"force-promotion": True}) await run_action.wait() assert (run_action.results.get("return-code", None) == 0) or ( run_action.results.get("Code", None) == "0" @@ -282,9 +282,9 @@ async def test_promote_standby( "database", f"{APPLICATION_NAME}:first-database" ) await second_model.applications[DATABASE_APP_NAME].remove_relation( - "async-replica", "async-primary" + "replication", "replication-offer" ) - wait_for_relation_removed_between(ops_test, "async-primary", "async-replica", second_model) + wait_for_relation_removed_between(ops_test, "replication-offer", "replication", second_model) async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( @@ -302,7 +302,7 @@ async def test_promote_standby( leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) assert leader_unit is not None, "No leader unit found" logger.info("promoting the first cluster") - run_action = await leader_unit.run_action("promote-cluster") + run_action = await leader_unit.run_action("promote-to-primary") await run_action.wait() assert (run_action.results.get("return-code", None) == 0) or ( run_action.results.get("Code", None) == "0" @@ -359,7 +359,7 @@ async def test_reestablish_relation( await are_writes_increasing(ops_test) logger.info("reestablishing the relation") - await second_model.relate(DATABASE_APP_NAME, "async-primary") + await second_model.relate(DATABASE_APP_NAME, "replication-offer") async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( @@ -378,7 +378,7 @@ async def test_reestablish_relation( leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) assert leader_unit is not None, "No leader unit found" logger.info("promoting the first cluster") - run_action = await leader_unit.run_action("promote-cluster") + run_action = await leader_unit.run_action("promote-to-primary") await run_action.wait() assert (run_action.results.get("return-code", None) == 0) or ( run_action.results.get("Code", None) == "0" From 7ac4f3f0992566a0f287646644302cf32f61974e Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 29 May 2024 16:50:41 -0300 Subject: [PATCH 2/7] Fix failover and set-secret behaviour Signed-off-by: Marcelo Henrique Neppel --- actions.yaml | 2 +- lib/charms/postgresql_k8s/v0/postgresql.py | 26 +-- src/charm.py | 48 ++++- src/cluster.py | 13 +- src/relations/async_replication.py | 173 +++++++++++++----- .../ha_tests/test_async_replication.py | 2 +- 6 files changed, 191 insertions(+), 73 deletions(-) diff --git a/actions.yaml b/actions.yaml index f5daf6944d..02f5c525f7 100644 --- a/actions.yaml +++ b/actions.yaml @@ -27,7 +27,7 @@ pre-upgrade-check: promote-to-primary: description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit. params: - force-promotion: + force: type: boolean description: Force the promotion of a cluster when there is already a primary cluster. restore: diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index 8783f76814..ffddc66360 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -36,7 +36,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 26 +LIBPATCH = 27 INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles" @@ -111,20 +111,19 @@ def __init__( self.system_users = system_users def _connect_to_database( - self, database: str = None, connect_to_current_host: bool = False + self, database: str = None, database_host: str = None ) -> psycopg2.extensions.connection: """Creates a connection to the database. Args: database: database to connect to (defaults to the database provided when the object for this class was created). - connect_to_current_host: whether to connect to the current host - instead of the primary host. + database_host: host to connect to instead of the primary host. Returns: psycopg2 connection object. """ - host = self.current_host if connect_to_current_host else self.primary_host + host = database_host if database_host is not None else self.primary_host connection = psycopg2.connect( f"dbname='{database if database else self.database}' user='{self.user}' host='{host}'" f"password='{self.password}' connect_timeout=1" @@ -388,7 +387,7 @@ def get_postgresql_text_search_configs(self) -> Set[str]: Set of PostgreSQL text search configs. """ with self._connect_to_database( - connect_to_current_host=True + database_host=self.current_host ) as connection, connection.cursor() as cursor: cursor.execute("SELECT CONCAT('pg_catalog.', cfgname) FROM pg_ts_config;") text_search_configs = cursor.fetchall() @@ -401,7 +400,7 @@ def get_postgresql_timezones(self) -> Set[str]: Set of PostgreSQL timezones. """ with self._connect_to_database( - connect_to_current_host=True + database_host=self.current_host ) as connection, connection.cursor() as cursor: cursor.execute("SELECT name FROM pg_timezone_names;") timezones = cursor.fetchall() @@ -434,7 +433,7 @@ def is_tls_enabled(self, check_current_host: bool = False) -> bool: """ try: with self._connect_to_database( - connect_to_current_host=check_current_host + database_host=self.current_host if check_current_host else None ) as connection, connection.cursor() as cursor: cursor.execute("SHOW ssl;") return "on" in cursor.fetchone()[0] @@ -502,19 +501,24 @@ def set_up_database(self) -> None: if connection is not None: connection.close() - def update_user_password(self, username: str, password: str) -> None: + def update_user_password( + self, username: str, password: str, database_host: str = None + ) -> None: """Update a user password. Args: username: the user to update the password. password: the new password for the user. + database_host: the host to connect to. Raises: PostgreSQLUpdateUserPasswordError if the password couldn't be changed. """ connection = None try: - with self._connect_to_database() as connection, connection.cursor() as cursor: + with self._connect_to_database( + database_host=database_host + ) as connection, connection.cursor() as cursor: cursor.execute( sql.SQL("ALTER USER {} WITH ENCRYPTED PASSWORD '" + password + "';").format( sql.Identifier(username) @@ -610,7 +614,7 @@ def validate_date_style(self, date_style: str) -> bool: """ try: with self._connect_to_database( - connect_to_current_host=True + database_host=self.current_host ) as connection, connection.cursor() as cursor: cursor.execute( sql.SQL( diff --git a/src/charm.py b/src/charm.py index 99592539af..bcf31ff165 100755 --- a/src/charm.py +++ b/src/charm.py @@ -86,7 +86,11 @@ USER, USER_PASSWORD_KEY, ) -from relations.async_replication import PostgreSQLAsyncReplication +from relations.async_replication import ( + REPLICATION_CONSUMER_RELATION, + REPLICATION_OFFER_RELATION, + PostgreSQLAsyncReplication, +) from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides from relations.postgresql_provider import PostgreSQLProvider from upgrade import PostgreSQLUpgrade, get_postgresql_dependencies_model @@ -1194,15 +1198,42 @@ def _on_set_password(self, event: ActionEvent) -> None: ) return - # Update the password in the PostgreSQL instance. - try: - self.postgresql.update_user_password(username, password) - except PostgreSQLUpdateUserPasswordError as e: - logger.exception(e) + replication_offer_relation = self.model.get_relation(REPLICATION_OFFER_RELATION) + if ( + replication_offer_relation is not None + and not self.async_replication.is_primary_cluster() + ): + # Update the password in the other cluster PostgreSQL primary instance. + other_cluster_endpoints = self.async_replication.get_all_primary_cluster_endpoints() + other_cluster_primary = self._patroni.get_primary( + alternative_endpoints=other_cluster_endpoints + ) + other_cluster_primary_ip = [ + replication_offer_relation.data[unit].get("private-address") + for unit in replication_offer_relation.units + if unit.name.replace("/", "-") == other_cluster_primary + ][0] + try: + self.postgresql.update_user_password( + username, password, database_host=other_cluster_primary_ip + ) + except PostgreSQLUpdateUserPasswordError as e: + logger.exception(e) + event.fail("Failed changing the password.") + return + elif self.model.get_relation(REPLICATION_CONSUMER_RELATION) is not None: event.fail( - "Failed changing the password: Not all members healthy or finished initial sync." + "Failed changing the password: This action can be ran only in the cluster from the offer side." ) return + else: + # Update the password in this cluster PostgreSQL primary instance. + try: + self.postgresql.update_user_password(username, password) + except PostgreSQLUpdateUserPasswordError as e: + logger.exception(e) + event.fail("Failed changing the password.") + return # Update the password in the secret store. self.set_secret(APP_SCOPE, f"{username}-password", password) @@ -1211,9 +1242,6 @@ def _on_set_password(self, event: ActionEvent) -> None: # Other units Patroni configuration will be reloaded in the peer relation changed event. self.update_config() - # Update the password in the async replication data. - self.async_replication.update_async_replication_data() - event.set_results({"password": password}) def _on_update_status(self, _) -> None: diff --git a/src/cluster.py b/src/cluster.py index 4b08ff5a58..29a42e87b5 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -230,11 +230,12 @@ def get_member_status(self, member_name: str) -> str: return member["state"] return "" - def get_primary(self, unit_name_pattern=False) -> str: + def get_primary(self, unit_name_pattern=False, alternative_endpoints: List[str] = None) -> str: """Get primary instance. Args: unit_name_pattern: whether to convert pod name to unit name + alternative_endpoints: list of alternative endpoints to check for the primary. Returns: primary pod or unit name. @@ -242,7 +243,7 @@ def get_primary(self, unit_name_pattern=False) -> str: # Request info from cluster endpoint (which returns all members of the cluster). for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)): with attempt: - url = self._get_alternative_patroni_url(attempt) + url = self._get_alternative_patroni_url(attempt, alternative_endpoints) cluster_status = requests.get( f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}", verify=self.verify, @@ -301,12 +302,18 @@ def get_sync_standby_names(self) -> List[str]: sync_standbys.append("/".join(member["name"].rsplit("-", 1))) return sync_standbys - def _get_alternative_patroni_url(self, attempt: AttemptManager) -> str: + def _get_alternative_patroni_url( + self, attempt: AttemptManager, alternative_endpoints: List[str] = None + ) -> str: """Get an alternative REST API URL from another member each time. When the Patroni process is not running in the current unit it's needed to use a URL from another cluster member REST API to do some operations. """ + if alternative_endpoints is not None: + return self._patroni_url.replace( + self.unit_ip, alternative_endpoints[attempt.retry_state.attempt_number - 1] + ) attempt_number = attempt.retry_state.attempt_number if attempt_number > 1: url = self._patroni_url diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 1328e55879..3b7ffb9f57 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -35,6 +35,7 @@ RelationChangedEvent, RelationDepartedEvent, Secret, + SecretChangedEvent, SecretNotFoundError, WaitingStatus, ) @@ -54,6 +55,7 @@ READ_ONLY_MODE_BLOCKING_MESSAGE = "Cluster in read-only mode" REPLICATION_CONSUMER_RELATION = "replication" REPLICATION_OFFER_RELATION = "replication-offer" +SECRET_LABEL = "async-replication-secret" class PostgreSQLAsyncReplication(Object): @@ -106,6 +108,8 @@ def __init__(self, charm): self.charm.on.promote_to_primary_action, self._on_promote_to_primary ) + self.framework.observe(self.charm.on.secret_changed, self._on_secret_changed) + def _can_promote_cluster(self, event: ActionEvent) -> bool: """Check if the cluster can be promoted.""" if not self.charm.is_cluster_initialised: @@ -141,22 +145,7 @@ def _can_promote_cluster(self, event: ActionEvent) -> bool: event.fail("This cluster is already the primary cluster.") return False - # To promote the other cluster if there is already a primary cluster, the action must be called with - # `force-promotion=true`. If not, fail the action telling that the other cluster is already the primary. - if relation.app == primary_cluster: - if not event.params.get("force-promotion"): - event.fail( - f"{relation.app.name} is already the primary cluster. Pass `force-promotion=true` to promote anyway." - ) - return False - else: - logger.warning( - "%s is already the primary cluster. Forcing promotion of %s to primary cluster due to `force-promotion=true`.", - relation.app.name, - self.charm.app.name, - ) - - return True + return self._handle_forceful_promotion(event) def _configure_primary_cluster( self, primary_cluster: Application, event: RelationChangedEvent @@ -164,7 +153,7 @@ def _configure_primary_cluster( """Configure the primary cluster.""" if self.charm.app == primary_cluster: self.charm.update_config() - if self._is_primary_cluster() and self.charm.unit.is_leader(): + if self.is_primary_cluster() and self.charm.unit.is_leader(): self._update_primary_cluster_data() # If this is a standby cluster, remove the information from DCS to make it # a normal cluster. @@ -192,24 +181,10 @@ def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: """Configure the standby cluster.""" relation = self._relation if relation.name == REPLICATION_CONSUMER_RELATION: - # Update the secrets between the clusters. - primary_cluster_info = relation.data[relation.app].get("primary-cluster-data") - secret_id = ( - None - if primary_cluster_info is None - else json.loads(primary_cluster_info).get("secret-id") - ) - try: - secret = self.charm.model.get_secret(id=secret_id, label=self._secret_label) - except SecretNotFoundError: + if not self._update_internal_secret(): logger.debug("Secret not found, deferring event") event.defer() return False - credentials = secret.peek_content() - for key, password in credentials.items(): - user = key.split("-password")[0] - self.charm.set_secret(APP_SCOPE, key, password) - logger.debug("Synced %s password", user) system_identifier, error = self.get_system_identifier() if error is not None: raise Exception(error) @@ -221,6 +196,24 @@ def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: logger.warning("Please review the backup file %s and handle its removal", filename) return True + def get_all_primary_cluster_endpoints(self) -> List[str]: + """Return all the primary cluster endpoints.""" + relation = self._relation + primary_cluster = self._get_primary_cluster() + # List the primary endpoints only for the standby cluster. + if relation is None or primary_cluster is None or self.charm.app == primary_cluster: + return [] + return [ + relation.data[unit].get("unit-address") + for relation in [ + self.model.get_relation(REPLICATION_OFFER_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), + ] + if relation is not None + for unit in relation.units + if relation.data[unit].get("unit-address") is not None + ] + def _get_highest_promoted_cluster_counter_value(self) -> str: """Return the highest promoted cluster counter.""" promoted_cluster_counter = "0" @@ -289,24 +282,27 @@ def get_primary_cluster_endpoint(self) -> Optional[str]: def _get_secret(self) -> Secret: """Return async replication necessary secrets.""" + app_secret = self.charm.model.get_secret(label=f"{PEER}.{self.model.app.name}.app") + content = app_secret.peek_content() + + # Filter out unnecessary secrets. + shared_content = dict(filter(lambda x: "password" in x[0], content.items())) + try: # Avoid recreating the secret. - secret = self.charm.model.get_secret(label=self._secret_label) + secret = self.charm.model.get_secret(label=SECRET_LABEL) if not secret.id: # Workaround for the secret id not being set with model uuid. secret._id = f"secret://{self.model.uuid}/{secret.get_info().id.split(':')[1]}" + if secret.peek_content() != shared_content: + logger.info("Updating outdated secret content") + secret.set_content(shared_content) return secret except SecretNotFoundError: logger.debug("Secret not found, creating a new one") pass - app_secret = self.charm.model.get_secret(label=f"{PEER}.{self.model.app.name}.app") - content = app_secret.peek_content() - - # Filter out unnecessary secrets. - shared_content = dict(filter(lambda x: "password" in x[0], content.items())) - - return self.charm.model.app.add_secret(content=shared_content, label=self._secret_label) + return self.charm.model.app.add_secret(content=shared_content, label=SECRET_LABEL) def get_standby_endpoints(self) -> List[str]: """Return the standby endpoints.""" @@ -403,6 +399,31 @@ def _handle_database_start(self, event: RelationChangedEvent) -> None: logger.debug("Deferring on_async_relation_changed: database hasn't started yet.") event.defer() + def _handle_forceful_promotion(self, event: ActionEvent) -> bool: + if not event.params.get("force"): + all_primary_cluster_endpoints = self.get_all_primary_cluster_endpoints() + if len(all_primary_cluster_endpoints) > 0: + primary_cluster_reachable = False + try: + primary = self.charm._patroni.get_primary( + alternative_endpoints=all_primary_cluster_endpoints + ) + if primary is not None: + primary_cluster_reachable = True + except RetryError: + pass + if not primary_cluster_reachable: + event.fail( + f"{self._relation.app.name} isn't reachable. Pass `force=true` to promote anyway." + ) + return False + else: + logger.warning( + "Forcing promotion of %s to primary cluster due to `force=true`.", + self.charm.app.name, + ) + return True + def handle_read_only_mode(self) -> None: """Handle read-only mode (standby cluster that lost the relation with the primary cluster).""" promoted_cluster_counter = self.charm._peers.data[self.charm.app].get( @@ -464,7 +485,7 @@ def _is_following_promoted_cluster(self) -> bool: == self._get_highest_promoted_cluster_counter_value() ) - def _is_primary_cluster(self) -> bool: + def is_primary_cluster(self) -> bool: """Return the primary cluster name.""" return self.charm.app == self._get_primary_cluster() @@ -550,6 +571,14 @@ def _on_async_relation_joined(self, _) -> None: def _on_create_replication(self, event: ActionEvent) -> None: """Set up asynchronous replication between two clusters.""" + if self._get_primary_cluster() is not None: + event.fail("There is already a replication set up.") + return + + if self._relation.name == REPLICATION_CONSUMER_RELATION: + event.fail("This action must be run in the cluster where the offer was created.") + return + if not self._handle_replication_change(event): return @@ -561,12 +590,47 @@ def _on_create_replication(self, event: ActionEvent) -> None: def _on_promote_to_primary(self, event: ActionEvent) -> None: """Promote this cluster to the primary cluster.""" + if self._get_primary_cluster() is None: + event.fail( + "No primary cluster found. Run `create-replication` action in the cluster where the offer was created." + ) + return + if not self._handle_replication_change(event): return # Set the status. self.charm.unit.status = MaintenanceStatus("Creating replication...") + def _on_secret_changed(self, event: SecretChangedEvent) -> None: + """Update the internal secret when the relation secret changes.""" + relation = self._relation + if relation is None: + logger.debug("Early exit on_secret_changed: No relation found.") + return + + if ( + relation.name == REPLICATION_OFFER_RELATION + and event.secret.label == f"{PEER}.{self.model.app.name}.app" + ): + logger.info("Internal secret changed, updating relation secret") + secret = self._get_secret() + secret.grant(relation) + primary_cluster_data = { + "endpoint": self._primary_cluster_endpoint, + "secret-id": secret.id, + } + relation.data[self.charm.app]["primary-cluster-data"] = json.dumps( + primary_cluster_data + ) + return + + if relation.name == REPLICATION_CONSUMER_RELATION and event.secret.label == SECRET_LABEL: + logger.info("Relation secret changed, updating internal secret") + if not self._update_internal_secret(): + logger.debug("Secret not found, deferring event") + event.defer() + @property def _primary_cluster_endpoint(self) -> str: """Return the endpoint from one of the sync-standbys, or from the primary if there is no sync-standby.""" @@ -610,11 +674,6 @@ def _relation(self) -> Relation: if relation is not None: return relation - @property - def _secret_label(self) -> str: - """Return the secret label.""" - return f"async-replication-secret-{self._get_highest_promoted_cluster_counter_value()}" - def _stop_database(self, event: RelationChangedEvent) -> bool: """Stop the database.""" if ( @@ -670,9 +729,29 @@ def update_async_replication_data(self) -> None: if relation is None: return relation.data[self.charm.unit].update({"unit-address": self.charm._unit_ip}) - if self._is_primary_cluster() and self.charm.unit.is_leader(): + if self.is_primary_cluster() and self.charm.unit.is_leader(): self._update_primary_cluster_data() + def _update_internal_secret(self) -> bool: + # Update the secrets between the clusters. + relation = self._relation + primary_cluster_info = relation.data[relation.app].get("primary-cluster-data") + secret_id = ( + None + if primary_cluster_info is None + else json.loads(primary_cluster_info).get("secret-id") + ) + try: + secret = self.charm.model.get_secret(id=secret_id, label=SECRET_LABEL) + except SecretNotFoundError: + return False + credentials = secret.peek_content() + for key, password in credentials.items(): + user = key.split("-password")[0] + self.charm.set_secret(APP_SCOPE, key, password) + logger.debug("Synced %s password", user) + return True + def _update_primary_cluster_data( self, promoted_cluster_counter: int = None, system_identifier: str = None ) -> None: diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index 52aebf0210..0967f7b5ff 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -244,7 +244,7 @@ async def test_switchover( leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME, model=second_model) assert leader_unit is not None, "No leader unit found" logger.info("promoting the second cluster") - run_action = await leader_unit.run_action("promote-to-primary", **{"force-promotion": True}) + run_action = await leader_unit.run_action("promote-to-primary", **{"force": True}) await run_action.wait() assert (run_action.results.get("return-code", None) == 0) or ( run_action.results.get("Code", None) == "0" From 02a584ce5116e4f9e0f2b718fccb2277669d221f Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 31 May 2024 15:01:57 -0300 Subject: [PATCH 3/7] Improve statuses Signed-off-by: Marcelo Henrique Neppel --- src/charm.py | 2 +- src/relations/async_replication.py | 51 ++++++++++++------- .../ha_tests/test_async_replication.py | 10 ++-- tests/unit/test_charm.py | 4 +- 4 files changed, 39 insertions(+), 28 deletions(-) diff --git a/src/charm.py b/src/charm.py index bcf31ff165..c8203426e9 100755 --- a/src/charm.py +++ b/src/charm.py @@ -1357,7 +1357,7 @@ def _set_primary_status_message(self) -> None: if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name: self.unit.status = ActiveStatus("Primary") elif self.is_standby_leader: - self.unit.status = ActiveStatus("Standby Leader") + self.unit.status = ActiveStatus("Standby") elif self._patroni.member_started: self.unit.status = ActiveStatus() except (RetryError, ConnectionError) as e: diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 3b7ffb9f57..072a1e51df 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -27,6 +27,7 @@ from ops import ( ActionEvent, + ActiveStatus, Application, BlockedStatus, MaintenanceStatus, @@ -52,7 +53,7 @@ logger = logging.getLogger(__name__) -READ_ONLY_MODE_BLOCKING_MESSAGE = "Cluster in read-only mode" +READ_ONLY_MODE_BLOCKING_MESSAGE = "Standalone read-only cluster" REPLICATION_CONSUMER_RELATION = "replication" REPLICATION_OFFER_RELATION = "replication-offer" SECRET_LABEL = "async-replication-secret" @@ -124,13 +125,11 @@ def _can_promote_cluster(self, event: ActionEvent) -> bool: if standby_leader is not None: try: self.charm._patroni.promote_standby_cluster() - if ( - self.charm.is_blocked - and self.charm.unit.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE - ): + if self.charm.app.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE: self.charm._peers.data[self.charm.app].update({ "promoted-cluster-counter": "" }) + self._set_app_status() self.charm._set_primary_status_message() except (StandbyClusterAlreadyPromotedError, ClusterNotPromotedError) as e: event.fail(str(e)) @@ -426,19 +425,11 @@ def _handle_forceful_promotion(self, event: ActionEvent) -> bool: def handle_read_only_mode(self) -> None: """Handle read-only mode (standby cluster that lost the relation with the primary cluster).""" - promoted_cluster_counter = self.charm._peers.data[self.charm.app].get( - "promoted-cluster-counter", "" - ) - if not self.charm.is_blocked or ( - promoted_cluster_counter != "0" - and self.charm.unit.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE - ): + if not self.charm.is_blocked: self.charm._set_primary_status_message() - if ( - promoted_cluster_counter == "0" - and self.charm.unit.status.message != READ_ONLY_MODE_BLOCKING_MESSAGE - ): - self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + + if self.charm.unit.is_leader(): + self._set_app_status() def _handle_replication_change(self, event: ActionEvent) -> bool: if not self._can_promote_cluster(event): @@ -504,7 +495,7 @@ def _on_async_relation_broken(self, _) -> None: if self.charm._patroni.get_standby_leader() is not None: if self.charm.unit.is_leader(): self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": "0"}) - self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + self._set_app_status() else: if self.charm.unit.is_leader(): self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": ""}) @@ -512,6 +503,9 @@ def _on_async_relation_broken(self, _) -> None: def _on_async_relation_changed(self, event: RelationChangedEvent) -> None: """Update the Patroni configuration if one of the clusters was already promoted.""" + if self.charm.unit.is_leader(): + self._set_app_status() + primary_cluster = self._get_primary_cluster() logger.debug("Primary cluster: %s", primary_cluster) if primary_cluster is None: @@ -590,7 +584,10 @@ def _on_create_replication(self, event: ActionEvent) -> None: def _on_promote_to_primary(self, event: ActionEvent) -> None: """Promote this cluster to the primary cluster.""" - if self._get_primary_cluster() is None: + if ( + self.charm.app.status.message != READ_ONLY_MODE_BLOCKING_MESSAGE + and self._get_primary_cluster() is None + ): event.fail( "No primary cluster found. Run `create-replication` action in the cluster where the offer was created." ) @@ -674,6 +671,22 @@ def _relation(self) -> Relation: if relation is not None: return relation + def _set_app_status(self) -> None: + """Set the app status.""" + if self.charm._peers.data[self.charm.app].get("promoted-cluster-counter") == "0": + self.charm.app.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + return + if self._relation is None: + self.charm.app.status = ActiveStatus() + return + primary_cluster = self._get_primary_cluster() + if primary_cluster is None: + self.charm.app.status = ActiveStatus() + else: + self.charm.app.status = ActiveStatus( + "Primary" if self.charm.app == primary_cluster else "Standby" + ) + def _stop_database(self, event: RelationChangedEvent) -> bool: """Stop the database.""" if ( diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index 0967f7b5ff..c5344d5a4d 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -288,10 +288,10 @@ async def test_promote_standby( async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( - apps=[DATABASE_APP_NAME], - status="blocked", - idle_period=IDLE_PERIOD, - timeout=TIMEOUT, + apps=[DATABASE_APP_NAME], idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + first_model.block_until( + lambda: first_model.applications[DATABASE_APP_NAME].status == "blocked", ), second_model.wait_for_idle( apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT @@ -378,7 +378,7 @@ async def test_reestablish_relation( leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) assert leader_unit is not None, "No leader unit found" logger.info("promoting the first cluster") - run_action = await leader_unit.run_action("promote-to-primary") + run_action = await leader_unit.run_action("create-replication") await run_action.wait() assert (run_action.results.get("return-code", None) == 0) or ( run_action.results.get("Code", None) == "0" diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 48907eb4d0..c88181015d 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -2295,9 +2295,7 @@ def test_set_active_status(self, _get_primary, _is_standby_leader, _member_start self.charm.unit.status.message, "Primary" if values[0] == self.charm.unit.name - else ( - "Standby Leader" if values[1] else ("" if values[2] else "fake status") - ), + else ("Standby" if values[1] else ("" if values[2] else "fake status")), ) else: _get_primary.side_effect = values[0] From 1c000c5cddbc106489d15f403ef5234d7fe3716c Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 31 May 2024 16:50:58 -0300 Subject: [PATCH 4/7] Fix app status set Signed-off-by: Marcelo Henrique Neppel --- src/relations/async_replication.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 072a1e51df..006966ddc7 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -495,7 +495,7 @@ def _on_async_relation_broken(self, _) -> None: if self.charm._patroni.get_standby_leader() is not None: if self.charm.unit.is_leader(): self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": "0"}) - self._set_app_status() + self._set_app_status() else: if self.charm.unit.is_leader(): self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": ""}) From ba84e96ed1d3fa98d9fa7d0e38543a06677b00b5 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 31 May 2024 16:52:52 -0300 Subject: [PATCH 5/7] Fix model switch Signed-off-by: Marcelo Henrique Neppel --- tests/integration/ha_tests/test_async_replication.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index 7b3de97680..96bdb3afa4 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -75,6 +75,7 @@ async def second_model(ops_test: OpsTest, first_model, request) -> Model: subprocess.run( ["juju", "set-model-constraints", f"arch={architecture.architecture}"], check=True ) + subprocess.run(["juju", "switch", first_model.info.name], check=True) second_model = Model() await second_model.connect(model_name=second_model_name) yield second_model From 8afc8030697ad5e5bca78a4769da940051966f89 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 10 Jun 2024 15:34:42 -0300 Subject: [PATCH 6/7] Fix config integration test Signed-off-by: Marcelo Henrique Neppel --- tests/integration/test_config.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_config.py b/tests/integration/test_config.py index c75731053d..25878e8253 100644 --- a/tests/integration/test_config.py +++ b/tests/integration/test_config.py @@ -96,11 +96,16 @@ async def test_config_parameters(ops_test: OpsTest) -> None: logger.info(k) charm_config[k] = v[0] await ops_test.model.applications[DATABASE_APP_NAME].set_config(charm_config) - await ops_test.model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="blocked", timeout=100 + await ops_test.model.block_until( + lambda: ops_test.model.units[f"{DATABASE_APP_NAME}/0"].workload_status + == "blocked", + timeout=100, ) assert "Configuration Error" in leader_unit.workload_status_message charm_config[k] = v[1] await ops_test.model.applications[DATABASE_APP_NAME].set_config(charm_config) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=100) + await ops_test.model.block_until( + lambda: ops_test.model.units[f"{DATABASE_APP_NAME}/0"].workload_status == "active", + timeout=100, + ) From 936b115314863bf5267b351fcde576e5aa41a702 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 11 Jun 2024 17:54:35 -0300 Subject: [PATCH 7/7] Fix backups integration test Signed-off-by: Marcelo Henrique Neppel --- tests/integration/helpers.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 2fe0b2d086..8a2ad24aa3 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -1036,10 +1036,10 @@ async def wait_for_idle_on_blocked( unit = ops_test.model.units.get(f"{database_app_name}/{unit_number}") await asyncio.gather( ops_test.model.wait_for_idle(apps=[other_app_name], status="active"), - ops_test.model.wait_for_idle( - apps=[database_app_name], status="blocked", raise_on_blocked=False + ops_test.model.block_until( + lambda: unit.workload_status == "blocked" + and unit.workload_status_message == status_message ), - ops_test.model.block_until(lambda: unit.workload_status_message == status_message), )