diff --git a/actions.yaml b/actions.yaml index 5d8cc03cf6..e7ad22d388 100644 --- a/actions.yaml +++ b/actions.yaml @@ -11,6 +11,13 @@ create-backup: Differential backup is a copy only of changed data since the last full backup. Incremental backup is a copy only of changed data since the last backup (any type). Possible values - full, differential, incremental. +create-replication: + description: Set up asynchronous replication between two clusters. + params: + name: + type: string + description: The name of the replication (defaults to 'default'). + default: default get-primary: description: Get the unit with is the primary/leader in the replication. get-password: @@ -25,10 +32,10 @@ list-backups: description: Lists backups in s3 storage in AWS. pre-upgrade-check: description: Run necessary pre-upgrade checks and preparations before executing a charm refresh. -promote-cluster: +promote-to-primary: description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit. params: - force-promotion: + force: type: boolean description: Force the promotion of a cluster when there is already a primary cluster. restore: diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index 8783f76814..ffddc66360 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -36,7 +36,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 26 +LIBPATCH = 27 INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles" @@ -111,20 +111,19 @@ def __init__( self.system_users = system_users def _connect_to_database( - self, database: str = None, connect_to_current_host: bool = False + self, database: str = None, database_host: str = None ) -> psycopg2.extensions.connection: """Creates a connection to the database. Args: database: database to connect to (defaults to the database provided when the object for this class was created). - connect_to_current_host: whether to connect to the current host - instead of the primary host. + database_host: host to connect to instead of the primary host. Returns: psycopg2 connection object. """ - host = self.current_host if connect_to_current_host else self.primary_host + host = database_host if database_host is not None else self.primary_host connection = psycopg2.connect( f"dbname='{database if database else self.database}' user='{self.user}' host='{host}'" f"password='{self.password}' connect_timeout=1" @@ -388,7 +387,7 @@ def get_postgresql_text_search_configs(self) -> Set[str]: Set of PostgreSQL text search configs. """ with self._connect_to_database( - connect_to_current_host=True + database_host=self.current_host ) as connection, connection.cursor() as cursor: cursor.execute("SELECT CONCAT('pg_catalog.', cfgname) FROM pg_ts_config;") text_search_configs = cursor.fetchall() @@ -401,7 +400,7 @@ def get_postgresql_timezones(self) -> Set[str]: Set of PostgreSQL timezones. """ with self._connect_to_database( - connect_to_current_host=True + database_host=self.current_host ) as connection, connection.cursor() as cursor: cursor.execute("SELECT name FROM pg_timezone_names;") timezones = cursor.fetchall() @@ -434,7 +433,7 @@ def is_tls_enabled(self, check_current_host: bool = False) -> bool: """ try: with self._connect_to_database( - connect_to_current_host=check_current_host + database_host=self.current_host if check_current_host else None ) as connection, connection.cursor() as cursor: cursor.execute("SHOW ssl;") return "on" in cursor.fetchone()[0] @@ -502,19 +501,24 @@ def set_up_database(self) -> None: if connection is not None: connection.close() - def update_user_password(self, username: str, password: str) -> None: + def update_user_password( + self, username: str, password: str, database_host: str = None + ) -> None: """Update a user password. Args: username: the user to update the password. password: the new password for the user. + database_host: the host to connect to. Raises: PostgreSQLUpdateUserPasswordError if the password couldn't be changed. """ connection = None try: - with self._connect_to_database() as connection, connection.cursor() as cursor: + with self._connect_to_database( + database_host=database_host + ) as connection, connection.cursor() as cursor: cursor.execute( sql.SQL("ALTER USER {} WITH ENCRYPTED PASSWORD '" + password + "';").format( sql.Identifier(username) @@ -610,7 +614,7 @@ def validate_date_style(self, date_style: str) -> bool: """ try: with self._connect_to_database( - connect_to_current_host=True + database_host=self.current_host ) as connection, connection.cursor() as cursor: cursor.execute( sql.SQL( diff --git a/metadata.yaml b/metadata.yaml index d2a42f663e..50d2c1370c 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -39,8 +39,8 @@ peers: interface: upgrade provides: - async-primary: - interface: async_replication + replication-offer: + interface: postgresql_async limit: 1 optional: true database: @@ -55,8 +55,8 @@ provides: interface: grafana_dashboard requires: - async-replica: - interface: async_replication + replication: + interface: postgresql_async limit: 1 optional: true certificates: diff --git a/src/backups.py b/src/backups.py index a02178bcee..ed78995353 100644 --- a/src/backups.py +++ b/src/backups.py @@ -26,7 +26,7 @@ from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed from constants import BACKUP_TYPE_OVERRIDES, BACKUP_USER, WORKLOAD_OS_GROUP, WORKLOAD_OS_USER -from relations.async_replication import ASYNC_PRIMARY_RELATION, ASYNC_REPLICA_RELATION +from relations.async_replication import REPLICATION_CONSUMER_RELATION, REPLICATION_OFFER_RELATION logger = logging.getLogger(__name__) @@ -810,8 +810,8 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool: logger.info("Checking that the cluster is not replicating data to a standby cluster") for relation in [ - self.model.get_relation(ASYNC_REPLICA_RELATION), - self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), + self.model.get_relation(REPLICATION_OFFER_RELATION), ]: if not relation: continue diff --git a/src/charm.py b/src/charm.py index 4057208f7e..6a679d78ff 100755 --- a/src/charm.py +++ b/src/charm.py @@ -82,7 +82,11 @@ WORKLOAD_OS_USER, ) from patroni import NotReadyError, Patroni, SwitchoverFailedError -from relations.async_replication import PostgreSQLAsyncReplication +from relations.async_replication import ( + REPLICATION_CONSUMER_RELATION, + REPLICATION_OFFER_RELATION, + PostgreSQLAsyncReplication, +) from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides from relations.postgresql_provider import PostgreSQLProvider from upgrade import PostgreSQLUpgrade, get_postgresql_k8s_dependencies_model @@ -844,7 +848,7 @@ def _set_active_status(self): if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name: self.unit.status = ActiveStatus("Primary") elif self.is_standby_leader: - self.unit.status = ActiveStatus("Standby Leader") + self.unit.status = ActiveStatus("Standby") elif self._patroni.member_started: self.unit.status = ActiveStatus() except (RetryError, ConnectionError) as e: @@ -1072,15 +1076,42 @@ def _on_set_password(self, event: ActionEvent) -> None: ) return - # Update the password in the PostgreSQL instance. - try: - self.postgresql.update_user_password(username, password) - except PostgreSQLUpdateUserPasswordError as e: - logger.exception(e) + replication_offer_relation = self.model.get_relation(REPLICATION_OFFER_RELATION) + if ( + replication_offer_relation is not None + and not self.async_replication.is_primary_cluster() + ): + # Update the password in the other cluster PostgreSQL primary instance. + other_cluster_endpoints = self.async_replication.get_all_primary_cluster_endpoints() + other_cluster_primary = self._patroni.get_primary( + alternative_endpoints=other_cluster_endpoints + ) + other_cluster_primary_ip = [ + replication_offer_relation.data[unit].get("private-address") + for unit in replication_offer_relation.units + if unit.name.replace("/", "-") == other_cluster_primary + ][0] + try: + self.postgresql.update_user_password( + username, password, database_host=other_cluster_primary_ip + ) + except PostgreSQLUpdateUserPasswordError as e: + logger.exception(e) + event.fail("Failed changing the password.") + return + elif self.model.get_relation(REPLICATION_CONSUMER_RELATION) is not None: event.fail( - "Failed changing the password: Not all members healthy or finished initial sync." + "Failed changing the password: This action can be ran only in the cluster from the offer side." ) return + else: + # Update the password in this cluster PostgreSQL primary instance. + try: + self.postgresql.update_user_password(username, password) + except PostgreSQLUpdateUserPasswordError as e: + logger.exception(e) + event.fail("Failed changing the password.") + return # Update the password in the secret store. self.set_secret(APP_SCOPE, f"{username}-password", password) @@ -1089,9 +1120,6 @@ def _on_set_password(self, event: ActionEvent) -> None: # Other units Patroni configuration will be reloaded in the peer relation changed event. self.update_config() - # Update the password in the async replication data. - self.async_replication.update_async_replication_data() - event.set_results({"password": password}) def _on_get_primary(self, event: ActionEvent) -> None: diff --git a/src/patroni.py b/src/patroni.py index fd467afdf1..a522ff778e 100644 --- a/src/patroni.py +++ b/src/patroni.py @@ -88,20 +88,6 @@ def _patroni_url(self) -> str: """Patroni REST API URL.""" return f"{'https' if self._tls_enabled else 'http'}://{self._endpoint}:8008" - # def configure_standby_cluster(self, host: str) -> None: - # """Configure this cluster as a standby cluster.""" - # requests.patch( - # f"{self._patroni_url}/config", - # verify=self._verify, - # json={ - # "standby_cluster": { - # "create_replica_methods": ["basebackup"], - # "host": host, - # "port": 5432, - # } - # }, - # ) - @property def rock_postgresql_version(self) -> Optional[str]: """Version of Postgresql installed in the Rock image.""" @@ -112,12 +98,18 @@ def rock_postgresql_version(self) -> Optional[str]: snap_meta = container.pull("/meta.charmed-postgresql/snap.yaml") return yaml.safe_load(snap_meta)["version"] - def _get_alternative_patroni_url(self, attempt: AttemptManager) -> str: + def _get_alternative_patroni_url( + self, attempt: AttemptManager, alternative_endpoints: List[str] = None + ) -> str: """Get an alternative REST API URL from another member each time. When the Patroni process is not running in the current unit it's needed to use a URL from another cluster member REST API to do some operations. """ + if alternative_endpoints is not None: + return self._patroni_url.replace( + self._endpoint, alternative_endpoints[attempt.retry_state.attempt_number - 1] + ) if attempt.retry_state.attempt_number > 1: url = self._patroni_url.replace( self._endpoint, list(self._endpoints)[attempt.retry_state.attempt_number - 2] @@ -126,11 +118,12 @@ def _get_alternative_patroni_url(self, attempt: AttemptManager) -> str: url = self._patroni_url return url - def get_primary(self, unit_name_pattern=False) -> str: + def get_primary(self, unit_name_pattern=False, alternative_endpoints: List[str] = None) -> str: """Get primary instance. Args: unit_name_pattern: whether or not to convert pod name to unit name + alternative_endpoints: list of alternative endpoints to check for the primary. Returns: primary pod or unit name. @@ -139,8 +132,8 @@ def get_primary(self, unit_name_pattern=False) -> str: # Request info from cluster endpoint (which returns all members of the cluster). for attempt in Retrying(stop=stop_after_attempt(len(self._endpoints) + 1)): with attempt: - url = self._get_alternative_patroni_url(attempt) - r = requests.get(f"{url}/cluster", verify=self._verify) + url = self._get_alternative_patroni_url(attempt, alternative_endpoints) + r = requests.get(f"{url}/cluster", verify=self._verify, timeout=5) for member in r.json()["members"]: if member["role"] == "leader": primary = member["name"] diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index f647f169b5..5b917cf646 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -24,6 +24,7 @@ from lightkube.resources.core_v1 import Endpoints, Service from ops import ( ActionEvent, + ActiveStatus, Application, BlockedStatus, MaintenanceStatus, @@ -32,6 +33,7 @@ RelationChangedEvent, RelationDepartedEvent, Secret, + SecretChangedEvent, SecretNotFoundError, WaitingStatus, ) @@ -50,9 +52,10 @@ logger = logging.getLogger(__name__) -ASYNC_PRIMARY_RELATION = "async-primary" -ASYNC_REPLICA_RELATION = "async-replica" -READ_ONLY_MODE_BLOCKING_MESSAGE = "Cluster in read-only mode" +READ_ONLY_MODE_BLOCKING_MESSAGE = "Standalone read-only cluster" +REPLICATION_CONSUMER_RELATION = "replication" +REPLICATION_OFFER_RELATION = "replication-offer" +SECRET_LABEL = "async-replication-secret" class PostgreSQLAsyncReplication(Object): @@ -63,36 +66,49 @@ def __init__(self, charm): super().__init__(charm, "postgresql") self.charm = charm self.framework.observe( - self.charm.on[ASYNC_PRIMARY_RELATION].relation_joined, self._on_async_relation_joined + self.charm.on[REPLICATION_OFFER_RELATION].relation_joined, + self._on_async_relation_joined, ) self.framework.observe( - self.charm.on[ASYNC_REPLICA_RELATION].relation_joined, self._on_async_relation_joined + self.charm.on[REPLICATION_CONSUMER_RELATION].relation_joined, + self._on_async_relation_joined, ) self.framework.observe( - self.charm.on[ASYNC_PRIMARY_RELATION].relation_changed, self._on_async_relation_changed + self.charm.on[REPLICATION_OFFER_RELATION].relation_changed, + self._on_async_relation_changed, ) self.framework.observe( - self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_async_relation_changed + self.charm.on[REPLICATION_CONSUMER_RELATION].relation_changed, + self._on_async_relation_changed, ) # Departure events self.framework.observe( - self.charm.on[ASYNC_PRIMARY_RELATION].relation_departed, + self.charm.on[REPLICATION_OFFER_RELATION].relation_departed, self._on_async_relation_departed, ) self.framework.observe( - self.charm.on[ASYNC_REPLICA_RELATION].relation_departed, + self.charm.on[REPLICATION_CONSUMER_RELATION].relation_departed, self._on_async_relation_departed, ) self.framework.observe( - self.charm.on[ASYNC_PRIMARY_RELATION].relation_broken, self._on_async_relation_broken + self.charm.on[REPLICATION_OFFER_RELATION].relation_broken, + self._on_async_relation_broken, ) self.framework.observe( - self.charm.on[ASYNC_REPLICA_RELATION].relation_broken, self._on_async_relation_broken + self.charm.on[REPLICATION_CONSUMER_RELATION].relation_broken, + self._on_async_relation_broken, ) # Actions - self.framework.observe(self.charm.on.promote_cluster_action, self._on_promote_cluster) + self.framework.observe( + self.charm.on.create_replication_action, self._on_create_replication + ) + self.framework.observe( + self.charm.on.promote_to_primary_action, self._on_promote_to_primary + ) + + self.framework.observe(self.charm.on.secret_changed, self._on_secret_changed) self.container = self.charm.unit.get_container("postgresql") @@ -110,13 +126,11 @@ def _can_promote_cluster(self, event: ActionEvent) -> bool: if standby_leader is not None: try: self.charm._patroni.promote_standby_cluster() - if ( - self.charm.is_blocked - and self.charm.unit.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE - ): + if self.charm.app.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE: self.charm._peers.data[self.charm.app].update({ "promoted-cluster-counter": "" }) + self._set_app_status() self.charm._set_active_status() except (StandbyClusterAlreadyPromotedError, ClusterNotPromotedError) as e: event.fail(str(e)) @@ -131,22 +145,7 @@ def _can_promote_cluster(self, event: ActionEvent) -> bool: event.fail("This cluster is already the primary cluster.") return False - # To promote the other cluster if there is already a primary cluster, the action must be called with - # `force-promotion=true`. If not, fail the action telling that the other cluster is already the primary. - if relation.app == primary_cluster: - if not event.params.get("force-promotion"): - event.fail( - f"{relation.app.name} is already the primary cluster. Pass `force-promotion=true` to promote anyway." - ) - return False - else: - logger.warning( - "%s is already the primary cluster. Forcing promotion of %s to primary cluster due to `force-promotion=true`.", - relation.app.name, - self.charm.app.name, - ) - - return True + return self._handle_forceful_promotion(event) def _configure_primary_cluster( self, primary_cluster: Application, event: RelationChangedEvent @@ -154,7 +153,7 @@ def _configure_primary_cluster( """Configure the primary cluster.""" if self.charm.app == primary_cluster: self.charm.update_config() - if self._is_primary_cluster() and self.charm.unit.is_leader(): + if self.is_primary_cluster() and self.charm.unit.is_leader(): self._update_primary_cluster_data() # If this is a standby cluster, remove the information from DCS to make it # a normal cluster. @@ -181,25 +180,11 @@ def _configure_primary_cluster( def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: """Configure the standby cluster.""" relation = self._relation - if relation.name == ASYNC_REPLICA_RELATION: - # Update the secrets between the clusters. - primary_cluster_info = relation.data[relation.app].get("primary-cluster-data") - secret_id = ( - None - if primary_cluster_info is None - else json.loads(primary_cluster_info).get("secret-id") - ) - try: - secret = self.charm.model.get_secret(id=secret_id, label=self._secret_label) - except SecretNotFoundError: + if relation.name == REPLICATION_CONSUMER_RELATION: + if not self._update_internal_secret(): logger.debug("Secret not found, deferring event") event.defer() return False - credentials = secret.peek_content() - for key, password in credentials.items(): - user = key.split("-password")[0] - self.charm.set_secret(APP_SCOPE, key, password) - logger.debug("Synced %s password", user) system_identifier, error = self.get_system_identifier() if error is not None: raise Exception(error) @@ -218,8 +203,8 @@ def _get_highest_promoted_cluster_counter_value(self) -> str: """Return the highest promoted cluster counter.""" promoted_cluster_counter = "0" for async_relation in [ - self.model.get_relation(ASYNC_PRIMARY_RELATION), - self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(REPLICATION_OFFER_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), ]: if async_relation is None: continue @@ -237,8 +222,8 @@ def _get_primary_cluster(self) -> Optional[Application]: primary_cluster = None promoted_cluster_counter = "0" for async_relation in [ - self.model.get_relation(ASYNC_PRIMARY_RELATION), - self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(REPLICATION_OFFER_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), ]: if async_relation is None: continue @@ -264,26 +249,47 @@ def get_primary_cluster_endpoint(self) -> Optional[str]: return None return json.loads(primary_cluster_data).get("endpoint") + def get_all_primary_cluster_endpoints(self) -> List[str]: + """Return all the primary cluster endpoints.""" + relation = self._relation + primary_cluster = self._get_primary_cluster() + # List the primary endpoints only for the standby cluster. + if relation is None or primary_cluster is None or self.charm.app == primary_cluster: + return [] + return [ + relation.data[unit].get("unit-address") + for relation in [ + self.model.get_relation(REPLICATION_OFFER_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), + ] + if relation is not None + for unit in relation.units + if relation.data[unit].get("unit-address") is not None + ] + def _get_secret(self) -> Secret: """Return async replication necessary secrets.""" + app_secret = self.charm.model.get_secret(label=f"{PEER}.{self.model.app.name}.app") + content = app_secret.peek_content() + + # Filter out unnecessary secrets. + shared_content = dict(filter(lambda x: "password" in x[0], content.items())) + try: # Avoid recreating the secret. - secret = self.charm.model.get_secret(label=self._secret_label) + secret = self.charm.model.get_secret(label=SECRET_LABEL) if not secret.id: # Workaround for the secret id not being set with model uuid. secret._id = f"secret://{self.model.uuid}/{secret.get_info().id.split(':')[1]}" + if secret.peek_content() != shared_content: + logger.info("Updating outdated secret content") + secret.set_content(shared_content) return secret except SecretNotFoundError: logger.debug("Secret not found, creating a new one") pass - app_secret = self.charm.model.get_secret(label=f"{PEER}.{self.model.app.name}.app") - content = app_secret.peek_content() - - # Filter out unnecessary secrets. - shared_content = dict(filter(lambda x: "password" in x[0], content.items())) - - return self.charm.model.app.add_secret(content=shared_content, label=self._secret_label) + return self.charm.model.app.add_secret(content=shared_content, label=SECRET_LABEL) def get_standby_endpoints(self) -> List[str]: """Return the standby endpoints.""" @@ -295,8 +301,8 @@ def get_standby_endpoints(self) -> List[str]: return [ relation.data[unit].get("unit-address") for relation in [ - self.model.get_relation(ASYNC_PRIMARY_RELATION), - self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(REPLICATION_OFFER_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), ] if relation is not None for unit in relation.units @@ -379,21 +385,74 @@ def _handle_database_start(self, event: RelationChangedEvent) -> None: logger.debug("Deferring on_async_relation_changed: database hasn't started yet.") event.defer() + def _handle_forceful_promotion(self, event: ActionEvent) -> bool: + if not event.params.get("force"): + all_primary_cluster_endpoints = self.get_all_primary_cluster_endpoints() + if len(all_primary_cluster_endpoints) > 0: + primary_cluster_reachable = False + try: + primary = self.charm._patroni.get_primary( + alternative_endpoints=all_primary_cluster_endpoints + ) + if primary is not None: + primary_cluster_reachable = True + except RetryError: + pass + if not primary_cluster_reachable: + event.fail( + f"{self._relation.app.name} isn't reachable. Pass `force=true` to promote anyway." + ) + return False + else: + logger.warning( + "Forcing promotion of %s to primary cluster due to `force=true`.", + self.charm.app.name, + ) + return True + def handle_read_only_mode(self) -> None: """Handle read-only mode (standby cluster that lost the relation with the primary cluster).""" - promoted_cluster_counter = self.charm._peers.data[self.charm.app].get( - "promoted-cluster-counter", "" - ) - if not self.charm.is_blocked or ( - promoted_cluster_counter != "0" - and self.charm.unit.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE - ): + if not self.charm.is_blocked: self.charm._set_active_status() - if ( - promoted_cluster_counter == "0" - and self.charm.unit.status.message != READ_ONLY_MODE_BLOCKING_MESSAGE - ): - self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + + if self.charm.unit.is_leader(): + self._set_app_status() + + def _handle_replication_change(self, event: ActionEvent) -> bool: + if not self._can_promote_cluster(event): + return False + + relation = self._relation + + # Check if all units from the other cluster published their pod IPs in the relation data. + # If not, fail the action telling that all units must publish their pod addresses in the + # relation data. + for unit in relation.units: + if "unit-address" not in relation.data[unit]: + event.fail( + "All units from the other cluster must publish their pod addresses in the relation data." + ) + return False + + system_identifier, error = self.get_system_identifier() + if error is not None: + logger.exception(error) + event.fail("Failed to get system identifier") + return False + + # Increment the current cluster counter in this application side based on the highest counter value. + promoted_cluster_counter = int(self._get_highest_promoted_cluster_counter_value()) + promoted_cluster_counter += 1 + logger.debug("Promoted cluster counter: %s", promoted_cluster_counter) + + self._update_primary_cluster_data(promoted_cluster_counter, system_identifier) + + # Emit an async replication changed event for this unit (to promote this cluster before demoting the + # other if this one is a standby cluster, which is needed to correctly set up the async replication + # when performing a switchover). + self._re_emit_async_relation_changed_event() + + return True def _is_following_promoted_cluster(self) -> bool: """Return True if this unit is following the promoted cluster.""" @@ -404,7 +463,7 @@ def _is_following_promoted_cluster(self) -> bool: == self._get_highest_promoted_cluster_counter_value() ) - def _is_primary_cluster(self) -> bool: + def is_primary_cluster(self) -> bool: """Return the primary cluster name.""" return self.charm.app == self._get_primary_cluster() @@ -423,7 +482,7 @@ def _on_async_relation_broken(self, _) -> None: if self.charm._patroni.get_standby_leader() is not None: if self.charm.unit.is_leader(): self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": "0"}) - self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + self._set_app_status() else: if self.charm.unit.is_leader(): self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": ""}) @@ -431,6 +490,9 @@ def _on_async_relation_broken(self, _) -> None: def _on_async_relation_changed(self, event: RelationChangedEvent) -> None: """Update the Patroni configuration if one of the clusters was already promoted.""" + if self.charm.unit.is_leader(): + self._set_app_status() + primary_cluster = self._get_primary_cluster() logger.debug("Primary cluster: %s", primary_cluster) if primary_cluster is None: @@ -487,44 +549,71 @@ def _on_async_relation_joined(self, _) -> None: "unit-promoted-cluster-counter": highest_promoted_cluster_counter }) - def _on_promote_cluster(self, event: ActionEvent) -> None: - """Promote this cluster to the primary cluster.""" - if not self._can_promote_cluster(event): + def _on_create_replication(self, event: ActionEvent) -> None: + """Set up asynchronous replication between two clusters.""" + if self._get_primary_cluster() is not None: + event.fail("There is already a replication set up.") return - relation = self._relation - - # Check if all units from the other cluster published their pod IPs in the relation data. - # If not, fail the action telling that all units must publish their pod addresses in the - # relation data. - for unit in relation.units: - if "unit-address" not in relation.data[unit]: - event.fail( - "All units from the other cluster must publish their pod addresses in the relation data." - ) - return + if self._relation.name == REPLICATION_CONSUMER_RELATION: + event.fail("This action must be run in the cluster where the offer was created.") + return - system_identifier, error = self.get_system_identifier() - if error is not None: - logger.exception(error) - event.fail("Failed to get system identifier") + if not self._handle_replication_change(event): return - # Increment the current cluster counter in this application side based on the highest counter value. - promoted_cluster_counter = int(self._get_highest_promoted_cluster_counter_value()) - promoted_cluster_counter += 1 - logger.debug("Promoted cluster counter: %s", promoted_cluster_counter) + # Set the replication name in the relation data. + self._relation.data[self.charm.app].update({"name": event.params["name"]}) - self._update_primary_cluster_data(promoted_cluster_counter, system_identifier) + # Set the status. + self.charm.unit.status = MaintenanceStatus("Creating replication...") - # Emit an async replication changed event for this unit (to promote this cluster before demoting the - # other if this one is a standby cluster, which is needed to correctly setup the async replication - # when performing a switchover). - self._re_emit_async_relation_changed_event() + def _on_promote_to_primary(self, event: ActionEvent) -> None: + """Promote this cluster to the primary cluster.""" + if ( + self.charm.app.status.message != READ_ONLY_MODE_BLOCKING_MESSAGE + and self._get_primary_cluster() is None + ): + event.fail( + "No primary cluster found. Run `create-replication` action in the cluster where the offer was created." + ) + return + + if not self._handle_replication_change(event): + return # Set the status. self.charm.unit.status = MaintenanceStatus("Promoting cluster...") + def _on_secret_changed(self, event: SecretChangedEvent) -> None: + """Update the internal secret when the relation secret changes.""" + relation = self._relation + if relation is None: + logger.debug("Early exit on_secret_changed: No relation found.") + return + + if ( + relation.name == REPLICATION_OFFER_RELATION + and event.secret.label == f"{PEER}.{self.model.app.name}.app" + ): + logger.info("Internal secret changed, updating relation secret") + secret = self._get_secret() + secret.grant(relation) + primary_cluster_data = { + "endpoint": self._primary_cluster_endpoint, + "secret-id": secret.id, + } + relation.data[self.charm.app]["primary-cluster-data"] = json.dumps( + primary_cluster_data + ) + return + + if relation.name == REPLICATION_CONSUMER_RELATION and event.secret.label == SECRET_LABEL: + logger.info("Relation secret changed, updating internal secret") + if not self._update_internal_secret(): + logger.debug("Secret not found, deferring event") + event.defer() + @property def _primary_cluster_endpoint(self) -> str: """Return the endpoint from one of the sync-standbys, or from the primary if there is no sync-standby.""" @@ -548,8 +637,8 @@ def _re_emit_async_relation_changed_event(self) -> None: def _relation(self) -> Relation: """Return the relation object.""" for relation in [ - self.model.get_relation(ASYNC_PRIMARY_RELATION), - self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(REPLICATION_OFFER_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), ]: if relation is not None: return relation @@ -578,10 +667,21 @@ def _remove_previous_cluster_information(self) -> None: raise e logger.debug(f"{values[0]} {values[1]} not found") - @property - def _secret_label(self) -> str: - """Return the secret label.""" - return f"async-replication-secret-{self._get_highest_promoted_cluster_counter_value()}" + def _set_app_status(self) -> None: + """Set the app status.""" + if self.charm._peers.data[self.charm.app].get("promoted-cluster-counter") == "0": + self.charm.app.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + return + if self._relation is None: + self.charm.app.status = ActiveStatus() + return + primary_cluster = self._get_primary_cluster() + if primary_cluster is None: + self.charm.app.status = ActiveStatus() + else: + self.charm.app.status = ActiveStatus( + "Primary" if self.charm.app == primary_cluster else "Standby" + ) def _stop_database(self, event: RelationChangedEvent) -> bool: """Stop the database.""" @@ -620,9 +720,29 @@ def update_async_replication_data(self) -> None: if relation is None: return relation.data[self.charm.unit].update({"unit-address": self._get_unit_ip()}) - if self._is_primary_cluster() and self.charm.unit.is_leader(): + if self.is_primary_cluster() and self.charm.unit.is_leader(): self._update_primary_cluster_data() + def _update_internal_secret(self) -> bool: + # Update the secrets between the clusters. + relation = self._relation + primary_cluster_info = relation.data[relation.app].get("primary-cluster-data") + secret_id = ( + None + if primary_cluster_info is None + else json.loads(primary_cluster_info).get("secret-id") + ) + try: + secret = self.charm.model.get_secret(id=secret_id, label=SECRET_LABEL) + except SecretNotFoundError: + return False + credentials = secret.peek_content() + for key, password in credentials.items(): + user = key.split("-password")[0] + self.charm.set_secret(APP_SCOPE, key, password) + logger.debug("Synced %s password", user) + return True + def _update_primary_cluster_data( self, promoted_cluster_counter: int = None, system_identifier: str = None ) -> None: @@ -638,7 +758,7 @@ def _update_primary_cluster_data( primary_cluster_data = {"endpoint": self._primary_cluster_endpoint} # Retrieve the secrets that will be shared between the clusters. - if async_relation.name == ASYNC_PRIMARY_RELATION: + if async_relation.name == REPLICATION_OFFER_RELATION: secret = self._get_secret() secret.grant(async_relation) primary_cluster_data["secret-id"] = secret.id diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index c10611f797..bb8d9a3ab5 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -140,10 +140,10 @@ async def test_async_replication( logger.info("checking whether writes are increasing") await are_writes_increasing(ops_test) - first_offer_command = f"offer {DATABASE_APP_NAME}:async-primary async-primary" + first_offer_command = f"offer {DATABASE_APP_NAME}:replication-offer replication-offer" await ops_test.juju(*first_offer_command.split()) first_consume_command = ( - f"consume -m {second_model.info.name} admin/{first_model.info.name}.async-primary" + f"consume -m {second_model.info.name} admin/{first_model.info.name}.replication-offer" ) await ops_test.juju(*first_consume_command.split()) @@ -157,7 +157,7 @@ async def test_async_replication( ), ) - await second_model.relate(DATABASE_APP_NAME, "async-primary") + await second_model.relate(DATABASE_APP_NAME, "replication-offer") async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( @@ -177,7 +177,7 @@ async def test_async_replication( leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) assert leader_unit is not None, "No leader unit found" logger.info("promoting the first cluster") - run_action = await leader_unit.run_action("promote-cluster") + run_action = await leader_unit.run_action("create-replication") await run_action.wait() assert (run_action.results.get("return-code", None) == 0) or ( run_action.results.get("Code", None) == "0" @@ -212,10 +212,10 @@ async def test_switchover( second_model_continuous_writes, ): """Test switching over to the second cluster.""" - second_offer_command = f"offer {DATABASE_APP_NAME}:async-replica async-replica" + second_offer_command = f"offer {DATABASE_APP_NAME}:replication replication" await ops_test.juju(*second_offer_command.split()) second_consume_command = ( - f"consume -m {second_model.info.name} admin/{first_model.info.name}.async-replica" + f"consume -m {second_model.info.name} admin/{first_model.info.name}.replication" ) await ops_test.juju(*second_consume_command.split()) @@ -234,7 +234,7 @@ async def test_switchover( leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME, model=second_model) assert leader_unit is not None, "No leader unit found" logger.info("promoting the second cluster") - run_action = await leader_unit.run_action("promote-cluster", **{"force-promotion": True}) + run_action = await leader_unit.run_action("promote-to-primary", **{"force": True}) await run_action.wait() assert (run_action.results.get("return-code", None) == 0) or ( run_action.results.get("Code", None) == "0" @@ -269,16 +269,16 @@ async def test_promote_standby( """Test promoting the standby cluster.""" logger.info("breaking the relation") await second_model.applications[DATABASE_APP_NAME].remove_relation( - "async-replica", "async-primary" + "replication", "replication-offer" ) - wait_for_relation_removed_between(ops_test, "async-primary", "async-replica", second_model) + wait_for_relation_removed_between(ops_test, "replication-offer", "replication", second_model) async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( - apps=[DATABASE_APP_NAME], - status="blocked", - idle_period=IDLE_PERIOD, - timeout=TIMEOUT, + apps=[DATABASE_APP_NAME], idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + first_model.block_until( + lambda: first_model.applications[DATABASE_APP_NAME].status == "blocked", ), second_model.wait_for_idle( apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT @@ -290,7 +290,7 @@ async def test_promote_standby( leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) assert leader_unit is not None, "No leader unit found" logger.info("promoting the first cluster") - run_action = await leader_unit.run_action("promote-cluster") + run_action = await leader_unit.run_action("promote-to-primary") await run_action.wait() assert (run_action.results.get("return-code", None) == 0) or ( run_action.results.get("Code", None) == "0" @@ -346,7 +346,7 @@ async def test_reestablish_relation( await are_writes_increasing(ops_test) logger.info("reestablishing the relation") - await second_model.relate(DATABASE_APP_NAME, "async-primary") + await second_model.relate(DATABASE_APP_NAME, "replication-offer") async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( @@ -365,7 +365,7 @@ async def test_reestablish_relation( leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) assert leader_unit is not None, "No leader unit found" logger.info("promoting the first cluster") - run_action = await leader_unit.run_action("promote-cluster") + run_action = await leader_unit.run_action("create-replication") await run_action.wait() assert (run_action.results.get("return-code", None) == 0) or ( run_action.results.get("Code", None) == "0" diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 29fe3047f2..3cc396f151 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -761,10 +761,10 @@ async def wait_for_idle_on_blocked( unit = ops_test.model.units.get(f"{database_app_name}/{unit_number}") await asyncio.gather( ops_test.model.wait_for_idle(apps=[other_app_name], status="active"), - ops_test.model.wait_for_idle( - apps=[database_app_name], status="blocked", raise_on_blocked=False + ops_test.model.block_until( + lambda: unit.workload_status == "blocked" + and unit.workload_status_message == status_message ), - ops_test.model.block_until(lambda: unit.workload_status_message == status_message), ) diff --git a/tests/integration/new_relations/test_new_relations.py b/tests/integration/new_relations/test_new_relations.py index 4a41a089c1..8f0fd7ebe4 100644 --- a/tests/integration/new_relations/test_new_relations.py +++ b/tests/integration/new_relations/test_new_relations.py @@ -590,8 +590,13 @@ async def test_discourse(ops_test: OpsTest): # Deploy Discourse and Redis. await gather( ops_test.model.deploy(DISCOURSE_APP_NAME, application_name=DISCOURSE_APP_NAME), + # Revision 28 is being used due to https://github.com/canonical/redis-k8s-operator/issues/87. ops_test.model.deploy( - REDIS_APP_NAME, application_name=REDIS_APP_NAME, channel="latest/edge" + REDIS_APP_NAME, + application_name=REDIS_APP_NAME, + channel="latest/edge", + revision=28, + series="jammy", ), ) diff --git a/tests/integration/test_config.py b/tests/integration/test_config.py index 5031bf4efd..9fe542b0cd 100644 --- a/tests/integration/test_config.py +++ b/tests/integration/test_config.py @@ -92,11 +92,16 @@ async def test_config_parameters(ops_test: OpsTest) -> None: logger.info(k) charm_config[k] = v[0] await ops_test.model.applications[DATABASE_APP_NAME].set_config(charm_config) - await ops_test.model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="blocked", timeout=100 + await ops_test.model.block_until( + lambda: ops_test.model.units[f"{DATABASE_APP_NAME}/0"].workload_status + == "blocked", + timeout=100, ) assert "Configuration Error" in leader_unit.workload_status_message charm_config[k] = v[1] await ops_test.model.applications[DATABASE_APP_NAME].set_config(charm_config) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=100) + await ops_test.model.block_until( + lambda: ops_test.model.units[f"{DATABASE_APP_NAME}/0"].workload_status == "active", + timeout=100, + ) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 3e033a1b6b..0b0e9d198c 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -1491,9 +1491,7 @@ def test_set_active_status(harness): harness.charm.unit.status.message, "Primary" if values[0] == harness.charm.unit.name - else ( - "Standby Leader" if values[1] else ("" if values[2] else "fake status") - ), + else ("Standby" if values[1] else ("" if values[2] else "fake status")), ) else: _get_primary.side_effect = values[0] diff --git a/tests/unit/test_patroni.py b/tests/unit/test_patroni.py index ddad83f6ab..b013c0a9ff 100644 --- a/tests/unit/test_patroni.py +++ b/tests/unit/test_patroni.py @@ -62,13 +62,17 @@ def test_get_primary(harness, patroni): # Test returning pod name. primary = patroni.get_primary() tc.assertEqual(primary, "postgresql-k8s-1") - _get.assert_called_once_with("http://postgresql-k8s-0:8008/cluster", verify=True) + _get.assert_called_once_with( + "http://postgresql-k8s-0:8008/cluster", verify=True, timeout=5 + ) # Test returning unit name. _get.reset_mock() primary = patroni.get_primary(unit_name_pattern=True) tc.assertEqual(primary, "postgresql-k8s/1") - _get.assert_called_once_with("http://postgresql-k8s-0:8008/cluster", verify=True) + _get.assert_called_once_with( + "http://postgresql-k8s-0:8008/cluster", verify=True, timeout=5 + ) def test_is_creating_backup(harness, patroni):