diff --git a/actions.yaml b/actions.yaml index 03a90556cc..ba46f1108b 100644 --- a/actions.yaml +++ b/actions.yaml @@ -17,6 +17,12 @@ list-backups: description: Lists backups in s3 storage in AWS. pre-upgrade-check: description: Run necessary pre-upgrade checks and preparations before executing a charm refresh. +promote-cluster: + description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit. + params: + force-promotion: + type: boolean + description: Force the promotion of a cluster when there is already a primary cluster. restore: description: Restore a database backup using pgBackRest. S3 credentials are retrieved from a relation with the S3 integrator charm. diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index b0c30b5d5e..8783f76814 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -36,7 +36,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 25 +LIBPATCH = 26 INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles" @@ -476,11 +476,11 @@ def set_up_database(self) -> None: """Set up postgres database with the right permissions.""" connection = None try: - self.create_user( - "admin", - extra_user_roles="pg_read_all_data,pg_write_all_data", - ) with self._connect_to_database() as connection, connection.cursor() as cursor: + cursor.execute("SELECT TRUE FROM pg_roles WHERE rolname='admin';") + if cursor.fetchone() is not None: + return + # Allow access to the postgres database only to the system users. cursor.execute("REVOKE ALL PRIVILEGES ON DATABASE postgres FROM PUBLIC;") cursor.execute("REVOKE CREATE ON SCHEMA public FROM PUBLIC;") @@ -490,6 +490,10 @@ def set_up_database(self) -> None: sql.Identifier(user) ) ) + self.create_user( + "admin", + extra_user_roles="pg_read_all_data,pg_write_all_data", + ) cursor.execute("GRANT CONNECT ON DATABASE postgres TO admin;") except psycopg2.Error as e: logger.error(f"Failed to set up databases: {e}") diff --git a/metadata.yaml b/metadata.yaml index 41bd0aa19f..5f429a40b2 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -39,6 +39,10 @@ peers: interface: upgrade provides: + async-primary: + interface: async_replication + limit: 1 + optional: true database: interface: postgresql_client db: @@ -51,6 +55,10 @@ provides: interface: grafana_dashboard requires: + async-replica: + interface: async_replication + limit: 1 + optional: true certificates: interface: tls-certificates limit: 1 diff --git a/src/backups.py b/src/backups.py index 3bf9ebe8c7..952658e736 100644 --- a/src/backups.py +++ b/src/backups.py @@ -26,6 +26,7 @@ from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed from constants import BACKUP_USER, WORKLOAD_OS_GROUP, WORKLOAD_OS_USER +from relations.async_replication import ASYNC_PRIMARY_RELATION, ASYNC_REPLICA_RELATION logger = logging.getLogger(__name__) @@ -717,6 +718,18 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool: event.fail(error_message) return False + logger.info("Checking that the cluster is not replicating data to a standby cluster") + for relation in [ + self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(ASYNC_PRIMARY_RELATION), + ]: + if not relation: + continue + error_message = "Unit cannot restore backup as the cluster is replicating data to a standby cluster" + logger.error(f"Restore failed: {error_message}") + event.fail(error_message) + return False + logger.info("Checking that this unit was already elected the leader unit") if not self.charm.unit.is_leader(): error_message = "Unit cannot restore backup as it was not elected the leader unit yet" diff --git a/src/charm.py b/src/charm.py index a76462baa2..09ba21a1b0 100755 --- a/src/charm.py +++ b/src/charm.py @@ -81,6 +81,7 @@ WORKLOAD_OS_USER, ) from patroni import NotReadyError, Patroni +from relations.async_replication import PostgreSQLAsyncReplication from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides from relations.postgresql_provider import PostgreSQLProvider from upgrade import PostgreSQLUpgrade, get_postgresql_k8s_dependencies_model @@ -163,6 +164,7 @@ def __init__(self, *args): self.legacy_db_admin_relation = DbProvides(self, admin=True) self.backup = PostgreSQLBackups(self, "s3-parameters") self.tls = PostgreSQLTLS(self, PEER, [self.primary_endpoint, self.replicas_endpoint]) + self.async_replication = PostgreSQLAsyncReplication(self) self.restart_manager = RollingOpsManager( charm=self, relation="restart", callback=self._restart ) @@ -351,6 +353,18 @@ def _get_endpoints_to_remove(self) -> List[str]: endpoints_to_remove = list(set(old) - set(current)) return endpoints_to_remove + def get_unit_ip(self, unit: Unit) -> Optional[str]: + """Get the IP address of a specific unit.""" + # Check if host is current host. + if unit == self.unit: + return str(self.model.get_binding(PEER).network.bind_address) + # Check if host is a peer. + elif unit in self._peers.data: + return str(self._peers.data[unit].get("private-address")) + # Return None if the unit is not a peer neither the current unit. + else: + return None + def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: """The leader removes the departing units from the list of cluster members.""" # Allow leader to update endpoints if it isn't leaving. @@ -368,6 +382,9 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: self.postgresql_client_relation.update_read_only_endpoint() self._remove_from_endpoints(endpoints_to_remove) + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + def _on_peer_relation_changed(self, event: HookEvent) -> None: """Reconfigure cluster members.""" # The cluster must be initialized first in the leader unit @@ -412,9 +429,13 @@ def _on_peer_relation_changed(self, event: HookEvent) -> None: # Restart the workload if it's stuck on the starting state after a timeline divergence # due to a backup that was restored. - if not self.is_primary and ( - self._patroni.member_replication_lag == "unknown" - or int(self._patroni.member_replication_lag) > 1000 + if ( + not self.is_primary + and not self.is_standby_leader + and ( + self._patroni.member_replication_lag == "unknown" + or int(self._patroni.member_replication_lag) > 1000 + ) ): self._patroni.reinitialize_postgresql() logger.debug("Deferring on_peer_relation_changed: reinitialising replica") @@ -440,8 +461,7 @@ def _on_peer_relation_changed(self, event: HookEvent) -> None: else: self.unit_peer_data.pop("start-tls-server", None) - if not self.is_blocked: - self._set_active_status() + self.async_replication.handle_read_only_mode() def _on_config_changed(self, event) -> None: """Handle configuration changes, like enabling plugins.""" @@ -471,6 +491,9 @@ def _on_config_changed(self, event) -> None: if self.is_blocked and "Configuration Error" in self.unit.status.message: self._set_active_status() + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + if not self.unit.is_leader(): return @@ -498,6 +521,9 @@ def enable_disable_extensions(self, database: str = None) -> None: Args: database: optional database where to enable/disable the extension. """ + if self._patroni.get_primary() is None: + logger.debug("Early exit enable_disable_extensions: standby cluster") + return spi_module = ["refint", "autoinc", "insert_username", "moddatetime"] plugins_exception = {"uuid_ossp": '"uuid-ossp"'} original_status = self.unit.status @@ -641,6 +667,25 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None: self._add_to_endpoints(self._endpoint) self._cleanup_old_cluster_resources() + + if not self.fix_leader_annotation(): + return + + # Create resources and add labels needed for replication. + try: + self._create_services() + except ApiError: + logger.exception("failed to create k8s services") + self.unit.status = BlockedStatus("failed to create k8s services") + return + + # Remove departing units when the leader changes. + self._remove_from_endpoints(self._get_endpoints_to_remove()) + + self._add_members(event) + + def fix_leader_annotation(self) -> bool: + """Fix the leader annotation if it's missing.""" client = Client() try: endpoint = client.get(Endpoints, name=self.cluster_name, namespace=self._namespace) @@ -657,23 +702,11 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None: except ApiError as e: if e.status.code == 403: self.on_deployed_without_trust() - return + return False # Ignore the error only when the resource doesn't exist. if e.status.code != 404: raise e - - # Create resources and add labels needed for replication. - try: - self._create_services() - except ApiError: - logger.exception("failed to create k8s services") - self.unit.status = BlockedStatus("failed to create k8s services") - return - - # Remove departing units when the leader changes. - self._remove_from_endpoints(self._get_endpoints_to_remove()) - - self._add_members(event) + return True def _create_pgdata(self, container: Container): """Create the PostgreSQL data directory.""" @@ -747,6 +780,8 @@ def _set_active_status(self): try: if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name: self.unit.status = ActiveStatus("Primary") + elif self.is_standby_leader: + self.unit.status = ActiveStatus("Standby Leader") elif self._patroni.member_started: self.unit.status = ActiveStatus() except (RetryError, ConnectionError) as e: @@ -819,6 +854,9 @@ def _on_upgrade_charm(self, _) -> None: self.unit.status = BlockedStatus(f"failed to patch pod with error {e}") return + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + def _patch_pod_labels(self, member: str) -> None: """Add labels required for replication to the current pod. @@ -988,6 +1026,9 @@ def _on_set_password(self, event: ActionEvent) -> None: # Other units Patroni configuration will be reloaded in the peer relation changed event. self.update_config() + # Update the password in the async replication data. + self.async_replication.update_async_replication_data() + event.set_results({"password": password}) def _on_get_primary(self, event: ActionEvent) -> None: @@ -1109,6 +1150,9 @@ def _on_update_status(self, _) -> None: if self._handle_processes_failures(): return + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + self._set_active_status() def _handle_processes_failures(self) -> bool: @@ -1131,8 +1175,15 @@ def _handle_processes_failures(self) -> bool: return False return True + try: + is_primary = self.is_primary + is_standby_leader = self.is_standby_leader + except RetryError: + return False + if ( - not self.is_primary + not is_primary + and not is_standby_leader and self._patroni.member_started and not self._patroni.member_streaming ): @@ -1170,6 +1221,11 @@ def is_primary(self) -> bool: """Return whether this unit is the primary instance.""" return self._unit == self._patroni.get_primary(unit_name_pattern=True) + @property + def is_standby_leader(self) -> bool: + """Return whether this unit is the standby leader instance.""" + return self._unit == self._patroni.get_standby_leader(unit_name_pattern=True) + @property def is_tls_enabled(self) -> bool: """Return whether TLS is enabled.""" diff --git a/src/constants.py b/src/constants.py index 0cdf7b26ec..af32248a60 100644 --- a/src/constants.py +++ b/src/constants.py @@ -20,6 +20,7 @@ WORKLOAD_OS_GROUP = "postgres" WORKLOAD_OS_USER = "postgres" METRICS_PORT = "9187" +POSTGRESQL_DATA_PATH = "/var/lib/postgresql/data/pgdata" POSTGRES_LOG_FILES = [ "/var/log/pgbackrest/*", "/var/log/postgresql/patroni.log", diff --git a/src/patroni.py b/src/patroni.py index 7c5f6d434e..3295841c64 100644 --- a/src/patroni.py +++ b/src/patroni.py @@ -30,6 +30,10 @@ logger = logging.getLogger(__name__) +class ClusterNotPromotedError(Exception): + """Raised when a cluster is not promoted.""" + + class NotReadyError(Exception): """Raised when not all cluster members healthy or finished initial sync.""" @@ -38,6 +42,10 @@ class EndpointNotReadyError(Exception): """Raised when an endpoint is not ready.""" +class StandbyClusterAlreadyPromotedError(Exception): + """Raised when a standby cluster is already promoted.""" + + class SwitchoverFailedError(Exception): """Raised when a switchover failed for some reason.""" @@ -79,6 +87,20 @@ def _patroni_url(self) -> str: """Patroni REST API URL.""" return f"{'https' if self._tls_enabled else 'http'}://{self._endpoint}:8008" + # def configure_standby_cluster(self, host: str) -> None: + # """Configure this cluster as a standby cluster.""" + # requests.patch( + # f"{self._patroni_url}/config", + # verify=self._verify, + # json={ + # "standby_cluster": { + # "create_replica_methods": ["basebackup"], + # "host": host, + # "port": 5432, + # } + # }, + # ) + @property def rock_postgresql_version(self) -> Optional[str]: """Version of Postgresql installed in the Rock image.""" @@ -127,6 +149,36 @@ def get_primary(self, unit_name_pattern=False) -> str: break return primary + def get_standby_leader( + self, unit_name_pattern=False, check_whether_is_running: bool = False + ) -> Optional[str]: + """Get standby leader instance. + + Args: + unit_name_pattern: whether to convert pod name to unit name + check_whether_is_running: whether to check if the standby leader is running + + Returns: + standby leader pod or unit name. + """ + standby_leader = None + # Request info from cluster endpoint (which returns all members of the cluster). + for attempt in Retrying(stop=stop_after_attempt(len(self._endpoints) + 1)): + with attempt: + url = self._get_alternative_patroni_url(attempt) + r = requests.get(f"{url}/cluster", verify=self._verify) + for member in r.json()["members"]: + if member["role"] == "standby_leader": + if check_whether_is_running and member["state"] not in RUNNING_STATES: + logger.warning(f"standby leader {member['name']} is not running") + continue + standby_leader = member["name"] + if unit_name_pattern: + # Change the last dash to / in order to match unit name pattern. + standby_leader = "/".join(standby_leader.rsplit("-", 1)) + break + return standby_leader + def get_sync_standby_names(self) -> List[str]: """Get the list of sync standby unit names.""" sync_standbys = [] @@ -260,7 +312,7 @@ def member_started(self) -> bool: allow server time to start up. """ try: - for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + for attempt in Retrying(stop=stop_after_delay(90), wait=wait_fixed(3)): with attempt: r = requests.get(f"{self._patroni_url}/health", verify=self._verify) except RetryError: @@ -310,6 +362,19 @@ def bulk_update_parameters_controller_by_patroni(self, parameters: Dict[str, Any json={"postgresql": {"parameters": parameters}}, ) + def promote_standby_cluster(self) -> None: + """Promote a standby cluster to be a regular cluster.""" + config_response = requests.get(f"{self._patroni_url}/config", verify=self._verify) + if "standby_cluster" not in config_response.json(): + raise StandbyClusterAlreadyPromotedError("standby cluster is already promoted") + requests.patch( + f"{self._patroni_url}/config", verify=self._verify, json={"standby_cluster": None} + ) + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + if self.get_primary() is None: + raise ClusterNotPromotedError("cluster not promoted") + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) def reinitialize_postgresql(self) -> None: """Reinitialize PostgreSQL.""" @@ -386,6 +451,8 @@ def render_patroni_yml_file( minority_count=self._members_count // 2, version=self.rock_postgresql_version.split(".")[0], pg_parameters=parameters, + primary_cluster_endpoint=self._charm.async_replication.get_primary_cluster_endpoint(), + extra_replication_endpoints=self._charm.async_replication.get_standby_endpoints(), ) self._render_file(f"{self._storage_path}/patroni.yml", rendered, 0o644) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py new file mode 100644 index 0000000000..0748f94017 --- /dev/null +++ b/src/relations/async_replication.py @@ -0,0 +1,665 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Async Replication implementation. + +The highest "promoted-cluster-counter" value is used to determine the primary cluster. +The application in any side of the relation which has the highest value in its application +relation databag is considered the primary cluster. + +The "unit-promoted-cluster-counter" field in the unit relation databag is used to determine +if the unit is following the promoted cluster. If the value is the same as the highest value +in the application relation databag, then the unit is following the promoted cluster. +Otherwise, it's needed to restart the database in the unit to follow the promoted cluster +if the unit is from the standby cluster (the one that was not promoted). +""" + +import itertools +import json +import logging +from datetime import datetime +from typing import List, Optional, Tuple + +from lightkube import ApiError, Client +from lightkube.resources.core_v1 import Endpoints, Service +from ops import ( + ActionEvent, + Application, + BlockedStatus, + MaintenanceStatus, + Object, + Relation, + RelationChangedEvent, + RelationDepartedEvent, + Secret, + SecretNotFoundError, + WaitingStatus, +) +from ops.pebble import ChangeError +from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed + +from constants import ( + APP_SCOPE, + POSTGRESQL_DATA_PATH, + WORKLOAD_OS_GROUP, + WORKLOAD_OS_USER, +) +from patroni import ClusterNotPromotedError, NotReadyError, StandbyClusterAlreadyPromotedError + +logger = logging.getLogger(__name__) + + +ASYNC_PRIMARY_RELATION = "async-primary" +ASYNC_REPLICA_RELATION = "async-replica" +READ_ONLY_MODE_BLOCKING_MESSAGE = "Cluster in read-only mode" + + +class PostgreSQLAsyncReplication(Object): + """Defines the async-replication management logic.""" + + def __init__(self, charm): + """Constructor.""" + super().__init__(charm, "postgresql") + self.charm = charm + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_joined, self._on_async_relation_joined + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_joined, self._on_async_relation_joined + ) + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_changed, self._on_async_relation_changed + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_async_relation_changed + ) + + # Departure events + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_departed, + self._on_async_relation_departed, + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_departed, + self._on_async_relation_departed, + ) + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_broken, self._on_async_relation_broken + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_broken, self._on_async_relation_broken + ) + + # Actions + self.framework.observe(self.charm.on.promote_cluster_action, self._on_promote_cluster) + + self.container = self.charm.unit.get_container("postgresql") + + def _can_promote_cluster(self, event: ActionEvent) -> bool: + """Check if the cluster can be promoted.""" + if not self.charm.is_cluster_initialised: + event.fail("Cluster not initialised yet.") + return False + + # Check if there is a relation. If not, see if there is a standby leader. If so promote it to leader. If not, + # fail the action telling that there is no relation and no standby leader. + relation = self._relation + if relation is None: + standby_leader = self.charm._patroni.get_standby_leader() + if standby_leader is not None: + try: + self.charm._patroni.promote_standby_cluster() + if ( + self.charm.is_blocked + and self.charm.unit.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE + ): + self.charm._peers.data[self.charm.app].update({ + "promoted-cluster-counter": "" + }) + self.charm._set_active_status() + except (StandbyClusterAlreadyPromotedError, ClusterNotPromotedError) as e: + event.fail(str(e)) + return False + event.fail("No relation and no standby leader found.") + return False + + # Check if this cluster is already the primary cluster. If so, fail the action telling that it's already + # the primary cluster. + primary_cluster = self._get_primary_cluster() + if self.charm.app == primary_cluster: + event.fail("This cluster is already the primary cluster.") + return False + + # To promote the other cluster if there is already a primary cluster, the action must be called with + # `force-promotion=true`. If not, fail the action telling that the other cluster is already the primary. + if relation.app == primary_cluster: + if not event.params.get("force-promotion"): + event.fail( + f"{relation.app.name} is already the primary cluster. Pass `force-promotion=true` to promote anyway." + ) + return False + else: + logger.warning( + "%s is already the primary cluster. Forcing promotion of %s to primary cluster due to `force-promotion=true`.", + relation.app.name, + self.charm.app.name, + ) + + return True + + def _configure_primary_cluster( + self, primary_cluster: Application, event: RelationChangedEvent + ) -> bool: + """Configure the primary cluster.""" + if self.charm.app == primary_cluster: + self.charm.update_config() + if self._is_primary_cluster() and self.charm.unit.is_leader(): + self._update_primary_cluster_data() + # If this is a standby cluster, remove the information from DCS to make it + # a normal cluster. + if self.charm._patroni.get_standby_leader() is not None: + self.charm._patroni.promote_standby_cluster() + try: + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + if not self.charm.is_primary: + raise ClusterNotPromotedError() + except RetryError: + logger.debug( + "Deferring on_async_relation_changed: standby cluster not promoted yet." + ) + event.defer() + return True + self.charm._peers.data[self.charm.unit].update({ + "unit-promoted-cluster-counter": self._get_highest_promoted_cluster_counter_value() + }) + self.charm._set_active_status() + return True + return False + + def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: + """Configure the standby cluster.""" + relation = self._relation + if relation.name == ASYNC_REPLICA_RELATION: + # Update the secrets between the clusters. + primary_cluster_info = relation.data[relation.app].get("primary-cluster-data") + secret_id = ( + None + if primary_cluster_info is None + else json.loads(primary_cluster_info).get("secret-id") + ) + try: + secret = self.charm.model.get_secret(id=secret_id, label=self._secret_label) + except SecretNotFoundError: + logger.debug("Secret not found, deferring event") + event.defer() + return False + credentials = secret.peek_content() + for key, password in credentials.items(): + user = key.split("-password")[0] + self.charm.set_secret(APP_SCOPE, key, password) + logger.debug("Synced %s password", user) + system_identifier, error = self.get_system_identifier() + if error is not None: + raise Exception(error) + if system_identifier != relation.data[relation.app].get("system-id"): + # Store current data in a tar.gz file. + logger.info("Creating backup of pgdata folder") + filename = f"{POSTGRESQL_DATA_PATH}-{str(datetime.now()).replace(' ', '-').replace(':', '-')}.tar.gz" + self.container.exec( + f"tar -zcf {filename} {POSTGRESQL_DATA_PATH}".split() + ).wait_output() + logger.warning("Please review the backup file %s and handle its removal", filename) + self._remove_previous_cluster_information() + return True + + def _get_highest_promoted_cluster_counter_value(self) -> str: + """Return the highest promoted cluster counter.""" + promoted_cluster_counter = "0" + for async_relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ]: + if async_relation is None: + continue + for databag in [ + async_relation.data[async_relation.app], + self.charm._peers.data[self.charm.app], + ]: + relation_promoted_cluster_counter = databag.get("promoted-cluster-counter", "0") + if int(relation_promoted_cluster_counter) > int(promoted_cluster_counter): + promoted_cluster_counter = relation_promoted_cluster_counter + return promoted_cluster_counter + + def _get_primary_cluster(self) -> Optional[Application]: + """Return the primary cluster.""" + primary_cluster = None + promoted_cluster_counter = "0" + for async_relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ]: + if async_relation is None: + continue + for app, relation_data in { + async_relation.app: async_relation.data, + self.charm.app: self.charm._peers.data, + }.items(): + databag = relation_data[app] + relation_promoted_cluster_counter = databag.get("promoted-cluster-counter", "0") + if relation_promoted_cluster_counter > promoted_cluster_counter: + promoted_cluster_counter = relation_promoted_cluster_counter + primary_cluster = app + return primary_cluster + + def get_primary_cluster_endpoint(self) -> Optional[str]: + """Return the primary cluster endpoint.""" + primary_cluster = self._get_primary_cluster() + if primary_cluster is None or self.charm.app == primary_cluster: + return None + relation = self._relation + primary_cluster_data = relation.data[relation.app].get("primary-cluster-data") + if primary_cluster_data is None: + return None + return json.loads(primary_cluster_data).get("endpoint") + + def _get_secret(self) -> Secret: + """Return async replication necessary secrets.""" + try: + # Avoid recreating the secret. + secret = self.charm.model.get_secret(label=self._secret_label) + if not secret.id: + # Workaround for the secret id not being set with model uuid. + secret._id = f"secret://{self.model.uuid}/{secret.get_info().id.split(':')[1]}" + return secret + except SecretNotFoundError: + logger.debug("Secret not found, creating a new one") + pass + + app_secret = self.charm.model.get_secret(label=f"{self.model.app.name}.app") + content = app_secret.peek_content() + + # Filter out unnecessary secrets. + shared_content = dict(filter(lambda x: "password" in x[0], content.items())) + + return self.charm.model.app.add_secret(content=shared_content, label=self._secret_label) + + def get_standby_endpoints(self) -> List[str]: + """Return the standby endpoints.""" + relation = self._relation + primary_cluster = self._get_primary_cluster() + # List the standby endpoints only for the primary cluster. + if relation is None or primary_cluster is None or self.charm.app != primary_cluster: + return [] + return [ + relation.data[unit].get("unit-address") + for relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ] + if relation is not None + for unit in relation.units + if relation.data[unit].get("unit-address") is not None + ] + + def get_system_identifier(self) -> Tuple[Optional[str], Optional[str]]: + """Returns the PostgreSQL system identifier from this instance.""" + try: + system_identifier, error = self.container.exec( + [ + f'/usr/lib/postgresql/{self.charm._patroni.rock_postgresql_version.split(".")[0]}/bin/pg_controldata', + POSTGRESQL_DATA_PATH, + ], + user=WORKLOAD_OS_USER, + group=WORKLOAD_OS_GROUP, + ).wait_output() + except ChangeError as e: + return None, str(e) + if error != "": + return None, error + system_identifier = [ + line for line in system_identifier.splitlines() if "Database system identifier" in line + ][0].split(" ")[-1] + return system_identifier, None + + def _get_unit_ip(self) -> str: + """Reads some files to quickly figure out its own pod IP. + + It should work for any Ubuntu-based image + """ + with open("/etc/hosts") as f: + hosts = f.read() + with open("/etc/hostname") as f: + hostname = f.read().replace("\n", "") + line = [ln for ln in hosts.split("\n") if ln.find(hostname) >= 0][0] + return line.split("\t")[0] + + def _handle_database_start(self, event: RelationChangedEvent) -> None: + """Handle the database start in the standby cluster.""" + try: + if self.charm._patroni.member_started: + # If the database is started, update the databag in a way the unit is marked as configured + # for async replication. + self.charm._peers.data[self.charm.unit].update({"stopped": ""}) + self.charm._peers.data[self.charm.unit].update({ + "unit-promoted-cluster-counter": self._get_highest_promoted_cluster_counter_value() + }) + + if self.charm.unit.is_leader(): + # If this unit is the leader, check if all units are ready before making the cluster + # active again (including the health checks from the update status hook). + if all( + self.charm._peers.data[unit].get("unit-promoted-cluster-counter") + == self._get_highest_promoted_cluster_counter_value() + for unit in {*self.charm._peers.units, self.charm.unit} + ): + self.charm._peers.data[self.charm.app].update({ + "cluster_initialised": "True" + }) + elif self._is_following_promoted_cluster(): + self.charm.unit.status = WaitingStatus( + "Waiting for the database to be started in all units" + ) + event.defer() + return + + self.charm._set_active_status() + elif not self.charm.unit.is_leader(): + raise NotReadyError() + else: + # If the standby leader fails to start, fix the leader annotation and defer the event. + self.charm.fix_leader_annotation() + self.charm.unit.status = WaitingStatus( + "Still starting the database in the standby leader" + ) + event.defer() + except NotReadyError: + self.charm.unit.status = WaitingStatus("Waiting for the database to start") + logger.debug("Deferring on_async_relation_changed: database hasn't started yet.") + event.defer() + + def handle_read_only_mode(self) -> None: + """Handle read-only mode (standby cluster that lost the relation with the primary cluster).""" + promoted_cluster_counter = self.charm._peers.data[self.charm.app].get( + "promoted-cluster-counter", "" + ) + if not self.charm.is_blocked or ( + promoted_cluster_counter != "0" + and self.charm.unit.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE + ): + self.charm._set_active_status() + if ( + promoted_cluster_counter == "0" + and self.charm.unit.status.message != READ_ONLY_MODE_BLOCKING_MESSAGE + ): + self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + + def _is_following_promoted_cluster(self) -> bool: + """Return True if this unit is following the promoted cluster.""" + if self._get_primary_cluster() is None: + return False + return ( + self.charm._peers.data[self.charm.unit].get("unit-promoted-cluster-counter") + == self._get_highest_promoted_cluster_counter_value() + ) + + def _is_primary_cluster(self) -> bool: + """Return the primary cluster name.""" + return self.charm.app == self._get_primary_cluster() + + def _on_async_relation_broken(self, _) -> None: + if "departing" in self.charm._peers.data[self.charm.unit]: + logger.debug("Early exit on_async_relation_broken: Skipping departing unit.") + return + + self.charm._peers.data[self.charm.unit].update({ + "stopped": "", + "unit-promoted-cluster-counter": "", + }) + + # If this is the standby cluster, set 0 in the "promoted-cluster-counter" field to set + # the cluster in read-only mode message also in the other units. + if self.charm._patroni.get_standby_leader() is not None: + if self.charm.unit.is_leader(): + self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": "0"}) + self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + else: + if self.charm.unit.is_leader(): + self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": ""}) + self.charm.update_config() + + def _on_async_relation_changed(self, event: RelationChangedEvent) -> None: + """Update the Patroni configuration if one of the clusters was already promoted.""" + primary_cluster = self._get_primary_cluster() + logger.debug("Primary cluster: %s", primary_cluster) + if primary_cluster is None: + logger.debug("Early exit on_async_relation_changed: No primary cluster found.") + return + + if self._configure_primary_cluster(primary_cluster, event): + return + + # Return if this is a new unit. + if not self.charm.unit.is_leader() and self._is_following_promoted_cluster(): + logger.debug("Early exit on_async_relation_changed: following promoted cluster.") + return + + if not self._stop_database(event): + return + + if not all( + "stopped" in self.charm._peers.data[unit] + or self.charm._peers.data[unit].get("unit-promoted-cluster-counter") + == self._get_highest_promoted_cluster_counter_value() + for unit in {*self.charm._peers.units, self.charm.unit} + ): + self.charm.unit.status = WaitingStatus( + "Waiting for the database to be stopped in all units" + ) + logger.debug("Deferring on_async_relation_changed: not all units stopped.") + event.defer() + return + + if self._wait_for_standby_leader(event): + return + + # Update the asynchronous replication configuration and start the database. + self.charm.update_config() + self.container.start(self.charm._postgresql_service) + + self._handle_database_start(event) + + def _on_async_relation_departed(self, event: RelationDepartedEvent) -> None: + """Set a flag to avoid setting a wrong status message on relation broken event handler.""" + # This is needed because of https://bugs.launchpad.net/juju/+bug/1979811. + if event.departing_unit == self.charm.unit: + self.charm._peers.data[self.charm.unit].update({"departing": "True"}) + + def _on_async_relation_joined(self, _) -> None: + """Publish this unit address in the relation data.""" + self._relation.data[self.charm.unit].update({"unit-address": self._get_unit_ip()}) + + # Set the counter for new units. + highest_promoted_cluster_counter = self._get_highest_promoted_cluster_counter_value() + if highest_promoted_cluster_counter != "0": + self.charm._peers.data[self.charm.unit].update({ + "unit-promoted-cluster-counter": highest_promoted_cluster_counter + }) + + def _on_promote_cluster(self, event: ActionEvent) -> None: + """Promote this cluster to the primary cluster.""" + if not self._can_promote_cluster(event): + return + + relation = self._relation + + # Check if all units from the other cluster published their pod IPs in the relation data. + # If not, fail the action telling that all units must publish their pod addresses in the + # relation data. + for unit in relation.units: + if "unit-address" not in relation.data[unit]: + event.fail( + "All units from the other cluster must publish their pod addresses in the relation data." + ) + return + + system_identifier, error = self.get_system_identifier() + if error is not None: + logger.exception(error) + event.fail("Failed to get system identifier") + return + + # Increment the current cluster counter in this application side based on the highest counter value. + promoted_cluster_counter = int(self._get_highest_promoted_cluster_counter_value()) + promoted_cluster_counter += 1 + logger.debug("Promoted cluster counter: %s", promoted_cluster_counter) + + self._update_primary_cluster_data(promoted_cluster_counter, system_identifier) + + # Emit an async replication changed event for this unit (to promote this cluster before demoting the + # other if this one is a standby cluster, which is needed to correctly setup the async replication + # when performing a switchover). + self._re_emit_async_relation_changed_event() + + # Set the status. + self.charm.unit.status = MaintenanceStatus("Promoting cluster...") + + @property + def _primary_cluster_endpoint(self) -> str: + """Return the endpoint from one of the sync-standbys, or from the primary if there is no sync-standby.""" + sync_standby_names = self.charm._patroni.get_sync_standby_names() + if len(sync_standby_names) > 0: + unit = self.model.get_unit(sync_standby_names[0]) + return self.charm.get_unit_ip(unit) + else: + return self.charm.get_unit_ip(self.charm.unit) + + def _re_emit_async_relation_changed_event(self) -> None: + """Re-emit the async relation changed event.""" + relation = self._relation + getattr(self.charm.on, f'{relation.name.replace("-", "_")}_relation_changed').emit( + relation, + app=relation.app, + unit=[unit for unit in relation.units if unit.app == relation.app][0], + ) + + @property + def _relation(self) -> Relation: + """Return the relation object.""" + for relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ]: + if relation is not None: + return relation + + def _remove_previous_cluster_information(self) -> None: + """Remove the previous cluster information.""" + client = Client() + for values in itertools.product( + [Endpoints, Service], + [ + f"patroni-{self.charm._name}", + f"patroni-{self.charm._name}-config", + f"patroni-{self.charm._name}-sync", + ], + ): + try: + client.delete( + values[0], + name=values[1], + namespace=self.charm._namespace, + ) + logger.debug(f"Deleted {values[0]} {values[1]}") + except ApiError as e: + # Ignore the error only when the resource doesn't exist. + if e.status.code != 404: + raise e + logger.debug(f"{values[0]} {values[1]} not found") + + @property + def _secret_label(self) -> str: + """Return the secret label.""" + return f"async-replication-secret-{self._get_highest_promoted_cluster_counter_value()}" + + def _stop_database(self, event: RelationChangedEvent) -> bool: + """Stop the database.""" + if ( + "stopped" not in self.charm._peers.data[self.charm.unit] + and not self._is_following_promoted_cluster() + ): + if not self.charm.unit.is_leader() and not self.container.exists(POSTGRESQL_DATA_PATH): + logger.debug("Early exit on_async_relation_changed: following promoted cluster.") + return False + + self.container.stop(self.charm._postgresql_service) + + if self.charm.unit.is_leader(): + # Remove the "cluster_initialised" flag to avoid self-healing in the update status hook. + self.charm._peers.data[self.charm.app].update({"cluster_initialised": ""}) + if not self._configure_standby_cluster(event): + return False + + # Remove and recreate the pgdata folder to enable replication of the data from the + # primary cluster. + logger.info("Removing and recreating pgdata folder") + self.container.exec(f"rm -r {POSTGRESQL_DATA_PATH}".split()).wait_output() + self.charm._create_pgdata(self.container) + + self.charm._peers.data[self.charm.unit].update({"stopped": "True"}) + + return True + + def update_async_replication_data(self) -> None: + """Updates the async-replication data, if the unit is the leader. + + This is used to update the standby units with the new primary information. + """ + relation = self._relation + if relation is None: + return + relation.data[self.charm.unit].update({"unit-address": self._get_unit_ip()}) + if self._is_primary_cluster() and self.charm.unit.is_leader(): + self._update_primary_cluster_data() + + def _update_primary_cluster_data( + self, promoted_cluster_counter: int = None, system_identifier: str = None + ) -> None: + """Update the primary cluster data.""" + async_relation = self._relation + + if promoted_cluster_counter is not None: + for relation in [async_relation, self.charm._peers]: + relation.data[self.charm.app].update({ + "promoted-cluster-counter": str(promoted_cluster_counter) + }) + + primary_cluster_data = {"endpoint": self._primary_cluster_endpoint} + + # Retrieve the secrets that will be shared between the clusters. + if async_relation.name == ASYNC_PRIMARY_RELATION: + secret = self._get_secret() + secret.grant(async_relation) + primary_cluster_data["secret-id"] = secret.id + + if system_identifier is not None: + primary_cluster_data["system-id"] = system_identifier + + async_relation.data[self.charm.app]["primary-cluster-data"] = json.dumps( + primary_cluster_data + ) + + def _wait_for_standby_leader(self, event: RelationChangedEvent) -> bool: + """Wait for the standby leader to be up and running.""" + try: + standby_leader = self.charm._patroni.get_standby_leader(check_whether_is_running=True) + except RetryError: + standby_leader = None + if not self.charm.unit.is_leader() and standby_leader is None: + self.charm.unit.status = WaitingStatus( + "Waiting for the standby leader start the database" + ) + logger.debug("Deferring on_async_relation_changed: standby leader hasn't started yet.") + event.defer() + return True + return False diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index 6d3c50802f..2d4ee5f92f 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -51,6 +51,11 @@ bootstrap: command: pgbackrest --stanza={{ restore_stanza }} --pg1-path={{ storage_path }}/pgdata --set={{ backup_id }} --type=immediate --target-action=promote restore no_params: True keep_existing_recovery_conf: True + {% elif primary_cluster_endpoint %} + standby_cluster: + host: {{ primary_cluster_endpoint }} + port: 5432 + create_replica_methods: ["basebackup"] {% else %} initdb: - auth-host: md5 @@ -61,6 +66,9 @@ bootstrap: pg_hba: - {{ 'hostssl' if enable_tls else 'host' }} all all 0.0.0.0/0 md5 - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.1/32 md5 + {%- for endpoint in extra_replication_endpoints %} + - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ endpoint }}/32 md5 + {%- endfor %} bypass_api_service: true log: dir: /var/log/postgresql @@ -116,6 +124,10 @@ postgresql: - {{ 'hostssl' if enable_tls else 'host' }} all all 0.0.0.0/0 md5 {%- endif %} - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.1/32 md5 + - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.6/32 md5 + {%- for endpoint in extra_replication_endpoints %} + - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ endpoint }}/32 md5 + {%- endfor %} {%- for endpoint in endpoints %} - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ endpoint }}.{{ namespace }}.svc.cluster.local md5 {%- endfor %} diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 3386fb3cae..2b7e7d2f03 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -128,6 +128,9 @@ def test_on_postgresql_pebble_ready(harness): patch( "charm.Patroni.primary_endpoint_ready", new_callable=PropertyMock ) as _primary_endpoint_ready, + patch( + "charm.PostgresqlOperatorCharm.enable_disable_extensions" + ) as _enable_disable_extensions, patch("charm.PostgresqlOperatorCharm.update_config"), patch("charm.PostgresqlOperatorCharm.postgresql") as _postgresql, patch( @@ -1208,6 +1211,9 @@ def test_handle_processes_failures(harness): with ( patch("charm.Patroni.reinitialize_postgresql") as _reinitialize_postgresql, patch("charm.Patroni.member_streaming", new_callable=PropertyMock) as _member_streaming, + patch( + "charm.PostgresqlOperatorCharm.is_standby_leader", new_callable=PropertyMock + ) as _is_standby_leader, patch( "charm.PostgresqlOperatorCharm.is_primary", new_callable=PropertyMock ) as _is_primary, @@ -1273,6 +1279,7 @@ def test_handle_processes_failures(harness): # Test when the unit is a replica and it's not streaming from primary. _restart.reset_mock() _is_primary.return_value = False + _is_standby_leader.return_value = False _member_streaming.return_value = False for values in itertools.product( [None, RetryError(last_attempt=1)], [True, False], [True, False] @@ -1442,6 +1449,9 @@ def test_handle_postgresql_restart_need(harness): def test_set_active_status(harness): with ( patch("charm.Patroni.member_started", new_callable=PropertyMock) as _member_started, + patch( + "charm.PostgresqlOperatorCharm.is_standby_leader", new_callable=PropertyMock + ) as _is_standby_leader, patch("charm.Patroni.get_primary") as _get_primary, ): for values in itertools.product( @@ -1451,26 +1461,42 @@ def test_set_active_status(harness): harness.charm.unit.name, f"{harness.charm.app.name}/2", ], + [ + RetryError(last_attempt=1), + ConnectionError, + True, + False, + ], [True, False], ): harness.charm.unit.status = MaintenanceStatus("fake status") - _member_started.return_value = values[1] + _member_started.return_value = values[2] if isinstance(values[0], str): _get_primary.side_effect = None _get_primary.return_value = values[0] - harness.charm._set_active_status() - tc.assertIsInstance( - harness.charm.unit.status, - ActiveStatus - if values[0] == harness.charm.unit.name or values[1] - else MaintenanceStatus, - ) - tc.assertEqual( - harness.charm.unit.status.message, - "Primary" - if values[0] == harness.charm.unit.name - else ("" if values[1] else "fake status"), - ) + if values[0] != harness.charm.unit.name and not isinstance(values[1], bool): + _is_standby_leader.side_effect = values[1] + _is_standby_leader.return_value = None + harness.charm._set_active_status() + tc.assertIsInstance(harness.charm.unit.status, MaintenanceStatus) + else: + _is_standby_leader.side_effect = None + _is_standby_leader.return_value = values[1] + harness.charm._set_active_status() + tc.assertIsInstance( + harness.charm.unit.status, + ActiveStatus + if values[0] == harness.charm.unit.name or values[1] or values[2] + else MaintenanceStatus, + ) + tc.assertEqual( + harness.charm.unit.status.message, + "Primary" + if values[0] == harness.charm.unit.name + else ( + "Standby Leader" if values[1] else ("" if values[2] else "fake status") + ), + ) else: _get_primary.side_effect = values[0] _get_primary.return_value = None