From 988d3cafc89d94b63df7440e19d07d296d82cecc Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 22 Jul 2024 16:56:36 -0300 Subject: [PATCH] [DPE-1177] Change 'master' to 'primary' in Patroni leader role (#532) * Change 'master' to 'primary' in Patroni leader role Signed-off-by: Marcelo Henrique Neppel * Handle upgrade and rollback Signed-off-by: Marcelo Henrique Neppel * Add integration tests Signed-off-by: Marcelo Henrique Neppel * Check labels and whether writes are increasing (the latter after the first unit upgraded) Signed-off-by: Marcelo Henrique Neppel * Check labels and whether writes are increasing (the latter after the first unit upgraded) in the rollback test Signed-off-by: Marcelo Henrique Neppel * Fix rollback revision and labels check Signed-off-by: Marcelo Henrique Neppel * Fix standby cluster initialisation Signed-off-by: Marcelo Henrique Neppel * Fix upgrade test Signed-off-by: Marcelo Henrique Neppel * Avoid errors on deploy and set specific revisions Signed-off-by: Marcelo Henrique Neppel * Add charm series Signed-off-by: Marcelo Henrique Neppel * Remove Retrying Signed-off-by: Marcelo Henrique Neppel --------- Signed-off-by: Marcelo Henrique Neppel --- lib/charms/postgresql_k8s/v0/postgresql.py | 13 +- src/charm.py | 54 ++++-- src/dependency.json | 6 +- src/relations/async_replication.py | 22 +-- src/upgrade.py | 17 ++ tests/integration/ha_tests/helpers.py | 41 +++- .../ha_tests/test_rollback_to_master_label.py | 178 ++++++++++++++++++ tests/integration/ha_tests/test_upgrade.py | 36 ++-- .../ha_tests/test_upgrade_to_primary_label.py | 135 +++++++++++++ tests/unit/test_charm.py | 39 +++- 10 files changed, 470 insertions(+), 71 deletions(-) create mode 100644 tests/integration/ha_tests/test_rollback_to_master_label.py create mode 100644 tests/integration/ha_tests/test_upgrade_to_primary_label.py diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index 64e6144332..f7d361b5a9 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -27,7 +27,6 @@ from ops.model import Relation from psycopg2 import sql from psycopg2.sql import Composed -from tenacity import Retrying, stop_after_attempt, wait_fixed # The unique Charmhub library identifier, never change it LIBID = "24ee217a54e840a598ff21a079c3e678" @@ -37,7 +36,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 32 +LIBPATCH = 33 INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles" @@ -129,12 +128,10 @@ def _connect_to_database( psycopg2 connection object. """ host = database_host if database_host is not None else self.primary_host - for attempt in Retrying(stop=stop_after_attempt(10), wait=wait_fixed(3), reraise=True): - with attempt: - connection = psycopg2.connect( - f"dbname='{database if database else self.database}' user='{self.user}' host='{host}'" - f"password='{self.password}' connect_timeout=1" - ) + connection = psycopg2.connect( + f"dbname='{database if database else self.database}' user='{self.user}' host='{host}'" + f"password='{self.password}' connect_timeout=1" + ) connection.autocommit = True return connection diff --git a/src/charm.py b/src/charm.py index ee446d9bd9..a2d210b6f0 100755 --- a/src/charm.py +++ b/src/charm.py @@ -173,7 +173,6 @@ def __init__(self, *args): self.framework.observe(self.on.postgresql_pebble_ready, self._on_postgresql_pebble_ready) self.framework.observe(self.on.pgdata_storage_detaching, self._on_pgdata_storage_detaching) self.framework.observe(self.on.stop, self._on_stop) - self.framework.observe(self.on.upgrade_charm, self._on_upgrade_charm) self.framework.observe(self.on.get_password_action, self._on_get_password) self.framework.observe(self.on.set_password_action, self._on_set_password) self.framework.observe(self.on.get_primary_action, self._on_get_primary) @@ -186,6 +185,7 @@ def __init__(self, *args): relation_name="upgrade", substrate="k8s", ) + self.framework.observe(self.on.upgrade_charm, self._on_upgrade_charm) self.postgresql_client_relation = PostgreSQLProvider(self) self.legacy_db_relation = DbProvides(self, admin=False) self.legacy_db_admin_relation = DbProvides(self, admin=True) @@ -779,12 +779,13 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None: return # Create resources and add labels needed for replication. - try: - self._create_services() - except ApiError: - logger.exception("failed to create k8s services") - self.unit.status = BlockedStatus("failed to create k8s services") - return + if self.upgrade.idle: + try: + self._create_services() + except ApiError: + logger.exception("failed to create k8s services") + self.unit.status = BlockedStatus("failed to create k8s services") + return # Remove departing units when the leader changes. self._remove_from_endpoints(self._get_endpoints_to_remove()) @@ -911,12 +912,23 @@ def _initialize_cluster(self, event: WorkloadEvent) -> bool: return False # Create resources and add labels needed for replication - try: - self._create_services() - except ApiError: - logger.exception("failed to create k8s services") - self.unit.status = BlockedStatus("failed to create k8s services") - return False + if self.upgrade.idle: + try: + self._create_services() + except ApiError: + logger.exception("failed to create k8s services") + self.unit.status = BlockedStatus("failed to create k8s services") + return False + + async_replication_primary_cluster = self.async_replication.get_primary_cluster() + if ( + async_replication_primary_cluster is not None + and async_replication_primary_cluster != self.app + ): + logger.debug( + "Early exit _initialize_cluster: not the primary cluster in async replication" + ) + return True if not self._patroni.primary_endpoint_ready: logger.debug( @@ -953,12 +965,13 @@ def is_blocked(self) -> bool: def _on_upgrade_charm(self, _) -> None: # Recreate k8s resources and add labels required for replication # when the pod loses them (like when it's deleted). - try: - self._create_services() - except ApiError: - logger.exception("failed to create k8s services") - self.unit.status = BlockedStatus("failed to create k8s services") - return + if self.upgrade.idle: + try: + self._create_services() + except ApiError: + logger.exception("failed to create k8s services") + self.unit.status = BlockedStatus("failed to create k8s services") + return try: self._patch_pod_labels(self.unit.name) @@ -1001,7 +1014,7 @@ def _create_services(self) -> None: ) services = { - "primary": "master", + "primary": "primary", "replicas": "replica", } for service_name_suffix, role_selector in services.items(): @@ -1474,6 +1487,7 @@ def _postgresql_layer(self) -> Layer: "group": WORKLOAD_OS_GROUP, "environment": { "PATRONI_KUBERNETES_LABELS": f"{{application: patroni, cluster-name: {self.cluster_name}}}", + "PATRONI_KUBERNETES_LEADER_LABEL_VALUE": "primary", "PATRONI_KUBERNETES_NAMESPACE": self._namespace, "PATRONI_KUBERNETES_USE_ENDPOINTS": "true", "PATRONI_NAME": pod_name, diff --git a/src/dependency.json b/src/dependency.json index 5e7e39d58d..fbe4dc6884 100644 --- a/src/dependency.json +++ b/src/dependency.json @@ -3,12 +3,12 @@ "dependencies": {"pgbouncer": ">0"}, "name": "postgresql", "upgrade_supported": ">0", - "version": "1" + "version": "2" }, "rock": { "dependencies": {}, "name": "charmed-postgresql", "upgrade_supported": "^14", - "version": "14.9" + "version": "14.11" } -} \ No newline at end of file +} diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index a5cf8153a4..c85b07ea5c 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -140,7 +140,7 @@ def _can_promote_cluster(self, event: ActionEvent) -> bool: # Check if this cluster is already the primary cluster. If so, fail the action telling that it's already # the primary cluster. - primary_cluster = self._get_primary_cluster() + primary_cluster = self.get_primary_cluster() if self.charm.app == primary_cluster: event.fail("This cluster is already the primary cluster.") return False @@ -217,7 +217,7 @@ def _get_highest_promoted_cluster_counter_value(self) -> str: promoted_cluster_counter = relation_promoted_cluster_counter return promoted_cluster_counter - def _get_primary_cluster(self) -> Optional[Application]: + def get_primary_cluster(self) -> Optional[Application]: """Return the primary cluster.""" primary_cluster = None promoted_cluster_counter = "0" @@ -240,7 +240,7 @@ def _get_primary_cluster(self) -> Optional[Application]: def get_primary_cluster_endpoint(self) -> Optional[str]: """Return the primary cluster endpoint.""" - primary_cluster = self._get_primary_cluster() + primary_cluster = self.get_primary_cluster() if primary_cluster is None or self.charm.app == primary_cluster: return None relation = self._relation @@ -252,7 +252,7 @@ def get_primary_cluster_endpoint(self) -> Optional[str]: def get_all_primary_cluster_endpoints(self) -> List[str]: """Return all the primary cluster endpoints.""" relation = self._relation - primary_cluster = self._get_primary_cluster() + primary_cluster = self.get_primary_cluster() # List the primary endpoints only for the standby cluster. if relation is None or primary_cluster is None or self.charm.app == primary_cluster: return [] @@ -294,7 +294,7 @@ def _get_secret(self) -> Secret: def get_standby_endpoints(self) -> List[str]: """Return the standby endpoints.""" relation = self._relation - primary_cluster = self._get_primary_cluster() + primary_cluster = self.get_primary_cluster() # List the standby endpoints only for the primary cluster. if relation is None or primary_cluster is None or self.charm.app != primary_cluster: return [] @@ -456,7 +456,7 @@ def _handle_replication_change(self, event: ActionEvent) -> bool: def _is_following_promoted_cluster(self) -> bool: """Return True if this unit is following the promoted cluster.""" - if self._get_primary_cluster() is None: + if self.get_primary_cluster() is None: return False return ( self.charm._peers.data[self.charm.unit].get("unit-promoted-cluster-counter") @@ -465,7 +465,7 @@ def _is_following_promoted_cluster(self) -> bool: def is_primary_cluster(self) -> bool: """Return the primary cluster name.""" - return self.charm.app == self._get_primary_cluster() + return self.charm.app == self.get_primary_cluster() def _on_async_relation_broken(self, _) -> None: if self.charm._peers is None or "departing" in self.charm._peers.data[self.charm.unit]: @@ -493,7 +493,7 @@ def _on_async_relation_changed(self, event: RelationChangedEvent) -> None: if self.charm.unit.is_leader(): self._set_app_status() - primary_cluster = self._get_primary_cluster() + primary_cluster = self.get_primary_cluster() logger.debug("Primary cluster: %s", primary_cluster) if primary_cluster is None: logger.debug("Early exit on_async_relation_changed: No primary cluster found.") @@ -559,7 +559,7 @@ def _on_async_relation_departed(self, event: RelationDepartedEvent) -> None: def _on_create_replication(self, event: ActionEvent) -> None: """Set up asynchronous replication between two clusters.""" - if self._get_primary_cluster() is not None: + if self.get_primary_cluster() is not None: event.fail("There is already a replication set up.") return @@ -580,7 +580,7 @@ def _on_promote_to_primary(self, event: ActionEvent) -> None: """Promote this cluster to the primary cluster.""" if ( self.charm.app.status.message != READ_ONLY_MODE_BLOCKING_MESSAGE - and self._get_primary_cluster() is None + and self.get_primary_cluster() is None ): event.fail( "No primary cluster found. Run `create-replication` action in the cluster where the offer was created." @@ -683,7 +683,7 @@ def _set_app_status(self) -> None: if self._relation is None: self.charm.app.status = ActiveStatus() return - primary_cluster = self._get_primary_cluster() + primary_cluster = self.get_primary_cluster() if primary_cluster is None: self.charm.app.status = ActiveStatus() else: diff --git a/src/upgrade.py b/src/upgrade.py index e41ababbff..1a9976d78a 100644 --- a/src/upgrade.py +++ b/src/upgrade.py @@ -56,6 +56,22 @@ def __init__(self, charm, model: BaseModel, **kwargs) -> None: ) self.framework.observe(self.charm.on.upgrade_charm, self._on_upgrade_charm_check_legacy) + def _handle_label_change(self) -> None: + """Handle the label change from `master` to `primary`.""" + unit_number = int(self.charm.unit.name.split("/")[1]) + if unit_number == 1: + # If the unit is the last to be upgraded before unit zero, + # trigger a switchover, so one of the upgraded units becomes + # the primary. + try: + self.charm._patroni.switchover() + except SwitchoverFailedError as e: + logger.warning(f"Switchover failed: {e}") + if len(self.charm._peers.units) == 0 or unit_number == 1: + # If the unit is the last to be upgraded before unit zero + # or the only unit in the cluster, update the label. + self.charm._create_services() + @property def is_no_sync_member(self) -> bool: """Whether this member shouldn't be a synchronous standby (when it's a replica).""" @@ -111,6 +127,7 @@ def _on_postgresql_pebble_ready(self, event: WorkloadEvent) -> None: in self.charm._patroni.cluster_members and self.charm._patroni.is_replication_healthy ): + self._handle_label_change() logger.debug("Upgraded unit is healthy. Set upgrade state to `completed`") self.set_unit_completed() else: diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 790ddd3667..ae027edc25 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -7,8 +7,10 @@ import subprocess import tarfile import tempfile +import zipfile from datetime import datetime -from typing import Dict, Optional, Set, Tuple +from pathlib import Path +from typing import Dict, Optional, Set, Tuple, Union import kubernetes as kubernetes import psycopg2 @@ -44,6 +46,7 @@ get_unit_address, run_command_on_unit, ) +from ..new_relations.helpers import get_application_relation_data PORT = 5432 @@ -421,6 +424,25 @@ async def get_patroni_setting(ops_test: OpsTest, setting: str) -> Optional[int]: return int(primary_start_timeout) if primary_start_timeout is not None else None +async def get_instances_roles(ops_test: OpsTest): + """Return the roles of the instances in the cluster.""" + labels = {} + client = Client() + app = await app_name(ops_test) + for unit in ops_test.model.applications[app].units: + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(5)): + with attempt: + pod = client.get( + res=Pod, + name=unit.name.replace("/", "-"), + namespace=ops_test.model.info.name, + ) + if "role" not in pod.metadata.labels: + raise ValueError(f"role label not available for {unit.name}") + labels[unit.name] = pod.metadata.labels["role"] + return labels + + async def get_postgresql_parameter(ops_test: OpsTest, parameter_name: str) -> Optional[int]: """Get the value of a PostgreSQL parameter from Patroni API. @@ -483,6 +505,23 @@ async def get_sync_standby(model: Model, application_name: str) -> str: return member["name"] +async def inject_dependency_fault( + ops_test: OpsTest, application_name: str, charm_file: Union[str, Path] +) -> None: + """Inject a dependency fault into the PostgreSQL charm.""" + # Query running dependency to overwrite with incompatible version. + dependencies = await get_application_relation_data( + ops_test, application_name, "upgrade", "dependencies" + ) + loaded_dependency_dict = json.loads(dependencies) + loaded_dependency_dict["charm"]["upgrade_supported"] = "^15" + loaded_dependency_dict["charm"]["version"] = "15.0" + + # Overwrite dependency.json with incompatible version. + with zipfile.ZipFile(charm_file, mode="a") as charm_zip: + charm_zip.writestr("src/dependency.json", json.dumps(loaded_dependency_dict)) + + async def is_connection_possible(ops_test: OpsTest, unit_name: str) -> bool: """Test a connection to a PostgreSQL server.""" try: diff --git a/tests/integration/ha_tests/test_rollback_to_master_label.py b/tests/integration/ha_tests/test_rollback_to_master_label.py new file mode 100644 index 0000000000..86b33c5699 --- /dev/null +++ b/tests/integration/ha_tests/test_rollback_to_master_label.py @@ -0,0 +1,178 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +import asyncio +import logging +import operator +import shutil +from pathlib import Path + +import pytest +from pytest_operator.plugin import OpsTest +from tenacity import Retrying, stop_after_attempt, wait_fixed + +from .. import markers +from ..architecture import architecture +from ..helpers import ( + APPLICATION_NAME, + CHARM_SERIES, + DATABASE_APP_NAME, + get_leader_unit, + get_primary, + get_unit_by_index, +) +from .helpers import ( + are_writes_increasing, + check_writes, + get_instances_roles, + inject_dependency_fault, + start_continuous_writes, +) + +logger = logging.getLogger(__name__) + +TIMEOUT = 600 + + +@pytest.mark.group(1) +@markers.amd64_only # TODO: remove after arm64 stable release +@pytest.mark.abort_on_fail +async def test_deploy_stable(ops_test: OpsTest) -> None: + """Simple test to ensure that the PostgreSQL and application charms get deployed.""" + await asyncio.gather( + ops_test.model.deploy( + DATABASE_APP_NAME, + num_units=3, + channel="14/stable", + revision=(280 if architecture == "arm64" else 281), + series=CHARM_SERIES, + trust=True, + ), + ops_test.model.deploy( + APPLICATION_NAME, + num_units=1, + channel="latest/edge", + ), + ) + logger.info("Wait for applications to become active") + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, APPLICATION_NAME], status="active", raise_on_error=False + ) + assert len(ops_test.model.applications[DATABASE_APP_NAME].units) == 3 + instances_roles = await get_instances_roles(ops_test) + assert operator.countOf(instances_roles.values(), "master") == 1 + assert operator.countOf(instances_roles.values(), "primary") == 0 + assert operator.countOf(instances_roles.values(), "replica") == 2 + + +@pytest.mark.group(1) +@markers.amd64_only # TODO: remove after arm64 stable release +async def test_fail_and_rollback(ops_test, continuous_writes) -> None: + # Start an application that continuously writes data to the database. + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + # Check whether writes are increasing. + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + assert leader_unit is not None, "No leader unit found" + + for attempt in Retrying(stop=stop_after_attempt(2), wait=wait_fixed(30), reraise=True): + with attempt: + logger.info("Run pre-upgrade-check action") + action = await leader_unit.run_action("pre-upgrade-check") + await action.wait() + + # Ensure the primary has changed to the first unit. + primary_name = await get_primary(ops_test, DATABASE_APP_NAME) + assert primary_name == f"{DATABASE_APP_NAME}/0" + + local_charm = await ops_test.build_charm(".") + if isinstance(local_charm, str): + filename = local_charm.split("/")[-1] + else: + filename = local_charm.name + fault_charm = Path("/tmp/", filename) + shutil.copy(local_charm, fault_charm) + + logger.info("Inject dependency fault") + await inject_dependency_fault(ops_test, DATABASE_APP_NAME, fault_charm) + + application = ops_test.model.applications[DATABASE_APP_NAME] + + logger.info("Refresh the charm") + await application.refresh(path=fault_charm) + + logger.info("Get first upgrading unit") + # Highest ordinal unit always the first to upgrade. + unit = get_unit_by_index(DATABASE_APP_NAME, application.units, 2) + + logger.info("Wait for upgrade to fail on first upgrading unit") + async with ops_test.fast_forward("60s"): + await ops_test.model.block_until( + lambda: unit.workload_status == "blocked", + timeout=TIMEOUT, + ) + instances_roles = await get_instances_roles(ops_test) + assert operator.countOf(instances_roles.values(), "master") == 1 + assert operator.countOf(instances_roles.values(), "primary") == 0 + assert operator.countOf(instances_roles.values(), "replica") == 2 + + logger.info("Ensure continuous_writes while in failure state on remaining units") + await are_writes_increasing(ops_test) + + logger.info("Re-run pre-upgrade-check action") + action = await leader_unit.run_action("pre-upgrade-check") + await action.wait() + + logger.info("Re-refresh the charm") + await ops_test.juju( + "refresh", DATABASE_APP_NAME, "--switch", "postgresql-k8s", "--channel", "14/stable" + ) + + async with ops_test.fast_forward("60s"): + await ops_test.model.block_until( + lambda: unit.workload_status_message == "upgrade completed", timeout=TIMEOUT + ) + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], idle_period=30, timeout=TIMEOUT + ) + + # Check whether writes are increasing. + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + instances_roles = await get_instances_roles(ops_test) + assert operator.countOf(instances_roles.values(), "master") == 1 + assert operator.countOf(instances_roles.values(), "primary") == 0 + assert operator.countOf(instances_roles.values(), "replica") == 2 + + logger.info("Resume upgrade") + action = await leader_unit.run_action("resume-upgrade") + await action.wait() + + logger.info("Wait for application to recover") + async with ops_test.fast_forward("60s"): + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", timeout=TIMEOUT + ) + + instances_roles = await get_instances_roles(ops_test) + assert operator.countOf(instances_roles.values(), "master") == 1 + assert operator.countOf(instances_roles.values(), "primary") == 0 + assert operator.countOf(instances_roles.values(), "replica") == 2 + + logger.info("Ensure continuous_writes after rollback procedure") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("Checking whether no writes were lost") + await check_writes(ops_test) + + # Remove fault charm file. + fault_charm.unlink() diff --git a/tests/integration/ha_tests/test_upgrade.py b/tests/integration/ha_tests/test_upgrade.py index ccd6cccfd5..215ca65995 100644 --- a/tests/integration/ha_tests/test_upgrade.py +++ b/tests/integration/ha_tests/test_upgrade.py @@ -2,12 +2,9 @@ # See LICENSE file for licensing details. import asyncio -import json import logging import shutil -import zipfile from pathlib import Path -from typing import Union import pytest from lightkube import Client @@ -24,10 +21,10 @@ get_primary, get_unit_by_index, ) -from ..new_relations.helpers import get_application_relation_data from .helpers import ( are_writes_increasing, check_writes, + inject_dependency_fault, start_continuous_writes, ) @@ -57,7 +54,10 @@ async def test_deploy_latest(ops_test: OpsTest) -> None: logger.info("Wait for applications to become active") async with ops_test.fast_forward(): await ops_test.model.wait_for_idle( - apps=[DATABASE_APP_NAME, APPLICATION_NAME], status="active", raise_on_error=False + apps=[DATABASE_APP_NAME, APPLICATION_NAME], + status="active", + raise_on_error=False, + timeout=1000, ) assert len(ops_test.model.applications[DATABASE_APP_NAME].units) == 3 @@ -124,6 +124,10 @@ async def test_upgrade_from_edge(ops_test: OpsTest, continuous_writes) -> None: apps=[DATABASE_APP_NAME], idle_period=30, timeout=TIMEOUT ) + # Check whether writes are increasing. + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + logger.info("Resume upgrade") leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) action = await leader_unit.run_action("resume-upgrade") @@ -221,6 +225,10 @@ async def test_fail_and_rollback(ops_test, continuous_writes) -> None: apps=[DATABASE_APP_NAME], idle_period=30, timeout=TIMEOUT ) + # Check whether writes are increasing. + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + logger.info("Resume upgrade") action = await leader_unit.run_action("resume-upgrade") await action.wait() @@ -241,21 +249,3 @@ async def test_fail_and_rollback(ops_test, continuous_writes) -> None: # Remove fault charm file. fault_charm.unlink() - - -@pytest.mark.group(1) -async def inject_dependency_fault( - ops_test: OpsTest, application_name: str, charm_file: Union[str, Path] -) -> None: - """Inject a dependency fault into the PostgreSQL charm.""" - # Query running dependency to overwrite with incompatible version. - dependencies = await get_application_relation_data( - ops_test, application_name, "upgrade", "dependencies" - ) - loaded_dependency_dict = json.loads(dependencies) - loaded_dependency_dict["charm"]["upgrade_supported"] = "^15" - loaded_dependency_dict["charm"]["version"] = "15.0" - - # Overwrite dependency.json with incompatible version. - with zipfile.ZipFile(charm_file, mode="a") as charm_zip: - charm_zip.writestr("src/dependency.json", json.dumps(loaded_dependency_dict)) diff --git a/tests/integration/ha_tests/test_upgrade_to_primary_label.py b/tests/integration/ha_tests/test_upgrade_to_primary_label.py new file mode 100644 index 0000000000..2243217445 --- /dev/null +++ b/tests/integration/ha_tests/test_upgrade_to_primary_label.py @@ -0,0 +1,135 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +import asyncio +import logging +import operator + +import pytest +from pytest_operator.plugin import OpsTest +from tenacity import Retrying, stop_after_attempt, wait_fixed + +from .. import markers +from ..helpers import ( + APPLICATION_NAME, + DATABASE_APP_NAME, + get_leader_unit, + get_primary, + get_unit_by_index, +) +from .helpers import ( + are_writes_increasing, + check_writes, + get_instances_roles, + start_continuous_writes, +) + +logger = logging.getLogger(__name__) + +TIMEOUT = 600 + + +@pytest.mark.group(1) +@markers.amd64_only # TODO: remove after arm64 stable release +@pytest.mark.abort_on_fail +async def test_deploy_stable(ops_test: OpsTest) -> None: + """Simple test to ensure that the PostgreSQL and application charms get deployed.""" + await asyncio.gather( + ops_test.model.deploy( + DATABASE_APP_NAME, + num_units=3, + channel="14/stable", + trust=True, + ), + ops_test.model.deploy( + APPLICATION_NAME, + num_units=1, + channel="latest/edge", + ), + ) + logger.info("Wait for applications to become active") + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, APPLICATION_NAME], status="active", raise_on_error=False + ) + assert len(ops_test.model.applications[DATABASE_APP_NAME].units) == 3 + instances_roles = await get_instances_roles(ops_test) + assert operator.countOf(instances_roles.values(), "master") == 1 + assert operator.countOf(instances_roles.values(), "primary") == 0 + assert operator.countOf(instances_roles.values(), "replica") == 2 + + +@pytest.mark.group(1) +@markers.amd64_only # TODO: remove after arm64 stable release +async def test_upgrade(ops_test, continuous_writes) -> None: + # Start an application that continuously writes data to the database. + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + # Check whether writes are increasing. + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + assert leader_unit is not None, "No leader unit found" + + for attempt in Retrying(stop=stop_after_attempt(2), wait=wait_fixed(30), reraise=True): + with attempt: + logger.info("Run pre-upgrade-check action") + action = await leader_unit.run_action("pre-upgrade-check") + await action.wait() + + # Ensure the primary has changed to the first unit. + primary_name = await get_primary(ops_test, DATABASE_APP_NAME) + assert primary_name == f"{DATABASE_APP_NAME}/0" + + local_charm = await ops_test.build_charm(".") + application = ops_test.model.applications[DATABASE_APP_NAME] + + logger.info("Refresh the charm") + await application.refresh(path=local_charm) + + logger.info("Get first upgrading unit") + # Highest ordinal unit always the first to upgrade. + unit = get_unit_by_index(DATABASE_APP_NAME, application.units, 2) + + async with ops_test.fast_forward("60s"): + await ops_test.model.block_until( + lambda: unit.workload_status_message == "upgrade completed", timeout=TIMEOUT + ) + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], idle_period=30, timeout=TIMEOUT + ) + + # Check whether writes are increasing. + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + instances_roles = await get_instances_roles(ops_test) + assert operator.countOf(instances_roles.values(), "master") == 1 + assert operator.countOf(instances_roles.values(), "primary") == 0 + assert operator.countOf(instances_roles.values(), "replica") == 2 + + logger.info("Resume upgrade") + action = await leader_unit.run_action("resume-upgrade") + await action.wait() + + logger.info("Wait for application to upgrade") + async with ops_test.fast_forward("60s"): + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", timeout=TIMEOUT + ) + + instances_roles = await get_instances_roles(ops_test) + assert operator.countOf(instances_roles.values(), "master") == 0 + assert operator.countOf(instances_roles.values(), "primary") == 1 + assert operator.countOf(instances_roles.values(), "replica") == 2 + + logger.info("Ensure continuous_writes after upgrade") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("Checking whether no writes were lost") + await check_writes(ops_test) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index a14e4678c6..5d9b4c5e27 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -62,6 +62,7 @@ def test_on_leader_elected(harness): patch("charm.Patroni.reload_patroni_configuration"), patch("charm.PostgresqlOperatorCharm._patch_pod_labels"), patch("charm.PostgresqlOperatorCharm._create_services") as _create_services, + patch("charm.PostgreSQLUpgrade.idle", new_callable=PropertyMock) as _idle, ): rel_id = harness.model.get_relation(PEER).id # Check that a new password was generated on leader election and nothing is done @@ -128,12 +129,20 @@ def test_on_leader_elected(harness): response = Mock() response.json.return_value = {"code": 403} _create_services.side_effect = ApiError(response=response) + _idle.return_value = True harness.set_leader(False) harness.set_leader() assert isinstance(harness.charm.unit.status, BlockedStatus) assert harness.charm.unit.status.message == "failed to create k8s services" + # No error when upgrading the cluster. + harness.charm.unit.status = ActiveStatus() + _idle.return_value = False + harness.set_leader(False) + harness.set_leader() + assert isinstance(harness.charm.unit.status, ActiveStatus) + # No trust when annotating _client.return_value.get.side_effect = ApiError(response=response) harness.set_leader(False) @@ -179,12 +188,13 @@ def test_on_postgresql_pebble_ready(harness): patch("charm.PostgresqlOperatorCharm.postgresql") as _postgresql, patch( "charm.PostgresqlOperatorCharm._create_services", - side_effect=[None, _FakeApiError, None], + side_effect=[None, _FakeApiError, _FakeApiError, None], ) as _create_services, patch("charm.Patroni.member_started") as _member_started, patch( "charm.PostgresqlOperatorCharm.push_tls_files_to_workload" ) as _push_tls_files_to_workload, + patch("charm.PostgreSQLUpgrade.idle", new_callable=PropertyMock) as _idle, patch("charm.PostgresqlOperatorCharm._patch_pod_labels"), patch("charm.PostgresqlOperatorCharm._on_leader_elected"), patch("charm.PostgresqlOperatorCharm._create_pgdata") as _create_pgdata, @@ -192,7 +202,7 @@ def test_on_postgresql_pebble_ready(harness): _rock_postgresql_version.return_value = "14.7" # Mock the primary endpoint ready property values. - _primary_endpoint_ready.side_effect = [False, True] + _primary_endpoint_ready.side_effect = [False, True, True] # Check that the initial plan is empty. harness.set_can_connect(POSTGRESQL_CONTAINER, True) @@ -210,12 +220,19 @@ def test_on_postgresql_pebble_ready(harness): tc.assertTrue(isinstance(harness.model.unit.status, WaitingStatus)) _set_active_status.assert_not_called() - # Check for a Blocked status when a failure happens . + # Check for a Blocked status when a failure happens. + _idle.return_value = True harness.container_pebble_ready(POSTGRESQL_CONTAINER) tc.assertTrue(isinstance(harness.model.unit.status, BlockedStatus)) _set_active_status.assert_not_called() + # No error when upgrading the cluster. + _idle.return_value = False + harness.container_pebble_ready(POSTGRESQL_CONTAINER) + _set_active_status.assert_called_once() + # Check for the Active status. + _set_active_status.reset_mock() _push_tls_files_to_workload.reset_mock() harness.container_pebble_ready(POSTGRESQL_CONTAINER) plan = harness.get_container_pebble_plan(POSTGRESQL_CONTAINER) @@ -742,15 +759,26 @@ def test_on_upgrade_charm(harness): "charms.data_platform_libs.v0.upgrade.DataUpgrade._upgrade_supported_check" ) as _upgrade_supported_check, patch( - "charm.PostgresqlOperatorCharm._patch_pod_labels", side_effect=[_FakeApiError, None] + "charm.PostgresqlOperatorCharm._patch_pod_labels", + side_effect=[None, _FakeApiError, None], ) as _patch_pod_labels, patch( "charm.PostgresqlOperatorCharm._create_services", side_effect=[_FakeApiError, None, None], ) as _create_services, + patch("charm.PostgreSQLUpgrade.idle", new_callable=PropertyMock) as _idle, ): - # Test with a problem happening when trying to create the k8s resources. + # Test when the cluster is being upgraded. harness.charm.unit.status = ActiveStatus() + _idle.return_value = False + harness.charm.on.upgrade_charm.emit() + _create_services.assert_not_called() + _patch_pod_labels.assert_called_once() + tc.assertTrue(isinstance(harness.charm.unit.status, ActiveStatus)) + + # Test with a problem happening when trying to create the k8s resources. + _patch_pod_labels.reset_mock() + _idle.return_value = True harness.charm.on.upgrade_charm.emit() _create_services.assert_called_once() _patch_pod_labels.assert_not_called() @@ -852,6 +880,7 @@ def test_postgresql_layer(harness): "group": "postgres", "environment": { "PATRONI_KUBERNETES_LABELS": f"{{application: patroni, cluster-name: patroni-{harness.charm._name}}}", + "PATRONI_KUBERNETES_LEADER_LABEL_VALUE": "primary", "PATRONI_KUBERNETES_NAMESPACE": harness.charm._namespace, "PATRONI_KUBERNETES_USE_ENDPOINTS": "true", "PATRONI_NAME": "postgresql-k8s-0",