diff --git a/src/charm.py b/src/charm.py index ce525bed36..f7f9675d59 100755 --- a/src/charm.py +++ b/src/charm.py @@ -10,6 +10,7 @@ import platform import subprocess import sys +from datetime import datetime from pathlib import Path from typing import Dict, List, Literal, Optional, Set, get_args @@ -352,7 +353,7 @@ def primary_endpoint(self) -> Optional[str]: logger.debug("primary endpoint early exit: Peer relation not joined yet.") return None try: - for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + for attempt in Retrying(stop=stop_after_delay(5), wait=wait_fixed(3)): with attempt: primary = self._patroni.get_primary() if primary is None and (standby_leader := self._patroni.get_standby_leader()): @@ -855,6 +856,7 @@ def _on_cluster_topology_change(self, _): def _on_install(self, event: InstallEvent) -> None: """Install prerequisites for the application.""" + logger.debug("Install start time: %s", datetime.now()) if not self._is_storage_attached(): self._reboot_on_detached_storage(event) return @@ -1162,7 +1164,8 @@ def _start_primary(self, event: StartEvent) -> None: # was fully initialised. self.enable_disable_extensions() - self.unit.status = ActiveStatus() + logger.debug("Active workload time: %s", datetime.now()) + self._set_primary_status_message() def _start_replica(self, event) -> None: """Configure the replica if the cluster was already initialised.""" diff --git a/src/cluster.py b/src/cluster.py index 29a42e87b5..14651400e9 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -362,7 +362,7 @@ def are_all_members_ready(self) -> bool: def get_patroni_health(self) -> Dict[str, str]: """Gets, retires and parses the Patroni health endpoint.""" - for attempt in Retrying(stop=stop_after_delay(90), wait=wait_fixed(3)): + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(7)): with attempt: r = requests.get( f"{self._patroni_url}/health", diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index e2adab6d8e..49eb672a9e 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -193,6 +193,7 @@ def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: filename = f"{POSTGRESQL_DATA_PATH}-{str(datetime.now()).replace(' ', '-').replace(':', '-')}.tar.gz" subprocess.check_call(f"tar -zcf {filename} {POSTGRESQL_DATA_PATH}".split()) logger.warning("Please review the backup file %s and handle its removal", filename) + self.charm.app_peer_data["suppress-oversee-users"] = "true" return True def get_all_primary_cluster_endpoints(self) -> List[str]: @@ -481,7 +482,7 @@ def is_primary_cluster(self) -> bool: return self.charm.app == self._get_primary_cluster() def _on_async_relation_broken(self, _) -> None: - if "departing" in self.charm._peers.data[self.charm.unit]: + if not self.charm._peers or "departing" in self.charm._peers.data[self.charm.unit]: logger.debug("Early exit on_async_relation_broken: Skipping departing unit.") return diff --git a/src/relations/postgresql_provider.py b/src/relations/postgresql_provider.py index 326ad27698..6b105fa602 100644 --- a/src/relations/postgresql_provider.py +++ b/src/relations/postgresql_provider.py @@ -137,6 +137,8 @@ def oversee_users(self) -> None: if not self.charm.unit.is_leader(): return + delete_user = "suppress-oversee-users" not in self.charm.app_peer_data + # Retrieve database users. try: database_users = { @@ -159,13 +161,16 @@ def oversee_users(self) -> None: # Delete that users that exist in the database but not in the active relations. for user in database_users - relation_users: - try: - logger.info("Remove relation user: %s", user) - self.charm.set_secret(APP_SCOPE, user, None) - self.charm.set_secret(APP_SCOPE, f"{user}-database", None) - self.charm.postgresql.delete_user(user) - except PostgreSQLDeleteUserError: - logger.error(f"Failed to delete user {user}") + if delete_user: + try: + logger.info("Remove relation user: %s", user) + self.charm.set_secret(APP_SCOPE, user, None) + self.charm.set_secret(APP_SCOPE, f"{user}-database", None) + self.charm.postgresql.delete_user(user) + except PostgreSQLDeleteUserError: + logger.error("Failed to delete user %s", user) + else: + logger.info("Stale relation user detected: %s", user) def update_endpoints(self, event: DatabaseRequestedEvent = None) -> None: """Set the read/write and read-only endpoints.""" diff --git a/src/upgrade.py b/src/upgrade.py index f8fce9ab13..7fa435b651 100644 --- a/src/upgrade.py +++ b/src/upgrade.py @@ -115,8 +115,10 @@ def _on_upgrade_charm_check_legacy(self) -> None: peers_state = list(filter(lambda state: state != "", self.unit_states)) - if len(peers_state) == len(self.peer_relation.units) and ( - set(peers_state) == {"ready"} or len(peers_state) == 0 + if ( + len(peers_state) == len(self.peer_relation.units) + and (set(peers_state) == {"ready"} or len(peers_state) == 0) + and self.charm.is_cluster_initialised ): if self.charm._patroni.member_started: # All peers have set the state to ready diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index 96bdb3afa4..3d95292229 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -41,6 +41,8 @@ IDLE_PERIOD = 5 TIMEOUT = 2000 +DATA_INTEGRATOR_APP_NAME = "data-integrator" + @contextlib.asynccontextmanager async def fast_forward( @@ -115,6 +117,14 @@ async def test_deploy_async_replication_setup( num_units=CLUSTER_SIZE, config={"profile": "testing"}, ) + if not await app_name(ops_test, DATA_INTEGRATOR_APP_NAME): + await ops_test.model.deploy( + DATA_INTEGRATOR_APP_NAME, + num_units=1, + channel="latest/edge", + config={"database-name": "testdb"}, + ) + await ops_test.model.relate(DATABASE_APP_NAME, DATA_INTEGRATOR_APP_NAME) if not await app_name(ops_test, model=second_model): charm = await ops_test.build_charm(".") await second_model.deploy( @@ -128,7 +138,7 @@ async def test_deploy_async_replication_setup( async with ops_test.fast_forward(), fast_forward(second_model): await gather( first_model.wait_for_idle( - apps=[DATABASE_APP_NAME, APPLICATION_NAME], + apps=[DATABASE_APP_NAME, APPLICATION_NAME, DATA_INTEGRATOR_APP_NAME], status="active", timeout=TIMEOUT, ), @@ -218,6 +228,19 @@ async def test_async_replication( await check_writes(ops_test, extra_model=second_model) +@pytest.mark.group(1) +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_get_data_integrator_credentials( + ops_test: OpsTest, +): + unit = ops_test.model.applications[DATA_INTEGRATOR_APP_NAME].units[0] + action = await unit.run_action(action_name="get-credentials") + result = await action.wait() + global data_integrator_credentials + data_integrator_credentials = result.results + + @pytest.mark.group(1) @markers.juju3 @pytest.mark.abort_on_fail @@ -273,6 +296,29 @@ async def test_switchover( await are_writes_increasing(ops_test, extra_model=second_model) +@pytest.mark.group(1) +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_data_integrator_creds_keep_on_working( + ops_test: OpsTest, + second_model: Model, +) -> None: + user = data_integrator_credentials["postgresql"]["username"] + password = data_integrator_credentials["postgresql"]["password"] + database = data_integrator_credentials["postgresql"]["database"] + + any_unit = second_model.applications[DATABASE_APP_NAME].units[0].name + primary = await get_primary(ops_test, any_unit, second_model) + address = second_model.units.get(primary).public_address + + connstr = f"dbname='{database}' user='{user}' host='{address}' port='5432' password='{password}' connect_timeout=1" + try: + with psycopg2.connect(connstr) as connection: + pass + finally: + connection.close() + + @pytest.mark.group(1) @markers.juju3 @pytest.mark.abort_on_fail diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 8a2ad24aa3..67fa1b59f2 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -621,17 +621,20 @@ async def get_password(ops_test: OpsTest, unit_name: str, username: str = "opera stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30), ) -async def get_primary(ops_test: OpsTest, unit_name: str) -> str: +async def get_primary(ops_test: OpsTest, unit_name: str, model=None) -> str: """Get the primary unit. Args: ops_test: ops_test instance. unit_name: the name of the unit. + model: Model to use. Returns: the current primary unit. """ - action = await ops_test.model.units.get(unit_name).run_action("get-primary") + if not model: + model = ops_test.model + action = await model.units.get(unit_name).run_action("get-primary") action = await action.wait() return action.results["primary"] diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 7637394f74..1e0cc81d02 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -4,6 +4,7 @@ import logging import platform import subprocess +from unittest import TestCase from unittest.mock import MagicMock, Mock, PropertyMock, call, mock_open, patch, sentinel import pytest @@ -37,6 +38,9 @@ CREATE_CLUSTER_CONF_PATH = "/etc/postgresql-common/createcluster.d/pgcharm.conf" +# used for assert functions +tc = TestCase() + @pytest.fixture(autouse=True) def harness(): @@ -165,7 +169,9 @@ def test_patroni_scrape_config_tls(harness): def test_primary_endpoint(harness): - with patch( + with patch("charm.stop_after_delay", new_callable=PropertyMock) as _stop_after_delay, patch( + "charm.wait_fixed", new_callable=PropertyMock + ) as _wait_fixed, patch( "charm.PostgresqlOperatorCharm._units_ips", new_callable=PropertyMock, return_value={"1.1.1.1", "1.1.1.2"}, @@ -174,6 +180,10 @@ def test_primary_endpoint(harness): _patroni.return_value.get_primary.return_value = sentinel.primary assert harness.charm.primary_endpoint == "1.1.1.1" + # Check needed to ensure a fast charm deployment. + _stop_after_delay.assert_called_once_with(5) + _wait_fixed.assert_called_once_with(3) + _patroni.return_value.get_member_ip.assert_called_once_with(sentinel.primary) _patroni.return_value.get_primary.assert_called_once_with() @@ -547,6 +557,9 @@ def test_enable_disable_extensions(harness, caplog): @patch_network_get(private_address="1.1.1.1") def test_on_start(harness): with ( + patch( + "charm.PostgresqlOperatorCharm._set_primary_status_message" + ) as _set_primary_status_message, patch( "charm.PostgresqlOperatorCharm.enable_disable_extensions" ) as _enable_disable_extensions, @@ -622,7 +635,7 @@ def test_on_start(harness): assert _postgresql.create_user.call_count == 4 # Considering the previous failed call. _oversee_users.assert_called_once() _enable_disable_extensions.assert_called_once() - assert isinstance(harness.model.unit.status, ActiveStatus) + _set_primary_status_message.assert_called_once() @patch_network_get(private_address="1.1.1.1") @@ -2309,16 +2322,21 @@ def test_update_new_unit_status(harness): handle_read_only_mode.assert_not_called() assert isinstance(harness.charm.unit.status, WaitingStatus) - @patch("charm.Patroni.member_started", new_callable=PropertyMock) - @patch("charm.PostgresqlOperatorCharm.is_standby_leader", new_callable=PropertyMock) - @patch("charm.Patroni.get_primary") - def test_set_active_status(self, _get_primary, _is_standby_leader, _member_started): + +def test_set_primary_status_message(harness): + with ( + patch("charm.Patroni.member_started", new_callable=PropertyMock) as _member_started, + patch( + "charm.PostgresqlOperatorCharm.is_standby_leader", new_callable=PropertyMock + ) as _is_standby_leader, + patch("charm.Patroni.get_primary") as _get_primary, + ): for values in itertools.product( [ RetryError(last_attempt=1), ConnectionError, - self.charm.unit.name, - f"{self.charm.app.name}/2", + harness.charm.unit.name, + f"{harness.charm.app.name}/2", ], [ RetryError(last_attempt=1), @@ -2328,34 +2346,34 @@ def test_set_active_status(self, _get_primary, _is_standby_leader, _member_start ], [True, False], ): - self.charm.unit.status = MaintenanceStatus("fake status") + harness.charm.unit.status = MaintenanceStatus("fake status") _member_started.return_value = values[2] if isinstance(values[0], str): _get_primary.side_effect = None _get_primary.return_value = values[0] - if values[0] != self.charm.unit.name and not isinstance(values[1], bool): + if values[0] != harness.charm.unit.name and not isinstance(values[1], bool): _is_standby_leader.side_effect = values[1] _is_standby_leader.return_value = None - self.charm._set_active_status() - self.assertIsInstance(self.charm.unit.status, MaintenanceStatus) + harness.charm._set_primary_status_message() + tc.assertIsInstance(harness.charm.unit.status, MaintenanceStatus) else: _is_standby_leader.side_effect = None _is_standby_leader.return_value = values[1] - self.charm._set_active_status() - self.assertIsInstance( - self.charm.unit.status, + harness.charm._set_primary_status_message() + tc.assertIsInstance( + harness.charm.unit.status, ActiveStatus - if values[0] == self.charm.unit.name or values[1] or values[2] + if values[0] == harness.charm.unit.name or values[1] or values[2] else MaintenanceStatus, ) - self.assertEqual( - self.charm.unit.status.message, + tc.assertEqual( + harness.charm.unit.status.message, "Primary" - if values[0] == self.charm.unit.name + if values[0] == harness.charm.unit.name else ("Standby" if values[1] else ("" if values[2] else "fake status")), ) else: _get_primary.side_effect = values[0] _get_primary.return_value = None - self.charm._set_active_status() - self.assertIsInstance(self.charm.unit.status, MaintenanceStatus) + harness.charm._set_primary_status_message() + tc.assertIsInstance(harness.charm.unit.status, MaintenanceStatus) diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index 2aac95e8b1..dbe2cb3901 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -42,6 +42,7 @@ def json(self): "http://server1/cluster": { "members": [{"name": "postgresql-0", "host": "1.1.1.1", "role": "leader", "lag": "1"}] }, + "http://server1/health": {"state": "running"}, "http://server4/cluster": {"members": []}, } if args[0] in data: @@ -128,6 +129,28 @@ def test_get_member_ip(peers_ips, patroni): tc.assertIsNone(ip) +def test_get_patroni_health(peers_ips, patroni): + with patch("cluster.stop_after_delay", new_callable=PropertyMock) as _stop_after_delay, patch( + "cluster.wait_fixed", new_callable=PropertyMock + ) as _wait_fixed, patch( + "charm.Patroni._patroni_url", new_callable=PropertyMock + ) as _patroni_url, patch("requests.get", side_effect=mocked_requests_get) as _get: + # Test when the Patroni API is reachable. + _patroni_url.return_value = "http://server1" + health = patroni.get_patroni_health() + + # Check needed to ensure a fast charm deployment. + _stop_after_delay.assert_called_once_with(60) + _wait_fixed.assert_called_once_with(7) + + tc.assertEqual(health, {"state": "running"}) + + # Test when the Patroni API is not reachable. + _patroni_url.return_value = "http://server2" + with tc.assertRaises(tenacity.RetryError): + patroni.get_patroni_health() + + def test_get_postgresql_version(peers_ips, patroni): with patch("charm.snap.SnapClient") as _snap_client: # TODO test a real implementation diff --git a/tests/unit/test_upgrade.py b/tests/unit/test_upgrade.py index 97ccedef17..b9112119e1 100644 --- a/tests/unit/test_upgrade.py +++ b/tests/unit/test_upgrade.py @@ -56,6 +56,41 @@ def test_log_rollback(harness): ) +@pytest.mark.parametrize( + "unit_states,is_cluster_initialised,call", + [ + (["ready"], False, False), + (["ready", "ready"], True, False), + (["idle"], False, False), + (["idle"], True, False), + (["ready"], True, True), + ], +) +def test_on_upgrade_charm_check_legacy(harness, unit_states, is_cluster_initialised, call): + with ( + patch( + "charms.data_platform_libs.v0.upgrade.DataUpgrade.state", + new_callable=PropertyMock(return_value=None), + ) as _state, + patch( + "charms.data_platform_libs.v0.upgrade.DataUpgrade.unit_states", + new_callable=PropertyMock(return_value=unit_states), + ) as _unit_states, + patch( + "charm.PostgresqlOperatorCharm.is_cluster_initialised", + new_callable=PropertyMock(return_value=is_cluster_initialised), + ) as _is_cluster_initialised, + patch("charm.Patroni.member_started", new_callable=PropertyMock) as _member_started, + patch( + "upgrade.PostgreSQLUpgrade._prepare_upgrade_from_legacy" + ) as _prepare_upgrade_from_legacy, + ): + with harness.hooks_disabled(): + harness.set_leader(True) + harness.charm.upgrade._on_upgrade_charm_check_legacy() + _member_started.assert_called_once() if call else _member_started.assert_not_called() + + @patch_network_get(private_address="1.1.1.1") def test_on_upgrade_granted(harness): with (