Skip to content

Commit

Permalink
Check for sync standbys before scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
dragomirp committed Mar 1, 2024
1 parent e78531a commit b616a87
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 100 deletions.
100 changes: 50 additions & 50 deletions lib/charms/tls_certificates_interface/v2/tls_certificates.py

Large diffs are not rendered by default.

68 changes: 49 additions & 19 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import os
import platform
import subprocess
from typing import Dict, List, Literal, Optional, Set, get_args
from typing import Dict, List, Literal, Optional, Set, Tuple, get_args

from charms.data_platform_libs.v0.data_interfaces import DataPeer, DataPeerUnit
from charms.data_platform_libs.v0.data_models import TypedCharmBase
Expand Down Expand Up @@ -44,7 +44,7 @@
Unit,
WaitingStatus,
)
from pysyncobj.utility import TcpUtility
from pysyncobj.utility import TcpUtility, UtilityException
from tenacity import RetryError, Retrying, retry, stop_after_attempt, stop_after_delay, wait_fixed

from backups import PostgreSQLBackups
Expand Down Expand Up @@ -994,7 +994,9 @@ def _on_start(self, event: StartEvent) -> None:
# Bootstrap the cluster in the leader unit.
self._start_primary(event)

def _remove_raft_node(self, syncobj_util: TcpUtility, partner: str, current: str) -> None:
def _remove_raft_node(
self, syncobj_util: TcpUtility, partner: str, current: str, is_leader: bool
) -> None:
"""Try to remove a raft member calling a partner node."""
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3), reraise=True):
with attempt:
Expand All @@ -1003,24 +1005,30 @@ def _remove_raft_node(self, syncobj_util: TcpUtility, partner: str, current: str
raise Exception("Failed to stop service")
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3), reraise=True):
with attempt:
status = syncobj_util.executeCommand(partner, ["status"])
if not (status := self._get_raft_status(syncobj_util, partner)):
raise Exception("Cannot get status")

if f"partner_node_status_server_{current}" not in status:
logger.debug("cannot remove raft member: not part of the cluster")
logger.debug("Raft member already removed")
return
if is_leader:
if status["leader"].address == current:
logger.warning("cannot remove raft member: still leader")
raise Exception("Raft member is still leader")
removal_result = syncobj_util.executeCommand(partner, ["remove", current])
if removal_result.startswith("FAIL"):
if not removal_result.startswith("SUCCESS"):
logger.warning("failed to remove raft member: %s", removal_result)
raise Exception("Failed to remove the unit")
raise Exception("Failed to remove raft member")

def _on_stop(self, _) -> None:
syncobj_util = TcpUtility(timeout=3)
raft_host = "localhost:2222"
# Try to call a different unit
for ip in self._units_ips:
if ip != self._unit_ip:
raft_host = f"{ip}:2222"
break
status = syncobj_util.executeCommand(raft_host, ["status"])
def _get_raft_status(self, syncobj_util: TcpUtility, host: str) -> Optional[Dict]:
"""Get raft status."""
try:
return syncobj_util.executeCommand(host, ["status"])
except UtilityException:
return None

def _parse_raft_partners(self, status: Dict) -> Tuple[List[str], List[str]]:
"""Collect raft partner and ready nodes."""
partners = []
ready = []
for key in status.keys():
Expand All @@ -1029,15 +1037,37 @@ def _on_stop(self, _) -> None:
partners.append(partner)
if status[key] == 2:
ready.append(partner)
return partners, ready

def _on_stop(self, _) -> None:
syncobj_util = TcpUtility(timeout=3)
raft_host = "localhost:2222"
# Try to call a different unit
status = None
for ip in self._units_ips:
if ip != self._unit_ip:
raft_host = f"{ip}:2222"
if not (status := self._get_raft_status(syncobj_util, raft_host)):
continue
break
if not status:
raft_host = "localhost:2222"
if not (status := self._get_raft_status(syncobj_util, raft_host)):
logger.warning("Stopping unit: all raft members are unreachable")
self._patroni.stop_patroni()
return

partners, ready = self._parse_raft_partners(status)
if not ready and not partners:
logger.debug("Terminating last node")
logger.debug("Terminating the last raft member")
self._patroni.stop_patroni()
return
if not ready:
raise Exception("Cannot stop unit: All other members are connecting")
raise Exception("Cannot stop unit: All other members are still connecting")

is_leader = status["leader"] and status["leader"].host == self._unit_ip
try:
self._remove_raft_node(syncobj_util, ready[0], status["self"].address)
self._remove_raft_node(syncobj_util, ready[0], status["self"].address, is_leader)
except Exception:
self._patroni.start_patroni()
raise
Expand Down
38 changes: 17 additions & 21 deletions src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
import logging
import os
import pwd
import subprocess
from typing import Any, Dict, List, Optional, Set

import requests
from charms.operator_libs_linux.v2 import snap
from jinja2 import Template
from pysyncobj.utility import TcpUtility, UtilityException
from tenacity import (
AttemptManager,
RetryError,
Expand Down Expand Up @@ -564,31 +564,27 @@ def remove_raft_member(self, member_ip: str) -> None:
is not part of the raft cluster.
"""
# Get the status of the raft cluster.
raft_status = subprocess.check_output(
[
"charmed-postgresql.syncobj-admin",
"-conn",
"127.0.0.1:2222",
"-status",
]
).decode("UTF-8")
syncobj_util = TcpUtility(timeout=3)

raft_host = "127.0.0.1:2222"
try:
raft_status = syncobj_util.executeCommand(raft_host, ["status"])
except UtilityException:
logger.warning("Remove raft member: Cannot connect to raft cluster")
raise RemoveRaftMemberFailedError()

# Check whether the member is still part of the raft cluster.
if not member_ip or member_ip not in raft_status:
if not member_ip or f"partner_node_status_server_{member_ip}:2222" not in raft_status:
return

# Remove the member from the raft cluster.
result = subprocess.check_output(
[
"charmed-postgresql.syncobj-admin",
"-conn",
"127.0.0.1:2222",
"-remove",
f"{member_ip}:2222",
]
).decode("UTF-8")

if "SUCCESS" not in result:
try:
result = syncobj_util.executeCommand(raft_host, ["remove", f"{member_ip}:2222"])
except UtilityException:
logger.debug("Remove raft member: Remove call failed")
raise RemoveRaftMemberFailedError()

if not result.startswith("SUCCESS"):
raise RemoveRaftMemberFailedError()

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
Expand Down
13 changes: 7 additions & 6 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
wait_fixed,
)

from ..helpers import APPLICATION_NAME, db_connect, get_unit_address, run_command_on_unit
from ..helpers import (
APPLICATION_NAME,
db_connect,
get_patroni_cluster,
get_unit_address,
run_command_on_unit,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -109,11 +115,6 @@ async def app_name(ops_test: OpsTest, application_name: str = "postgresql") -> O
return None


def get_patroni_cluster(unit_ip: str) -> Dict[str, str]:
resp = requests.get(f"http://{unit_ip}:8008/cluster")
return resp.json()


async def change_patroni_setting(
ops_test: OpsTest, setting: str, value: int, use_random_unit: bool = False
) -> None:
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/ha_tests/test_restore_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
CHARM_SERIES,
db_connect,
get_password,
get_patroni_cluster,
get_primary,
get_unit_address,
set_password,
)
from .helpers import (
add_unit_with_storage,
get_patroni_cluster,
reused_full_cluster_recovery_storage,
storage_id,
)
Expand Down
16 changes: 16 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,22 @@ def change_primary_start_timeout(
)


def get_patroni_cluster(unit_ip: str) -> Dict[str, str]:
resp = requests.get(f"http://{unit_ip}:8008/cluster")
return resp.json()


def assert_sync_standbys(unit_ip: str, standbys: int) -> None:
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3), reraise=True):
with attempt:
cluster = get_patroni_cluster(unit_ip)
cluster_standbys = 0
for member in cluster["members"]:
if member["role"] == "sync_standby":
cluster_standbys += 1
assert cluster_standbys >= standbys, "Less than expected standbys"


async def check_database_users_existence(
ops_test: OpsTest,
users_that_should_exist: List[str],
Expand Down
8 changes: 6 additions & 2 deletions tests/integration/new_relations/test_new_relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import yaml
from pytest_operator.plugin import OpsTest

from ..helpers import CHARM_SERIES, scale_application
from ..helpers import CHARM_SERIES, assert_sync_standbys, scale_application
from ..juju_ import juju_major_version
from .helpers import (
build_connection_string,
Expand Down Expand Up @@ -356,10 +356,14 @@ async def test_relation_data_is_updated_correctly_when_scaling(ops_test: OpsTest
apps=[DATABASE_APP_NAME], status="active", timeout=1500, wait_for_exact_units=4
)

assert_sync_standbys(
ops_test.model.applications[DATABASE_APP_NAME].units[0].public_address, 2
)

# Remove the original units.
await ops_test.model.applications[DATABASE_APP_NAME].destroy_units(*units_to_remove)
await ops_test.model.wait_for_idle(
apps=[DATABASE_APP_NAME], status="active", timeout=3000, wait_for_exact_units=2
apps=[DATABASE_APP_NAME], status="active", timeout=1500, wait_for_exact_units=2
)

# Get the updated connection data and assert it can be used
Expand Down
9 changes: 8 additions & 1 deletion tests/integration/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
APPLICATION_NAME,
CHARM_SERIES,
DATABASE_APP_NAME,
assert_sync_standbys,
build_connection_string,
check_database_users_existence,
check_databases_creation,
Expand All @@ -40,6 +41,7 @@


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_mailman3_core_db(ops_test: OpsTest, charm: str) -> None:
"""Deploy Mailman3 Core to test the 'db' relation."""
async with ops_test.fast_forward():
Expand Down Expand Up @@ -106,6 +108,7 @@ async def test_mailman3_core_db(ops_test: OpsTest, charm: str) -> None:


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_relation_data_is_updated_correctly_when_scaling(ops_test: OpsTest):
"""Test that relation data, like connection data, is updated correctly when scaling."""
# Retrieve the list of current database unit names.
Expand All @@ -118,10 +121,14 @@ async def test_relation_data_is_updated_correctly_when_scaling(ops_test: OpsTest
apps=[DATABASE_APP_NAME], status="active", timeout=1500, wait_for_exact_units=4
)

assert_sync_standbys(
ops_test.model.applications[DATABASE_APP_NAME].units[0].public_address, 2
)

# Remove the original units.
await ops_test.model.applications[DATABASE_APP_NAME].destroy_units(*units_to_remove)
await ops_test.model.wait_for_idle(
apps=[DATABASE_APP_NAME], status="active", timeout=3000, wait_for_exact_units=2
apps=[DATABASE_APP_NAME], status="active", timeout=1500, wait_for_exact_units=2
)

# Get the updated connection data and assert it can be used
Expand Down

0 comments on commit b616a87

Please sign in to comment.