diff --git a/actions.yaml b/actions.yaml index 44231ad4a9..7364321fa7 100644 --- a/actions.yaml +++ b/actions.yaml @@ -17,6 +17,12 @@ list-backups: description: Lists backups in s3 storage. pre-upgrade-check: description: Run necessary pre-upgrade checks and preparations before executing a charm refresh. +promote-cluster: + description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit. + params: + force-promotion: + type: boolean + description: Force the promotion of a cluster when there is already a primary cluster. restore: description: Restore a database backup using pgBackRest. S3 credentials are retrieved from a relation with the S3 integrator charm. diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index 574e157780..8783f76814 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -18,6 +18,7 @@ Any charm using this library should import the `psycopg2` or `psycopg2-binary` dependency. """ + import logging from collections import OrderedDict from typing import Dict, List, Optional, Set, Tuple @@ -35,7 +36,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 24 +LIBPATCH = 26 INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles" @@ -358,9 +359,7 @@ def _generate_database_privileges_statements( statements.append( """UPDATE pg_catalog.pg_largeobject_metadata SET lomowner = (SELECT oid FROM pg_roles WHERE rolname = '{}') -WHERE lomowner = (SELECT oid FROM pg_roles WHERE rolname = '{}');""".format( - user, self.user - ) +WHERE lomowner = (SELECT oid FROM pg_roles WHERE rolname = '{}');""".format(user, self.user) ) else: for schema in schemas: @@ -477,11 +476,11 @@ def set_up_database(self) -> None: """Set up postgres database with the right permissions.""" connection = None try: - self.create_user( - "admin", - extra_user_roles="pg_read_all_data,pg_write_all_data", - ) with self._connect_to_database() as connection, connection.cursor() as cursor: + cursor.execute("SELECT TRUE FROM pg_roles WHERE rolname='admin';") + if cursor.fetchone() is not None: + return + # Allow access to the postgres database only to the system users. cursor.execute("REVOKE ALL PRIVILEGES ON DATABASE postgres FROM PUBLIC;") cursor.execute("REVOKE CREATE ON SCHEMA public FROM PUBLIC;") @@ -491,6 +490,10 @@ def set_up_database(self) -> None: sql.Identifier(user) ) ) + self.create_user( + "admin", + extra_user_roles="pg_read_all_data,pg_write_all_data", + ) cursor.execute("GRANT CONNECT ON DATABASE postgres TO admin;") except psycopg2.Error as e: logger.error(f"Failed to set up databases: {e}") @@ -562,18 +565,16 @@ def build_postgresql_parameters( parameters = {} for config, value in config_options.items(): # Filter config option not related to PostgreSQL parameters. - if not config.startswith( - ( - "durability", - "instance", - "logging", - "memory", - "optimizer", - "request", - "response", - "vacuum", - ) - ): + if not config.startswith(( + "durability", + "instance", + "logging", + "memory", + "optimizer", + "request", + "response", + "vacuum", + )): continue parameter = "_".join(config.split("_")[1:]) if parameter in ["date_style", "time_zone"]: @@ -594,8 +595,8 @@ def build_postgresql_parameters( # and the remaining as cache memory. shared_buffers = int(available_memory * 0.25) effective_cache_size = int(available_memory - shared_buffers) - parameters.setdefault("shared_buffers", f"{int(shared_buffers/10**6)}MB") - parameters.update({"effective_cache_size": f"{int(effective_cache_size/10**6)}MB"}) + parameters.setdefault("shared_buffers", f"{int(shared_buffers / 10**6)}MB") + parameters.update({"effective_cache_size": f"{int(effective_cache_size / 10**6)}MB"}) else: # Return default parameters.setdefault("shared_buffers", "128MB") diff --git a/metadata.yaml b/metadata.yaml index b4bd1a4331..cd29daf381 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -26,6 +26,10 @@ peers: interface: upgrade provides: + async-primary: + interface: async_replication + limit: 1 + optional: true database: interface: postgresql_client db: @@ -37,6 +41,10 @@ provides: limit: 1 requires: + async-replica: + interface: async_replication + limit: 1 + optional: true certificates: interface: tls-certificates limit: 1 diff --git a/src/charm.py b/src/charm.py index dd9a3b8fca..4eeca6aba6 100755 --- a/src/charm.py +++ b/src/charm.py @@ -86,6 +86,7 @@ USER, USER_PASSWORD_KEY, ) +from relations.async_replication import PostgreSQLAsyncReplication from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides from relations.postgresql_provider import PostgreSQLProvider from upgrade import PostgreSQLUpgrade, get_postgresql_dependencies_model @@ -166,6 +167,7 @@ def __init__(self, *args): self.legacy_db_admin_relation = DbProvides(self, admin=True) self.backup = PostgreSQLBackups(self, "s3-parameters") self.tls = PostgreSQLTLS(self, PEER) + self.async_replication = PostgreSQLAsyncReplication(self) self.restart_manager = RollingOpsManager( charm=self, relation="restart", callback=self._restart ) @@ -321,6 +323,8 @@ def primary_endpoint(self) -> Optional[str]: for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): with attempt: primary = self._patroni.get_primary() + if primary is None and (standby_leader := self._patroni.get_standby_leader()): + primary = standby_leader primary_endpoint = self._patroni.get_member_ip(primary) # Force a retry if there is no primary or the member that was # returned is not in the list of the current cluster members @@ -420,6 +424,9 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: self.unit.status = WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE) return + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + def _on_pgdata_storage_detaching(self, _) -> None: # Change the primary if it's the unit that is being removed. try: @@ -513,9 +520,13 @@ def _on_peer_relation_changed(self, event: HookEvent): # Restart the workload if it's stuck on the starting state after a timeline divergence # due to a backup that was restored. - if not self.is_primary and ( - self._patroni.member_replication_lag == "unknown" - or int(self._patroni.member_replication_lag) > 1000 + if ( + not self.is_primary + and not self.is_standby_leader + and ( + self._patroni.member_replication_lag == "unknown" + or int(self._patroni.member_replication_lag) > 1000 + ) ): self._patroni.reinitialize_postgresql() logger.debug("Deferring on_peer_relation_changed: reinitialising replica") @@ -551,8 +562,7 @@ def _update_new_unit_status(self) -> None: # a failed switchover, so wait until the primary is elected. if self.primary_endpoint: self._update_relation_endpoints() - if not self.is_blocked: - self.unit.status = ActiveStatus() + self.async_replication.handle_read_only_mode() else: self.unit.status = WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE) @@ -688,6 +698,7 @@ def _hosts(self) -> set: def _patroni(self) -> Patroni: """Returns an instance of the Patroni object.""" return Patroni( + self, self._unit_ip, self.cluster_name, self._member_name, @@ -704,6 +715,11 @@ def is_primary(self) -> bool: """Return whether this unit is the primary instance.""" return self.unit.name == self._patroni.get_primary(unit_name_pattern=True) + @property + def is_standby_leader(self) -> bool: + """Return whether this unit is the standby leader instance.""" + return self.unit.name == self._patroni.get_standby_leader(unit_name_pattern=True) + @property def is_tls_enabled(self) -> bool: """Return whether TLS is enabled.""" @@ -902,6 +918,9 @@ def _on_config_changed(self, event) -> None: if self.is_blocked and "Configuration Error" in self.unit.status.message: self.unit.status = ActiveStatus() + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + if not self.unit.is_leader(): return @@ -929,6 +948,9 @@ def enable_disable_extensions(self, database: str = None) -> None: Args: database: optional database where to enable/disable the extension. """ + if self._patroni.get_primary() is None: + logger.debug("Early exit enable_disable_extensions: standby cluster") + return spi_module = ["refint", "autoinc", "insert_username", "moddatetime"] plugins_exception = {"uuid_ossp": '"uuid-ossp"'} original_status = self.unit.status @@ -1188,6 +1210,9 @@ def _on_set_password(self, event: ActionEvent) -> None: # Other units Patroni configuration will be reloaded in the peer relation changed event. self.update_config() + # Update the password in the async replication data. + self.async_replication.update_async_replication_data() + event.set_results({"password": password}) def _on_update_status(self, _) -> None: @@ -1225,6 +1250,9 @@ def _on_update_status(self, _) -> None: if self._handle_workload_failures(): return + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + self._set_primary_status_message() # Restart topology observer if it is gone @@ -1270,8 +1298,16 @@ def _handle_workload_failures(self) -> bool: a bool indicating whether the charm performed any action. """ # Restart the workload if it's stuck on the starting state after a restart. + try: + is_primary = self.is_primary + is_standby_leader = self.is_standby_leader + except RetryError: + return False + if ( - not self._patroni.member_started + not is_primary + and not is_standby_leader + and not self._patroni.member_started and "postgresql_restarted" in self._peers.data[self.unit] and self._patroni.member_replication_lag == "unknown" ): @@ -1291,6 +1327,8 @@ def _set_primary_status_message(self) -> None: try: if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name: self.unit.status = ActiveStatus("Primary") + elif self.is_standby_leader: + self.unit.status = ActiveStatus("Standby Leader") elif self._patroni.member_started: self.unit.status = ActiveStatus() except (RetryError, ConnectionError) as e: diff --git a/src/cluster.py b/src/cluster.py index ea234f5f95..4b08ff5a58 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -47,10 +47,22 @@ RUNNING_STATES = ["running", "streaming"] +class ClusterNotPromotedError(Exception): + """Raised when a cluster is not promoted.""" + + class NotReadyError(Exception): """Raised when not all cluster members healthy or finished initial sync.""" +class EndpointNotReadyError(Exception): + """Raised when an endpoint is not ready.""" + + +class StandbyClusterAlreadyPromotedError(Exception): + """Raised when a standby cluster is already promoted.""" + + class RemoveRaftMemberFailedError(Exception): """Raised when a remove raft member failed for some reason.""" @@ -68,6 +80,7 @@ class Patroni: def __init__( self, + charm, unit_ip: str, cluster_name: str, member_name: str, @@ -81,6 +94,7 @@ def __init__( """Initialize the Patroni class. Args: + charm: PostgreSQL charm instance. unit_ip: IP address of the current unit cluster_name: name of the cluster member_name: name of the member inside the cluster @@ -91,6 +105,7 @@ def __init__( rewind_password: password for the user used on rewinds tls_enabled: whether TLS is enabled """ + self.charm = charm self.unit_ip = unit_ip self.cluster_name = cluster_name self.member_name = member_name @@ -241,6 +256,38 @@ def get_primary(self, unit_name_pattern=False) -> str: primary = "/".join(primary.rsplit("-", 1)) return primary + def get_standby_leader( + self, unit_name_pattern=False, check_whether_is_running: bool = False + ) -> Optional[str]: + """Get standby leader instance. + + Args: + unit_name_pattern: whether to convert pod name to unit name + check_whether_is_running: whether to check if the standby leader is running + + Returns: + standby leader pod or unit name. + """ + # Request info from cluster endpoint (which returns all members of the cluster). + for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)): + with attempt: + url = self._get_alternative_patroni_url(attempt) + cluster_status = requests.get( + f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}", + verify=self.verify, + timeout=API_REQUEST_TIMEOUT, + ) + for member in cluster_status.json()["members"]: + if member["role"] == "standby_leader": + if check_whether_is_running and member["state"] not in RUNNING_STATES: + logger.warning(f"standby leader {member['name']} is not running") + continue + standby_leader = member["name"] + if unit_name_pattern: + # Change the last dash to / in order to match unit name pattern. + standby_leader = "/".join(standby_leader.rsplit("-", 1)) + return standby_leader + def get_sync_standby_names(self) -> List[str]: """Get the list of sync standby unit names.""" sync_standbys = [] @@ -296,16 +343,19 @@ def are_all_members_ready(self) -> bool: except RetryError: return False - # Check if all members are running and one of them is a leader (primary), - # because sometimes there may exist (for some period of time) only - # replicas after a failed switchover. + # Check if all members are running and one of them is a leader (primary) or + # a standby leader, because sometimes there may exist (for some period of time) + # only replicas after a failed switchover. return all( member["state"] in RUNNING_STATES for member in cluster_status.json()["members"] - ) and any(member["role"] == "leader" for member in cluster_status.json()["members"]) + ) and any( + member["role"] in ["leader", "standby_leader"] + for member in cluster_status.json()["members"] + ) def get_patroni_health(self) -> Dict[str, str]: """Gets, retires and parses the Patroni health endpoint.""" - for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + for attempt in Retrying(stop=stop_after_delay(90), wait=wait_fixed(3)): with attempt: r = requests.get( f"{self._patroni_url}/health", @@ -423,6 +473,19 @@ def is_member_isolated(self) -> bool: return len(cluster_status.json()["members"]) == 0 + def promote_standby_cluster(self) -> None: + """Promote a standby cluster to be a regular cluster.""" + config_response = requests.get(f"{self._patroni_url}/config", verify=self.verify) + if "standby_cluster" not in config_response.json(): + raise StandbyClusterAlreadyPromotedError("standby cluster is already promoted") + requests.patch( + f"{self._patroni_url}/config", verify=self.verify, json={"standby_cluster": None} + ) + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + if self.get_primary() is None: + raise ClusterNotPromotedError("cluster not promoted") + def render_file(self, path: str, content: str, mode: int) -> None: """Write a content rendered from a template to a file. @@ -475,6 +538,7 @@ def render_patroni_yml_file( data_path=POSTGRESQL_DATA_PATH, enable_tls=enable_tls, member_name=self.member_name, + partner_addrs=self.charm.async_replication.get_partner_addresses(), peers_ips=self.peers_ips, pgbackrest_configuration_file=PGBACKREST_CONFIGURATION_FILE, scope=self.cluster_name, @@ -492,6 +556,8 @@ def render_patroni_yml_file( version=self.get_postgresql_version().split(".")[0], minority_count=self.planned_units // 2, pg_parameters=parameters, + primary_cluster_endpoint=self.charm.async_replication.get_primary_cluster_endpoint(), + extra_replication_endpoints=self.charm.async_replication.get_standby_endpoints(), ) self.render_file(f"{PATRONI_CONF_PATH}/patroni.yaml", rendered, 0o600) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py new file mode 100644 index 0000000000..1e4df96c88 --- /dev/null +++ b/src/relations/async_replication.py @@ -0,0 +1,697 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Async Replication implementation. + +The highest "promoted-cluster-counter" value is used to determine the primary cluster. +The application in any side of the relation which has the highest value in its application +relation databag is considered the primary cluster. + +The "unit-promoted-cluster-counter" field in the unit relation databag is used to determine +if the unit is following the promoted cluster. If the value is the same as the highest value +in the application relation databag, then the unit is following the promoted cluster. +Otherwise, it's needed to restart the database in the unit to follow the promoted cluster +if the unit is from the standby cluster (the one that was not promoted). +""" + +import json +import logging +import os +import pwd +import shutil +import subprocess +from datetime import datetime +from pathlib import Path +from subprocess import PIPE, run +from typing import List, Optional, Tuple + +from ops import ( + ActionEvent, + Application, + BlockedStatus, + MaintenanceStatus, + Object, + Relation, + RelationChangedEvent, + RelationDepartedEvent, + Secret, + SecretNotFoundError, + WaitingStatus, +) +from tenacity import RetryError, Retrying, stop_after_attempt, stop_after_delay, wait_fixed + +from cluster import ClusterNotPromotedError, NotReadyError, StandbyClusterAlreadyPromotedError +from constants import ( + APP_SCOPE, + PATRONI_CONF_PATH, + PEER, + POSTGRESQL_DATA_PATH, +) + +logger = logging.getLogger(__name__) + + +ASYNC_PRIMARY_RELATION = "async-primary" +ASYNC_REPLICA_RELATION = "async-replica" +READ_ONLY_MODE_BLOCKING_MESSAGE = "Cluster in read-only mode" + + +class PostgreSQLAsyncReplication(Object): + """Defines the async-replication management logic.""" + + def __init__(self, charm): + """Constructor.""" + super().__init__(charm, "postgresql") + self.charm = charm + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_joined, self._on_async_relation_joined + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_joined, self._on_async_relation_joined + ) + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_changed, self._on_async_relation_changed + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_async_relation_changed + ) + + # Departure events + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_departed, + self._on_async_relation_departed, + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_departed, + self._on_async_relation_departed, + ) + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_broken, self._on_async_relation_broken + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_broken, self._on_async_relation_broken + ) + + # Actions + self.framework.observe(self.charm.on.promote_cluster_action, self._on_promote_cluster) + + def _can_promote_cluster(self, event: ActionEvent) -> bool: + """Check if the cluster can be promoted.""" + if not self.charm.is_cluster_initialised: + event.fail("Cluster not initialised yet.") + return False + + # Check if there is a relation. If not, see if there is a standby leader. If so promote it to leader. If not, + # fail the action telling that there is no relation and no standby leader. + relation = self._relation + if relation is None: + standby_leader = self.charm._patroni.get_standby_leader() + if standby_leader is not None: + try: + self.charm._patroni.promote_standby_cluster() + if ( + self.charm.is_blocked + and self.charm.unit.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE + ): + self.charm._peers.data[self.charm.app].update({ + "promoted-cluster-counter": "" + }) + self.charm._set_primary_status_message() + except (StandbyClusterAlreadyPromotedError, ClusterNotPromotedError) as e: + event.fail(str(e)) + return False + event.fail("No relation and no standby leader found.") + return False + + # Check if this cluster is already the primary cluster. If so, fail the action telling that it's already + # the primary cluster. + primary_cluster = self._get_primary_cluster() + if self.charm.app == primary_cluster: + event.fail("This cluster is already the primary cluster.") + return False + + # To promote the other cluster if there is already a primary cluster, the action must be called with + # `force-promotion=true`. If not, fail the action telling that the other cluster is already the primary. + if relation.app == primary_cluster: + if not event.params.get("force-promotion"): + event.fail( + f"{relation.app.name} is already the primary cluster. Pass `force-promotion=true` to promote anyway." + ) + return False + else: + logger.warning( + "%s is already the primary cluster. Forcing promotion of %s to primary cluster due to `force-promotion=true`.", + relation.app.name, + self.charm.app.name, + ) + + return True + + def _configure_primary_cluster( + self, primary_cluster: Application, event: RelationChangedEvent + ) -> bool: + """Configure the primary cluster.""" + if self.charm.app == primary_cluster: + self.charm.update_config() + if self._is_primary_cluster() and self.charm.unit.is_leader(): + self._update_primary_cluster_data() + # If this is a standby cluster, remove the information from DCS to make it + # a normal cluster. + if self.charm._patroni.get_standby_leader() is not None: + self.charm._patroni.promote_standby_cluster() + try: + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + if not self.charm.is_primary: + raise ClusterNotPromotedError() + except RetryError: + logger.debug( + "Deferring on_async_relation_changed: standby cluster not promoted yet." + ) + event.defer() + return True + self.charm._peers.data[self.charm.unit].update({ + "unit-promoted-cluster-counter": self._get_highest_promoted_cluster_counter_value() + }) + self.charm._set_primary_status_message() + return True + return False + + def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: + """Configure the standby cluster.""" + relation = self._relation + if relation.name == ASYNC_REPLICA_RELATION: + # Update the secrets between the clusters. + primary_cluster_info = relation.data[relation.app].get("primary-cluster-data") + secret_id = ( + None + if primary_cluster_info is None + else json.loads(primary_cluster_info).get("secret-id") + ) + try: + secret = self.charm.model.get_secret(id=secret_id, label=self._secret_label) + except SecretNotFoundError: + logger.debug("Secret not found, deferring event") + event.defer() + return False + credentials = secret.peek_content() + for key, password in credentials.items(): + user = key.split("-password")[0] + self.charm.set_secret(APP_SCOPE, key, password) + logger.debug("Synced %s password", user) + system_identifier, error = self.get_system_identifier() + if error is not None: + raise Exception(error) + if system_identifier != relation.data[relation.app].get("system-id"): + # Store current data in a tar.gz file. + logger.info("Creating backup of pgdata folder") + filename = f"{POSTGRESQL_DATA_PATH}-{str(datetime.now()).replace(' ', '-').replace(':', '-')}.tar.gz" + subprocess.check_call(f"tar -zcf {filename} {POSTGRESQL_DATA_PATH}".split()) + logger.warning("Please review the backup file %s and handle its removal", filename) + return True + + def _get_highest_promoted_cluster_counter_value(self) -> str: + """Return the highest promoted cluster counter.""" + promoted_cluster_counter = "0" + for async_relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ]: + if async_relation is None: + continue + for databag in [ + async_relation.data[async_relation.app], + self.charm._peers.data[self.charm.app], + ]: + relation_promoted_cluster_counter = databag.get("promoted-cluster-counter", "0") + if int(relation_promoted_cluster_counter) > int(promoted_cluster_counter): + promoted_cluster_counter = relation_promoted_cluster_counter + return promoted_cluster_counter + + def get_partner_addresses(self) -> List[str]: + """Return the partner addresses.""" + primary_cluster = self._get_primary_cluster() + if ( + primary_cluster is None + or self.charm.app == primary_cluster + or not self.charm.unit.is_leader() + or self.charm._peers.data[self.charm.unit].get("unit-promoted-cluster-counter") + == self._get_highest_promoted_cluster_counter_value() + ): + logger.debug(f"Partner addresses: {self.charm._peer_members_ips}") + return self.charm._peer_members_ips + + logger.debug("Partner addresses: []") + return [] + + def _get_primary_cluster(self) -> Optional[Application]: + """Return the primary cluster.""" + primary_cluster = None + promoted_cluster_counter = "0" + for async_relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ]: + if async_relation is None: + continue + for app, relation_data in { + async_relation.app: async_relation.data, + self.charm.app: self.charm._peers.data, + }.items(): + databag = relation_data[app] + relation_promoted_cluster_counter = databag.get("promoted-cluster-counter", "0") + if relation_promoted_cluster_counter > promoted_cluster_counter: + promoted_cluster_counter = relation_promoted_cluster_counter + primary_cluster = app + return primary_cluster + + def get_primary_cluster_endpoint(self) -> Optional[str]: + """Return the primary cluster endpoint.""" + primary_cluster = self._get_primary_cluster() + if primary_cluster is None or self.charm.app == primary_cluster: + return None + relation = self._relation + primary_cluster_data = relation.data[relation.app].get("primary-cluster-data") + if primary_cluster_data is None: + return None + return json.loads(primary_cluster_data).get("endpoint") + + def _get_secret(self) -> Secret: + """Return async replication necessary secrets.""" + try: + # Avoid recreating the secret. + secret = self.charm.model.get_secret(label=self._secret_label) + if not secret.id: + # Workaround for the secret id not being set with model uuid. + secret._id = f"secret://{self.model.uuid}/{secret.get_info().id.split(':')[1]}" + return secret + except SecretNotFoundError: + logger.debug("Secret not found, creating a new one") + pass + + app_secret = self.charm.model.get_secret(label=f"{PEER}.{self.model.app.name}.app") + content = app_secret.peek_content() + + # Filter out unnecessary secrets. + shared_content = dict(filter(lambda x: "password" in x[0], content.items())) + + return self.charm.model.app.add_secret(content=shared_content, label=self._secret_label) + + def get_standby_endpoints(self) -> List[str]: + """Return the standby endpoints.""" + relation = self._relation + primary_cluster = self._get_primary_cluster() + # List the standby endpoints only for the primary cluster. + if relation is None or primary_cluster is None or self.charm.app != primary_cluster: + return [] + return [ + relation.data[unit].get("unit-address") + for relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ] + if relation is not None + for unit in relation.units + if relation.data[unit].get("unit-address") is not None + ] + + def get_system_identifier(self) -> Tuple[Optional[str], Optional[str]]: + """Returns the PostgreSQL system identifier from this instance.""" + + def demote(): + pw_record = pwd.getpwnam("snap_daemon") + + def result(): + os.setgid(pw_record.pw_gid) + os.setuid(pw_record.pw_uid) + + return result + + process = run( + [ + f'/snap/charmed-postgresql/current/usr/lib/postgresql/{self.charm._patroni.get_postgresql_version().split(".")[0]}/bin/pg_controldata', + POSTGRESQL_DATA_PATH, + ], + stdout=PIPE, + stderr=PIPE, + preexec_fn=demote(), + ) + if process.returncode != 0: + return None, process.stderr.decode() + system_identifier = [ + line + for line in process.stdout.decode().splitlines() + if "Database system identifier" in line + ][0].split(" ")[-1] + return system_identifier, None + + def _handle_database_start(self, event: RelationChangedEvent) -> None: + """Handle the database start in the standby cluster.""" + try: + if self.charm._patroni.member_started: + # If the database is started, update the databag in a way the unit is marked as configured + # for async replication. + self.charm._peers.data[self.charm.unit].update({"stopped": ""}) + self.charm._peers.data[self.charm.unit].update({ + "unit-promoted-cluster-counter": self._get_highest_promoted_cluster_counter_value() + }) + + if self.charm.unit.is_leader(): + # If this unit is the leader, check if all units are ready before making the cluster + # active again (including the health checks from the update status hook). + self.charm.update_config() + if all( + self.charm._peers.data[unit].get("unit-promoted-cluster-counter") + == self._get_highest_promoted_cluster_counter_value() + for unit in {*self.charm._peers.units, self.charm.unit} + ): + self.charm._peers.data[self.charm.app].update({ + "cluster_initialised": "True" + }) + elif self._is_following_promoted_cluster(): + self.charm.unit.status = WaitingStatus( + "Waiting for the database to be started in all units" + ) + event.defer() + return + + self.charm._set_primary_status_message() + elif not self.charm.unit.is_leader(): + try: + self.charm._patroni.reload_patroni_configuration() + except RetryError: + pass + raise NotReadyError() + else: + self.charm.unit.status = WaitingStatus( + "Still starting the database in the standby leader" + ) + event.defer() + except NotReadyError: + self.charm.unit.status = WaitingStatus("Waiting for the database to start") + logger.debug("Deferring on_async_relation_changed: database hasn't started yet.") + event.defer() + + def handle_read_only_mode(self) -> None: + """Handle read-only mode (standby cluster that lost the relation with the primary cluster).""" + promoted_cluster_counter = self.charm._peers.data[self.charm.app].get( + "promoted-cluster-counter", "" + ) + if not self.charm.is_blocked or ( + promoted_cluster_counter != "0" + and self.charm.unit.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE + ): + self.charm._set_primary_status_message() + if ( + promoted_cluster_counter == "0" + and self.charm.unit.status.message != READ_ONLY_MODE_BLOCKING_MESSAGE + ): + self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + + def _is_following_promoted_cluster(self) -> bool: + """Return True if this unit is following the promoted cluster.""" + if self._get_primary_cluster() is None: + return False + return ( + self.charm._peers.data[self.charm.unit].get("unit-promoted-cluster-counter") + == self._get_highest_promoted_cluster_counter_value() + ) + + def _is_primary_cluster(self) -> bool: + """Return the primary cluster name.""" + return self.charm.app == self._get_primary_cluster() + + def _on_async_relation_broken(self, _) -> None: + if "departing" in self.charm._peers.data[self.charm.unit]: + logger.debug("Early exit on_async_relation_broken: Skipping departing unit.") + return + + self.charm._peers.data[self.charm.unit].update({ + "stopped": "", + "unit-promoted-cluster-counter": "", + }) + + # If this is the standby cluster, set 0 in the "promoted-cluster-counter" field to set + # the cluster in read-only mode message also in the other units. + if self.charm._patroni.get_standby_leader() is not None: + if self.charm.unit.is_leader(): + self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": "0"}) + self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + else: + if self.charm.unit.is_leader(): + self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": ""}) + self.charm.update_config() + + def _on_async_relation_changed(self, event: RelationChangedEvent) -> None: + """Update the Patroni configuration if one of the clusters was already promoted.""" + primary_cluster = self._get_primary_cluster() + logger.debug("Primary cluster: %s", primary_cluster) + if primary_cluster is None: + logger.debug("Early exit on_async_relation_changed: No primary cluster found.") + return + + if self._configure_primary_cluster(primary_cluster, event): + return + + # Return if this is a new unit. + if not self.charm.unit.is_leader() and self._is_following_promoted_cluster(): + logger.debug("Early exit on_async_relation_changed: following promoted cluster.") + return + + if not self._stop_database(event): + return + + if not all( + "stopped" in self.charm._peers.data[unit] + or self.charm._peers.data[unit].get("unit-promoted-cluster-counter") + == self._get_highest_promoted_cluster_counter_value() + for unit in {*self.charm._peers.units, self.charm.unit} + ): + self.charm.unit.status = WaitingStatus( + "Waiting for the database to be stopped in all units" + ) + logger.debug("Deferring on_async_relation_changed: not all units stopped.") + event.defer() + return + + if self._wait_for_standby_leader(event): + return + + # Update the asynchronous replication configuration and start the database. + self.charm.update_config() + if not self.charm._patroni.start_patroni(): + raise Exception("Failed to start patroni service.") + + self._handle_database_start(event) + + def _on_async_relation_departed(self, event: RelationDepartedEvent) -> None: + """Set a flag to avoid setting a wrong status message on relation broken event handler.""" + # This is needed because of https://bugs.launchpad.net/juju/+bug/1979811. + if event.departing_unit == self.charm.unit: + self.charm._peers.data[self.charm.unit].update({"departing": "True"}) + + def _on_async_relation_joined(self, _) -> None: + """Publish this unit address in the relation data.""" + self._relation.data[self.charm.unit].update({"unit-address": self.charm._unit_ip}) + + # Set the counter for new units. + highest_promoted_cluster_counter = self._get_highest_promoted_cluster_counter_value() + if highest_promoted_cluster_counter != "0": + self.charm._peers.data[self.charm.unit].update({ + "unit-promoted-cluster-counter": highest_promoted_cluster_counter + }) + + def _on_promote_cluster(self, event: ActionEvent) -> None: + """Promote this cluster to the primary cluster.""" + if not self._can_promote_cluster(event): + return + + relation = self._relation + + # Check if all units from the other cluster published their pod IPs in the relation data. + # If not, fail the action telling that all units must publish their pod addresses in the + # relation data. + for unit in relation.units: + if "unit-address" not in relation.data[unit]: + event.fail( + "All units from the other cluster must publish their pod addresses in the relation data." + ) + return + + system_identifier, error = self.get_system_identifier() + if error is not None: + logger.exception(error) + event.fail("Failed to get system identifier") + return + + # Increment the current cluster counter in this application side based on the highest counter value. + promoted_cluster_counter = int(self._get_highest_promoted_cluster_counter_value()) + promoted_cluster_counter += 1 + logger.debug("Promoted cluster counter: %s", promoted_cluster_counter) + + self._update_primary_cluster_data(promoted_cluster_counter, system_identifier) + + # Emit an async replication changed event for this unit (to promote this cluster before demoting the + # other if this one is a standby cluster, which is needed to correctly setup the async replication + # when performing a switchover). + self._re_emit_async_relation_changed_event() + + # Set the status. + self.charm.unit.status = MaintenanceStatus("Promoting cluster...") + + @property + def _primary_cluster_endpoint(self) -> str: + """Return the endpoint from one of the sync-standbys, or from the primary if there is no sync-standby.""" + sync_standby_names = self.charm._patroni.get_sync_standby_names() + if len(sync_standby_names) > 0: + unit = self.model.get_unit(sync_standby_names[0]) + return self.charm._get_unit_ip(unit) + else: + return self.charm._get_unit_ip(self.charm.unit) + + def _re_emit_async_relation_changed_event(self) -> None: + """Re-emit the async relation changed event.""" + relation = self._relation + getattr(self.charm.on, f'{relation.name.replace("-", "_")}_relation_changed').emit( + relation, + app=relation.app, + unit=[unit for unit in relation.units if unit.app == relation.app][0], + ) + + def _reinitialise_pgdata(self) -> None: + """Reinitialise the pgdata folder.""" + try: + path = Path(POSTGRESQL_DATA_PATH) + if path.exists() and path.is_dir(): + shutil.rmtree(path) + except OSError as e: + raise Exception( + f"Failed to remove contents of the data directory with error: {str(e)}" + ) + os.mkdir(POSTGRESQL_DATA_PATH) + os.chmod(POSTGRESQL_DATA_PATH, 0o750) + self.charm._patroni._change_owner(POSTGRESQL_DATA_PATH) + + @property + def _relation(self) -> Relation: + """Return the relation object.""" + for relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ]: + if relation is not None: + return relation + + @property + def _secret_label(self) -> str: + """Return the secret label.""" + return f"async-replication-secret-{self._get_highest_promoted_cluster_counter_value()}" + + def _stop_database(self, event: RelationChangedEvent) -> bool: + """Stop the database.""" + if ( + "stopped" not in self.charm._peers.data[self.charm.unit] + and not self._is_following_promoted_cluster() + ): + if not self.charm.unit.is_leader() and not os.path.exists(POSTGRESQL_DATA_PATH): + logger.debug("Early exit on_async_relation_changed: following promoted cluster.") + return False + + try: + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): + with attempt: + if not self.charm._patroni.stop_patroni(): + raise Exception("Failed to stop patroni service.") + except RetryError: + logger.debug("Deferring on_async_relation_changed: patroni hasn't stopped yet.") + event.defer() + return False + + if self.charm.unit.is_leader(): + # Remove the "cluster_initialised" flag to avoid self-healing in the update status hook. + self.charm._peers.data[self.charm.app].update({"cluster_initialised": ""}) + if not self._configure_standby_cluster(event): + return False + + # Remove and recreate the pgdata folder to enable replication of the data from the + # primary cluster. + logger.info("Removing and recreating pgdata folder") + self._reinitialise_pgdata() + + # Remove previous cluster information to make it possible to initialise a new cluster. + logger.info("Removing previous cluster information") + try: + path = Path(f"{PATRONI_CONF_PATH}/raft") + if path.exists() and path.is_dir(): + shutil.rmtree(path) + except OSError as e: + raise Exception( + f"Failed to remove previous cluster information with error: {str(e)}" + ) + + self.charm._peers.data[self.charm.unit].update({"stopped": "True"}) + + return True + + def update_async_replication_data(self) -> None: + """Updates the async-replication data, if the unit is the leader. + + This is used to update the standby units with the new primary information. + """ + relation = self._relation + if relation is None: + return + relation.data[self.charm.unit].update({"unit-address": self.charm._unit_ip}) + if self._is_primary_cluster() and self.charm.unit.is_leader(): + self._update_primary_cluster_data() + + def _update_primary_cluster_data( + self, promoted_cluster_counter: int = None, system_identifier: str = None + ) -> None: + """Update the primary cluster data.""" + async_relation = self._relation + + if promoted_cluster_counter is not None: + for relation in [async_relation, self.charm._peers]: + relation.data[self.charm.app].update({ + "promoted-cluster-counter": str(promoted_cluster_counter) + }) + + # Update the data in the relation. + primary_cluster_data = {"endpoint": self._primary_cluster_endpoint} + + # Retrieve the secrets that will be shared between the clusters. + if async_relation.name == ASYNC_PRIMARY_RELATION: + secret = self._get_secret() + secret.grant(async_relation) + primary_cluster_data["secret-id"] = secret.id + + if system_identifier is not None: + primary_cluster_data["system-id"] = system_identifier + + async_relation.data[self.charm.app]["primary-cluster-data"] = json.dumps( + primary_cluster_data + ) + + def _wait_for_standby_leader(self, event: RelationChangedEvent) -> bool: + """Wait for the standby leader to be up and running.""" + try: + standby_leader = self.charm._patroni.get_standby_leader(check_whether_is_running=True) + except RetryError: + standby_leader = None + if not self.charm.unit.is_leader() and standby_leader is None: + if self.charm._patroni.is_member_isolated: + self.charm._patroni.restart_patroni() + self.charm.unit.status = WaitingStatus("Restarting Patroni to rejoin the cluster") + logger.debug( + "Deferring on_async_relation_changed: restarting Patroni to rejoin the cluster." + ) + event.defer() + return True + self.charm.unit.status = WaitingStatus( + "Waiting for the standby leader start the database" + ) + logger.debug("Deferring on_async_relation_changed: standby leader hasn't started yet.") + event.defer() + return True + return False diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index 0732ec7156..1a12292807 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -37,11 +37,11 @@ ctl: raft: data_dir: {{ conf_path }}/raft self_addr: '{{ self_ip }}:2222' - {% if peers_ips -%} + {% if partner_addrs -%} partner_addrs: {% endif -%} - {% for peer_ip in peers_ips -%} - - {{ peer_ip }}:2222 + {% for partner_addr in partner_addrs -%} + - {{ partner_addr }}:2222 {% endfor %} bootstrap: @@ -103,6 +103,11 @@ bootstrap: command: pgbackrest {{ pgbackrest_configuration_file }} --stanza={{ restore_stanza }} --pg1-path={{ data_path }} --set={{ backup_id }} --type=immediate --target-action=promote restore no_params: True keep_existing_recovery_conf: True + {% elif primary_cluster_endpoint %} + standby_cluster: + host: {{ primary_cluster_endpoint }} + port: 5432 + create_replica_methods: ["basebackup"] {% else %} initdb: - encoding: UTF8 @@ -146,6 +151,9 @@ postgresql: {%- endif %} - {{ 'hostssl' if enable_tls else 'host' }} all all 0.0.0.0/0 md5 # Allow replications connections from other cluster members. + {%- for endpoint in extra_replication_endpoints %} + - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ endpoint }}/32 md5 + {%- endfor %} {%- for peer_ip in peers_ips %} - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ peer_ip }}/0 md5 {% endfor %} diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 456784f0fb..415048e6ad 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -320,9 +320,11 @@ def test_on_config_changed(harness): def test_check_extension_dependencies(harness): - with patch("subprocess.check_output", return_value=b"C"), patch.object( - PostgresqlOperatorCharm, "postgresql", Mock() - ): + with patch("charm.Patroni.get_primary") as _get_primary, patch( + "subprocess.check_output", return_value=b"C" + ), patch.object(PostgresqlOperatorCharm, "postgresql", Mock()): + _get_primary.return_value = harness.charm.unit + # Test when plugins dependencies exception is not caused config = { "plugin_address_standardizer_enable": False, @@ -348,9 +350,13 @@ def test_check_extension_dependencies(harness): def test_enable_disable_extensions(harness, caplog): - with patch("subprocess.check_output", return_value=b"C"), patch.object( - PostgresqlOperatorCharm, "postgresql", Mock() - ) as postgresql_mock: + with patch("charm.Patroni.get_primary") as _get_primary, patch( + "charm.PostgresqlOperatorCharm._unit_ip" + ), patch("charm.PostgresqlOperatorCharm._patroni"), patch( + "subprocess.check_output", return_value=b"C" + ), patch.object(PostgresqlOperatorCharm, "postgresql", Mock()) as postgresql_mock: + _get_primary.return_value = harness.charm.unit + # Test when all extensions install/uninstall succeed. postgresql_mock.enable_disable_extension.side_effect = None with caplog.at_level(logging.ERROR): @@ -835,6 +841,12 @@ def test_on_update_status(harness): "charm.Patroni.member_replication_lag", new_callable=PropertyMock ) as _member_replication_lag, patch("charm.Patroni.member_started", new_callable=PropertyMock) as _member_started, + patch( + "charm.PostgresqlOperatorCharm.is_standby_leader", new_callable=PropertyMock + ) as _is_standby_leader, + patch( + "charm.PostgresqlOperatorCharm.is_primary", new_callable=PropertyMock + ) as _is_primary, patch( "charm.PostgresqlOperatorCharm._update_relation_endpoints" ) as _update_relation_endpoints, @@ -867,6 +879,8 @@ def test_on_update_status(harness): # Test the reinitialisation of the replica when its lag is unknown # after a restart. _set_primary_status_message.reset_mock() + _is_primary.return_value = False + _is_standby_leader.return_value = False _member_started.return_value = False _is_member_isolated.return_value = False _member_replication_lag.return_value = "unknown" @@ -1354,9 +1368,7 @@ def test_validate_config_options(harness): def test_on_peer_relation_changed(harness): with ( patch("charm.snap.SnapCache"), - patch( - "charm.PostgresqlOperatorCharm._update_relation_endpoints" - ) as _update_relation_endpoints, + patch("charm.PostgresqlOperatorCharm._update_new_unit_status") as _update_new_unit_status, patch( "charm.PostgresqlOperatorCharm.primary_endpoint", new_callable=PropertyMock ) as _primary_endpoint, @@ -1369,6 +1381,7 @@ def test_on_peer_relation_changed(harness): patch( "charm.Patroni.member_replication_lag", new_callable=PropertyMock ) as _member_replication_lag, + patch("charm.PostgresqlOperatorCharm.is_standby_leader") as _is_standby_leader, patch("charm.PostgresqlOperatorCharm.is_primary") as _is_primary, patch("charm.Patroni.member_started", new_callable=PropertyMock) as _member_started, patch("charm.Patroni.start_patroni") as _start_patroni, @@ -1417,20 +1430,19 @@ def test_on_peer_relation_changed(harness): _update_member_ip.assert_called_once() _update_config.assert_called_once() _start_patroni.assert_called_once() - _update_relation_endpoints.assert_called_once() - assert isinstance(harness.model.unit.status, ActiveStatus) + _update_new_unit_status.assert_called_once() # Test when the cluster member updates its IP. _update_member_ip.reset_mock() _update_config.reset_mock() _start_patroni.reset_mock() - _update_relation_endpoints.reset_mock() _update_member_ip.return_value = True + _update_new_unit_status.reset_mock() harness.charm._on_peer_relation_changed(mock_event) _update_member_ip.assert_called_once() _update_config.assert_not_called() _start_patroni.assert_not_called() - _update_relation_endpoints.assert_not_called() + _update_new_unit_status.assert_not_called() # Test when the unit fails to update the Patroni configuration. _update_member_ip.return_value = False @@ -1438,7 +1450,7 @@ def test_on_peer_relation_changed(harness): harness.charm._on_peer_relation_changed(mock_event) _update_config.assert_called_once() _start_patroni.assert_not_called() - _update_relation_endpoints.assert_not_called() + _update_new_unit_status.assert_not_called() assert isinstance(harness.model.unit.status, BlockedStatus) # Test when Patroni hasn't started yet in the unit. @@ -1446,7 +1458,7 @@ def test_on_peer_relation_changed(harness): _member_started.return_value = False harness.charm._on_peer_relation_changed(mock_event) _start_patroni.assert_called_once() - _update_relation_endpoints.assert_not_called() + _update_new_unit_status.assert_not_called() assert isinstance(harness.model.unit.status, WaitingStatus) # Test when Patroni has already started but this is a replica with a @@ -1458,6 +1470,7 @@ def test_on_peer_relation_changed(harness): _check_stanza.reset_mock() _start_stop_pgbackrest_service.reset_mock() _is_primary.return_value = values[0] + _is_standby_leader.return_value = values[0] _member_replication_lag.return_value = values[1] harness.charm.unit.status = ActiveStatus() harness.charm.on.database_peers_relation_changed.emit(relation) @@ -2211,6 +2224,9 @@ def test_on_peer_relation_departed(harness): def test_update_new_unit_status(harness): with ( + patch( + "relations.async_replication.PostgreSQLAsyncReplication.handle_read_only_mode" + ) as handle_read_only_mode, patch( "charm.PostgresqlOperatorCharm._update_relation_endpoints" ) as _update_relation_endpoints, @@ -2218,23 +2234,72 @@ def test_update_new_unit_status(harness): "charm.PostgresqlOperatorCharm.primary_endpoint", new_callable=PropertyMock ) as _primary_endpoint, ): - # Test when the charm is blocked. + # Test when the primary endpoint is reachable. _primary_endpoint.return_value = "endpoint" - harness.charm.unit.status = BlockedStatus("fake blocked status") + harness.charm.unit.status = MaintenanceStatus("fake status") harness.charm._update_new_unit_status() _update_relation_endpoints.assert_called_once() - assert isinstance(harness.charm.unit.status, BlockedStatus) - - # Test when the charm is not blocked. - _update_relation_endpoints.reset_mock() - harness.charm.unit.status = WaitingStatus() - harness.charm._update_new_unit_status() - _update_relation_endpoints.assert_called_once() - assert isinstance(harness.charm.unit.status, ActiveStatus) + handle_read_only_mode.assert_called_once() + assert not isinstance(harness.charm.unit.status, WaitingStatus) # Test when the primary endpoint is not reachable yet. _update_relation_endpoints.reset_mock() + handle_read_only_mode.reset_mock() _primary_endpoint.return_value = None harness.charm._update_new_unit_status() _update_relation_endpoints.assert_not_called() + handle_read_only_mode.assert_not_called() assert isinstance(harness.charm.unit.status, WaitingStatus) + + @patch("charm.Patroni.member_started", new_callable=PropertyMock) + @patch("charm.PostgresqlOperatorCharm.is_standby_leader", new_callable=PropertyMock) + @patch("charm.Patroni.get_primary") + def test_set_active_status(self, _get_primary, _is_standby_leader, _member_started): + for values in itertools.product( + [ + RetryError(last_attempt=1), + ConnectionError, + self.charm.unit.name, + f"{self.charm.app.name}/2", + ], + [ + RetryError(last_attempt=1), + ConnectionError, + True, + False, + ], + [True, False], + ): + self.charm.unit.status = MaintenanceStatus("fake status") + _member_started.return_value = values[2] + if isinstance(values[0], str): + _get_primary.side_effect = None + _get_primary.return_value = values[0] + if values[0] != self.charm.unit.name and not isinstance(values[1], bool): + _is_standby_leader.side_effect = values[1] + _is_standby_leader.return_value = None + self.charm._set_active_status() + self.assertIsInstance(self.charm.unit.status, MaintenanceStatus) + else: + _is_standby_leader.side_effect = None + _is_standby_leader.return_value = values[1] + self.charm._set_active_status() + self.assertIsInstance( + self.charm.unit.status, + ActiveStatus + if values[0] == self.charm.unit.name or values[1] or values[2] + else MaintenanceStatus, + ) + self.assertEqual( + self.charm.unit.status.message, + "Primary" + if values[0] == self.charm.unit.name + else ( + "Standby Leader" if values[1] else ("" if values[2] else "fake status") + ), + ) + else: + _get_primary.side_effect = values[0] + _get_primary.return_value = None + self.charm._set_active_status() + self.assertIsInstance(self.charm.unit.status, MaintenanceStatus) diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index 6c5f227af6..e94f3bb2bd 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -9,8 +9,10 @@ import tenacity as tenacity from charms.operator_libs_linux.v2 import snap from jinja2 import Template +from ops.testing import Harness from tenacity import stop_after_delay +from charm import PostgresqlOperatorCharm from cluster import Patroni from constants import ( PATRONI_CONF_PATH, @@ -54,9 +56,18 @@ def peers_ips(): yield peers_ips +@pytest.fixture() +def harness(): + harness = Harness(PostgresqlOperatorCharm) + harness.begin() + yield harness + harness.cleanup() + + @pytest.fixture(autouse=True) -def patroni(peers_ips): +def patroni(harness, peers_ips): patroni = Patroni( + harness.charm, "1.1.1.1", "postgresql", "postgresql-0", @@ -252,6 +263,10 @@ def test_render_file(peers_ips, patroni): def test_render_patroni_yml_file(peers_ips, patroni): with ( + patch( + "relations.async_replication.PostgreSQLAsyncReplication.get_partner_addresses", + return_value=["2.2.2.2", "3.3.3.3"], + ) as _get_partner_addresses, patch("charm.Patroni.get_postgresql_version") as _get_postgresql_version, patch("charm.Patroni.render_file") as _render_file, patch("charm.Patroni._create_directory"), @@ -275,6 +290,7 @@ def test_render_patroni_yml_file(peers_ips, patroni): log_path=PATRONI_LOGS_PATH, postgresql_log_path=POSTGRESQL_LOGS_PATH, member_name=member_name, + partner_addrs=["2.2.2.2", "3.3.3.3"], peers_ips=peers_ips, scope=scope, self_ip=patroni.unit_ip,