Skip to content

Commit

Permalink
Merge branch 'main' into increase-coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
dragomirp committed Jun 25, 2024
2 parents 01d2c14 + 713f6e4 commit 8ce88a0
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 37 deletions.
7 changes: 5 additions & 2 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import platform
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Literal, Optional, Set, get_args

Expand Down Expand Up @@ -352,7 +353,7 @@ def primary_endpoint(self) -> Optional[str]:
logger.debug("primary endpoint early exit: Peer relation not joined yet.")
return None
try:
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
for attempt in Retrying(stop=stop_after_delay(5), wait=wait_fixed(3)):
with attempt:
primary = self._patroni.get_primary()
if primary is None and (standby_leader := self._patroni.get_standby_leader()):
Expand Down Expand Up @@ -855,6 +856,7 @@ def _on_cluster_topology_change(self, _):

def _on_install(self, event: InstallEvent) -> None:
"""Install prerequisites for the application."""
logger.debug("Install start time: %s", datetime.now())
if not self._is_storage_attached():
self._reboot_on_detached_storage(event)
return
Expand Down Expand Up @@ -1162,7 +1164,8 @@ def _start_primary(self, event: StartEvent) -> None:
# was fully initialised.
self.enable_disable_extensions()

self.unit.status = ActiveStatus()
logger.debug("Active workload time: %s", datetime.now())
self._set_primary_status_message()

def _start_replica(self, event) -> None:
"""Configure the replica if the cluster was already initialised."""
Expand Down
2 changes: 1 addition & 1 deletion src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def are_all_members_ready(self) -> bool:

def get_patroni_health(self) -> Dict[str, str]:
"""Gets, retires and parses the Patroni health endpoint."""
for attempt in Retrying(stop=stop_after_delay(90), wait=wait_fixed(3)):
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(7)):
with attempt:
r = requests.get(
f"{self._patroni_url}/health",
Expand Down
3 changes: 2 additions & 1 deletion src/relations/async_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool:
filename = f"{POSTGRESQL_DATA_PATH}-{str(datetime.now()).replace(' ', '-').replace(':', '-')}.tar.gz"
subprocess.check_call(f"tar -zcf {filename} {POSTGRESQL_DATA_PATH}".split())
logger.warning("Please review the backup file %s and handle its removal", filename)
self.charm.app_peer_data["suppress-oversee-users"] = "true"
return True

def get_all_primary_cluster_endpoints(self) -> List[str]:
Expand Down Expand Up @@ -481,7 +482,7 @@ def is_primary_cluster(self) -> bool:
return self.charm.app == self._get_primary_cluster()

def _on_async_relation_broken(self, _) -> None:
if "departing" in self.charm._peers.data[self.charm.unit]:
if not self.charm._peers or "departing" in self.charm._peers.data[self.charm.unit]:
logger.debug("Early exit on_async_relation_broken: Skipping departing unit.")
return

Expand Down
19 changes: 12 additions & 7 deletions src/relations/postgresql_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ def oversee_users(self) -> None:
if not self.charm.unit.is_leader():
return

delete_user = "suppress-oversee-users" not in self.charm.app_peer_data

# Retrieve database users.
try:
database_users = {
Expand All @@ -159,13 +161,16 @@ def oversee_users(self) -> None:

# Delete that users that exist in the database but not in the active relations.
for user in database_users - relation_users:
try:
logger.info("Remove relation user: %s", user)
self.charm.set_secret(APP_SCOPE, user, None)
self.charm.set_secret(APP_SCOPE, f"{user}-database", None)
self.charm.postgresql.delete_user(user)
except PostgreSQLDeleteUserError:
logger.error(f"Failed to delete user {user}")
if delete_user:
try:
logger.info("Remove relation user: %s", user)
self.charm.set_secret(APP_SCOPE, user, None)
self.charm.set_secret(APP_SCOPE, f"{user}-database", None)
self.charm.postgresql.delete_user(user)
except PostgreSQLDeleteUserError:
logger.error("Failed to delete user %s", user)
else:
logger.info("Stale relation user detected: %s", user)

def update_endpoints(self, event: DatabaseRequestedEvent = None) -> None:
"""Set the read/write and read-only endpoints."""
Expand Down
6 changes: 4 additions & 2 deletions src/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ def _on_upgrade_charm_check_legacy(self) -> None:

peers_state = list(filter(lambda state: state != "", self.unit_states))

if len(peers_state) == len(self.peer_relation.units) and (
set(peers_state) == {"ready"} or len(peers_state) == 0
if (
len(peers_state) == len(self.peer_relation.units)
and (set(peers_state) == {"ready"} or len(peers_state) == 0)
and self.charm.is_cluster_initialised
):
if self.charm._patroni.member_started:
# All peers have set the state to ready
Expand Down
48 changes: 47 additions & 1 deletion tests/integration/ha_tests/test_async_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
IDLE_PERIOD = 5
TIMEOUT = 2000

DATA_INTEGRATOR_APP_NAME = "data-integrator"


@contextlib.asynccontextmanager
async def fast_forward(
Expand Down Expand Up @@ -115,6 +117,14 @@ async def test_deploy_async_replication_setup(
num_units=CLUSTER_SIZE,
config={"profile": "testing"},
)
if not await app_name(ops_test, DATA_INTEGRATOR_APP_NAME):
await ops_test.model.deploy(
DATA_INTEGRATOR_APP_NAME,
num_units=1,
channel="latest/edge",
config={"database-name": "testdb"},
)
await ops_test.model.relate(DATABASE_APP_NAME, DATA_INTEGRATOR_APP_NAME)
if not await app_name(ops_test, model=second_model):
charm = await ops_test.build_charm(".")
await second_model.deploy(
Expand All @@ -128,7 +138,7 @@ async def test_deploy_async_replication_setup(
async with ops_test.fast_forward(), fast_forward(second_model):
await gather(
first_model.wait_for_idle(
apps=[DATABASE_APP_NAME, APPLICATION_NAME],
apps=[DATABASE_APP_NAME, APPLICATION_NAME, DATA_INTEGRATOR_APP_NAME],
status="active",
timeout=TIMEOUT,
),
Expand Down Expand Up @@ -218,6 +228,19 @@ async def test_async_replication(
await check_writes(ops_test, extra_model=second_model)


@pytest.mark.group(1)
@markers.juju3
@pytest.mark.abort_on_fail
async def test_get_data_integrator_credentials(
ops_test: OpsTest,
):
unit = ops_test.model.applications[DATA_INTEGRATOR_APP_NAME].units[0]
action = await unit.run_action(action_name="get-credentials")
result = await action.wait()
global data_integrator_credentials
data_integrator_credentials = result.results


@pytest.mark.group(1)
@markers.juju3
@pytest.mark.abort_on_fail
Expand Down Expand Up @@ -273,6 +296,29 @@ async def test_switchover(
await are_writes_increasing(ops_test, extra_model=second_model)


@pytest.mark.group(1)
@markers.juju3
@pytest.mark.abort_on_fail
async def test_data_integrator_creds_keep_on_working(
ops_test: OpsTest,
second_model: Model,
) -> None:
user = data_integrator_credentials["postgresql"]["username"]
password = data_integrator_credentials["postgresql"]["password"]
database = data_integrator_credentials["postgresql"]["database"]

any_unit = second_model.applications[DATABASE_APP_NAME].units[0].name
primary = await get_primary(ops_test, any_unit, second_model)
address = second_model.units.get(primary).public_address

connstr = f"dbname='{database}' user='{user}' host='{address}' port='5432' password='{password}' connect_timeout=1"
try:
with psycopg2.connect(connstr) as connection:
pass
finally:
connection.close()


@pytest.mark.group(1)
@markers.juju3
@pytest.mark.abort_on_fail
Expand Down
7 changes: 5 additions & 2 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,17 +621,20 @@ async def get_password(ops_test: OpsTest, unit_name: str, username: str = "opera
stop=stop_after_attempt(10),
wait=wait_exponential(multiplier=1, min=2, max=30),
)
async def get_primary(ops_test: OpsTest, unit_name: str) -> str:
async def get_primary(ops_test: OpsTest, unit_name: str, model=None) -> str:
"""Get the primary unit.
Args:
ops_test: ops_test instance.
unit_name: the name of the unit.
model: Model to use.
Returns:
the current primary unit.
"""
action = await ops_test.model.units.get(unit_name).run_action("get-primary")
if not model:
model = ops_test.model
action = await model.units.get(unit_name).run_action("get-primary")
action = await action.wait()
return action.results["primary"]

Expand Down
60 changes: 39 additions & 21 deletions tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import platform
import subprocess
from unittest import TestCase
from unittest.mock import MagicMock, Mock, PropertyMock, call, mock_open, patch, sentinel

import pytest
Expand Down Expand Up @@ -37,6 +38,9 @@

CREATE_CLUSTER_CONF_PATH = "/etc/postgresql-common/createcluster.d/pgcharm.conf"

# used for assert functions
tc = TestCase()


@pytest.fixture(autouse=True)
def harness():
Expand Down Expand Up @@ -165,7 +169,9 @@ def test_patroni_scrape_config_tls(harness):


def test_primary_endpoint(harness):
with patch(
with patch("charm.stop_after_delay", new_callable=PropertyMock) as _stop_after_delay, patch(
"charm.wait_fixed", new_callable=PropertyMock
) as _wait_fixed, patch(
"charm.PostgresqlOperatorCharm._units_ips",
new_callable=PropertyMock,
return_value={"1.1.1.1", "1.1.1.2"},
Expand All @@ -174,6 +180,10 @@ def test_primary_endpoint(harness):
_patroni.return_value.get_primary.return_value = sentinel.primary
assert harness.charm.primary_endpoint == "1.1.1.1"

# Check needed to ensure a fast charm deployment.
_stop_after_delay.assert_called_once_with(5)
_wait_fixed.assert_called_once_with(3)

_patroni.return_value.get_member_ip.assert_called_once_with(sentinel.primary)
_patroni.return_value.get_primary.assert_called_once_with()

Expand Down Expand Up @@ -547,6 +557,9 @@ def test_enable_disable_extensions(harness, caplog):
@patch_network_get(private_address="1.1.1.1")
def test_on_start(harness):
with (
patch(
"charm.PostgresqlOperatorCharm._set_primary_status_message"
) as _set_primary_status_message,
patch(
"charm.PostgresqlOperatorCharm.enable_disable_extensions"
) as _enable_disable_extensions,
Expand Down Expand Up @@ -622,7 +635,7 @@ def test_on_start(harness):
assert _postgresql.create_user.call_count == 4 # Considering the previous failed call.
_oversee_users.assert_called_once()
_enable_disable_extensions.assert_called_once()
assert isinstance(harness.model.unit.status, ActiveStatus)
_set_primary_status_message.assert_called_once()


@patch_network_get(private_address="1.1.1.1")
Expand Down Expand Up @@ -2309,16 +2322,21 @@ def test_update_new_unit_status(harness):
handle_read_only_mode.assert_not_called()
assert isinstance(harness.charm.unit.status, WaitingStatus)

@patch("charm.Patroni.member_started", new_callable=PropertyMock)
@patch("charm.PostgresqlOperatorCharm.is_standby_leader", new_callable=PropertyMock)
@patch("charm.Patroni.get_primary")
def test_set_active_status(self, _get_primary, _is_standby_leader, _member_started):

def test_set_primary_status_message(harness):
with (
patch("charm.Patroni.member_started", new_callable=PropertyMock) as _member_started,
patch(
"charm.PostgresqlOperatorCharm.is_standby_leader", new_callable=PropertyMock
) as _is_standby_leader,
patch("charm.Patroni.get_primary") as _get_primary,
):
for values in itertools.product(
[
RetryError(last_attempt=1),
ConnectionError,
self.charm.unit.name,
f"{self.charm.app.name}/2",
harness.charm.unit.name,
f"{harness.charm.app.name}/2",
],
[
RetryError(last_attempt=1),
Expand All @@ -2328,34 +2346,34 @@ def test_set_active_status(self, _get_primary, _is_standby_leader, _member_start
],
[True, False],
):
self.charm.unit.status = MaintenanceStatus("fake status")
harness.charm.unit.status = MaintenanceStatus("fake status")
_member_started.return_value = values[2]
if isinstance(values[0], str):
_get_primary.side_effect = None
_get_primary.return_value = values[0]
if values[0] != self.charm.unit.name and not isinstance(values[1], bool):
if values[0] != harness.charm.unit.name and not isinstance(values[1], bool):
_is_standby_leader.side_effect = values[1]
_is_standby_leader.return_value = None
self.charm._set_active_status()
self.assertIsInstance(self.charm.unit.status, MaintenanceStatus)
harness.charm._set_primary_status_message()
tc.assertIsInstance(harness.charm.unit.status, MaintenanceStatus)
else:
_is_standby_leader.side_effect = None
_is_standby_leader.return_value = values[1]
self.charm._set_active_status()
self.assertIsInstance(
self.charm.unit.status,
harness.charm._set_primary_status_message()
tc.assertIsInstance(
harness.charm.unit.status,
ActiveStatus
if values[0] == self.charm.unit.name or values[1] or values[2]
if values[0] == harness.charm.unit.name or values[1] or values[2]
else MaintenanceStatus,
)
self.assertEqual(
self.charm.unit.status.message,
tc.assertEqual(
harness.charm.unit.status.message,
"Primary"
if values[0] == self.charm.unit.name
if values[0] == harness.charm.unit.name
else ("Standby" if values[1] else ("" if values[2] else "fake status")),
)
else:
_get_primary.side_effect = values[0]
_get_primary.return_value = None
self.charm._set_active_status()
self.assertIsInstance(self.charm.unit.status, MaintenanceStatus)
harness.charm._set_primary_status_message()
tc.assertIsInstance(harness.charm.unit.status, MaintenanceStatus)
23 changes: 23 additions & 0 deletions tests/unit/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def json(self):
"http://server1/cluster": {
"members": [{"name": "postgresql-0", "host": "1.1.1.1", "role": "leader", "lag": "1"}]
},
"http://server1/health": {"state": "running"},
"http://server4/cluster": {"members": []},
}
if args[0] in data:
Expand Down Expand Up @@ -128,6 +129,28 @@ def test_get_member_ip(peers_ips, patroni):
tc.assertIsNone(ip)


def test_get_patroni_health(peers_ips, patroni):
with patch("cluster.stop_after_delay", new_callable=PropertyMock) as _stop_after_delay, patch(
"cluster.wait_fixed", new_callable=PropertyMock
) as _wait_fixed, patch(
"charm.Patroni._patroni_url", new_callable=PropertyMock
) as _patroni_url, patch("requests.get", side_effect=mocked_requests_get) as _get:
# Test when the Patroni API is reachable.
_patroni_url.return_value = "http://server1"
health = patroni.get_patroni_health()

# Check needed to ensure a fast charm deployment.
_stop_after_delay.assert_called_once_with(60)
_wait_fixed.assert_called_once_with(7)

tc.assertEqual(health, {"state": "running"})

# Test when the Patroni API is not reachable.
_patroni_url.return_value = "http://server2"
with tc.assertRaises(tenacity.RetryError):
patroni.get_patroni_health()


def test_get_postgresql_version(peers_ips, patroni):
with patch("charm.snap.SnapClient") as _snap_client:
# TODO test a real implementation
Expand Down
Loading

0 comments on commit 8ce88a0

Please sign in to comment.