diff --git a/lib/charms/data_platform_libs/v0/data_interfaces.py b/lib/charms/data_platform_libs/v0/data_interfaces.py index 59a97226a4..a2162aa0ba 100644 --- a/lib/charms/data_platform_libs/v0/data_interfaces.py +++ b/lib/charms/data_platform_libs/v0/data_interfaces.py @@ -331,7 +331,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 37 +LIBPATCH = 38 PYDEPS = ["ops>=2.0.0"] @@ -2606,6 +2606,14 @@ def set_version(self, relation_id: int, version: str) -> None: """ self.update_relation_data(relation_id, {"version": version}) + def set_subordinated(self, relation_id: int) -> None: + """Raises the subordinated flag in the application relation databag. + + Args: + relation_id: the identifier for a particular relation. + """ + self.update_relation_data(relation_id, {"subordinated": "true"}) + class DatabaseProviderEventHandlers(EventHandlers): """Provider-side of the database relation handlers.""" @@ -2842,6 +2850,21 @@ def _on_relation_created_event(self, event: RelationCreatedEvent) -> None: def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: """Event emitted when the database relation has changed.""" + is_subordinate = False + remote_unit_data = None + for key in event.relation.data.keys(): + if isinstance(key, Unit) and not key.name.startswith(self.charm.app.name): + remote_unit_data = event.relation.data[key] + elif isinstance(key, Application) and key.name != self.charm.app.name: + is_subordinate = event.relation.data[key].get("subordinated") == "true" + + if is_subordinate: + if not remote_unit_data: + return + + if remote_unit_data.get("state") != "ready": + return + # Check which data has changed to emit customs events. diff = self._diff(event) diff --git a/lib/charms/data_platform_libs/v0/upgrade.py b/lib/charms/data_platform_libs/v0/upgrade.py index 18a58ff09e..4d909d644d 100644 --- a/lib/charms/data_platform_libs/v0/upgrade.py +++ b/lib/charms/data_platform_libs/v0/upgrade.py @@ -285,7 +285,7 @@ def restart(self, event) -> None: # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 17 +LIBPATCH = 18 PYDEPS = ["pydantic>=1.10,<2", "poetry-core"] @@ -921,7 +921,7 @@ def _on_upgrade_charm(self, event: UpgradeCharmEvent) -> None: self.charm.unit.status = WaitingStatus("other units upgrading first...") self.peer_relation.data[self.charm.unit].update({"state": "ready"}) - if self.charm.app.planned_units() == 1: + if len(self.app_units) == 1: # single unit upgrade, emit upgrade_granted event right away getattr(self.on, "upgrade_granted").emit() diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 98e0589b18..ca2b2f6a54 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -59,7 +59,7 @@ class ProcessRunningError(Exception): """Raised when a process is running when it is not expected to be.""" -async def are_all_db_processes_down(ops_test: OpsTest, process: str) -> bool: +async def are_all_db_processes_down(ops_test: OpsTest, process: str, signal: str) -> bool: """Verifies that all units of the charm do not have the DB process running.""" app = await app_name(ops_test) if "/" in process: @@ -68,7 +68,7 @@ async def are_all_db_processes_down(ops_test: OpsTest, process: str) -> bool: pgrep_cmd = ("pgrep", "-x", process) try: - for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + for attempt in Retrying(stop=stop_after_delay(400), wait=wait_fixed(3)): with attempt: for unit in ops_test.model.applications[app].units: _, processes, _ = await ops_test.juju("ssh", unit.name, *pgrep_cmd) @@ -79,6 +79,9 @@ async def are_all_db_processes_down(ops_test: OpsTest, process: str) -> bool: # If something was returned, there is a running process. if len(processes) > 0: + logger.info("Unit %s not yet down" % unit.name) + # Try to rekill the unit + await send_signal_to_process(ops_test, unit.name, process, signal) raise ProcessRunningError except RetryError: return False diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 63d5b5abaa..f23094169b 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -95,6 +95,7 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: @pytest.mark.group(1) +@pytest.mark.abort_on_fail async def test_storage_re_use(ops_test, continuous_writes): """Verifies that database units with attached storage correctly repurpose storage. @@ -142,6 +143,7 @@ async def test_storage_re_use(ops_test, continuous_writes): @pytest.mark.group(1) +@pytest.mark.abort_on_fail @pytest.mark.parametrize("process", DB_PROCESSES) async def test_kill_db_process( ops_test: OpsTest, process: str, continuous_writes, primary_start_timeout @@ -170,6 +172,7 @@ async def test_kill_db_process( @pytest.mark.group(1) +@pytest.mark.abort_on_fail @pytest.mark.parametrize("process", DB_PROCESSES) async def test_freeze_db_process( ops_test: OpsTest, process: str, continuous_writes, primary_start_timeout @@ -208,6 +211,7 @@ async def test_freeze_db_process( @pytest.mark.group(1) +@pytest.mark.abort_on_fail @pytest.mark.parametrize("process", DB_PROCESSES) async def test_restart_db_process( ops_test: OpsTest, process: str, continuous_writes, primary_start_timeout @@ -236,6 +240,7 @@ async def test_restart_db_process( @pytest.mark.group(1) +@pytest.mark.abort_on_fail @pytest.mark.parametrize("process", DB_PROCESSES) @pytest.mark.parametrize("signal", ["SIGTERM", "SIGKILL"]) async def test_full_cluster_restart( @@ -272,7 +277,7 @@ async def test_full_cluster_restart( # of all replicas being down at the same time. try: assert await are_all_db_processes_down( - ops_test, process + ops_test, process, signal ), "Not all units down at the same time." finally: if process == PATRONI_PROCESS: @@ -304,6 +309,7 @@ async def test_full_cluster_restart( @pytest.mark.group(1) +@pytest.mark.abort_on_fail @pytest.mark.unstable async def test_forceful_restart_without_data_and_transaction_logs( ops_test: OpsTest, @@ -380,6 +386,7 @@ async def test_forceful_restart_without_data_and_transaction_logs( @pytest.mark.group(1) +@pytest.mark.abort_on_fail async def test_network_cut(ops_test: OpsTest, continuous_writes, primary_start_timeout): """Completely cut and restore network.""" # Locate primary unit. @@ -468,6 +475,7 @@ async def test_network_cut(ops_test: OpsTest, continuous_writes, primary_start_t @pytest.mark.group(1) +@pytest.mark.abort_on_fail async def test_network_cut_without_ip_change( ops_test: OpsTest, continuous_writes, primary_start_timeout ):