diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 3c84df48ae..9aa04982cc 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -485,7 +485,7 @@ def is_primary_cluster(self) -> bool: return self.charm.app == self._get_primary_cluster() def _on_async_relation_broken(self, _) -> None: - if "departing" in self.charm._peers.data[self.charm.unit]: + if not self.charm._peers or "departing" in self.charm._peers.data[self.charm.unit]: logger.debug("Early exit on_async_relation_broken: Skipping departing unit.") return diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index 96bdb3afa4..5faf249308 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -4,7 +4,7 @@ import contextlib import logging import subprocess -from asyncio import gather +from asyncio import gather, sleep from typing import Optional import psycopg2 @@ -41,6 +41,8 @@ IDLE_PERIOD = 5 TIMEOUT = 2000 +DATA_INTEGRATOR_APP_NAME = "data-integrator" + @contextlib.asynccontextmanager async def fast_forward( @@ -115,6 +117,14 @@ async def test_deploy_async_replication_setup( num_units=CLUSTER_SIZE, config={"profile": "testing"}, ) + if not await app_name(ops_test, DATA_INTEGRATOR_APP_NAME): + await ops_test.model.deploy( + DATA_INTEGRATOR_APP_NAME, + num_units=1, + channel="latest/edge", + config={"database-name": "testdb"}, + ) + await ops_test.model.relate(DATABASE_APP_NAME, DATA_INTEGRATOR_APP_NAME) if not await app_name(ops_test, model=second_model): charm = await ops_test.build_charm(".") await second_model.deploy( @@ -128,7 +138,7 @@ async def test_deploy_async_replication_setup( async with ops_test.fast_forward(), fast_forward(second_model): await gather( first_model.wait_for_idle( - apps=[DATABASE_APP_NAME, APPLICATION_NAME], + apps=[DATABASE_APP_NAME, APPLICATION_NAME, DATA_INTEGRATOR_APP_NAME], status="active", timeout=TIMEOUT, ), @@ -218,6 +228,19 @@ async def test_async_replication( await check_writes(ops_test, extra_model=second_model) +@pytest.mark.group(1) +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_get_data_integrator_credentials( + ops_test: OpsTest, +): + unit = ops_test.model.applications[DATA_INTEGRATOR_APP_NAME].units[0] + action = await unit.run_action(action_name="get-credentials") + result = await action.wait() + global data_integrator_credentials + data_integrator_credentials = result.results + + @pytest.mark.group(1) @markers.juju3 @pytest.mark.abort_on_fail @@ -273,6 +296,43 @@ async def test_switchover( await are_writes_increasing(ops_test, extra_model=second_model) +@pytest.mark.group(1) +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_data_integrator_creds_keep_on_working( + ops_test: OpsTest, + second_model: Model, +) -> None: + user = data_integrator_credentials["postgresql"]["username"] + password = data_integrator_credentials["postgresql"]["password"] + database = data_integrator_credentials["postgresql"]["database"] + + any_unit = second_model.applications[DATABASE_APP_NAME].units[0].name + primary = await get_primary(ops_test, any_unit) + address = get_unit_address(ops_test, primary) + + connstr = f"dbname='{database}' user='{user}' host='{address}' port='5432' password='{password}' connect_timeout=1" + try: + with psycopg2.connect(connstr) as connection: + pass + finally: + connection.close() + + logger.info("Re-enable oversee users") + action = await primary.run_action(action_name="reenable-oversee-users") + await action.wait() + + async with ops_test.fast_forward(): + await sleep(20) + try: + with psycopg2.connect(connstr) as connection: + assert False + except psycopg2.errors.InsufficientPrivilege: + logger.info("Data integrator creds purged") + finally: + connection.close() + + @pytest.mark.group(1) @markers.juju3 @pytest.mark.abort_on_fail