Skip to content

Commit

Permalink
[DPE-3684] Factor out minor changes from the raft reinit PR (#681)
Browse files Browse the repository at this point in the history
* Factor out minor changes from the raft reinit PR

* Bump ruff

* Missed namespace change
  • Loading branch information
dragomirp authored Nov 25, 2024
1 parent 3eb537d commit 58fc973
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 88 deletions.
80 changes: 60 additions & 20 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pydantic = "^1.10.18"
poetry-core = "^1.9.1"
pyOpenSSL = "^24.2.1"
jinja2 = "^3.1.4"
pysyncobj = "^0.3.13"
psutil = "^6.0.0"

[tool.poetry.group.charm-libs.dependencies]
# data_platform_libs/v0/data_interfaces.py
Expand All @@ -38,7 +40,7 @@ opentelemetry-exporter-otlp-proto-http = "1.21.0"
optional = true

[tool.poetry.group.format.dependencies]
ruff = "^0.7.1"
ruff = "^0.8.0"

[tool.poetry.group.lint]
optional = true
Expand Down Expand Up @@ -106,7 +108,7 @@ line-length = 99

[tool.ruff.lint]
explicit-preview-rules = true
select = ["A", "E", "W", "F", "C", "N", "D", "I001", "B", "CPY", "RUF", "S", "SIM", "UP", "TCH"]
select = ["A", "E", "W", "F", "C", "N", "D", "I001", "B", "CPY", "RUF", "S", "SIM", "UP", "TC"]
extend-ignore = [
"D203",
"D204",
Expand Down
26 changes: 17 additions & 9 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,25 +511,31 @@ def _on_pgdata_storage_detaching(self, _) -> None:
if self.primary_endpoint:
self._update_relation_endpoints()

def _on_peer_relation_changed(self, event: HookEvent): # noqa: C901
"""Reconfigure cluster members when something changes."""
def _peer_relation_changed_checks(self, event: HookEvent) -> bool:
"""Split of to reduce complexity."""
# Prevents the cluster to be reconfigured before it's bootstrapped in the leader.
if "cluster_initialised" not in self._peers.data[self.app]:
logger.debug("Deferring on_peer_relation_changed: cluster not initialized")
event.defer()
return
return False

# If the unit is the leader, it can reconfigure the cluster.
if self.unit.is_leader() and not self._reconfigure_cluster(event):
event.defer()
return
return False

if self._update_member_ip():
return
return False

# Don't update this member before it's part of the members list.
if self._unit_ip not in self.members_ips:
logger.debug("Early exit on_peer_relation_changed: Unit not in the members list")
return False
return True

def _on_peer_relation_changed(self, event: HookEvent):
"""Reconfigure cluster members when something changes."""
if not self._peer_relation_changed_checks(event):
return

# Update the list of the cluster members in the replicas to make them know each other.
Expand Down Expand Up @@ -736,14 +742,16 @@ def add_cluster_member(self, member: str) -> None:
def _get_unit_ip(self, unit: Unit) -> str | None:
"""Get the IP address of a specific unit."""
# Check if host is current host.
ip = None
if unit == self.unit:
return str(self.model.get_binding(PEER).network.bind_address)
ip = self.model.get_binding(PEER).network.bind_address
# Check if host is a peer.
elif unit in self._peers.data:
return str(self._peers.data[unit].get("private-address"))
ip = self._peers.data[unit].get("private-address")
# Return None if the unit is not a peer neither the current unit.
else:
return None
if ip:
return str(ip)
return None

@property
def _hosts(self) -> set:
Expand Down
37 changes: 17 additions & 20 deletions src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import requests
from charms.operator_libs_linux.v2 import snap
from jinja2 import Template
from pysyncobj.utility import TcpUtility, UtilityException
from tenacity import (
AttemptManager,
RetryError,
Expand Down Expand Up @@ -766,32 +767,28 @@ def remove_raft_member(self, member_ip: str) -> None:
"""
# Suppressing since the call will be removed soon
# Get the status of the raft cluster.
raft_status = subprocess.check_output([ # noqa
"charmed-postgresql.syncobj-admin",
"-conn",
"127.0.0.1:2222",
"-pass",
self.raft_password,
"-status",
]).decode("UTF-8")
syncobj_util = TcpUtility(password=self.raft_password, timeout=3)

raft_host = "127.0.0.1:2222"
try:
raft_status = syncobj_util.executeCommand(raft_host, ["status"])
except UtilityException:
logger.warning("Remove raft member: Cannot connect to raft cluster")
raise RemoveRaftMemberFailedError() from None

# Check whether the member is still part of the raft cluster.
if not member_ip or member_ip not in raft_status:
if not member_ip or f"partner_node_status_server_{member_ip}:2222" not in raft_status:
return

# Suppressing since the call will be removed soon
# Remove the member from the raft cluster.
result = subprocess.check_output([ # noqa
"charmed-postgresql.syncobj-admin",
"-conn",
"127.0.0.1:2222",
"-pass",
self.raft_password,
"-remove",
f"{member_ip}:2222",
]).decode("UTF-8")

if "SUCCESS" not in result:
try:
result = syncobj_util.executeCommand(raft_host, ["remove", f"{member_ip}:2222"])
except UtilityException:
logger.debug("Remove raft member: Remove call failed")
raise RemoveRaftMemberFailedError() from None

if not result.startswith("SUCCESS"):
raise RemoveRaftMemberFailedError()

@retry(stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=10))
Expand Down
53 changes: 43 additions & 10 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,27 @@ async def are_writes_increasing(
extra_model: Model = None,
) -> None:
"""Verify new writes are continuing by counting the number of writes."""
down_units = [down_unit] if isinstance(down_unit, str) or not down_unit else down_unit
writes, _ = await count_writes(
ops_test,
down_unit=down_unit,
down_unit=down_units[0],
use_ip_from_inside=use_ip_from_inside,
extra_model=extra_model,
)
for member, count in writes.items():
for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)):
with attempt:
more_writes, _ = await count_writes(
ops_test,
down_unit=down_unit,
use_ip_from_inside=use_ip_from_inside,
extra_model=extra_model,
)
logger.info(f"Initial writes {writes}")

for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3), reraise=True):
with attempt:
more_writes, _ = await count_writes(
ops_test,
down_unit=down_unit,
use_ip_from_inside=use_ip_from_inside,
extra_model=extra_model,
)
logger.info(f"Retry writes {more_writes}")
for member, count in writes.items():
if "/".join(member.split(".", 1)[-1].rsplit("-", 1)) in down_units:
continue
assert more_writes[member] > count, (
f"{member}: writes not continuing to DB (current writes: {more_writes[member]} - previous writes: {count})"
)
Expand Down Expand Up @@ -616,6 +622,33 @@ async def is_replica(ops_test: OpsTest, unit_name: str, use_ip_from_inside: bool
return False


async def get_cluster_roles(
ops_test: OpsTest, unit_name: str, use_ip_from_inside: bool = False
) -> dict[str, str | list[str] | None]:
"""Returns whether the unit a replica in the cluster."""
unit_ip = await (
get_ip_from_inside_the_unit(ops_test, unit_name)
if use_ip_from_inside
else get_unit_ip(ops_test, unit_name)
)

members = {"replicas": [], "primaries": [], "sync_standbys": []}
cluster_info = requests.get(f"http://{unit_ip}:8008/cluster")
member_list = cluster_info.json()["members"]
logger.info(f"Cluster members are: {member_list}")
for member in member_list:
role = member["role"]
name = "/".join(member["name"].rsplit("-", 1))
if role == "leader":
members["primaries"].append(name)
elif role == "sync_standby":
members["sync_standbys"].append(name)
else:
members["replicas"].append(name)

return members


async def instance_ip(ops_test: OpsTest, instance: str) -> str:
"""Translate juju instance name to IP.
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/ha_tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
# Copyright 2021 Canonical Ltd.
# See LICENSE file for licensing details.

import asyncio
import logging
from asyncio import TimeoutError

import pytest
from juju import tag
Expand Down Expand Up @@ -163,7 +163,7 @@ async def test_app_resources_conflicts_v3(ops_test: OpsTest, charm: str):
await ops_test.model.wait_for_idle(
apps=[DUP_APPLICATION_NAME], timeout=1000, status="blocked"
)
except TimeoutError:
except asyncio.TimeoutError:
logger.info("Application is not in blocked state. Checking logs...")

# Since application have postgresql db in storage from external application it should not be able to connect due to new password
Expand Down Expand Up @@ -211,7 +211,7 @@ async def test_app_resources_conflicts_v2(ops_test: OpsTest, charm: str):
await ops_test.model.wait_for_idle(
apps=[DUP_APPLICATION_NAME], timeout=1000, status="blocked"
)
except TimeoutError:
except asyncio.TimeoutError:
logger.info("Application is not in blocked state. Checking logs...")

# Since application have postgresql db in storage from external application it should not be able to connect due to new password
Expand Down
Loading

0 comments on commit 58fc973

Please sign in to comment.