Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-5827] Set all nodes to synchronous replicas #672

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def _on_get_primary(self, event: ActionEvent) -> None:
except RetryError as e:
logger.error(f"failed to get primary with error {e}")

def _updated_synchronous_node_count(self, num_units: int | None = None) -> bool:
def updated_synchronous_node_count(self, num_units: int | None = None) -> bool:
"""Tries to update synchronous_node_count configuration and reports the result."""
try:
self._patroni.update_synchronous_node_count(num_units)
Expand Down Expand Up @@ -439,7 +439,7 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:

if "cluster_initialised" not in self._peers.data[
self.app
] or not self._updated_synchronous_node_count(len(self._units_ips)):
] or not self.updated_synchronous_node_count(len(self._units_ips)):
logger.debug("Deferring on_peer_relation_departed: cluster not initialized")
event.defer()
return
Expand Down
6 changes: 4 additions & 2 deletions src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,8 @@ def render_patroni_yml_file(
stanza=stanza,
restore_stanza=restore_stanza,
version=self.get_postgresql_version().split(".")[0],
minority_count=self.planned_units // 2,
# -1 for leader
synchronous_node_count=self.planned_units - 1,
dragomirp marked this conversation as resolved.
Show resolved Hide resolved
pg_parameters=parameters,
primary_cluster_endpoint=self.charm.async_replication.get_primary_cluster_endpoint(),
extra_replication_endpoints=self.charm.async_replication.get_standby_endpoints(),
Expand Down Expand Up @@ -789,6 +790,7 @@ def remove_raft_member(self, member_ip: str) -> None:
raise RemoveRaftMemberFailedError() from None

if not result.startswith("SUCCESS"):
logger.debug("Remove raft member: Remove call not successful")
raise RemoveRaftMemberFailedError()

@retry(stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=10))
Expand Down Expand Up @@ -860,7 +862,7 @@ def update_synchronous_node_count(self, units: int | None = None) -> None:
with attempt:
r = requests.patch(
f"{self._patroni_url}/config",
json={"synchronous_node_count": units // 2},
json={"synchronous_node_count": units - 1},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

REST calls should also use the new value.

verify=self.verify,
auth=self._patroni_auth,
timeout=PATRONI_TIMEOUT,
Expand Down
1 change: 1 addition & 0 deletions src/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None:
# Update the configuration.
self.charm.unit.status = MaintenanceStatus("updating configuration")
self.charm.update_config()
self.charm.updated_synchronous_node_count(len(self.charm._units_ips))

self.charm.unit.status = MaintenanceStatus("refreshing the snap")
self.charm._install_snap_packages(packages=SNAP_PACKAGES, refresh=True)
Expand Down
2 changes: 1 addition & 1 deletion templates/patroni.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ bootstrap:
retry_timeout: 10
maximum_lag_on_failover: 1048576
synchronous_mode: true
synchronous_node_count: {{ minority_count }}
synchronous_node_count: {{ synchronous_node_count }}
postgresql:
use_pg_rewind: true
remove_data_directory_on_rewind_failure: true
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async def are_writes_increasing(
with attempt:
more_writes, _ = await count_writes(
ops_test,
down_unit=down_unit,
down_unit=down_units[0],
use_ip_from_inside=use_ip_from_inside,
extra_model=extra_model,
)
Expand Down
10 changes: 5 additions & 5 deletions tests/integration/ha_tests/test_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ async def test_reelection(ops_test: OpsTest, continuous_writes, primary_start_ti

# Remove the primary unit.
primary_name = await get_primary(ops_test, app)
await ops_test.model.destroy_units(
primary_name,
)
await ops_test.model.destroy_units(primary_name)

# Wait and get the primary again (which can be any unit, including the previous primary).
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=[app], status="active")
async with ops_test.fast_forward("60s"):
await ops_test.model.wait_for_idle(
apps=[app, APPLICATION_NAME], status="active", idle_period=30
)

await are_writes_increasing(ops_test, primary_name)

Expand Down
3 changes: 1 addition & 2 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,15 +1043,14 @@ def switchover(
)
assert response.status_code == 200
app_name = current_primary.split("/")[0]
minority_count = len(ops_test.model.applications[app_name].units) // 2
for attempt in Retrying(stop=stop_after_attempt(30), wait=wait_fixed(2), reraise=True):
with attempt:
response = requests.get(f"http://{primary_ip}:8008/cluster")
assert response.status_code == 200
standbys = len([
member for member in response.json()["members"] if member["role"] == "sync_standby"
])
assert standbys >= minority_count
assert standbys == len(ops_test.model.applications[app_name].units) - 1


async def wait_for_idle_on_blocked(
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2189,7 +2189,7 @@ def test_on_peer_relation_departed(harness):
patch("charm.Patroni.are_all_members_ready") as _are_all_members_ready,
patch("charm.PostgresqlOperatorCharm._get_ips_to_remove") as _get_ips_to_remove,
patch(
"charm.PostgresqlOperatorCharm._updated_synchronous_node_count"
"charm.PostgresqlOperatorCharm.updated_synchronous_node_count"
) as _updated_synchronous_node_count,
patch("charm.Patroni.remove_raft_member") as _remove_raft_member,
patch("charm.PostgresqlOperatorCharm._unit_ip") as _unit_ip,
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def test_render_patroni_yml_file(peers_ips, patroni):
rewind_user=REWIND_USER,
rewind_password=rewind_password,
version=postgresql_version,
minority_count=patroni.planned_units // 2,
synchronous_node_count=patroni.planned_units - 1,
raft_password=raft_password,
patroni_password=patroni_password,
)
Expand Down
5 changes: 5 additions & 0 deletions tests/unit/test_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ def test_on_upgrade_granted(harness):
patch("charm.Patroni.start_patroni") as _start_patroni,
patch("charm.PostgresqlOperatorCharm._install_snap_packages") as _install_snap_packages,
patch("charm.PostgresqlOperatorCharm.update_config") as _update_config,
patch(
"charm.PostgresqlOperatorCharm.updated_synchronous_node_count"
) as _updated_synchronous_node_count,
):
# Test when the charm fails to start Patroni.
mock_event = MagicMock()
Expand Down Expand Up @@ -174,6 +177,7 @@ def test_on_upgrade_granted(harness):
_member_started.reset_mock()
_cluster_members.reset_mock()
mock_event.defer.reset_mock()
_updated_synchronous_node_count.reset_mock()
_is_replication_healthy.return_value = True
with harness.hooks_disabled():
harness.set_leader(True)
Expand All @@ -184,6 +188,7 @@ def test_on_upgrade_granted(harness):
_set_unit_completed.assert_called_once()
_set_unit_failed.assert_not_called()
_on_upgrade_changed.assert_called_once()
_updated_synchronous_node_count.assert_called_once_with(2)


def test_pre_upgrade_check(harness):
Expand Down
Loading