Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-5714] Filter out degraded read only endpoints #679

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,23 @@ def is_member_isolated(self) -> bool:

return len(cluster_status.json()["members"]) == 0

def are_replicas_up(self) -> dict[str, bool] | None:
"""Check if cluster members are running or streaming."""
try:
response = requests.get(
f"{self._patroni_url}/cluster",
verify=self.verify,
auth=self._patroni_auth,
timeout=PATRONI_TIMEOUT,
)
Comment on lines +530 to +535
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if the majority of units goes down, the cluster endpoint will stop being updated. Best to address this after the RAFT reinit PRs land.

return {
member["host"]: member["state"] in ["running", "streaming"]
for member in response.json()["members"]
}
except Exception:
logger.exception("Unable to get the state of the cluster")
return

def promote_standby_cluster(self) -> None:
"""Promote a standby cluster to be a regular cluster."""
config_response = requests.get(
Expand Down
5 changes: 5 additions & 0 deletions src/relations/postgresql_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ def update_endpoints(self, event: DatabaseRequestedEvent = None) -> None:
# If there are no replicas, remove the read-only endpoint.
replicas_endpoint = list(self.charm.members_ips - {self.charm.primary_endpoint})
replicas_endpoint.sort()
cluster_state = self.charm._patroni.are_replicas_up()
if cluster_state:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are getting the primary endpoint just above, so we should be able to get the cluster endpoint as well, but if we can't, no filtering will be done.

replicas_endpoint = [
replica for replica in replicas_endpoint if cluster_state.get(replica, False)
]
read_only_endpoints = (
",".join(f"{x}:{DATABASE_PORT}" for x in replicas_endpoint)
if len(replicas_endpoint) > 0
Expand Down
41 changes: 40 additions & 1 deletion tests/integration/new_relations/test_new_relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,19 @@
import pytest
import yaml
from pytest_operator.plugin import OpsTest
from tenacity import Retrying, stop_after_attempt, wait_fixed

from .. import markers
from ..helpers import CHARM_BASE, assert_sync_standbys, get_leader_unit, scale_application
from ..helpers import (
CHARM_BASE,
assert_sync_standbys,
get_leader_unit,
get_machine_from_unit,
get_primary,
scale_application,
start_machine,
stop_machine,
)
from ..juju_ import juju_major_version
from .helpers import (
build_connection_string,
Expand Down Expand Up @@ -184,6 +194,35 @@ async def test_database_relation_with_charm_libraries(ops_test: OpsTest):
cursor.execute("DROP TABLE test;")


@pytest.mark.group("new_relations_tests")
@pytest.mark.abort_on_fail
async def test_filter_out_degraded_replicas(ops_test: OpsTest):
primary = await get_primary(ops_test, f"{DATABASE_APP_NAME}/0")
replica = next(
unit.name
for unit in ops_test.model.applications[DATABASE_APP_NAME].units
if unit.name != primary
)
machine = await get_machine_from_unit(ops_test, replica)
await stop_machine(ops_test, machine)

# Topology observer runs every half a minute
await asyncio.sleep(60)

for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(3), reraise=True):
with attempt:
data = await get_application_relation_data(
ops_test,
APPLICATION_APP_NAME,
FIRST_DATABASE_RELATION_NAME,
"read-only-endpoints",
)
assert data is None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two units in the cluster, so there will be no readonly endpoints if the down unit is filtered out.


await start_machine(ops_test, machine)
await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=200)


@pytest.mark.group("new_relations_tests")
async def test_user_with_extra_roles(ops_test: OpsTest):
"""Test superuser actions and the request for more permissions."""
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,3 +663,21 @@ def test_update_patroni_restart_condition(patroni, new_restart_condition):
f"Restart={new_restart_condition}"
)
_run.assert_called_once_with(["/bin/systemctl", "daemon-reload"])


def test_are_replicas_up(patroni):
with (
patch("requests.get") as _get,
):
_get.return_value.json.return_value = {
"members": [
{"host": "1.1.1.1", "state": "running"},
{"host": "2.2.2.2", "state": "streaming"},
{"host": "3.3.3.3", "state": "other state"},
]
}
assert patroni.are_replicas_up() == {"1.1.1.1": True, "2.2.2.2": True, "3.3.3.3": False}

# Return None on error
_get.side_effect = Exception
assert patroni.are_replicas_up() is None
45 changes: 43 additions & 2 deletions tests/unit/test_postgresql_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,12 @@ def test_update_endpoints_with_event(harness):
patch(
"charm.PostgresqlOperatorCharm.members_ips",
new_callable=PropertyMock,
return_value={"1.1.1.1", "2.2.2.2"},
) as _members_ips,
patch("charm.Patroni.get_primary", new_callable=PropertyMock) as _get_primary,
patch(
"charm.Patroni.are_replicas_up", return_value={"1.1.1.1": True, "2.2.2.2": True}
) as _are_replicas_up,
patch(
"relations.postgresql_provider.DatabaseProvides.fetch_my_relation_data"
) as _fetch_my_relation_data,
Expand All @@ -227,7 +231,6 @@ def test_update_endpoints_with_event(harness):
# Mock the members_ips list to simulate different scenarios
# (with and without a replica).
rel_id = harness.model.get_relation(RELATION_NAME).id
_members_ips.side_effect = [{"1.1.1.1", "2.2.2.2"}, {"1.1.1.1"}]

# Add two different relations.
rel_id = harness.add_relation(RELATION_NAME, "application")
Expand Down Expand Up @@ -259,6 +262,7 @@ def test_update_endpoints_with_event(harness):
_fetch_my_relation_data.assert_called_once_with([2], ["password"])

# Also test with only a primary instance.
_members_ips.return_value = {"1.1.1.1"}
harness.charm.postgresql_client_relation.update_endpoints(mock_event)
assert harness.get_relation_data(rel_id, harness.charm.app.name) == {
"endpoints": "1.1.1.1:5432",
Expand All @@ -277,15 +281,18 @@ def test_update_endpoints_without_event(harness):
patch(
"charm.PostgresqlOperatorCharm.members_ips",
new_callable=PropertyMock,
return_value={"1.1.1.1", "2.2.2.2"},
) as _members_ips,
patch("charm.Patroni.get_primary", new_callable=PropertyMock) as _get_primary,
patch(
"charm.Patroni.are_replicas_up", return_value={"1.1.1.1": True, "2.2.2.2": True}
) as _are_replicas_up,
patch(
"relations.postgresql_provider.DatabaseProvides.fetch_my_relation_data"
) as _fetch_my_relation_data,
):
# Mock the members_ips list to simulate different scenarios
# (with and without a replica).
_members_ips.side_effect = [{"1.1.1.1", "2.2.2.2"}, {"1.1.1.1", "2.2.2.2"}, {"1.1.1.1"}]
rel_id = harness.model.get_relation(RELATION_NAME).id

# Don't set data if no password
Expand Down Expand Up @@ -340,7 +347,41 @@ def test_update_endpoints_without_event(harness):
}
_fetch_my_relation_data.assert_called_once_with(None, ["password"])

# Filter out missing replica
_members_ips.return_value = {"1.1.1.1", "2.2.2.2", "3.3.3.3"}
harness.charm.postgresql_client_relation.update_endpoints()
assert harness.get_relation_data(rel_id, harness.charm.app.name) == {
"endpoints": "1.1.1.1:5432",
"read-only-endpoints": "2.2.2.2:5432",
"uris": "postgresql://relation-2:[email protected]:5432/test_db",
"tls": "False",
}
assert harness.get_relation_data(another_rel_id, harness.charm.app.name) == {
"endpoints": "1.1.1.1:5432",
"read-only-endpoints": "2.2.2.2:5432",
"uris": "postgresql://relation-3:[email protected]:5432/test_db2",
"tls": "False",
}

# Don't filter if unable to get cluster status
_are_replicas_up.return_value = None
harness.charm.postgresql_client_relation.update_endpoints()
assert harness.get_relation_data(rel_id, harness.charm.app.name) == {
"endpoints": "1.1.1.1:5432",
"read-only-endpoints": "2.2.2.2:5432,3.3.3.3:5432",
"uris": "postgresql://relation-2:[email protected]:5432/test_db",
"tls": "False",
}
assert harness.get_relation_data(another_rel_id, harness.charm.app.name) == {
"endpoints": "1.1.1.1:5432",
"read-only-endpoints": "2.2.2.2:5432,3.3.3.3:5432",
"uris": "postgresql://relation-3:[email protected]:5432/test_db2",
"tls": "False",
}

# Also test with only a primary instance.
_members_ips.return_value = {"1.1.1.1"}
_are_replicas_up.return_value = {"1.1.1.1": True}
harness.charm.postgresql_client_relation.update_endpoints()
assert harness.get_relation_data(rel_id, harness.charm.app.name) == {
"endpoints": "1.1.1.1:5432",
Expand Down
Loading