From f0102f48e7f6fc75ff4cb743c5045668a5fec7f6 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 29 Nov 2023 17:53:05 -0300 Subject: [PATCH 1/6] Experimental stanby cluster Signed-off-by: Marcelo Henrique Neppel --- actions.yaml | 17 ++ metadata.yaml | 5 + src/charm.py | 2 + src/cluster.py | 15 +- src/coordinator_ops.py | 207 +++++++++++++++ src/relations/async_replication.py | 399 +++++++++++++++++++++++++++++ templates/patroni.yml.j2 | 8 + 7 files changed, 651 insertions(+), 2 deletions(-) create mode 100644 src/coordinator_ops.py create mode 100644 src/relations/async_replication.py diff --git a/actions.yaml b/actions.yaml index 44231ad4a9..a60db6765f 100644 --- a/actions.yaml +++ b/actions.yaml @@ -17,6 +17,23 @@ list-backups: description: Lists backups in s3 storage. pre-upgrade-check: description: Run necessary pre-upgrade checks and preparations before executing a charm refresh. +promote-standby-cluster: + description: Promotes the standby cluster of choice to a leader. Must be ran against the charm unit leader of the standby cluster. + params: + force: + type: boolean + default: False + description: | + WARNING: this option set to True WILL WIPE OUT your current primary cluster! + If this option and "force-really-really-mean-it" are set both to true, then this unit will take over the primary role. + It only works in the case of cross-cluster replication, where both clusters are connected to each other in the async-primary. + force-really-really-mean-it: + type: boolean + default: False + description: | + WARNING: this option set to True WILL WIPE OUT your current primary cluster! + If this option and "force" are set both to true, then this unit will take over the primary role. + It only works in the case of cross-cluster replication, where both clusters are connected to each other in the async-primary. restore: description: Restore a database backup using pgBackRest. S3 credentials are retrieved from a relation with the S3 integrator charm. diff --git a/metadata.yaml b/metadata.yaml index b4bd1a4331..51ff15120f 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -26,6 +26,8 @@ peers: interface: upgrade provides: + async-primary: + interface: async_replication database: interface: postgresql_client db: @@ -37,6 +39,9 @@ provides: limit: 1 requires: + async-replica: + interface: async_replication + limit: 1 certificates: interface: tls-certificates limit: 1 diff --git a/src/charm.py b/src/charm.py index 3fe0cf6845..90411127d5 100755 --- a/src/charm.py +++ b/src/charm.py @@ -80,6 +80,7 @@ USER, USER_PASSWORD_KEY, ) +from relations.async_replication import PostgreSQLAsyncReplication from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides from relations.postgresql_provider import PostgreSQLProvider from upgrade import PostgreSQLUpgrade, get_postgresql_dependencies_model @@ -153,6 +154,7 @@ def __init__(self, *args): ], log_slots=[f"{POSTGRESQL_SNAP_NAME}:logs"], ) + self.async_manager = PostgreSQLAsyncReplication(self) def patroni_scrape_config(self) -> List[Dict]: """Generates scrape config for the Patroni metrics endpoint.""" diff --git a/src/cluster.py b/src/cluster.py index 6f5c748813..457166d4f4 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -464,6 +464,9 @@ def render_patroni_yml_file( # Open the template patroni.yml file. with open("templates/patroni.yml.j2", "r") as file: template = Template(file.read()) + + primary = self._charm.async_manager.get_primary_data() + # Render the template file with the correct values. rendered = template.render( conf_path=PATRONI_CONF_PATH, @@ -479,8 +482,12 @@ def render_patroni_yml_file( scope=self.cluster_name, self_ip=self.unit_ip, superuser=USER, - superuser_password=self.superuser_password, - replication_password=self.replication_password, + superuser_password=primary["superuser-password"] + if primary + else self.superuser_password, + replication_password=primary["replication-password"] + if primary + else self.replication_password, rewind_user=REWIND_USER, rewind_password=self.rewind_password, enable_pgbackrest=stanza is not None, @@ -491,6 +498,10 @@ def render_patroni_yml_file( version=self.get_postgresql_version().split(".")[0], minority_count=self.planned_units // 2, pg_parameters=parameters, + standby_cluster_endpoint=primary["endpoint"] if primary else None, + extra_replication_endpoints={"{}/32".format(primary["endpoint"])} + if primary + else self._charm.async_manager.standby_endpoints(), ) self.render_file(f"{PATRONI_CONF_PATH}/patroni.yaml", rendered, 0o600) diff --git a/src/coordinator_ops.py b/src/coordinator_ops.py new file mode 100644 index 0000000000..30a10e8b6b --- /dev/null +++ b/src/coordinator_ops.py @@ -0,0 +1,207 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +"""The coordinated ops is a class that ensures a certain activity is ran together. + +The concept is similar to the "cohort" in snaps, where all units wait until they can +proceed to execute a certain activity, for example, restarting your service. +The process starts with the leader issuing a new coordination request. Effectively, +that is implemented as the __coord_counter is increased +1 in the app level. +__coord_approved is set to "False". +Each unit receives a relation-changed, which is then re-issued as a _coordinator_requested +event. Once the unit done its task, it should ack the request. +Each unit should ack the request by equaling its own __coord_counter +to the app's value. +Once all units ack'ed the __coord_counter, then the leader switches the +__coord_approved to "True". All units then will process that new change as a +"coordinator-approved" event and execute the activity they have been waiting. +If there is a need to coordinate several activities in sequence, e.g. coordinated stop and then +coordinated start, it is recommended that the leader unit publishes twice a _requested, as follows: + class MyCharm: + def __init__(self, *args): + self.stop_coordinator = CoordinatedOpsManager(relation, tag="_stop_my_charm") + self.start_coordinator = CoordinatedOpsManager(relation, tag="_start_my_charm") + self.framework.observe( + self.stop_coordinator.on.coordinator_requested, + self._on_coordinator_requested + ) + self.framework.observe( + self.stop_coordinator.on.coordinator_approved, + self._on_coordinator_approved + ) + self.framework.observe( + self.start_coordinator.on.coordinator_requested, + self._on_coordinator_requested + ) + self.framework.observe( + self.start_coordinator.on.coordinator_approved, + self._on_coordinator_approved + ) + def _a_method(): + # A method that kick starts the restarting coordination + ...... + if self.charm.unit.is_leader(): + self.stop_coordinator.coordinate() + def _on_coordinator_requested(self, event): + if self.service_is_running and event.tag == "_stop_my_charm": + # We are in the stop-phase + self.service.stop() + self.stop_coordinator.acknowledge(event) + elif event.tag == "_start_my_charm": + # we are in the starting-phase + self.service.start() + self.start_coordinator.acknowledge(event) + def _on_coordinator_approved(self, event): + # All units have ack'ed the activity, which means we have stopped. + if self.charm.unit.is_leader() and event.tag == "_stop_my_charm": + # Now kickstart the restarting process + self.start_coordinator.coordinate() +""" + + +import logging +from typing import AnyStr + +from ops.charm import ( + CharmBase, + CharmEvents, + EventSource, + RelationChangedEvent, +) +from ops.framework import EventBase, Handle, Object + +logger = logging.getLogger(__name__) + + +class CoordinatorEventBase(EventBase): + """Base event for the coordination activities.""" + + def __init__(self, handle: "Handle", tag: str): + super().__init__(handle) + self._tag = tag + + @property + def tag(self): + """Returns the tag representing this coordinator's controllers.""" + return self._tag + + +class CoordinatorRequestedEvent(CoordinatorEventBase): + """Event to signal that the leader requested the units to coordinate a new activity.""" + + def __init__(self, handle: "Handle", tag: str): + super().__init__(handle, tag) + + +class CoordinatorApprovedEvent(CoordinatorEventBase): + """Event to signal that all units ack'ed the coordination request and can proceed.""" + + def __init__(self, handle: "Handle", tag: str): + super().__init__(handle, tag) + + +class CoordinatorCharmEvents(CharmEvents): + """List of events that the TLS Certificates requirer charm can leverage.""" + + coordinator_approved = EventSource(CoordinatorApprovedEvent) + coordinator_requested = EventSource(CoordinatorRequestedEvent) + + +class CoordinatedOpsManager(Object): + """Coordinates activities that demand the entire peer group to act at once.""" + + on = CoordinatorCharmEvents() + + def __init__(self, charm: CharmBase, relation: AnyStr, tag: AnyStr = ""): + super().__init__(charm, relation) + self.tag = tag + self.relation = relation + self.app = charm.app + self.name = relation + tag # use the tag to separate multiple coordinator objects + # in the same charm class. + self.charm = charm # Maintain a reference to charm, so we can emit events. + self.framework.observe(charm.on[self.relation].relation_changed, self._on_relation_changed) + + @property + def under_coordination(self): + """Returns True if the _coord_approved == False.""" + return ( + self.model.get_relation(self.relation) + .data[self.app] + .get(f"_{self.name}_coord_approved", "True") + == "False" + ) + + def coordinate(self): + """Process a request to ask a new coordination activity. + + If we are the leader, fire off a coordinator requested event in the self.name. + """ + logger.info("coordinate: starting") + if self.charm.unit.is_leader(): + counter = ( + self.model.get_relation(self.relation) + .data[self.app] + .get(f"_{self.name}_coord_counter", 0) + ) + self.model.get_relation(self.relation).data[self.app][ + f"_{self.name}_coord_counter" + ] = str(counter + 1 if counter < 10000000 else 0) + self.model.get_relation(self.relation).data[self.app][ + f"_{self.name}_coord_approved" + ] = "False" + logger.info("coordinate: tasks executed") + + def acknowledge(self, event): + """Runs the ack of the latest requested coordination. + + Each unit will set their own _counter to the same value as app's. + """ + coord_counter = f"_{self.name}_coord_counter" + self.model.get_relation(self.relation).data[self.charm.unit][coord_counter] = str( + self.model.get_relation(self.relation).data[self.app].get(coord_counter, 0) + ) + logger.info("acknowledge: updated internal counter") + + if not self.charm.unit.is_leader(): + # Nothing to do anymore. + logger.info("acknowledge: this unit is not a leader") + return + + relation = self.model.get_relation(self.relation) + # Now, the leader must check if everyone has ack'ed + for unit in relation.units: + if relation.data[unit].get(coord_counter, "0") != relation.data[self.app].get( + coord_counter, "0" + ): + logger.info(f"acknowledge: {unit.name} still has a different coord_counter") + # We defer the event until _coord_approved == True. + # If we have _coord_counter differing, then we are not yet there. + event.defer() + return + logger.info("acknowledge: all units are set, set coord_approved == True") + # Just confirmed we have all units ack'ed. Now, set the approval. + relation.data[self.app][f"_{self.name}_coord_approved"] = "True" + + def _on_relation_changed(self: CharmBase, _: RelationChangedEvent): + """Process relation changed. + + First, determine whether this unit has received a new request for coordination. + Then, if we are the leader, fire off a coordinator requested event. + """ + logger.info("coordinator: starting _on_relation_changed") + relation_data = self.model.get_relation(self.relation).data[self.app] + unit_data = self.model.get_relation(self.relation).data[self.charm.unit] + + if relation_data.get(f"_{self.name}_coord_approved", "False") == "True": + logger.info("coordinator: _on_relation_changed -- coordinator approved") + # We are approved to move on, issue the coordinator_approved event. + self.on.coordinator_approved.emit(self.tag) + return + coord_counter = f"_{self.name}_coord_counter" + if coord_counter in relation_data and relation_data.get( + coord_counter, "0" + ) != unit_data.get(coord_counter, "0"): + logger.info("coordinator: _on_relation_changed -- coordinator requested") + self.on.coordinator_requested.emit(self.tag) + return diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py new file mode 100644 index 0000000000..5bab58c6aa --- /dev/null +++ b/src/relations/async_replication.py @@ -0,0 +1,399 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Implements the state-machine. + +1) First async replication relation is made: both units get blocked waiting for a leader +2) User runs the promote action against one of the clusters +3) The cluster moves leader and sets the async-replication data, marking itself as leader +4) The other units receive that new information and update themselves to become standby-leaders. +""" + +import json +import logging +import os +import pwd +import shutil +from pathlib import Path +from subprocess import PIPE, run +from typing import Dict, Set + +from charms.operator_libs_linux.v2 import snap +from ops.charm import ( + ActionEvent, + CharmBase, +) +from ops.framework import Object +from ops.model import ( + Unit, +) + +from constants import PATRONI_CONF_PATH +from coordinator_ops import CoordinatedOpsManager +from src.constants import POSTGRESQL_DATA_PATH + +logger = logging.getLogger(__name__) + + +ASYNC_PRIMARY_RELATION = "async-primary" +ASYNC_REPLICA_RELATION = "async-replica" + + +class MoreThanOnePrimarySelectedError(Exception): + """Represents more than one primary has been selected.""" + + +def _get_pod_ip(): + """Reads some files to quickly figure out its own pod IP. + + It should work for any Ubuntu-based image + """ + with open("/etc/hosts") as f: + hosts = f.read() + with open("/etc/hostname") as f: + hostname = f.read().replace("\n", "") + line = [ln for ln in hosts.split("\n") if ln.find(hostname) >= 0][0] + return line.split("\t")[0] + + +class PostgreSQLAsyncReplication(Object): + """Defines the async-replication management logic.""" + + def __init__(self, charm: CharmBase, relation_name: str = ASYNC_PRIMARY_RELATION) -> None: + super().__init__(charm, relation_name) + self.relation_name = relation_name + self.charm = charm + self.restart_coordinator = CoordinatedOpsManager(charm, "restart", tag="_asyncreplica") + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_changed, self._on_primary_changed + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_standby_changed + ) + self.framework.observe( + self.restart_coordinator.on.coordinator_requested, self._on_coordination_request + ) + self.framework.observe( + self.restart_coordinator.on.coordinator_approved, self._on_coordination_approval + ) + + # Departure events + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_departed, self._on_departure + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_departed, self._on_departure + ) + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_broken, self._on_departure + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_broken, self._on_departure + ) + + # Actions + self.framework.observe( + self.charm.on.promote_standby_cluster_action, self._on_promote_standby_cluster + ) + + # We treat both relations above as actually the same. + # The big difference appears only at promote/demote actions + self.relation_set = { + *set(self.charm.model.relations[ASYNC_PRIMARY_RELATION]), + *set(self.charm.model.relations[ASYNC_REPLICA_RELATION]), + } + + @property + def endpoint(self) -> str: + """Assumes the endpoint is the same, disregard if we are a primary or standby cluster.""" + for rel in self.relation_set: + return str(self.charm.model.get_binding(rel).network.ingress_address) + return None + + def standby_endpoints(self) -> Set[str]: + """Returns the set of IPs used by each standby unit with a /32 mask.""" + standby_endpoints = set() + for rel in self.relation_set: + for unit in self._all_units(rel): + if not rel.data[unit].get("elected", None): + standby_endpoints.add("{}/32".format(str(rel.data[unit]["ingress-address"]))) + if "pod-address" in rel.data[unit]: + standby_endpoints.add("{}/32".format(str(rel.data[unit]["pod-address"]))) + return standby_endpoints + + def get_primary_data(self) -> Dict[str, str]: + """Returns the primary info, if available and if the primary cluster is ready.""" + for rel in self.relation_set: + for unit in self._all_units(rel): + if "elected" in rel.data[unit] and unit.name == self.charm.unit.name: + # If this unit is the leader, then return None + return None + if rel.data[unit].get("elected", None) and rel.data[unit].get( + "primary-cluster-ready", None + ): + elected_data = json.loads(rel.data[unit]["elected"]) + return { + "endpoint": str(elected_data["endpoint"]), + "replication-password": elected_data["replication-password"], + "superuser-password": elected_data["superuser-password"], + } + return None + + def _all_units(self, relation): + found_units = {*relation.units, self.charm.unit} + logger.debug(f"Units found: {found_units}") + return found_units + + def _all_replica_published_pod_ips(self) -> bool: + for rel in self.relation_set: + for unit in self._all_units(rel): + if "elected" in rel.data[unit]: + # This is the leader unit, it will not publish its own pod address + continue + if "pod-address" not in rel.data[unit]: + return False + return True + + def _on_departure(self, _): + for rel in [ + self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(ASYNC_PRIMARY_RELATION), + ]: + if not rel: # if no relation exits, then it rel == None + continue + if "pod-address" in rel.data[self.charm.unit]: + del rel.data[self.charm.unit]["pod-address"] + if "elected" in rel.data[self.charm.unit]: + del rel.data[self.charm.unit]["elected"] + if "primary-cluster-ready" in rel.data[self.charm.unit]: + del rel.data[self.charm.unit]["primary-cluster-ready"] + + snap_cache = snap.SnapCache() + charmed_postgresql_snap = snap_cache["charmed-postgresql"] + if not charmed_postgresql_snap.present: + raise Exception("Cannot start/stop service, snap is not yet installed.") + charmed_postgresql_snap.stop(services=["patroni"]) + self.charm.update_config() + charmed_postgresql_snap.start(services=["patroni"]) + + def _on_primary_changed(self, event): + """Triggers a configuration change in the primary units.""" + primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) + if not primary_relation: + return + logger.info("_on_primary_changed: primary_relation exists") + + primary = self._check_if_primary_already_selected() + if not primary: + # primary may not be available because the action of promoting a cluster was + # executed way after the relation changes. + # Defer it until + event.defer() + return + logger.info("_on_primary_changed: primary cluster exists") + + if primary.name != self.charm.unit.name: + # this unit is not the system leader + return + logger.info("_on_primary_changed: unit is the primary's leader") + + if not self._all_replica_published_pod_ips(): + # We will have more events happening, no need for retrigger + event.defer() + return + logger.info("_on_primary_changed: all replicas published pod details") + + # This unit is the leader, generate a new configuration and leave. + # There is nothing to do for the leader. + snap_cache = snap.SnapCache() + charmed_postgresql_snap = snap_cache["charmed-postgresql"] + if not charmed_postgresql_snap.present: + raise Exception("Cannot start/stop service, snap is not yet installed.") + charmed_postgresql_snap.stop(services=["patroni"]) + self.charm.update_config() + charmed_postgresql_snap.start(services=["patroni"]) + + # Retrigger the other units' async-replica-changed + primary_relation.data[self.charm.unit]["primary-cluster-ready"] = "True" + + def _on_standby_changed(self, event): # noqa C901 + """Triggers a configuration change.""" + replica_relation = self.model.get_relation(ASYNC_REPLICA_RELATION) + if not replica_relation: + return + logger.info("_on_standby_changed: replica relation available") + + primary = self._check_if_primary_already_selected() + if not primary: + return + logger.info("_on_standby_changed: primary is present") + + # Check if we have already published pod-address. If not, then we are waiting + # for the leader to catch all the pod ips and restart itself + if "pod-address" not in replica_relation.data[self.charm.unit]: + replica_relation.data[self.charm.unit]["pod-address"] = _get_pod_ip() + # Finish here and wait for the retrigger from the primary cluster + event.defer() + return + logger.info("_on_standby_changed: pod-address published in own replica databag") + + if not self.get_primary_data(): + # We've made thus far. + # However, the get_primary_data will return != None ONLY if the primary cluster + # is ready and configured. Until then, we wait. + event.defer() + return + logger.info("_on_standby_changed: primary cluster is ready") + + ################ + # Initiate restart logic + ################ + + # We need to: + # 1) Stop all standby units + # 2) Delete the k8s service + # 3) Remove the pgdata folder + # 4) Start all standby units + # For that, the peer leader must first stop its own service and then, issue a + # coordination request to all units. All units ack that request once they all have + # their service stopped. + # Then, we get an approved coordination from the leader, which triggers the + # steps 2-4. + if self.charm.unit.is_leader() and not self.restart_coordinator.under_coordination: + # The leader now requests a ack from each unit that they have stopped. + self.restart_coordinator.coordinate() + + def _on_coordination_request(self, event): + # Stop the service. + # We need all replicas to be stopped, so we can remove the patroni-postgresql-k8s + # service from Kubernetes and not getting it recreated! + # We will restart the it once the cluster is ready. + snap_cache = snap.SnapCache() + charmed_postgresql_snap = snap_cache["charmed-postgresql"] + if not charmed_postgresql_snap.present: + raise Exception("Cannot start/stop service, snap is not yet installed.") + charmed_postgresql_snap.stop(services=["patroni"]) + self.restart_coordinator.acknowledge(event) + + def _on_coordination_approval(self, event): + """Runs when the coordinator guaranteed all units have stopped.""" + if self.charm.unit.is_leader(): + # Remove previous cluster information to make it possible to initialise a new cluster. + logger.info("Removing previous cluster information") + + def demote(): + pw_record = pwd.getpwnam("snap_daemon") + + def result(): + os.setgid(pw_record.pw_gid) + os.setuid(pw_record.pw_uid) + + return result + + process = run( + [ + "charmed-postgresql.patronictl", + "-c", + f"{PATRONI_CONF_PATH}/patroni.yaml", + "remove", + self.charm.cluster_name, + ], + input=f"{self.charm.cluster_name}\nYes I am aware".encode(), + stdout=PIPE, + stderr=PIPE, + preexec_fn=demote(), + timeout=10, + ) + if process.returncode != 0: + raise Exception( + f"Failed to remove previous cluster information with error: {process.stderr.decode()}" + ) + + # Clean folder and generate configuration. + try: + path = Path(POSTGRESQL_DATA_PATH) + if path.exists() and path.is_dir(): + shutil.rmtree(path) + except OSError as e: + raise Exception( + f"Failed to remove contents of the data directory with error: {str(e)}" + ) + + self.charm.update_config() + logger.info("_on_standby_changed: configuration done, waiting for restart of the service") + + # We are ready to restart the service now: all peers have configured themselves. + snap_cache = snap.SnapCache() + charmed_postgresql_snap = snap_cache["charmed-postgresql"] + if not charmed_postgresql_snap.present: + raise Exception("Cannot start/stop service, snap is not yet installed.") + charmed_postgresql_snap.start(services=["patroni"]) + + def _get_primary_candidates(self): + rel = self.model.get_relation(ASYNC_PRIMARY_RELATION) + return rel.units if rel else [] + + def _check_if_primary_already_selected(self) -> Unit: + """Returns the unit if a primary is present.""" + result = None + if not self.relation_set: + return None + for rel in self.relation_set: + for unit in self._all_units(rel): + if "elected" in rel.data[unit] and not result: + result = unit + elif "elected" in rel.data[unit] and result: + raise MoreThanOnePrimarySelectedError + return result + + def _on_promote_standby_cluster(self, event: ActionEvent) -> None: + """Moves a standby cluster to a primary, if none is present.""" + if ( + "cluster_initialised" not in self.charm._peers.data[self.charm.app] + or not self.charm._patroni.member_started + ): + event.fail("Cluster not initialized yet.") + return + + if not self.charm.unit.is_leader(): + event.fail("Not the charm leader unit.") + return + + # Now, publish that this unit is the leader + if not self.endpoint: + event.fail("No relation found.") + return + primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) + if not primary_relation: + event.fail("No primary relation") + return + + # Check if this is a take over from a standby cluster + if event.params.get("force", False) and event.params.get( + "force-really-really-mean-it", False + ): + pass + + # Let the exception error the unit + unit = self._check_if_primary_already_selected() + if unit: + event.fail(f"Cannot promote - {unit.name} is already primary: demote it first") + return + + # If this is a standby-leader, then execute switchover logic + # TODO + primary_relation.data[self.charm.unit]["elected"] = json.dumps( + { + "endpoint": self.endpoint, + "replication-password": self.charm._patroni._replication_password, + "superuser-password": self.charm._patroni._superuser_password, + } + ) + + # Now, check if postgresql it had originally published its pod IP in the + # replica relation databag. Delete it, if yes. + replica_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) + if not replica_relation or "pod-address" not in replica_relation.data[self.charm.unit]: + return + del replica_relation.data[self.charm.unit]["pod-address"] + # event.set_result() diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index 0732ec7156..6af6888291 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -103,6 +103,11 @@ bootstrap: command: pgbackrest {{ pgbackrest_configuration_file }} --stanza={{ restore_stanza }} --pg1-path={{ data_path }} --set={{ backup_id }} --type=immediate --target-action=promote restore no_params: True keep_existing_recovery_conf: True + {% elif standby_cluster_endpoint %} + standby_cluster: + host: {{ standby_cluster_endpoint }} + port: 5432 + create_replica_methods: ["basebackup"] {% else %} initdb: - encoding: UTF8 @@ -145,6 +150,9 @@ postgresql: - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.1/32 md5 {%- endif %} - {{ 'hostssl' if enable_tls else 'host' }} all all 0.0.0.0/0 md5 + {%- for e in extra_replication_endpoints %} + - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ e }} md5 + {%- endfor -%} # Allow replications connections from other cluster members. {%- for peer_ip in peers_ips %} - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ peer_ip }}/0 md5 From b6e5c82aa09163f727bd2c8a2b428b44eafa8f56 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 4 Dec 2023 10:43:29 -0300 Subject: [PATCH 2/6] Minor fixes of copy/paste Signed-off-by: Marcelo Henrique Neppel --- poetry.lock | 22 +++++++++++- src/charm.py | 1 + src/cluster.py | 7 ++-- src/relations/async_replication.py | 54 +++++++++++------------------- 4 files changed, 47 insertions(+), 37 deletions(-) diff --git a/poetry.lock b/poetry.lock index eb43369890..24122c3fab 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.0 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "appnope" @@ -877,6 +877,16 @@ files = [ {file = "MarkupSafe-2.1.3-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:5bbe06f8eeafd38e5d0a4894ffec89378b6c6a625ff57e3028921f8ff59318ac"}, {file = "MarkupSafe-2.1.3-cp311-cp311-win32.whl", hash = "sha256:dd15ff04ffd7e05ffcb7fe79f1b98041b8ea30ae9234aed2a9168b5797c3effb"}, {file = "MarkupSafe-2.1.3-cp311-cp311-win_amd64.whl", hash = "sha256:134da1eca9ec0ae528110ccc9e48041e0828d79f24121a1a146161103c76e686"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:f698de3fd0c4e6972b92290a45bd9b1536bffe8c6759c62471efaa8acb4c37bc"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:aa57bd9cf8ae831a362185ee444e15a93ecb2e344c8e52e4d721ea3ab6ef1823"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ffcc3f7c66b5f5b7931a5aa68fc9cecc51e685ef90282f4a82f0f5e9b704ad11"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:47d4f1c5f80fc62fdd7777d0d40a2e9dda0a05883ab11374334f6c4de38adffd"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1f67c7038d560d92149c060157d623c542173016c4babc0c1913cca0564b9939"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:9aad3c1755095ce347e26488214ef77e0485a3c34a50c5a5e2471dff60b9dd9c"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:14ff806850827afd6b07a5f32bd917fb7f45b046ba40c57abdb636674a8b559c"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8f9293864fe09b8149f0cc42ce56e3f0e54de883a9de90cd427f191c346eb2e1"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-win32.whl", hash = "sha256:715d3562f79d540f251b99ebd6d8baa547118974341db04f5ad06d5ea3eb8007"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-win_amd64.whl", hash = "sha256:1b8dd8c3fd14349433c79fa8abeb573a55fc0fdd769133baac1f5e07abf54aeb"}, {file = "MarkupSafe-2.1.3-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:8e254ae696c88d98da6555f5ace2279cf7cd5b3f52be2b5cf97feafe883b58d2"}, {file = "MarkupSafe-2.1.3-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:cb0932dc158471523c9637e807d9bfb93e06a95cbf010f1a38b98623b929ef2b"}, {file = "MarkupSafe-2.1.3-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9402b03f1a1b4dc4c19845e5c749e3ab82d5078d16a2a4c2cd2df62d57bb0707"}, @@ -1567,6 +1577,7 @@ files = [ {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:69b023b2b4daa7548bcfbd4aa3da05b3a74b772db9e23b982788168117739938"}, {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:81e0b275a9ecc9c0c0c07b4b90ba548307583c125f54d5b6946cfee6360c733d"}, {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ba336e390cd8e4d1739f42dfe9bb83a3cc2e80f567d8805e11b46f4a943f5515"}, + {file = "PyYAML-6.0.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:326c013efe8048858a6d312ddd31d56e468118ad4cdeda36c719bf5bb6192290"}, {file = "PyYAML-6.0.1-cp310-cp310-win32.whl", hash = "sha256:bd4af7373a854424dabd882decdc5579653d7868b8fb26dc7d0e99f823aa5924"}, {file = "PyYAML-6.0.1-cp310-cp310-win_amd64.whl", hash = "sha256:fd1592b3fdf65fff2ad0004b5e363300ef59ced41c2e6b3a99d4089fa8c5435d"}, {file = "PyYAML-6.0.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6965a7bc3cf88e5a1c3bd2e0b5c22f8d677dc88a455344035f03399034eb3007"}, @@ -1574,8 +1585,15 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:42f8152b8dbc4fe7d96729ec2b99c7097d656dc1213a3229ca5383f973a5ed6d"}, {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:062582fca9fabdd2c8b54a3ef1c978d786e0f6b3a1510e0ac93ef59e0ddae2bc"}, {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d2b04aac4d386b172d5b9692e2d2da8de7bfb6c387fa4f801fbf6fb2e6ba4673"}, + {file = "PyYAML-6.0.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:e7d73685e87afe9f3b36c799222440d6cf362062f78be1013661b00c5c6f678b"}, {file = "PyYAML-6.0.1-cp311-cp311-win32.whl", hash = "sha256:1635fd110e8d85d55237ab316b5b011de701ea0f29d07611174a1b42f1444741"}, {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, + {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, + {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, + {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, + {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, + {file = "PyYAML-6.0.1-cp312-cp312-win_amd64.whl", hash = "sha256:0d3304d8c0adc42be59c5f8a4d9e3d7379e6955ad754aa9d6ab7a398b59dd1df"}, {file = "PyYAML-6.0.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:50550eb667afee136e9a77d6dc71ae76a44df8b3e51e41b77f6de2932bfe0f47"}, {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1fe35611261b29bd1de0070f0b2f47cb6ff71fa6595c077e42bd0c419fa27b98"}, {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:704219a11b772aea0d8ecd7058d0082713c3562b4e271b849ad7dc4a5c90c13c"}, @@ -1592,6 +1610,7 @@ files = [ {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a0cd17c15d3bb3fa06978b4e8958dcdc6e0174ccea823003a106c7d4d7899ac5"}, {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:28c119d996beec18c05208a8bd78cbe4007878c6dd15091efb73a30e90539696"}, {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7e07cbde391ba96ab58e532ff4803f79c4129397514e1413a7dc761ccd755735"}, + {file = "PyYAML-6.0.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:49a183be227561de579b4a36efbb21b3eab9651dd81b1858589f796549873dd6"}, {file = "PyYAML-6.0.1-cp38-cp38-win32.whl", hash = "sha256:184c5108a2aca3c5b3d3bf9395d50893a7ab82a38004c8f61c258d4428e80206"}, {file = "PyYAML-6.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:1e2722cc9fbb45d9b87631ac70924c11d3a401b2d7f410cc0e3bbf249f2dca62"}, {file = "PyYAML-6.0.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:9eb6caa9a297fc2c2fb8862bc5370d0303ddba53ba97e71f08023b6cd73d16a8"}, @@ -1599,6 +1618,7 @@ files = [ {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5773183b6446b2c99bb77e77595dd486303b4faab2b086e7b17bc6bef28865f6"}, {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b786eecbdf8499b9ca1d697215862083bd6d2a99965554781d0d8d1ad31e13a0"}, {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc1bf2925a1ecd43da378f4db9e4f799775d6367bdb94671027b73b393a7c42c"}, + {file = "PyYAML-6.0.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:04ac92ad1925b2cff1db0cfebffb6ffc43457495c9b3c39d3fcae417d7125dc5"}, {file = "PyYAML-6.0.1-cp39-cp39-win32.whl", hash = "sha256:faca3bdcf85b2fc05d06ff3fbc1f83e1391b3e724afa3feba7d13eeab355484c"}, {file = "PyYAML-6.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:510c9deebc5c0225e8c96813043e62b680ba2f9c50a08d3724c7f28a747d1486"}, {file = "PyYAML-6.0.1.tar.gz", hash = "sha256:bfdf460b1736c775f2ba9f6a92bca30bc2095067b8a9d77876d1fad6cc3b4a43"}, diff --git a/src/charm.py b/src/charm.py index 90411127d5..568076abc5 100755 --- a/src/charm.py +++ b/src/charm.py @@ -656,6 +656,7 @@ def _hosts(self) -> set: def _patroni(self) -> Patroni: """Returns an instance of the Patroni object.""" return Patroni( + self, self._unit_ip, self.cluster_name, self._member_name, diff --git a/src/cluster.py b/src/cluster.py index 457166d4f4..32d1d38a42 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -67,6 +67,7 @@ class Patroni: def __init__( self, + charm, unit_ip: str, cluster_name: str, member_name: str, @@ -80,6 +81,7 @@ def __init__( """Initialize the Patroni class. Args: + charm: PostgreSQL charm instance. unit_ip: IP address of the current unit cluster_name: name of the cluster member_name: name of the member inside the cluster @@ -90,6 +92,7 @@ def __init__( rewind_password: password for the user used on rewinds tls_enabled: whether TLS is enabled """ + self.charm = charm self.unit_ip = unit_ip self.cluster_name = cluster_name self.member_name = member_name @@ -465,7 +468,7 @@ def render_patroni_yml_file( with open("templates/patroni.yml.j2", "r") as file: template = Template(file.read()) - primary = self._charm.async_manager.get_primary_data() + primary = self.charm.async_manager.get_primary_data() # Render the template file with the correct values. rendered = template.render( @@ -501,7 +504,7 @@ def render_patroni_yml_file( standby_cluster_endpoint=primary["endpoint"] if primary else None, extra_replication_endpoints={"{}/32".format(primary["endpoint"])} if primary - else self._charm.async_manager.standby_endpoints(), + else self.charm.async_manager.standby_endpoints(), ) self.render_file(f"{PATRONI_CONF_PATH}/patroni.yaml", rendered, 0o600) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 5bab58c6aa..eee2687763 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -28,9 +28,8 @@ Unit, ) -from constants import PATRONI_CONF_PATH +from constants import PATRONI_CONF_PATH, POSTGRESQL_DATA_PATH from coordinator_ops import CoordinatedOpsManager -from src.constants import POSTGRESQL_DATA_PATH logger = logging.getLogger(__name__) @@ -43,19 +42,6 @@ class MoreThanOnePrimarySelectedError(Exception): """Represents more than one primary has been selected.""" -def _get_pod_ip(): - """Reads some files to quickly figure out its own pod IP. - - It should work for any Ubuntu-based image - """ - with open("/etc/hosts") as f: - hosts = f.read() - with open("/etc/hostname") as f: - hostname = f.read().replace("\n", "") - line = [ln for ln in hosts.split("\n") if ln.find(hostname) >= 0][0] - return line.split("\t")[0] - - class PostgreSQLAsyncReplication(Object): """Defines the async-replication management logic.""" @@ -117,8 +103,8 @@ def standby_endpoints(self) -> Set[str]: for unit in self._all_units(rel): if not rel.data[unit].get("elected", None): standby_endpoints.add("{}/32".format(str(rel.data[unit]["ingress-address"]))) - if "pod-address" in rel.data[unit]: - standby_endpoints.add("{}/32".format(str(rel.data[unit]["pod-address"]))) + if "unit-address" in rel.data[unit]: + standby_endpoints.add("{}/32".format(str(rel.data[unit]["unit-address"]))) return standby_endpoints def get_primary_data(self) -> Dict[str, str]: @@ -144,13 +130,13 @@ def _all_units(self, relation): logger.debug(f"Units found: {found_units}") return found_units - def _all_replica_published_pod_ips(self) -> bool: + def _all_replica_published_unit_ips(self) -> bool: for rel in self.relation_set: for unit in self._all_units(rel): if "elected" in rel.data[unit]: - # This is the leader unit, it will not publish its own pod address + # This is the leader unit, it will not publish its own unit address continue - if "pod-address" not in rel.data[unit]: + if "unit-address" not in rel.data[unit]: return False return True @@ -161,8 +147,8 @@ def _on_departure(self, _): ]: if not rel: # if no relation exits, then it rel == None continue - if "pod-address" in rel.data[self.charm.unit]: - del rel.data[self.charm.unit]["pod-address"] + if "unit-address" in rel.data[self.charm.unit]: + del rel.data[self.charm.unit]["unit-address"] if "elected" in rel.data[self.charm.unit]: del rel.data[self.charm.unit]["elected"] if "primary-cluster-ready" in rel.data[self.charm.unit]: @@ -197,11 +183,11 @@ def _on_primary_changed(self, event): return logger.info("_on_primary_changed: unit is the primary's leader") - if not self._all_replica_published_pod_ips(): + if not self._all_replica_published_unit_ips(): # We will have more events happening, no need for retrigger event.defer() return - logger.info("_on_primary_changed: all replicas published pod details") + logger.info("_on_primary_changed: all replicas published unit details") # This unit is the leader, generate a new configuration and leave. # There is nothing to do for the leader. @@ -228,14 +214,14 @@ def _on_standby_changed(self, event): # noqa C901 return logger.info("_on_standby_changed: primary is present") - # Check if we have already published pod-address. If not, then we are waiting - # for the leader to catch all the pod ips and restart itself - if "pod-address" not in replica_relation.data[self.charm.unit]: - replica_relation.data[self.charm.unit]["pod-address"] = _get_pod_ip() + # Check if we have already published unit-address. If not, then we are waiting + # for the leader to catch all the unit ips and restart itself + if "unit-address" not in replica_relation.data[self.charm.unit]: + replica_relation.data[self.charm.unit]["unit-address"] = self.charm._unit_ip # Finish here and wait for the retrigger from the primary cluster event.defer() return - logger.info("_on_standby_changed: pod-address published in own replica databag") + logger.info("_on_standby_changed: unit-address published in own replica databag") if not self.get_primary_data(): # We've made thus far. @@ -385,15 +371,15 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: primary_relation.data[self.charm.unit]["elected"] = json.dumps( { "endpoint": self.endpoint, - "replication-password": self.charm._patroni._replication_password, - "superuser-password": self.charm._patroni._superuser_password, + "replication-password": self.charm._patroni.replication_password, + "superuser-password": self.charm._patroni.superuser_password, } ) - # Now, check if postgresql it had originally published its pod IP in the + # Now, check if postgresql it had originally published its unit IP in the # replica relation databag. Delete it, if yes. replica_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) - if not replica_relation or "pod-address" not in replica_relation.data[self.charm.unit]: + if not replica_relation or "unit-address" not in replica_relation.data[self.charm.unit]: return - del replica_relation.data[self.charm.unit]["pod-address"] + del replica_relation.data[self.charm.unit]["unit-address"] # event.set_result() From 4d262fda58f31c5b7118a55433465e74a297c6f3 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 7 Feb 2024 08:39:01 -0300 Subject: [PATCH 3/6] Add VM specific code Signed-off-by: Marcelo Henrique Neppel --- src/coordinator_ops.py | 18 +- src/relations/async_replication.py | 128 +++++--- tests/integration/ha_tests/helpers.py | 9 +- .../ha_tests/test_async_replication.py | 289 ++++++++++++++++++ 4 files changed, 399 insertions(+), 45 deletions(-) create mode 100644 tests/integration/ha_tests/test_async_replication.py diff --git a/src/coordinator_ops.py b/src/coordinator_ops.py index 30a10e8b6b..2bd1c2becc 100644 --- a/src/coordinator_ops.py +++ b/src/coordinator_ops.py @@ -1,26 +1,34 @@ -# Copyright 2023 Canonical Ltd. +# Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. """The coordinated ops is a class that ensures a certain activity is ran together. The concept is similar to the "cohort" in snaps, where all units wait until they can proceed to execute a certain activity, for example, restarting your service. + The process starts with the leader issuing a new coordination request. Effectively, that is implemented as the __coord_counter is increased +1 in the app level. __coord_approved is set to "False". + Each unit receives a relation-changed, which is then re-issued as a _coordinator_requested event. Once the unit done its task, it should ack the request. Each unit should ack the request by equaling its own __coord_counter to the app's value. + Once all units ack'ed the __coord_counter, then the leader switches the __coord_approved to "True". All units then will process that new change as a "coordinator-approved" event and execute the activity they have been waiting. + If there is a need to coordinate several activities in sequence, e.g. coordinated stop and then coordinated start, it is recommended that the leader unit publishes twice a _requested, as follows: + + class MyCharm: + def __init__(self, *args): self.stop_coordinator = CoordinatedOpsManager(relation, tag="_stop_my_charm") self.start_coordinator = CoordinatedOpsManager(relation, tag="_start_my_charm") + self.framework.observe( self.stop_coordinator.on.coordinator_requested, self._on_coordinator_requested @@ -37,11 +45,13 @@ def __init__(self, *args): self.start_coordinator.on.coordinator_approved, self._on_coordinator_approved ) + def _a_method(): # A method that kick starts the restarting coordination ...... if self.charm.unit.is_leader(): self.stop_coordinator.coordinate() + def _on_coordinator_requested(self, event): if self.service_is_running and event.tag == "_stop_my_charm": # We are in the stop-phase @@ -51,6 +61,7 @@ def _on_coordinator_requested(self, event): # we are in the starting-phase self.service.start() self.start_coordinator.acknowledge(event) + def _on_coordinator_approved(self, event): # All units have ack'ed the activity, which means we have stopped. if self.charm.unit.is_leader() and event.tag == "_stop_my_charm": @@ -139,10 +150,10 @@ def coordinate(self): """ logger.info("coordinate: starting") if self.charm.unit.is_leader(): - counter = ( + counter = int( self.model.get_relation(self.relation) .data[self.app] - .get(f"_{self.name}_coord_counter", 0) + .get(f"_{self.name}_coord_counter", "0") ) self.model.get_relation(self.relation).data[self.app][ f"_{self.name}_coord_counter" @@ -187,6 +198,7 @@ def _on_relation_changed(self: CharmBase, _: RelationChangedEvent): """Process relation changed. First, determine whether this unit has received a new request for coordination. + Then, if we are the leader, fire off a coordinator requested event. """ logger.info("coordinator: starting _on_relation_changed") diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index eee2687763..29b65b5fad 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -16,7 +16,7 @@ import shutil from pathlib import Path from subprocess import PIPE, run -from typing import Dict, Set +from typing import Dict, Optional, Set, Tuple from charms.operator_libs_linux.v2 import snap from ops.charm import ( @@ -28,7 +28,13 @@ Unit, ) -from constants import PATRONI_CONF_PATH, POSTGRESQL_DATA_PATH +from constants import ( + APP_SCOPE, + MONITORING_PASSWORD_KEY, + PATRONI_CONF_PATH, + POSTGRESQL_DATA_PATH, + REWIND_PASSWORD_KEY, +) from coordinator_ops import CoordinatedOpsManager logger = logging.getLogger(__name__) @@ -250,19 +256,32 @@ def _on_standby_changed(self, event): # noqa C901 self.restart_coordinator.coordinate() def _on_coordination_request(self, event): - # Stop the service. - # We need all replicas to be stopped, so we can remove the patroni-postgresql-k8s - # service from Kubernetes and not getting it recreated! - # We will restart the it once the cluster is ready. + self.restart_coordinator.acknowledge(event) + + def _on_coordination_approval(self, event): + """Runs when the coordinator guaranteed all units have stopped.""" snap_cache = snap.SnapCache() charmed_postgresql_snap = snap_cache["charmed-postgresql"] if not charmed_postgresql_snap.present: - raise Exception("Cannot start/stop service, snap is not yet installed.") + raise Exception("Cannot restart service, snap is not yet installed.") charmed_postgresql_snap.stop(services=["patroni"]) - self.restart_coordinator.acknowledge(event) - def _on_coordination_approval(self, event): - """Runs when the coordinator guaranteed all units have stopped.""" + # Clean folder and generate configuration. + try: + path = Path(POSTGRESQL_DATA_PATH) + if path.exists() and path.is_dir(): + shutil.rmtree(path) + except OSError as e: + raise Exception( + f"Failed to remove contents of the data directory with error: {str(e)}" + ) + + self.charm.update_config() + logger.info("_on_standby_changed: configuration done, waiting for restart of the service") + + # We are ready to restart the service now: all peers have configured themselves. + charmed_postgresql_snap.start(services=["patroni"]) + if self.charm.unit.is_leader(): # Remove previous cluster information to make it possible to initialise a new cluster. logger.info("Removing previous cluster information") @@ -284,7 +303,7 @@ def result(): "remove", self.charm.cluster_name, ], - input=f"{self.charm.cluster_name}\nYes I am aware".encode(), + input=f"{self.charm.cluster_name}\nYes I am aware\npostgresql-0\n".encode(), stdout=PIPE, stderr=PIPE, preexec_fn=demote(), @@ -295,26 +314,6 @@ def result(): f"Failed to remove previous cluster information with error: {process.stderr.decode()}" ) - # Clean folder and generate configuration. - try: - path = Path(POSTGRESQL_DATA_PATH) - if path.exists() and path.is_dir(): - shutil.rmtree(path) - except OSError as e: - raise Exception( - f"Failed to remove contents of the data directory with error: {str(e)}" - ) - - self.charm.update_config() - logger.info("_on_standby_changed: configuration done, waiting for restart of the service") - - # We are ready to restart the service now: all peers have configured themselves. - snap_cache = snap.SnapCache() - charmed_postgresql_snap = snap_cache["charmed-postgresql"] - if not charmed_postgresql_snap.present: - raise Exception("Cannot start/stop service, snap is not yet installed.") - charmed_postgresql_snap.start(services=["patroni"]) - def _get_primary_candidates(self): rel = self.model.get_relation(ASYNC_PRIMARY_RELATION) return rel.units if rel else [] @@ -354,12 +353,6 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: event.fail("No primary relation") return - # Check if this is a take over from a standby cluster - if event.params.get("force", False) and event.params.get( - "force-really-really-mean-it", False - ): - pass - # Let the exception error the unit unit = self._check_if_primary_already_selected() if unit: @@ -367,7 +360,6 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: return # If this is a standby-leader, then execute switchover logic - # TODO primary_relation.data[self.charm.unit]["elected"] = json.dumps( { "endpoint": self.endpoint, @@ -382,4 +374,62 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: if not replica_relation or "unit-address" not in replica_relation.data[self.charm.unit]: return del replica_relation.data[self.charm.unit]["unit-address"] - # event.set_result() + + def get_system_identifier(self) -> Tuple[Optional[str], Optional[str]]: + """Returns the PostgreSQL system identifier from this instance.""" + + def demote(): + pw_record = pwd.getpwnam("snap_daemon") + + def result(): + os.setgid(pw_record.pw_gid) + os.setuid(pw_record.pw_uid) + + return result + + process = run( + [ + f'/snap/charmed-postgresql/current/usr/lib/postgresql/{self.charm._patroni.get_postgresql_version().split(".")[0]}/bin/pg_controldata', + POSTGRESQL_DATA_PATH, + ], + stdout=PIPE, + stderr=PIPE, + preexec_fn=demote(), + ) + if process.returncode != 0: + return None, process.stderr.decode() + system_identifier = [ + line + for line in process.stdout.decode().splitlines() + if "Database system identifier" in line + ][0].split(" ")[-1] + return system_identifier, None + + def update_async_replication_data(self) -> None: + """Updates the async-replication data, if the unit is the leader. + + This is used to update the standby units with the new primary information. + If the unit is not the leader, then the data is removed from its databag. + """ + if "promoted" not in self.charm.app_peer_data: + return + + primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) + if self.charm.unit.is_leader(): + system_identifier, error = self.get_system_identifier() + if error is not None: + raise Exception(f"Failed to get system identifier: {error}") + primary_relation.data[self.charm.unit]["elected"] = json.dumps( + { + "endpoint": self.endpoint, + "monitoring-password": self.charm.get_secret( + APP_SCOPE, MONITORING_PASSWORD_KEY + ), + "replication-password": self.charm._patroni._replication_password, + "rewind-password": self.charm.get_secret(APP_SCOPE, REWIND_PASSWORD_KEY), + "superuser-password": self.charm._patroni._superuser_password, + "system-id": system_identifier, + } + ) + else: + primary_relation.data[self.charm.unit]["elected"] = "" diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 276e0a1b2d..9d3b8c4f71 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -3,6 +3,7 @@ import os import random import subprocess +from juju.model import Model from pathlib import Path from tempfile import mkstemp from typing import Dict, Optional, Set, Tuple @@ -85,7 +86,7 @@ async def are_writes_increasing(ops_test, down_unit: str = None) -> None: assert more_writes[member] > count, f"{member}: writes not continuing to DB" -async def app_name(ops_test: OpsTest, application_name: str = "postgresql") -> Optional[str]: +async def app_name(ops_test: OpsTest, application_name: str = "postgresql", model: Model = None) -> Optional[str]: """Returns the name of the cluster running PostgreSQL. This is important since not all deployments of the PostgreSQL charm have the application name @@ -93,8 +94,10 @@ async def app_name(ops_test: OpsTest, application_name: str = "postgresql") -> O Note: if multiple clusters are running PostgreSQL this will return the one first found. """ - status = await ops_test.model.get_status() - for app in ops_test.model.applications: + if model is None: + model = ops_test.model + status = await model.get_status() + for app in model.applications: if application_name in status["applications"][app]["charm"]: return app diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py new file mode 100644 index 0000000000..c3e3153c8b --- /dev/null +++ b/tests/integration/ha_tests/test_async_replication.py @@ -0,0 +1,289 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +import contextlib +import logging +from asyncio import gather +from typing import Optional + +import pytest as pytest +from juju.controller import Controller +from juju.model import Model +from lightkube import Client +from lightkube.resources.core_v1 import Pod +from pytest_operator.plugin import OpsTest + +from tests.integration.ha_tests.helpers import ( + are_writes_increasing, + check_writes, + get_standby_leader, + get_sync_standby, + start_continuous_writes, app_name, +) +from tests.integration.helpers import ( + APPLICATION_NAME, + DATABASE_APP_NAME, + build_and_deploy, + get_leader_unit, + wait_for_relation_removed_between, +) +from tests.integration.juju_ import juju_major_version + +logger = logging.getLogger(__name__) + + +@contextlib.asynccontextmanager +async def fast_forward( + model: Model, fast_interval: str = "10s", slow_interval: Optional[str] = None +): + """Adaptation of OpsTest.fast_forward to work with different models.""" + update_interval_key = "update-status-hook-interval" + if slow_interval: + interval_after = slow_interval + else: + interval_after = (await model.get_config())[update_interval_key] + + await model.set_config({update_interval_key: fast_interval}) + yield + await model.set_config({update_interval_key: interval_after}) + + +@pytest.fixture(scope="module") +async def controller(first_model) -> Controller: + """Return the controller.""" + return await first_model.get_controller() + + +@pytest.fixture(scope="module") +def first_model(ops_test: OpsTest) -> Model: + """Return the first model.""" + first_model = ops_test.model + return first_model + + +@pytest.fixture(scope="module") +async def second_model(controller, first_model) -> Model: + """Create and return the second model.""" + second_model_name = f"{first_model.info.name}-other" + await controller.add_model(second_model_name) + second_model = Model() + await second_model.connect(model_name=second_model_name) + return second_model + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_deploy_async_replication_setup( + ops_test: OpsTest, first_model: Model, second_model: Model +) -> None: + """Build and deploy two PostgreSQL cluster in two separate models to test async replication.""" + if not await app_name(ops_test): + await build_and_deploy(ops_test, 3, wait_for_idle=False) + if not await app_name(ops_test, second_model): + await build_and_deploy(ops_test, 3, wait_for_idle=False, model=second_model) + await ops_test.model.deploy(APPLICATION_NAME, num_units=1) + + async with ops_test.fast_forward(), fast_forward(second_model): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME, APPLICATION_NAME], + status="active", + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="active", + ), + ) + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_async_replication( + ops_test: OpsTest, + controller: Controller, + first_model: Model, + second_model: Model, + continuous_writes, +) -> None: + """Test async replication between two PostgreSQL clusters.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + offer_endpoint = ( + f"{DATABASE_APP_NAME}:async-primary" if juju_major_version == 2 else "async-primary" + ) + await first_model.create_offer(offer_endpoint, "async-primary", DATABASE_APP_NAME) + await second_model.consume( + f"admin/{first_model.info.name}.async-primary", controller=controller + ) + + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + ) + + await second_model.relate(DATABASE_APP_NAME, "async-primary") + + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the first cluster") + run_action = await leader_unit.run_action("promote-standby-cluster") + await run_action.wait() + + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +async def test_break_and_reestablish_relation( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that the relation can be broken and re-established.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + logger.info("breaking the relation") + await second_model.applications[DATABASE_APP_NAME].remove_relation( + "async-replica", "async-primary" + ) + wait_for_relation_removed_between(ops_test, "async-primary", "async-replica", second_model) + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + ) + + logger.info("reestablishing the relation") + await second_model.relate(DATABASE_APP_NAME, "async-primary") + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the first cluster") + run_action = await leader_unit.run_action("promote-standby-cluster") + await run_action.wait() + + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +async def test_async_replication_failover_in_main_cluster( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that async replication fails over correctly.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) + logger.info(f"Sync-standby: {sync_standby}") + logger.info("deleting the sync-standby pod") + client = Client(namespace=first_model.info.name) + client.delete(Pod, name=sync_standby.replace("/", "-")) + + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + ) + + # Check that the sync-standby unit is not the same as before. + new_sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) + logger.info(f"New sync-standby: {new_sync_standby}") + assert new_sync_standby != sync_standby, "Sync-standby is the same as before" + + logger.info("Ensure continuous_writes after the crashed unit") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +async def test_async_replication_failover_in_secondary_cluster( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that async replication fails back correctly.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + standby_leader = await get_standby_leader(second_model, DATABASE_APP_NAME) + logger.info(f"Standby leader: {standby_leader}") + logger.info("deleting the standby leader pod") + client = Client(namespace=second_model.info.name) + client.delete(Pod, name=standby_leader.replace("/", "-")) + + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + ) + + # Check that the standby leader unit is not the same as before. + new_standby_leader = await get_standby_leader(second_model, DATABASE_APP_NAME) + assert new_standby_leader != standby_leader, "Standby leader is the same as before" + + logger.info("Ensure continuous_writes after the crashed unit") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) From c2bf0c9ebc7b1d51606e3f84e8a85fd900c5993b Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Thu, 8 Feb 2024 09:53:56 -0300 Subject: [PATCH 4/6] Add updates from K8S PR Signed-off-by: Marcelo Henrique Neppel --- src/relations/async_replication.py | 148 ++++++++++++------ tests/integration/ha_tests/helpers.py | 36 +++++ .../ha_tests/test_async_replication.py | 47 +++--- tests/integration/helpers.py | 33 ++++ 4 files changed, 199 insertions(+), 65 deletions(-) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 29b65b5fad..232e8d5c46 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -14,11 +14,12 @@ import os import pwd import shutil +import subprocess +from datetime import datetime from pathlib import Path from subprocess import PIPE, run from typing import Dict, Optional, Set, Tuple -from charms.operator_libs_linux.v2 import snap from ops.charm import ( ActionEvent, CharmBase, @@ -26,14 +27,18 @@ from ops.framework import Object from ops.model import ( Unit, + WaitingStatus, ) +from tenacity import Retrying, stop_after_attempt, wait_fixed from constants import ( APP_SCOPE, MONITORING_PASSWORD_KEY, PATRONI_CONF_PATH, POSTGRESQL_DATA_PATH, + REPLICATION_PASSWORD_KEY, REWIND_PASSWORD_KEY, + USER_PASSWORD_KEY, ) from coordinator_ops import CoordinatedOpsManager @@ -98,9 +103,12 @@ def __init__(self, charm: CharmBase, relation_name: str = ASYNC_PRIMARY_RELATION @property def endpoint(self) -> str: """Assumes the endpoint is the same, disregard if we are a primary or standby cluster.""" - for rel in self.relation_set: - return str(self.charm.model.get_binding(rel).network.ingress_address) - return None + sync_standby_names = self.charm._patroni.get_sync_standby_names() + if len(sync_standby_names) > 0: + unit = self.model.get_unit(sync_standby_names[0]) + return self.charm.get_unit_ip(unit) + else: + return self.charm.get_unit_ip(self.charm.unit) def standby_endpoints(self) -> Set[str]: """Returns the set of IPs used by each standby unit with a /32 mask.""" @@ -109,24 +117,27 @@ def standby_endpoints(self) -> Set[str]: for unit in self._all_units(rel): if not rel.data[unit].get("elected", None): standby_endpoints.add("{}/32".format(str(rel.data[unit]["ingress-address"]))) - if "unit-address" in rel.data[unit]: - standby_endpoints.add("{}/32".format(str(rel.data[unit]["unit-address"]))) + if "pod-address" in rel.data[unit]: + standby_endpoints.add("{}/32".format(str(rel.data[unit]["pod-address"]))) return standby_endpoints - def get_primary_data(self) -> Dict[str, str]: + def get_primary_data(self) -> Optional[Dict[str, str]]: """Returns the primary info, if available and if the primary cluster is ready.""" for rel in self.relation_set: for unit in self._all_units(rel): if "elected" in rel.data[unit] and unit.name == self.charm.unit.name: # If this unit is the leader, then return None return None + if rel.data[unit].get("elected", None) and rel.data[unit].get( "primary-cluster-ready", None ): elected_data = json.loads(rel.data[unit]["elected"]) return { "endpoint": str(elected_data["endpoint"]), + "monitoring-password": elected_data["monitoring-password"], "replication-password": elected_data["replication-password"], + "rewind-password": elected_data["rewind-password"], "superuser-password": elected_data["superuser-password"], } return None @@ -153,20 +164,22 @@ def _on_departure(self, _): ]: if not rel: # if no relation exits, then it rel == None continue - if "unit-address" in rel.data[self.charm.unit]: - del rel.data[self.charm.unit]["unit-address"] + if "pod-address" in rel.data[self.charm.unit]: + del rel.data[self.charm.unit]["pod-address"] if "elected" in rel.data[self.charm.unit]: del rel.data[self.charm.unit]["elected"] if "primary-cluster-ready" in rel.data[self.charm.unit]: del rel.data[self.charm.unit]["primary-cluster-ready"] + if self.charm.unit.is_leader() and "promoted" in self.charm.app_peer_data: + del self.charm.app_peer_data["promoted"] - snap_cache = snap.SnapCache() - charmed_postgresql_snap = snap_cache["charmed-postgresql"] - if not charmed_postgresql_snap.present: - raise Exception("Cannot start/stop service, snap is not yet installed.") - charmed_postgresql_snap.stop(services=["patroni"]) + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): + with attempt: + if not self.charm._patroni.stop_patroni(): + raise Exception("Failed to stop patroni service.") self.charm.update_config() - charmed_postgresql_snap.start(services=["patroni"]) + if not self.charm._patroni.start_patroni(): + raise Exception("Failed to start patroni service.") def _on_primary_changed(self, event): """Triggers a configuration change in the primary units.""" @@ -193,17 +206,17 @@ def _on_primary_changed(self, event): # We will have more events happening, no need for retrigger event.defer() return - logger.info("_on_primary_changed: all replicas published unit details") + logger.info("_on_primary_changed: all replicas published pod details") # This unit is the leader, generate a new configuration and leave. # There is nothing to do for the leader. - snap_cache = snap.SnapCache() - charmed_postgresql_snap = snap_cache["charmed-postgresql"] - if not charmed_postgresql_snap.present: - raise Exception("Cannot start/stop service, snap is not yet installed.") - charmed_postgresql_snap.stop(services=["patroni"]) + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): + with attempt: + if not self.charm._patroni.stop_patroni(): + raise Exception("Failed to stop patroni service.") self.charm.update_config() - charmed_postgresql_snap.start(services=["patroni"]) + if not self.charm._patroni.start_patroni(): + raise Exception("Failed to start patroni service.") # Retrigger the other units' async-replica-changed primary_relation.data[self.charm.unit]["primary-cluster-ready"] = "True" @@ -229,7 +242,8 @@ def _on_standby_changed(self, event): # noqa C901 return logger.info("_on_standby_changed: unit-address published in own replica databag") - if not self.get_primary_data(): + primary_data = self.get_primary_data() + if not primary_data: # We've made thus far. # However, the get_primary_data will return != None ONLY if the primary cluster # is ready and configured. Until then, we wait. @@ -237,6 +251,27 @@ def _on_standby_changed(self, event): # noqa C901 return logger.info("_on_standby_changed: primary cluster is ready") + if "system-id" not in replica_relation.data[self.charm.unit]: + system_identifier, error = self.get_system_identifier() + if error is not None: + raise Exception(f"Failed to get system identifier: {error}") + replica_relation.data[self.charm.unit]["system-id"] = system_identifier + + if self.charm.unit.is_leader(): + self.charm.set_secret( + APP_SCOPE, MONITORING_PASSWORD_KEY, primary_data["monitoring-password"] + ) + self.charm.set_secret( + APP_SCOPE, USER_PASSWORD_KEY, primary_data["superuser-password"] + ) + self.charm.set_secret( + APP_SCOPE, REPLICATION_PASSWORD_KEY, primary_data["replication-password"] + ) + self.charm.set_secret( + APP_SCOPE, REWIND_PASSWORD_KEY, primary_data["rewind-password"] + ) + del self.charm._peers.data[self.charm.app]["cluster_initialised"] + ################ # Initiate restart logic ################ @@ -244,7 +279,7 @@ def _on_standby_changed(self, event): # noqa C901 # We need to: # 1) Stop all standby units # 2) Delete the k8s service - # 3) Remove the pgdata folder + # 3) Remove the pgdata folder (if the clusters are different) # 4) Start all standby units # For that, the peer leader must first stop its own service and then, issue a # coordination request to all units. All units ack that request once they all have @@ -256,33 +291,51 @@ def _on_standby_changed(self, event): # noqa C901 self.restart_coordinator.coordinate() def _on_coordination_request(self, event): + # Stop the service. + # We need all replicas to be stopped, so we can remove the previous cluster info. + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3), reraise=True): + with attempt: + if not self.charm._patroni.stop_patroni(): + raise Exception("Failed to stop patroni service.") + + replica_relation = self.model.get_relation(ASYNC_REPLICA_RELATION) + for unit in replica_relation.units: + if "elected" not in replica_relation.data[unit]: + continue + elected_data = json.loads(replica_relation.data[unit]["elected"]) + if "system-id" not in elected_data: + continue + if replica_relation.data[self.charm.unit]["system-id"] != elected_data["system-id"]: + if self.charm.unit.is_leader(): + # Store current data in a ZIP file, clean folder and generate configuration. + logger.info("Creating backup of pgdata folder") + subprocess.check_call( + f"tar -zcf {POSTGRESQL_DATA_PATH}-{str(datetime.now()).replace(' ', '-').replace(':', '-')}.zip {POSTGRESQL_DATA_PATH}".split() + ) + logger.info("Removing and recreating pgdata folder") + try: + path = Path(POSTGRESQL_DATA_PATH) + if path.exists() and path.is_dir(): + shutil.rmtree(path) + except OSError as e: + raise Exception( + f"Failed to remove contents of the data directory with error: {str(e)}" + ) + break self.restart_coordinator.acknowledge(event) def _on_coordination_approval(self, event): """Runs when the coordinator guaranteed all units have stopped.""" - snap_cache = snap.SnapCache() - charmed_postgresql_snap = snap_cache["charmed-postgresql"] - if not charmed_postgresql_snap.present: - raise Exception("Cannot restart service, snap is not yet installed.") - charmed_postgresql_snap.stop(services=["patroni"]) - - # Clean folder and generate configuration. - try: - path = Path(POSTGRESQL_DATA_PATH) - if path.exists() and path.is_dir(): - shutil.rmtree(path) - except OSError as e: - raise Exception( - f"Failed to remove contents of the data directory with error: {str(e)}" - ) - self.charm.update_config() - logger.info("_on_standby_changed: configuration done, waiting for restart of the service") - - # We are ready to restart the service now: all peers have configured themselves. - charmed_postgresql_snap.start(services=["patroni"]) + logger.info( + "_on_coordination_approval: configuration done, waiting for restart of the service" + ) if self.charm.unit.is_leader(): + # We are ready to restart the service now: all peers have configured themselves. + if not self.charm._patroni.start_patroni(): + raise Exception("Failed to start patroni service.") + # Remove previous cluster information to make it possible to initialise a new cluster. logger.info("Removing previous cluster information") @@ -313,12 +366,17 @@ def result(): raise Exception( f"Failed to remove previous cluster information with error: {process.stderr.decode()}" ) + self.charm._peers.data[self.charm.app]["cluster_initialised"] = "True" + else: + self.charm.unit.status = WaitingStatus("waiting for primary to be ready") + event.defer() + return def _get_primary_candidates(self): rel = self.model.get_relation(ASYNC_PRIMARY_RELATION) return rel.units if rel else [] - def _check_if_primary_already_selected(self) -> Unit: + def _check_if_primary_already_selected(self) -> Optional[Unit]: """Returns the unit if a primary is present.""" result = None if not self.relation_set: diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index dd55ee2e1c..b1fac352a1 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -360,6 +360,42 @@ def get_random_unit(ops_test: OpsTest, app: str) -> str: return random.choice(ops_test.model.applications[app].units).name +async def get_standby_leader(model: Model, application_name: str) -> str: + """Get the standby leader name. + + Args: + model: the model instance. + application_name: the name of the application to get the value for. + + Returns: + the name of the standby leader. + """ + status = await model.get_status() + first_unit_ip = list(status["applications"][application_name]["units"].values())[0]["address"] + cluster = get_patroni_cluster(first_unit_ip) + for member in cluster["members"]: + if member["role"] == "standby_leader": + return member["name"] + + +async def get_sync_standby(model: Model, application_name: str) -> str: + """Get the sync_standby name. + + Args: + model: the model instance. + application_name: the name of the application to get the value for. + + Returns: + the name of the sync standby. + """ + status = await model.get_status() + first_unit_ip = list(status["applications"][application_name]["units"].values())[0]["address"] + cluster = get_patroni_cluster(first_unit_ip) + for member in cluster["members"]: + if member["role"] == "sync_standby": + return member["name"] + + async def get_password(ops_test: OpsTest, app: str, down_unit: str = None) -> str: """Use the charm action to retrieve the password from provided application. diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index 10aa96b5ff..cc86e7e296 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -9,11 +9,16 @@ import pytest as pytest from juju.controller import Controller from juju.model import Model -from lightkube import Client -from lightkube.resources.core_v1 import Pod from pytest_operator.plugin import OpsTest -from tests.integration.ha_tests.helpers import ( +from ..helpers import ( + APPLICATION_NAME, + DATABASE_APP_NAME, + get_leader_unit, + wait_for_relation_removed_between, +) +from ..juju_ import juju_major_version +from .helpers import ( app_name, are_writes_increasing, check_writes, @@ -21,14 +26,6 @@ get_sync_standby, start_continuous_writes, ) -from tests.integration.helpers import ( - APPLICATION_NAME, - DATABASE_APP_NAME, - build_and_deploy, - get_leader_unit, - wait_for_relation_removed_between, -) -from tests.integration.juju_ import juju_major_version logger = logging.getLogger(__name__) @@ -79,9 +76,19 @@ async def test_deploy_async_replication_setup( ) -> None: """Build and deploy two PostgreSQL cluster in two separate models to test async replication.""" if not await app_name(ops_test): - await build_and_deploy(ops_test, 3, wait_for_idle=False) - if not await app_name(ops_test, second_model): - await build_and_deploy(ops_test, 3, wait_for_idle=False, model=second_model) + charm = await ops_test.build_charm(".") + await ops_test.model.deploy( + charm, + num_units=3, + config={"profile": "testing"}, + ) + if not await app_name(ops_test, model=second_model): + charm = await ops_test.build_charm(".") + await second_model.deploy( + charm, + num_units=3, + config={"profile": "testing"}, + ) await ops_test.model.deploy(APPLICATION_NAME, num_units=1) async with ops_test.fast_forward(), fast_forward(second_model): @@ -89,10 +96,12 @@ async def test_deploy_async_replication_setup( first_model.wait_for_idle( apps=[DATABASE_APP_NAME, APPLICATION_NAME], status="active", + timeout=1500, ), second_model.wait_for_idle( apps=[DATABASE_APP_NAME], status="active", + timeout=1500, ), ) @@ -230,9 +239,8 @@ async def test_async_replication_failover_in_main_cluster( sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) logger.info(f"Sync-standby: {sync_standby}") - logger.info("deleting the sync-standby pod") - client = Client(namespace=first_model.info.name) - client.delete(Pod, name=sync_standby.replace("/", "-")) + logger.info("removing the sync-standby unit") + await first_model.applications[DATABASE_APP_NAME].remove_unit(sync_standby) async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): await gather( @@ -267,9 +275,8 @@ async def test_async_replication_failover_in_secondary_cluster( standby_leader = await get_standby_leader(second_model, DATABASE_APP_NAME) logger.info(f"Standby leader: {standby_leader}") - logger.info("deleting the standby leader pod") - client = Client(namespace=second_model.info.name) - client.delete(Pod, name=standby_leader.replace("/", "-")) + logger.info("removing the standby leader unit") + await second_model.applications[DATABASE_APP_NAME].remove_unit(standby_leader) async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): await gather( diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 0bbecae6c0..7227481bc3 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -16,6 +16,7 @@ import psycopg2 import requests import yaml +from juju.model import Model from juju.unit import Unit from pytest_operator.plugin import OpsTest from tenacity import ( @@ -761,6 +762,18 @@ async def check_tls_patroni_api(ops_test: OpsTest, unit_name: str, enabled: bool return False +def has_relation_exited( + ops_test: OpsTest, endpoint_one: str, endpoint_two: str, model: Model = None +) -> bool: + """Returns true if the relation between endpoint_one and endpoint_two has been removed.""" + relations = model.relations if model is not None else ops_test.model.relations + for rel in relations: + endpoints = [endpoint.name for endpoint in rel.endpoints] + if endpoint_one in endpoints and endpoint_two in endpoints: + return False + return True + + def remove_chown_workaround(original_charm_filename: str, patched_charm_filename: str) -> None: """Remove the chown workaround from the charm.""" with zipfile.ZipFile(original_charm_filename, "r") as charm_file, zipfile.ZipFile( @@ -978,3 +991,23 @@ async def wait_for_idle_on_blocked( ), ops_test.model.block_until(lambda: unit.workload_status_message == status_message), ) + + +def wait_for_relation_removed_between( + ops_test: OpsTest, endpoint_one: str, endpoint_two: str, model: Model = None +) -> None: + """Wait for relation to be removed before checking if it's waiting or idle. + + Args: + ops_test: running OpsTest instance + endpoint_one: one endpoint of the relation. Doesn't matter if it's provider or requirer. + endpoint_two: the other endpoint of the relation. + model: optional model to check for the relation. + """ + try: + for attempt in Retrying(stop=stop_after_delay(3 * 60), wait=wait_fixed(3)): + with attempt: + if has_relation_exited(ops_test, endpoint_one, endpoint_two, model): + break + except RetryError: + assert False, "Relation failed to exit after 3 minutes." From cf93239438a31537043a42c8580ebae2b3ad7605 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 20 Mar 2024 20:08:11 -0300 Subject: [PATCH 5/6] Updates from K8S PR Signed-off-by: Marcelo Henrique Neppel --- src/relations/async_replication.py | 41 ++- tests/integration/ha_tests/helpers.py | 63 +++-- .../ha_tests/test_async_replication.py | 250 +++++++++--------- tests/integration/helpers.py | 7 +- 4 files changed, 206 insertions(+), 155 deletions(-) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 232e8d5c46..b9d5a74f1a 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -27,9 +27,9 @@ from ops.framework import Object from ops.model import ( Unit, - WaitingStatus, + WaitingStatus, MaintenanceStatus, ActiveStatus, ) -from tenacity import Retrying, stop_after_attempt, wait_fixed +from tenacity import Retrying, stop_after_attempt, wait_fixed, RetryError from constants import ( APP_SCOPE, @@ -106,9 +106,9 @@ def endpoint(self) -> str: sync_standby_names = self.charm._patroni.get_sync_standby_names() if len(sync_standby_names) > 0: unit = self.model.get_unit(sync_standby_names[0]) - return self.charm.get_unit_ip(unit) + return self.charm._get_unit_ip(unit) else: - return self.charm.get_unit_ip(self.charm.unit) + return self.charm._get_unit_ip(self.charm.unit) def standby_endpoints(self) -> Set[str]: """Returns the set of IPs used by each standby unit with a /32 mask.""" @@ -186,6 +186,7 @@ def _on_primary_changed(self, event): primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) if not primary_relation: return + self.charm.unit.status = MaintenanceStatus("configuring main cluster") logger.info("_on_primary_changed: primary_relation exists") primary = self._check_if_primary_already_selected() @@ -193,39 +194,50 @@ def _on_primary_changed(self, event): # primary may not be available because the action of promoting a cluster was # executed way after the relation changes. # Defer it until + logger.debug("defer _on_primary_changed: primary not present") event.defer() return logger.info("_on_primary_changed: primary cluster exists") if primary.name != self.charm.unit.name: # this unit is not the system leader + logger.debug("early exit _on_primary_changed: unit is not the primary's leader") + self.charm.unit.status = ActiveStatus() return logger.info("_on_primary_changed: unit is the primary's leader") if not self._all_replica_published_unit_ips(): # We will have more events happening, no need for retrigger + logger.debug("defer _on_primary_changed: not all replicas published pod details") event.defer() return logger.info("_on_primary_changed: all replicas published pod details") # This unit is the leader, generate a new configuration and leave. # There is nothing to do for the leader. - for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): - with attempt: - if not self.charm._patroni.stop_patroni(): - raise Exception("Failed to stop patroni service.") + try: + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): + with attempt: + if not self.charm._patroni.stop_patroni(): + raise Exception("Failed to stop patroni service.") + except RetryError: + logger.debug("defer _on_primary_changed: failed to stop the container") + event.defer() + return self.charm.update_config() if not self.charm._patroni.start_patroni(): raise Exception("Failed to start patroni service.") # Retrigger the other units' async-replica-changed primary_relation.data[self.charm.unit]["primary-cluster-ready"] = "True" + self.charm.unit.status = ActiveStatus() def _on_standby_changed(self, event): # noqa C901 """Triggers a configuration change.""" replica_relation = self.model.get_relation(ASYNC_REPLICA_RELATION) if not replica_relation: return + self.charm.unit.status = MaintenanceStatus("configuring standby cluster") logger.info("_on_standby_changed: replica relation available") primary = self._check_if_primary_already_selected() @@ -272,6 +284,9 @@ def _on_standby_changed(self, event): # noqa C901 ) del self.charm._peers.data[self.charm.app]["cluster_initialised"] + if "cluster_initialised" in self.charm._peers.data[self.charm.app]: + return + ################ # Initiate restart logic ################ @@ -290,6 +305,8 @@ def _on_standby_changed(self, event): # noqa C901 # The leader now requests a ack from each unit that they have stopped. self.restart_coordinator.coordinate() + self.charm.unit.status = WaitingStatus("waiting for promotion of the main cluster") + def _on_coordination_request(self, event): # Stop the service. # We need all replicas to be stopped, so we can remove the previous cluster info. @@ -417,12 +434,20 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: event.fail(f"Cannot promote - {unit.name} is already primary: demote it first") return + system_identifier, error = self.get_system_identifier() + if error is not None: + event.fail(f"Failed to get system identifier: {error}") + return + # If this is a standby-leader, then execute switchover logic primary_relation.data[self.charm.unit]["elected"] = json.dumps( { "endpoint": self.endpoint, + "monitoring-password": self.charm.get_secret(APP_SCOPE, MONITORING_PASSWORD_KEY), "replication-password": self.charm._patroni.replication_password, + "rewind-password": self.charm.get_secret(APP_SCOPE, REWIND_PASSWORD_KEY), "superuser-password": self.charm._patroni.superuser_password, + "system-id": system_identifier, } ) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index b1fac352a1..3c171d7126 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -189,11 +189,12 @@ async def is_cluster_updated(ops_test: OpsTest, primary_name: str) -> None: ), "secondary not up to date with the cluster after restarting." -async def check_writes(ops_test) -> int: +async def check_writes(ops_test, extra_model: Model = None) -> int: """Gets the total writes from the test charm and compares to the writes from db.""" total_expected_writes = await stop_continuous_writes(ops_test) - actual_writes, max_number_written = await count_writes(ops_test) + actual_writes, max_number_written = await count_writes(ops_test, extra_model=extra_model) for member, count in actual_writes.items(): + print(f"member: {member}, count: {count}, max_number_written: {max_number_written[member]}, total_expected_writes: {total_expected_writes}") assert ( count == max_number_written[member] ), f"{member}: writes to the db were missed: count of actual writes different from the max number written." @@ -202,15 +203,22 @@ async def check_writes(ops_test) -> int: async def count_writes( - ops_test: OpsTest, down_unit: str = None + ops_test: OpsTest, down_unit: str = None, extra_model: Model = None ) -> Tuple[Dict[str, int], Dict[str, int]]: """Count the number of writes in the database.""" app = await app_name(ops_test) password = await get_password(ops_test, app, down_unit) - for unit in ops_test.model.applications[app].units: - if unit.name != down_unit: - cluster = get_patroni_cluster(await get_unit_ip(ops_test, unit.name)) - break + members = [] + for model in [ops_test.model, extra_model]: + if model is None: + continue + for unit in model.applications[app].units: + if unit.name != down_unit: + members_data = get_patroni_cluster(await get_unit_ip(ops_test, unit.name))["members"] + for index, member_data in enumerate(members_data): + members_data[index]["model"] = model.info.name + members.extend(members_data) + break down_ips = [] if down_unit: for unit in ops_test.model.applications[app].units: @@ -219,7 +227,7 @@ async def count_writes( down_ips.append(await get_unit_ip(ops_test, unit.name)) count = {} maximum = {} - for member in cluster["members"]: + for member in members: if member["role"] != "replica" and member["host"] not in down_ips: host = member["host"] @@ -228,12 +236,23 @@ async def count_writes( f" host='{host}' password='{password}' connect_timeout=10" ) - with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: - cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;") - results = cursor.fetchone() - count[member["name"]] = results[0] - maximum[member["name"]] = results[1] - connection.close() + member_name = f'{member["model"]}.{member["name"]}' + connection = None + try: + with psycopg2.connect( + connection_string + ) as connection, connection.cursor() as cursor: + cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;") + results = cursor.fetchone() + count[member_name] = results[0] + maximum[member_name] = results[1] + except psycopg2.Error: + # Error raised when the connection is not possible. + count[member_name] = -1 + maximum[member_name] = -1 + finally: + if connection is not None: + connection.close() return count, maximum @@ -412,20 +431,24 @@ async def get_password(ops_test: OpsTest, app: str, down_unit: str = None) -> st return action.results["password"] -async def get_unit_ip(ops_test: OpsTest, unit_name: str) -> str: +async def get_unit_ip(ops_test: OpsTest, unit_name: str, model: Model = None) -> str: """Wrapper for getting unit ip. Args: ops_test: The ops test object passed into every test case unit_name: The name of the unit to get the address + model: Optional model instance to use Returns: The (str) ip of the unit """ - application = unit_name.split("/")[0] - for unit in ops_test.model.applications[application].units: - if unit.name == unit_name: - break - return await instance_ip(ops_test, unit.machine.hostname) + if model is None: + application = unit_name.split("/")[0] + for unit in ops_test.model.applications[application].units: + if unit.name == unit_name: + break + return await instance_ip(ops_test, unit.machine.hostname) + else: + return get_unit_address(ops_test, unit_name) @retry(stop=stop_after_attempt(8), wait=wait_fixed(15), reraise=True) diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index cc86e7e296..dd8e70b854 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -170,128 +170,128 @@ async def test_async_replication( await check_writes(ops_test, extra_model=second_model) -@pytest.mark.group(1) -async def test_break_and_reestablish_relation( - ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes -) -> None: - """Test that the relation can be broken and re-established.""" - logger.info("starting continuous writes to the database") - await start_continuous_writes(ops_test, DATABASE_APP_NAME) - - logger.info("checking whether writes are increasing") - await are_writes_increasing(ops_test) - - logger.info("breaking the relation") - await second_model.applications[DATABASE_APP_NAME].remove_relation( - "async-replica", "async-primary" - ) - wait_for_relation_removed_between(ops_test, "async-primary", "async-replica", second_model) - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): - await gather( - first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - ) - - logger.info("reestablishing the relation") - await second_model.relate(DATABASE_APP_NAME, "async-primary") - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): - await gather( - first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - ) - - logger.info("checking whether writes are increasing") - await are_writes_increasing(ops_test) - - # Run the promote action. - logger.info("Get leader unit") - leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) - assert leader_unit is not None, "No leader unit found" - logger.info("promoting the first cluster") - run_action = await leader_unit.run_action("promote-standby-cluster") - await run_action.wait() - - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): - await gather( - first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - ) - - logger.info("checking whether writes are increasing") - await are_writes_increasing(ops_test) - - # Verify that no writes to the database were missed after stopping the writes - # (check that all the units have all the writes). - logger.info("checking whether no writes were lost") - await check_writes(ops_test, extra_model=second_model) - - -@pytest.mark.group(1) -async def test_async_replication_failover_in_main_cluster( - ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes -) -> None: - """Test that async replication fails over correctly.""" - logger.info("starting continuous writes to the database") - await start_continuous_writes(ops_test, DATABASE_APP_NAME) - - logger.info("checking whether writes are increasing") - await are_writes_increasing(ops_test) - - sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) - logger.info(f"Sync-standby: {sync_standby}") - logger.info("removing the sync-standby unit") - await first_model.applications[DATABASE_APP_NAME].remove_unit(sync_standby) - - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): - await gather( - first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - ) - - # Check that the sync-standby unit is not the same as before. - new_sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) - logger.info(f"New sync-standby: {new_sync_standby}") - assert new_sync_standby != sync_standby, "Sync-standby is the same as before" - - logger.info("Ensure continuous_writes after the crashed unit") - await are_writes_increasing(ops_test) - - # Verify that no writes to the database were missed after stopping the writes - # (check that all the units have all the writes). - logger.info("checking whether no writes were lost") - await check_writes(ops_test, extra_model=second_model) - - -@pytest.mark.group(1) -async def test_async_replication_failover_in_secondary_cluster( - ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes -) -> None: - """Test that async replication fails back correctly.""" - logger.info("starting continuous writes to the database") - await start_continuous_writes(ops_test, DATABASE_APP_NAME) - - logger.info("checking whether writes are increasing") - await are_writes_increasing(ops_test) - - standby_leader = await get_standby_leader(second_model, DATABASE_APP_NAME) - logger.info(f"Standby leader: {standby_leader}") - logger.info("removing the standby leader unit") - await second_model.applications[DATABASE_APP_NAME].remove_unit(standby_leader) - - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): - await gather( - first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - ) - - # Check that the standby leader unit is not the same as before. - new_standby_leader = await get_standby_leader(second_model, DATABASE_APP_NAME) - assert new_standby_leader != standby_leader, "Standby leader is the same as before" - - logger.info("Ensure continuous_writes after the crashed unit") - await are_writes_increasing(ops_test) - - # Verify that no writes to the database were missed after stopping the writes - # (check that all the units have all the writes). - logger.info("checking whether no writes were lost") - await check_writes(ops_test, extra_model=second_model) +# @pytest.mark.group(1) +# async def test_break_and_reestablish_relation( +# ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +# ) -> None: +# """Test that the relation can be broken and re-established.""" +# logger.info("starting continuous writes to the database") +# await start_continuous_writes(ops_test, DATABASE_APP_NAME) +# +# logger.info("checking whether writes are increasing") +# await are_writes_increasing(ops_test) +# +# logger.info("breaking the relation") +# await second_model.applications[DATABASE_APP_NAME].remove_relation( +# "async-replica", "async-primary" +# ) +# wait_for_relation_removed_between(ops_test, "async-primary", "async-replica", second_model) +# async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): +# await gather( +# first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), +# second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), +# ) +# +# logger.info("reestablishing the relation") +# await second_model.relate(DATABASE_APP_NAME, "async-primary") +# async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): +# await gather( +# first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), +# second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), +# ) +# +# logger.info("checking whether writes are increasing") +# await are_writes_increasing(ops_test) +# +# # Run the promote action. +# logger.info("Get leader unit") +# leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) +# assert leader_unit is not None, "No leader unit found" +# logger.info("promoting the first cluster") +# run_action = await leader_unit.run_action("promote-standby-cluster") +# await run_action.wait() +# +# async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): +# await gather( +# first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), +# second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), +# ) +# +# logger.info("checking whether writes are increasing") +# await are_writes_increasing(ops_test) +# +# # Verify that no writes to the database were missed after stopping the writes +# # (check that all the units have all the writes). +# logger.info("checking whether no writes were lost") +# await check_writes(ops_test, extra_model=second_model) +# +# +# @pytest.mark.group(1) +# async def test_async_replication_failover_in_main_cluster( +# ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +# ) -> None: +# """Test that async replication fails over correctly.""" +# logger.info("starting continuous writes to the database") +# await start_continuous_writes(ops_test, DATABASE_APP_NAME) +# +# logger.info("checking whether writes are increasing") +# await are_writes_increasing(ops_test) +# +# sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) +# logger.info(f"Sync-standby: {sync_standby}") +# logger.info("removing the sync-standby unit") +# await first_model.applications[DATABASE_APP_NAME].remove_unit(sync_standby) +# +# async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): +# await gather( +# first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), +# second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), +# ) +# +# # Check that the sync-standby unit is not the same as before. +# new_sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) +# logger.info(f"New sync-standby: {new_sync_standby}") +# assert new_sync_standby != sync_standby, "Sync-standby is the same as before" +# +# logger.info("Ensure continuous_writes after the crashed unit") +# await are_writes_increasing(ops_test) +# +# # Verify that no writes to the database were missed after stopping the writes +# # (check that all the units have all the writes). +# logger.info("checking whether no writes were lost") +# await check_writes(ops_test, extra_model=second_model) +# +# +# @pytest.mark.group(1) +# async def test_async_replication_failover_in_secondary_cluster( +# ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +# ) -> None: +# """Test that async replication fails back correctly.""" +# logger.info("starting continuous writes to the database") +# await start_continuous_writes(ops_test, DATABASE_APP_NAME) +# +# logger.info("checking whether writes are increasing") +# await are_writes_increasing(ops_test) +# +# standby_leader = await get_standby_leader(second_model, DATABASE_APP_NAME) +# logger.info(f"Standby leader: {standby_leader}") +# logger.info("removing the standby leader unit") +# await second_model.applications[DATABASE_APP_NAME].remove_unit(standby_leader) +# +# async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): +# await gather( +# first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), +# second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), +# ) +# +# # Check that the standby leader unit is not the same as before. +# new_standby_leader = await get_standby_leader(second_model, DATABASE_APP_NAME) +# assert new_standby_leader != standby_leader, "Standby leader is the same as before" +# +# logger.info("Ensure continuous_writes after the crashed unit") +# await are_writes_increasing(ops_test) +# +# # Verify that no writes to the database were missed after stopping the writes +# # (check that all the units have all the writes). +# logger.info("checking whether no writes were lost") +# await check_writes(ops_test, extra_model=second_model) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 7227481bc3..82298c6eda 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -643,17 +643,20 @@ async def get_tls_ca( return json.loads(relation_data[0]["application-data"]["certificates"])[0].get("ca") -def get_unit_address(ops_test: OpsTest, unit_name: str) -> str: +def get_unit_address(ops_test: OpsTest, unit_name: str, model: Model = None) -> str: """Get unit IP address. Args: ops_test: The ops test framework instance unit_name: The name of the unit + model: Optional model to use to get the unit address Returns: IP address of the unit """ - return ops_test.model.units.get(unit_name).public_address + if model is None: + model = ops_test.model + return model.units.get(unit_name).public_address async def check_tls(ops_test: OpsTest, unit_name: str, enabled: bool) -> bool: From 37edc516a4bf559af359fefd10041f455071500e Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Thu, 28 Mar 2024 15:48:35 -0300 Subject: [PATCH 6/6] Update from k8s code Signed-off-by: Marcelo Henrique Neppel --- src/cluster.py | 61 ++- src/relations/async_replication.py | 170 +++++-- tests/integration/ha_tests/helpers.py | 22 +- .../ha_tests/test_async_replication.py | 481 ++++++++++++------ 4 files changed, 524 insertions(+), 210 deletions(-) diff --git a/src/cluster.py b/src/cluster.py index 0f100bb8a0..c53f249be1 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -46,10 +46,22 @@ RUNNING_STATES = ["running", "streaming"] +class ClusterNotPromotedError(Exception): + """Raised when a cluster is not promoted.""" + + class NotReadyError(Exception): """Raised when not all cluster members healthy or finished initial sync.""" +class EndpointNotReadyError(Exception): + """Raised when an endpoint is not ready.""" + + +class StandbyClusterAlreadyPromotedError(Exception): + """Raised when a standby cluster is already promoted.""" + + class RemoveRaftMemberFailedError(Exception): """Raised when a remove raft member failed for some reason.""" @@ -243,6 +255,32 @@ def get_primary(self, unit_name_pattern=False) -> str: primary = "/".join(primary.rsplit("-", 1)) return primary + def get_standby_leader(self, unit_name_pattern=False) -> str: + """Get standby leader instance. + + Args: + unit_name_pattern: whether to convert pod name to unit name + + Returns: + standby leader pod or unit name. + """ + # Request info from cluster endpoint (which returns all members of the cluster). + for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)): + with attempt: + url = self._get_alternative_patroni_url(attempt) + cluster_status = requests.get( + f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}", + verify=self.verify, + timeout=API_REQUEST_TIMEOUT, + ) + for member in cluster_status.json()["members"]: + if member["role"] == "standby_leader": + standby_leader = member["name"] + if unit_name_pattern: + # Change the last dash to / in order to match unit name pattern. + standby_leader = "/".join(standby_leader.rsplit("-", 1)) + return standby_leader + def get_sync_standby_names(self) -> List[str]: """Get the list of sync standby unit names.""" sync_standbys = [] @@ -298,12 +336,12 @@ def are_all_members_ready(self) -> bool: except RetryError: return False - # Check if all members are running and one of them is a leader (primary), - # because sometimes there may exist (for some period of time) only - # replicas after a failed switchover. + # Check if all members are running and one of them is a leader (primary) or + # a standby leader, because sometimes there may exist (for some period of time) + # only replicas after a failed switchover. return all( member["state"] in RUNNING_STATES for member in cluster_status.json()["members"] - ) and any(member["role"] == "leader" for member in cluster_status.json()["members"]) + ) and any(member["role"] in ["leader", "standby_leader"] for member in cluster_status.json()["members"]) def get_patroni_health(self) -> Dict[str, str]: """Gets, retires and parses the Patroni health endpoint.""" @@ -425,6 +463,19 @@ def is_member_isolated(self) -> bool: return len(cluster_status.json()["members"]) == 0 + def promote_standby_cluster(self) -> None: + """Promote a standby cluster to be a regular cluster.""" + config_response = requests.get(f"{self._patroni_url}/config", verify=self.verify) + if "standby_cluster" not in config_response.json(): + raise StandbyClusterAlreadyPromotedError("standby cluster is already promoted") + requests.patch( + f"{self._patroni_url}/config", verify=self.verify, json={"standby_cluster": None} + ) + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + if self.get_primary() is None: + raise ClusterNotPromotedError("cluster not promoted") + def render_file(self, path: str, content: str, mode: int) -> None: """Write a content rendered from a template to a file. @@ -506,7 +557,7 @@ def render_patroni_yml_file( if primary else self.charm.async_manager.standby_endpoints(), ) - self.render_file(f"{PATRONI_CONF_PATH}/patroni.yaml", rendered, 0o600) + self.render_file(f"{PATRONI_CONF_PATH}/patroni.yaml", rendered, 0o644) def start_patroni(self) -> bool: """Start Patroni service using snap. diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index b9d5a74f1a..494c181b1e 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -1,13 +1,7 @@ -# Copyright 2023 Canonical Ltd. +# Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. -"""Implements the state-machine. - -1) First async replication relation is made: both units get blocked waiting for a leader -2) User runs the promote action against one of the clusters -3) The cluster moves leader and sets the async-replication data, marking itself as leader -4) The other units receive that new information and update themselves to become standby-leaders. -""" +"""Implements the state-machine.""" import json import logging @@ -26,11 +20,15 @@ ) from ops.framework import Object from ops.model import ( + ActiveStatus, + MaintenanceStatus, + Relation, Unit, - WaitingStatus, MaintenanceStatus, ActiveStatus, + WaitingStatus, ) -from tenacity import Retrying, stop_after_attempt, wait_fixed, RetryError +from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed +from cluster import ClusterNotPromotedError, StandbyClusterAlreadyPromotedError from constants import ( APP_SCOPE, MONITORING_PASSWORD_KEY, @@ -180,6 +178,7 @@ def _on_departure(self, _): self.charm.update_config() if not self.charm._patroni.start_patroni(): raise Exception("Failed to start patroni service.") + self.charm.unit.status = ActiveStatus() def _on_primary_changed(self, event): """Triggers a configuration change in the primary units.""" @@ -237,9 +236,9 @@ def _on_standby_changed(self, event): # noqa C901 replica_relation = self.model.get_relation(ASYNC_REPLICA_RELATION) if not replica_relation: return - self.charm.unit.status = MaintenanceStatus("configuring standby cluster") logger.info("_on_standby_changed: replica relation available") + self.charm.unit.status = MaintenanceStatus("configuring standby cluster") primary = self._check_if_primary_already_selected() if not primary: return @@ -310,6 +309,7 @@ def _on_standby_changed(self, event): # noqa C901 def _on_coordination_request(self, event): # Stop the service. # We need all replicas to be stopped, so we can remove the previous cluster info. + self.charm.unit.status = MaintenanceStatus("stopping database to enable async replication") for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3), reraise=True): with attempt: if not self.charm._patroni.stop_patroni(): @@ -338,60 +338,107 @@ def _on_coordination_request(self, event): raise Exception( f"Failed to remove contents of the data directory with error: {str(e)}" ) + os.mkdir(POSTGRESQL_DATA_PATH) + os.chmod(POSTGRESQL_DATA_PATH, 0o750) + self.charm._patroni._change_owner(POSTGRESQL_DATA_PATH) break + + # Remove previous cluster information to make it possible to initialise a new cluster. + logger.info("Removing previous cluster information") + try: + path = Path(f"{PATRONI_CONF_PATH}/raft") + if path.exists() and path.is_dir(): + shutil.rmtree(path) + except OSError as e: + raise Exception( + f"Failed to remove previous cluster information with error: {str(e)}" + ) + self.restart_coordinator.acknowledge(event) def _on_coordination_approval(self, event): """Runs when the coordinator guaranteed all units have stopped.""" + self.charm.unit.status = MaintenanceStatus("starting database to enable async replication") + self.charm.update_config() logger.info( "_on_coordination_approval: configuration done, waiting for restart of the service" ) + # We are ready to restart the service now: all peers have configured themselves. + if not self.charm._patroni.start_patroni(): + raise Exception("Failed to start patroni service.") + + try: + for attempt in Retrying(stop=stop_after_attempt(10), wait=wait_fixed(5)): + with attempt: + if not self.charm._patroni.member_started: + raise Exception + except RetryError: + logger.debug("defer _on_coordination_approval: database hasn't started yet") + event.defer() + return + if self.charm.unit.is_leader(): - # We are ready to restart the service now: all peers have configured themselves. - if not self.charm._patroni.start_patroni(): - raise Exception("Failed to start patroni service.") - - # Remove previous cluster information to make it possible to initialise a new cluster. - logger.info("Removing previous cluster information") - - def demote(): - pw_record = pwd.getpwnam("snap_daemon") - - def result(): - os.setgid(pw_record.pw_gid) - os.setuid(pw_record.pw_uid) - - return result - - process = run( - [ - "charmed-postgresql.patronictl", - "-c", - f"{PATRONI_CONF_PATH}/patroni.yaml", - "remove", - self.charm.cluster_name, - ], - input=f"{self.charm.cluster_name}\nYes I am aware\npostgresql-0\n".encode(), - stdout=PIPE, - stderr=PIPE, - preexec_fn=demote(), - timeout=10, + self._handle_leader_startup(event) + elif not self.charm._patroni.get_standby_leader(unit_name_pattern=True): + self.charm.unit.status = WaitingStatus("waiting for standby leader to be ready") + event.defer() + return + self.charm.unit.status = ActiveStatus() + + def _handle_leader_startup(self, event) -> None: + diverging_databases = False + try: + for attempt in Retrying(stop=stop_after_attempt(10), wait=wait_fixed(5)): + with attempt: + if ( + self.charm._patroni.get_standby_leader(unit_name_pattern=True) + != self.charm.unit.name + ): + raise Exception + except RetryError: + diverging_databases = True + if diverging_databases: + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3), reraise=True): + with attempt: + if not self.charm._patroni.stop_patroni(): + raise Exception("Failed to stop patroni service.") + + logger.info( + "Removing and recreating pgdata folder due to diverging databases between this and the other cluster" ) - if process.returncode != 0: + try: + path = Path(POSTGRESQL_DATA_PATH) + if path.exists() and path.is_dir(): + shutil.rmtree(path) + except OSError as e: raise Exception( - f"Failed to remove previous cluster information with error: {process.stderr.decode()}" + f"Failed to remove contents of the data directory with error: {str(e)}" ) - self.charm._peers.data[self.charm.app]["cluster_initialised"] = "True" - else: - self.charm.unit.status = WaitingStatus("waiting for primary to be ready") + os.mkdir(POSTGRESQL_DATA_PATH) + os.chmod(POSTGRESQL_DATA_PATH, 0o750) + self.charm._patroni._change_owner(POSTGRESQL_DATA_PATH) + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3), reraise=True): + with attempt: + if not self.charm._patroni.start_patroni(): + raise Exception("Failed to start patroni service.") + try: + for attempt in Retrying(stop=stop_after_attempt(10), wait=wait_fixed(5)): + with attempt: + if not self.charm._patroni.member_started: + raise Exception + except RetryError: + logger.debug("defer _on_coordination_approval: database hasn't started yet") + event.defer() + return + + if not self.charm._patroni.are_all_members_ready(): + self.charm.unit.status = WaitingStatus("waiting for all members to be ready") event.defer() return - def _get_primary_candidates(self): - rel = self.model.get_relation(ASYNC_PRIMARY_RELATION) - return rel.units if rel else [] + self.charm._peers.data[self.charm.app]["cluster_initialised"] = "True" def _check_if_primary_already_selected(self) -> Optional[Unit]: """Returns the unit if a primary is present.""" @@ -424,16 +471,32 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: event.fail("No relation found.") return primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) - if not primary_relation: - event.fail("No primary relation") - return - + if primary_relation: + self._promote_standby_cluster_from_two_clusters(event, primary_relation) + else: + # Remove the standby cluster information from the Patroni configuration. + try: + self.charm._patroni.promote_standby_cluster() + except Exception as e: + event.fail(str(e)) + + def _promote_standby_cluster_from_two_clusters( + self, event: ActionEvent, primary_relation: Relation + ) -> None: # Let the exception error the unit unit = self._check_if_primary_already_selected() if unit: - event.fail(f"Cannot promote - {unit.name} is already primary: demote it first") + event.fail(f"Cannot promote - {self.charm.app.name} is already the main cluster") return + try: + self.charm._patroni.promote_standby_cluster() + except StandbyClusterAlreadyPromotedError: + # Ignore this error for non-standby clusters. + pass + except ClusterNotPromotedError as e: + event.fail(str(e)) + system_identifier, error = self.get_system_identifier() if error is not None: event.fail(f"Failed to get system identifier: {error}") @@ -450,6 +513,7 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: "system-id": system_identifier, } ) + self.charm.app_peer_data["promoted"] = "True" # Now, check if postgresql it had originally published its unit IP in the # replica relation databag. Delete it, if yes. diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 3c171d7126..07b62dbf9b 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -75,14 +75,16 @@ async def are_all_db_processes_down(ops_test: OpsTest, process: str) -> bool: return True -async def are_writes_increasing(ops_test, down_unit: str = None) -> None: +async def are_writes_increasing(ops_test, down_unit: str = None, extra_model: Model = None) -> None: """Verify new writes are continuing by counting the number of writes.""" - writes, _ = await count_writes(ops_test, down_unit=down_unit) + writes, _ = await count_writes(ops_test, down_unit=down_unit, extra_model=extra_model) for member, count in writes.items(): for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): with attempt: - more_writes, _ = await count_writes(ops_test, down_unit=down_unit) - assert more_writes[member] > count, f"{member}: writes not continuing to DB" + more_writes, _ = await count_writes(ops_test, down_unit=down_unit, extra_model=extra_model) + assert ( + more_writes[member] > count + ), f"{member}: writes not continuing to DB (current writes: {more_writes[member]} - previous writes: {count})" async def app_name( @@ -669,24 +671,26 @@ async def is_secondary_up_to_date(ops_test: OpsTest, unit_name: str, expected_wr return True -async def start_continuous_writes(ops_test: OpsTest, app: str) -> None: +async def start_continuous_writes(ops_test: OpsTest, app: str, model: Model = None) -> None: """Start continuous writes to PostgreSQL.""" # Start the process by relating the application to the database or # by calling the action if the relation already exists. + if model is None: + model = ops_test.model relations = [ relation - for relation in ops_test.model.applications[app].relations + for relation in model.applications[app].relations if not relation.is_peer and f"{relation.requires.application_name}:{relation.requires.name}" == f"{APPLICATION_NAME}:first-database" ] if not relations: - await ops_test.model.relate(app, f"{APPLICATION_NAME}:first-database") - await ops_test.model.wait_for_idle(status="active", timeout=1000) + await model.relate(app, f"{APPLICATION_NAME}:first-database") + await model.wait_for_idle(status="active", timeout=1000) for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3), reraise=True): with attempt: action = ( - await ops_test.model.applications[APPLICATION_NAME] + await model.applications[APPLICATION_NAME] .units[0] .run_action("start-continuous-writes") ) diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index dd8e70b854..9e193a8c47 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -6,18 +6,22 @@ from asyncio import gather from typing import Optional +import psycopg2 import pytest as pytest from juju.controller import Controller from juju.model import Model from pytest_operator.plugin import OpsTest +from tenacity import Retrying, stop_after_delay, wait_fixed from ..helpers import ( APPLICATION_NAME, DATABASE_APP_NAME, get_leader_unit, + get_password, + get_primary, + get_unit_address, wait_for_relation_removed_between, ) -from ..juju_ import juju_major_version from .helpers import ( app_name, are_writes_increasing, @@ -30,6 +34,11 @@ logger = logging.getLogger(__name__) +FAST_INTERVAL = "60s" +IDLE_PERIOD = 30 +TIMEOUT = 2000 + + @contextlib.asynccontextmanager async def fast_forward( model: Model, fast_interval: str = "10s", slow_interval: Optional[str] = None @@ -69,10 +78,27 @@ async def second_model(controller, first_model) -> Model: return second_model +@pytest.fixture +async def second_model_continuous_writes(second_model) -> None: + """Cleans up continuous writes on the second model after a test run.""" + yield + # Clear the written data at the end. + for attempt in Retrying(stop=stop_after_delay(10), wait=wait_fixed(3), reraise=True): + with attempt: + action = ( + await second_model.applications[APPLICATION_NAME] + .units[0] + .run_action("clear-continuous-writes") + ) + await action.wait() + assert action.results["result"] == "True", "Unable to clear up continuous_writes table" + + @pytest.mark.group(1) +@pytest.mark.juju3 @pytest.mark.abort_on_fail async def test_deploy_async_replication_setup( - ops_test: OpsTest, first_model: Model, second_model: Model + ops_test: OpsTest, first_model: Model, second_model: Model, charm ) -> None: """Build and deploy two PostgreSQL cluster in two separate models to test async replication.""" if not await app_name(ops_test): @@ -90,23 +116,25 @@ async def test_deploy_async_replication_setup( config={"profile": "testing"}, ) await ops_test.model.deploy(APPLICATION_NAME, num_units=1) + await second_model.deploy(APPLICATION_NAME, num_units=1) async with ops_test.fast_forward(), fast_forward(second_model): await gather( first_model.wait_for_idle( apps=[DATABASE_APP_NAME, APPLICATION_NAME], status="active", - timeout=1500, + timeout=TIMEOUT, ), second_model.wait_for_idle( - apps=[DATABASE_APP_NAME], + apps=[DATABASE_APP_NAME, APPLICATION_NAME], status="active", - timeout=1500, + timeout=TIMEOUT, ), ) @pytest.mark.group(1) +@pytest.mark.juju3 @pytest.mark.abort_on_fail async def test_async_replication( ops_test: OpsTest, @@ -122,26 +150,233 @@ async def test_async_replication( logger.info("checking whether writes are increasing") await are_writes_increasing(ops_test) - offer_endpoint = ( - f"{DATABASE_APP_NAME}:async-primary" if juju_major_version == 2 else "async-primary" - ) - await first_model.create_offer(offer_endpoint, "async-primary", DATABASE_APP_NAME) + await first_model.create_offer("async-primary", "async-primary", DATABASE_APP_NAME) await second_model.consume( f"admin/{first_model.info.name}.async-primary", controller=controller ) - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( - first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), ) await second_model.relate(DATABASE_APP_NAME, "async-primary") - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the first cluster") + run_action = await leader_unit.run_action("promote-standby-cluster") + await run_action.wait() + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_switchover( + ops_test: OpsTest, + controller: Controller, + first_model: Model, + second_model: Model, + second_model_continuous_writes, +): + """Test switching over to the second cluster.""" + logger.info("breaking the relation") + await second_model.applications[DATABASE_APP_NAME].remove_relation( + "async-replica", "async-primary" + ) + wait_for_relation_removed_between(ops_test, "async-primary", "async-replica", second_model) + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + second_offer_command = f"offer {DATABASE_APP_NAME}:async-replica async-replica" + await ops_test.juju(*second_offer_command.split()) + await second_model.consume( + f"admin/{first_model.info.name}.async-replica", controller=controller + ) + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + await second_model.relate(DATABASE_APP_NAME, "async-replica") + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME, model=second_model) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the second cluster") + run_action = await leader_unit.run_action("promote-standby-cluster") + await run_action.wait() + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME, model=second_model) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_promote_standby( + ops_test: OpsTest, + controller: Controller, + first_model: Model, + second_model: Model, + second_model_continuous_writes, +) -> None: + """Test promoting the standby cluster.""" + logger.info("breaking the relation") + await second_model.applications[DATABASE_APP_NAME].remove_relation( + "async-primary", "async-replica" + ) + wait_for_relation_removed_between(ops_test, "async-replica", "async-primary", first_model) + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the first cluster") + run_action = await leader_unit.run_action("promote-standby-cluster") + await run_action.wait() + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("removing the previous data") + primary = await get_primary(ops_test) + address = get_unit_address(ops_test, primary) + password = await get_password(ops_test, primary) + database_name = f'{APPLICATION_NAME.replace("-", "_")}_first_database' + connection = None + try: + connection = psycopg2.connect( + f"dbname={database_name} user=operator password={password} host={address}" + ) + connection.autocommit = True + cursor = connection.cursor() + cursor.execute("DROP TABLE IF EXISTS continuous_writes;") + except psycopg2.Error as e: + assert False, f"Failed to drop continuous writes table: {e}" + finally: + if connection is not None: + connection.close() + + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_reestablish_relation( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that the relation can be broken and re-established.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + logger.info("reestablishing the relation") + await second_model.relate(DATABASE_APP_NAME, "async-primary") + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( - first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), ) logger.info("checking whether writes are increasing") @@ -155,10 +390,14 @@ async def test_async_replication( run_action = await leader_unit.run_action("promote-standby-cluster") await run_action.wait() - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( - first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), ) logger.info("checking whether writes are increasing") @@ -170,128 +409,84 @@ async def test_async_replication( await check_writes(ops_test, extra_model=second_model) -# @pytest.mark.group(1) -# async def test_break_and_reestablish_relation( -# ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes -# ) -> None: -# """Test that the relation can be broken and re-established.""" -# logger.info("starting continuous writes to the database") -# await start_continuous_writes(ops_test, DATABASE_APP_NAME) -# -# logger.info("checking whether writes are increasing") -# await are_writes_increasing(ops_test) -# -# logger.info("breaking the relation") -# await second_model.applications[DATABASE_APP_NAME].remove_relation( -# "async-replica", "async-primary" -# ) -# wait_for_relation_removed_between(ops_test, "async-primary", "async-replica", second_model) -# async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): -# await gather( -# first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), -# second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), -# ) -# -# logger.info("reestablishing the relation") -# await second_model.relate(DATABASE_APP_NAME, "async-primary") -# async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): -# await gather( -# first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), -# second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), -# ) -# -# logger.info("checking whether writes are increasing") -# await are_writes_increasing(ops_test) -# -# # Run the promote action. -# logger.info("Get leader unit") -# leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) -# assert leader_unit is not None, "No leader unit found" -# logger.info("promoting the first cluster") -# run_action = await leader_unit.run_action("promote-standby-cluster") -# await run_action.wait() -# -# async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): -# await gather( -# first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), -# second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), -# ) -# -# logger.info("checking whether writes are increasing") -# await are_writes_increasing(ops_test) -# -# # Verify that no writes to the database were missed after stopping the writes -# # (check that all the units have all the writes). -# logger.info("checking whether no writes were lost") -# await check_writes(ops_test, extra_model=second_model) -# -# -# @pytest.mark.group(1) -# async def test_async_replication_failover_in_main_cluster( -# ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes -# ) -> None: -# """Test that async replication fails over correctly.""" -# logger.info("starting continuous writes to the database") -# await start_continuous_writes(ops_test, DATABASE_APP_NAME) -# -# logger.info("checking whether writes are increasing") -# await are_writes_increasing(ops_test) -# -# sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) -# logger.info(f"Sync-standby: {sync_standby}") -# logger.info("removing the sync-standby unit") -# await first_model.applications[DATABASE_APP_NAME].remove_unit(sync_standby) -# -# async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): -# await gather( -# first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), -# second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), -# ) -# -# # Check that the sync-standby unit is not the same as before. -# new_sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) -# logger.info(f"New sync-standby: {new_sync_standby}") -# assert new_sync_standby != sync_standby, "Sync-standby is the same as before" -# -# logger.info("Ensure continuous_writes after the crashed unit") -# await are_writes_increasing(ops_test) -# -# # Verify that no writes to the database were missed after stopping the writes -# # (check that all the units have all the writes). -# logger.info("checking whether no writes were lost") -# await check_writes(ops_test, extra_model=second_model) -# -# -# @pytest.mark.group(1) -# async def test_async_replication_failover_in_secondary_cluster( -# ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes -# ) -> None: -# """Test that async replication fails back correctly.""" -# logger.info("starting continuous writes to the database") -# await start_continuous_writes(ops_test, DATABASE_APP_NAME) -# -# logger.info("checking whether writes are increasing") -# await are_writes_increasing(ops_test) -# -# standby_leader = await get_standby_leader(second_model, DATABASE_APP_NAME) -# logger.info(f"Standby leader: {standby_leader}") -# logger.info("removing the standby leader unit") -# await second_model.applications[DATABASE_APP_NAME].remove_unit(standby_leader) -# -# async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): -# await gather( -# first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), -# second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), -# ) -# -# # Check that the standby leader unit is not the same as before. -# new_standby_leader = await get_standby_leader(second_model, DATABASE_APP_NAME) -# assert new_standby_leader != standby_leader, "Standby leader is the same as before" -# -# logger.info("Ensure continuous_writes after the crashed unit") -# await are_writes_increasing(ops_test) -# -# # Verify that no writes to the database were missed after stopping the writes -# # (check that all the units have all the writes). -# logger.info("checking whether no writes were lost") -# await check_writes(ops_test, extra_model=second_model) +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_async_replication_failover_in_main_cluster( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that async replication fails over correctly.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) + logger.info(f"Sync-standby: {sync_standby}") + logger.info("deleting the sync-standby pod") + await first_model.applications[DATABASE_APP_NAME].destroy_units(sync_standby) + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + # Check that the sync-standby unit is not the same as before. + new_sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) + logger.info(f"New sync-standby: {new_sync_standby}") + assert new_sync_standby != sync_standby, "Sync-standby is the same as before" + + logger.info("Ensure continuous_writes after the crashed unit") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_async_replication_failover_in_secondary_cluster( + ops_test: OpsTest, + first_model: Model, + second_model: Model, + continuous_writes, + primary_start_timeout, +) -> None: + """Test that async replication fails back correctly.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + standby_leader = await get_standby_leader(second_model, DATABASE_APP_NAME) + logger.info(f"Standby leader: {standby_leader}") + logger.info("deleting the standby leader pod") + await first_model.applications[DATABASE_APP_NAME].destroy_units(standby_leader) + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("Ensure continuous_writes after the crashed unit") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model)