From 8eb539713f5272e418b78362f6e5201535a77965 Mon Sep 17 00:00:00 2001 From: phvalguima Date: Mon, 29 Jan 2024 18:11:59 +0100 Subject: [PATCH] [DPE-3243][DPE-3246] Async backup and new list-backups output This method implements an async backup method: now, the charm will not wait anymore for the backup action to be finished. Users can check the status of backups using the list-backups command. Restore action closes all the target indices and the action will execute synchronously with the restore process itself. Indices that are closed will not be writable during that time. If the restore takes longer and the `juju run` action times out, then the action continues to run in the background. It is possible to see it happening with `juju status`. OpenSearch opens the indices after the restore action has been executed. This PR also fixes the integration tests in backups: now, we wait for the backup to asynchronously finish its task before moving on to the next ones. Closed tickets: * [DPE-3243](https://warthogs.atlassian.net/browse/DPE-3243): list-backups now contains only relevant information, in this case the backup-id and its status; the PR removes the "backup-type" column, as it has no meaning in OpenSearch * [DPE-3246](https://warthogs.atlassian.net/browse/DPE-3246): backups will be async and restores will be synchronous. Users can follow the restore in the juju status. In any case, the indices that are being restored will be closed, so the user needs to wait. --- .github/workflows/integration.yaml | 17 - actions.yaml | 4 +- lib/charms/opensearch/v0/constants_charm.py | 2 + lib/charms/opensearch/v0/helper_cluster.py | 12 +- .../opensearch/v0/opensearch_backups.py | 384 ++++++------ .../opensearch/v0/opensearch_base_charm.py | 20 +- lib/charms/opensearch/v0/opensearch_distro.py | 1 - .../v0/opensearch_plugin_manager.py | 6 +- tests/integration/ha/continuous_writes.py | 5 +- tests/integration/ha/helpers.py | 73 +++ tests/integration/ha/helpers_data.py | 19 + tests/integration/ha/test_backups.py | 309 ++++------ tests/unit/lib/test_backups.py | 548 +++++++++++++++++- tests/unit/lib/test_opensearch_base_charm.py | 4 +- 14 files changed, 998 insertions(+), 406 deletions(-) diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index 940de57bd..c29be9e4d 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -160,16 +160,6 @@ jobs: sudo rm -rf /usr/local/share/boost sudo rm -rf "$AGENT_TOOLSDIRECTORY" - - name: Setup microceph - uses: phvalguima/microceph-action@main - with: - channel: 'latest/edge' - devname: '/dev/sdi' - accesskey: 'accesskey' - secretkey: 'secretkey' - bucket: 'testbucket' - osdsize: '5G' - - name: Select tests id: select-tests run: | @@ -184,13 +174,6 @@ jobs: - name: Run backup integration run: | - # load microceph output - # ATM: remove the https:// reference and stick with http only - sed -i 's@https://@http://@g' microceph.source - for i in $(cat microceph.source); do export $i; done - export TEST_NUM_APP_UNITS=2 - - # Set kernel params sudo sysctl -w vm.max_map_count=262144 vm.swappiness=0 net.ipv4.tcp_retries2=5 tox run -e ha-backup-integration -- -m '${{ steps.select-tests.outputs.mark_expression }}' env: diff --git a/actions.yaml b/actions.yaml index 9b3c3607e..eb2392a57 100644 --- a/actions.yaml +++ b/actions.yaml @@ -60,6 +60,6 @@ restore: backup-id: type: integer description: | - A backup-id to identify the backup to restore. Format: + A backup-id to identify the backup to restore. Format: backup-id=. required: - - backup-id + - backup-id \ No newline at end of file diff --git a/lib/charms/opensearch/v0/constants_charm.py b/lib/charms/opensearch/v0/constants_charm.py index a0004c8bd..eb6f1ac76 100644 --- a/lib/charms/opensearch/v0/constants_charm.py +++ b/lib/charms/opensearch/v0/constants_charm.py @@ -70,6 +70,8 @@ HorizontalScaleUpSuggest = "Horizontal scale up advised: {} shards unassigned." WaitingForOtherUnitServiceOps = "Waiting for other units to complete the ops on their service." NewIndexRequested = "new index {index} requested" +RestoreInProgress = "Restore in progress..." +PluginConfigStart = "Plugin configuration started." # Relation Interfaces diff --git a/lib/charms/opensearch/v0/helper_cluster.py b/lib/charms/opensearch/v0/helper_cluster.py index f22fe909e..af60ffa66 100644 --- a/lib/charms/opensearch/v0/helper_cluster.py +++ b/lib/charms/opensearch/v0/helper_cluster.py @@ -6,6 +6,7 @@ from random import choice from typing import Dict, List, Optional +from charms.opensearch.v0.helper_enums import BaseStrEnum from charms.opensearch.v0.models import Node from charms.opensearch.v0.opensearch_distro import OpenSearchDistribution from tenacity import retry, stop_after_attempt, wait_exponential @@ -24,6 +25,13 @@ logger = logging.getLogger(__name__) +class IndexStateEnum(BaseStrEnum): + """Enum for index states.""" + + OPEN = "open" + CLOSED = "closed" + + class ClusterTopology: """Class for creating the best possible configuration for a Node.""" @@ -267,9 +275,9 @@ def indices( alt_hosts: Optional[List[str]] = None, ) -> List[Dict[str, str]]: """Get all shards of all indexes in the cluster.""" - idx = opensearch.request("GET", "/_cat/indices", host=host, alt_hosts=alt_hosts) + endpoint = "/_cat/indices?expand_wildcards=all" idx = {} - for index in opensearch.request("GET", "/_cat/indices", host=host, alt_hosts=alt_hosts): + for index in opensearch.request("GET", endpoint, host=host, alt_hosts=alt_hosts): idx[index["index"]] = {"health": index["health"], "status": index["status"]} return idx diff --git a/lib/charms/opensearch/v0/opensearch_backups.py b/lib/charms/opensearch/v0/opensearch_backups.py index bfc2fd03b..85a8eac3d 100644 --- a/lib/charms/opensearch/v0/opensearch_backups.py +++ b/lib/charms/opensearch/v0/opensearch_backups.py @@ -46,20 +46,19 @@ def __init__(...): import json import logging +import math from typing import Any, Dict, List, Set, Tuple -import requests from charms.data_platform_libs.v0.s3 import S3Requirer -from charms.opensearch.v0.helper_cluster import ClusterState +from charms.opensearch.v0.constants_charm import RestoreInProgress +from charms.opensearch.v0.helper_cluster import ClusterState, IndexStateEnum from charms.opensearch.v0.helper_enums import BaseStrEnum from charms.opensearch.v0.opensearch_exceptions import ( OpenSearchError, OpenSearchHttpError, ) -from charms.opensearch.v0.opensearch_internal_data import Scope from charms.opensearch.v0.opensearch_plugins import ( OpenSearchBackupPlugin, - OpenSearchPluginEventScope, OpenSearchPluginRelationClusterNotReadyError, PluginState, ) @@ -88,12 +87,7 @@ def __init__(...): S3_REPO_BASE_PATH = "/" -INDICES_TO_EXCLUDE_AT_RESTORE = { - ".opendistro_security", - ".opensearch-observability", - ".opensearch-sap-log-types-config", - ".opensearch-sap-pre-packaged-rules-config", -} +INDICES_TO_EXCLUDE_AT_RESTORE = {".opendistro_security", ".opensearch-observability"} REPO_NOT_CREATED_ERR = "repository type [s3] does not exist" REPO_NOT_ACCESS_ERR = f"[{S3_REPOSITORY}] path [{S3_REPO_BASE_PATH}] is not accessible" @@ -105,10 +99,22 @@ class OpenSearchBackupError(OpenSearchError): """Exception thrown when an opensearch backup-related action fails.""" +class OpenSearchRestoreError(OpenSearchError): + """Exception thrown when an opensearch restore-related action fails.""" + + class OpenSearchListBackupError(OpenSearchBackupError): """Exception thrown when internal list backups call fails.""" +class OpenSearchRestoreCheckError(OpenSearchRestoreError): + """Exception thrown when restore status check errors.""" + + +class OpenSearchRestoreIndexClosingError(OpenSearchRestoreError): + """Exception thrown when restore fails to close indices.""" + + class BackupServiceState(BaseStrEnum): """Enum for the states possible once plugin is enabled.""" @@ -153,21 +159,15 @@ def __init__(self, charm: Object): def _plugin_status(self): return self.charm.plugin_manager.get_plugin_status(OpenSearchBackupPlugin) - def _format_backup_list(self, backup_list: List[Tuple[Any]]) -> str: + def _format_backup_list(self, backups: List[Tuple[Any]]) -> str: """Formats provided list of backups as a table.""" - backups = [ - "{:<10s} | {:<12s} | {:s}".format(" backup-id ", "backup-type", "backup-status") - ] - backups.append("-" * len(backups[0])) - - import math + output = ["{:<10s} | {:s}".format(" backup-id ", "backup-status")] + output.append("-" * len(output[0])) - for backup_id, backup_type, backup_status in backup_list: + for backup_id, backup_status in backups: tab = " " * math.floor((10 - len(str(backup_id))) / 2) - backups.append( - "{:<10s} | {:<12s} | {:s}".format(f"{tab}{backup_id}", backup_type, backup_status) - ) - return "\n".join(backups) + output.append("{:<10s} | {:s}".format(f"{tab}{backup_id}", backup_status)) + return "\n".join(output) def _generate_backup_list_output(self, backups: Dict[str, Any]) -> str: """Generates a list of backups in a formatted table. @@ -178,15 +178,9 @@ def _generate_backup_list_output(self, backups: Dict[str, Any]) -> str: OpenSearchError: if the list of backups errors """ backup_list = [] - for id, backup in backups: + for id, backup in backups.items(): state = self.get_snapshot_status(backup["state"]) - backup_list.append( - ( - id, - "physical", - str(state) if state != BackupServiceState.SUCCESS else "finished", - ) - ) + backup_list.append((id, state.value)) return self._format_backup_list(backup_list) def _on_list_backups_action(self, event: ActionEvent) -> None: @@ -205,127 +199,181 @@ def _on_list_backups_action(self, event: ActionEvent) -> None: else: event.fail("Failed: invalid output format, must be either json or table") - def _restore_and_try_close_indices_if_needed( - self, backup_id: int - ) -> Tuple[Dict[str, Any], Set[str]]: - """Restores and processes any exception related to running indices. + def _close_indices(self, indices: Set[str]) -> bool: + """Close a list of indices and return their status.""" + if not indices: + # The indices is empty, we do not need to check + return True + resp = self._request( + "POST", + f"{','.join(indices)}/_close", + payload={ + "ignore_unavailable": "true", + }, + ) + + # Trivial case, something went wrong + if not resp or not resp.get("acknowledged", False): + return False + + # There are two options here we return True: + # 1) ack=True and shards_ack=False with empty indices + # This means the indices are already closed + if not resp.get("shards_acknowledged", False): + if not resp.get("indices", {}): + return True + return False - Closing option is preferable, as the restore action as a whole may not succeed. - This method returns the final output and list of closed indices. The calling method - should also check the output for any other error messages, as a more generic error - (i.e. non-closing index error) will pass by as much as a successful restore. + # 2) ack=True and shards_ack=True with each index in resp["indices"] + # marked as closed=True + # The statement of explicit "is True" below assures we have a boolean + # as the response has the form of "true" or "false" originally + all_closed = all( + [state and state.get("closed") for state in resp.get("indices", {}).values()] + ) + if not all_closed: + return False + + # Finally, we can state it is all good + return True + + def _close_indices_if_needed(self, backup_id: int) -> Set[str]: + """Closes indices that will be restored. + + Returns a set of indices that were closed or raises an exception: + - OpenSearchRestoreIndexClosingError if any of the indices could not be closed. + + Raises: + OpenSearchHttpError + OpenSearchRestoreIndexClosingError """ - closed_idx = set() + backup_indices = self._list_backups()[backup_id]["indices"] + indices_to_close = set() + for index, state in ClusterState.indices(self.charm.opensearch).items(): + if ( + index in backup_indices + and state["status"] != IndexStateEnum.CLOSED + and index not in INDICES_TO_EXCLUDE_AT_RESTORE + ): + indices_to_close.add(index) + try: - for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(wait=30)): - with attempt: - # First, for overlapping indices - backup_indices = self._list_backups()[backup_id]["indices"] - to_close = [ - index - for index, state in ClusterState.indices(self.charm.opensearch).items() - if ( - index in backup_indices - and state["status"] != "close" - and index not in INDICES_TO_EXCLUDE_AT_RESTORE - ) - ] - # Try closing each index - for index in to_close: - # Try closing the index - o = self._request("POST", f"{index}/_close") - logger.debug( - f"_restore_and_try_close_indices_if_needed: request closing {index} returned {o}" - ) - closed_idx.add(index) - - output = self._request( - "POST", - f"_snapshot/{S3_REPOSITORY}/{backup_id}/_restore?wait_for_completion=false", - payload={ - "indices": ",".join( - [ - f"-{idx}" - for idx in INDICES_TO_EXCLUDE_AT_RESTORE & set(backup_indices) - ] - ), - "ignore_unavailable": False, - "include_global_state": False, - "include_aliases": True, - "partial": False, - }, - ) - logger.debug( - f"_restore_and_try_close_indices_if_needed retrying, output is: {output}" - ) - if ( - self.get_service_status(output) - == BackupServiceState.SNAPSHOT_RESTORE_ERROR_INDEX_NOT_CLOSED - ): - # The .opensearch-sap-log-types-config, for example does not show - # in the _cat/indices but does block a restore operation. - to_close = output["error"]["reason"].split("[")[2].split("]")[0] - o = self._request("POST", f"{index}/_close") - logger.debug( - f"_restore_and_try_close_indices_if_needed: request closing {index} returned {o}" - ) - closed_idx.add(index) - # We still have non closed indices, try again - raise Exception() - # If status returns success, ensure all indices have been updated - # otherwise, raise an assert error and retry - shard_status = output["snapshot"]["shards"] - assert shard_status["total"] == shard_status["successful"] - except RetryError: - logger.error("_restore_and_try_close_indices_if_needed: fail all retries") - return output, closed_idx - - def _on_restore_backup_action(self, event: ActionEvent) -> None: # noqa: C901 + if not self._close_indices(indices_to_close): + raise OpenSearchRestoreIndexClosingError() + except OpenSearchError as e: + raise OpenSearchRestoreIndexClosingError(e) + return indices_to_close + + def _restore(self, backup_id: int) -> Dict[str, Any]: + """Runs the restore and processes the response.""" + backup_indices = self._list_backups()[backup_id]["indices"] + output = self._request( + "POST", + f"_snapshot/{S3_REPOSITORY}/{backup_id}/_restore?wait_for_completion=true", + payload={ + "indices": ",".join( + [f"-{idx}" for idx in INDICES_TO_EXCLUDE_AT_RESTORE & set(backup_indices)] + ), + "partial": False, # It is the default value, but we want to avoid partial restores + }, + ) + logger.debug(f"_restore: restore call returned {output}") + if ( + self.get_service_status(output) + == BackupServiceState.SNAPSHOT_RESTORE_ERROR_INDEX_NOT_CLOSED + ): + to_close = output["error"]["reason"].split("[")[2].split("]")[0] + raise OpenSearchRestoreIndexClosingError(f"_restore: fails to close {to_close}") + + if "snapshot" not in output or "shards" not in output.get("snapshot"): + raise OpenSearchRestoreCheckError(f"_restore: unexpected response {output}") + + return output["snapshot"] + + def _is_restore_complete(self) -> bool: + """Checks if the restore is finished. + + Essentially, check for each index shard: for all type=SNAPSHOT and stage=DONE, return True. + """ + indices_status = self._request("GET", "/_recovery?human") + for info in indices_status.values(): + # Now, check the status of each shard + for shard in info["shards"]: + if shard["type"] == "SNAPSHOT" and shard["stage"] != "DONE": + return False + return True + + def _is_backup_available_for_restore(self, backup_id: int) -> bool: + """Checks if the backup_id exists and is ready for a restore.""" + backups = self._list_backups() + try: + return ( + backup_id in backups.keys() + and self.get_snapshot_status(backups[backup_id]["state"]) + == BackupServiceState.SUCCESS + ) + except OpenSearchListBackupError: + return False + + def _on_restore_backup_action(self, event: ActionEvent) -> None: """Restores a backup to the current cluster.""" if not self._can_unit_perform_backup(event): event.fail("Failed: backup service is not configured yet") return - + if not self._is_restore_complete(): + event.fail("Failed: previous restore is still in progress") + return + # Now, validate the backup is working backup_id = str(event.params.get("backup-id")) + if not self._is_backup_available_for_restore(backup_id): + event.fail(f"Failed: no backup-id {backup_id}") + return + + self.charm.status.set(MaintenanceStatus(RestoreInProgress)) # Restore will try to close indices if there is a matching name. # The goal is to leave the cluster in a running state, even if the restore fails. # In case of failure, then restore action must return a list of closed indices closed_idx = set() try: - backups = self._list_backups() - if backup_id not in backups.keys(): - event.fail(f"Failed: no backup-id {backup_id}, options available: {backups}") - return - backup_id_state = self.get_snapshot_status(backups[backup_id]["state"]) - if backup_id_state != BackupServiceState.SUCCESS: - event.fail( - f"Failed: no backup-id {backup_id} not successful, state is: {backup_id_state}" - ) - return - output, closed_idx = self._restore_and_try_close_indices_if_needed(backup_id) + closed_idx = self._close_indices_if_needed(backup_id) + output = self._restore(backup_id) logger.debug(f"Restore action: received response: {output}") logger.info(f"Restore action succeeded for backup_id {backup_id}") - except OpenSearchListBackupError as e: - event.fail(f"Failed: {e}, closed indices: {closed_idx}") + except ( + OpenSearchHttpError, + OpenSearchRestoreIndexClosingError, + OpenSearchRestoreCheckError, + ) as e: + event.fail(f"Failed: {e}") + return + + # Post execution checks + # Was the call successful? + state = self.get_service_status(output) + if state != BackupServiceState.SUCCESS: + event.fail(f"Restore failed with {state}") return - if self.get_service_status(output) != BackupServiceState.SUCCESS: - status = str(self.get_service_status(output)) - event.fail(f"Failed with: {status}, closed indices: {closed_idx}") - event.set_results({"state": "successful restore!", "closed-indices": str(closed_idx)}) - def _on_create_backup_action(self, event: ActionEvent) -> None: + shards = output.get("shards", {}) + if shards.get("successful", -1) != shards.get("total", 0): + event.fail("Failed to restore all the shards") + return + + msg = "Restore is complete" if self._is_restore_complete() else "Restore in progress..." + self.charm.status.clear(RestoreInProgress) + event.set_results( + {"backup-id": backup_id, "status": msg, "closed-indices": str(closed_idx)} + ) + + def _on_create_backup_action(self, event: ActionEvent) -> None: # noqa: C901 """Creates a backup from the current cluster.""" if not self._can_unit_perform_backup(event): - event.fail("Failed: backup service is not configured yet") + event.fail("Failed: backup service is not configured or busy") return new_backup_id = None try: - # Check if any backup is not running already, or RetryError happens - if self.is_backup_in_progress(): - event.fail("Backup still in progress: aborting this request...") - return # Increment by 1 the latest snapshot_id (set to 0 if no snapshot was previously made) new_backup_id = int(max(self._list_backups().keys() or [0])) + 1 logger.debug( @@ -335,21 +383,17 @@ def _on_create_backup_action(self, event: ActionEvent) -> None: "PUT", f"_snapshot/{S3_REPOSITORY}/{new_backup_id}?wait_for_completion=false", payload={ - "indices": "*", - "ignore_unavailable": False, - "include_global_state": True, - "partial": False, + "indices": "*", # Take all indices + "partial": False, # It is the default value, but we want to avoid partial backups }, ) ) ) logger.info(f"Backup request submitted with backup-id {new_backup_id}") + logger.info(f"Backup completed with backup-id {new_backup_id}") except ( - RetryError, - ValueError, OpenSearchHttpError, - requests.HTTPError, OpenSearchListBackupError, ) as e: event.fail(f"Failed with exception: {e}") @@ -367,15 +411,17 @@ def _can_unit_perform_backup(self, event: ActionEvent) -> bool: """ # First, validate the plugin is present and correctly configured. if self._plugin_status != PluginState.ENABLED: - event.fail(f"Failed: plugin is not ready yet, current status is {self._plugin_status}") + logger.warning( + f"Failed: plugin is not ready yet, current status is {self._plugin_status}" + ) return False # Then, check the repo status status = self._check_repo_status() if status != BackupServiceState.SUCCESS: - event.fail(f"Failed: repo status is {status}") + logger.warning(f"Failed: repo status is {status}") return False - return True + return not self.is_backup_in_progress() def _list_backups(self) -> Dict[int, str]: """Returns a mapping of snapshot ids / state.""" @@ -388,17 +434,32 @@ def _list_backups(self) -> Dict[int, str]: for snapshot in response.get("snapshots", []) } - def is_backup_in_progress(self, backup_id=None) -> bool: - """Returns True if backup is in progress, False otherwise.""" - return self._query_backup_status(backup_id) in [ + def is_backup_in_progress(self) -> bool: + """Returns True if backup is in progress, False otherwise. + + We filter the _query_backup_status() and seek for the following states: + - SNAPSHOT_IN_PROGRESS + """ + if self._query_backup_status() in [ BackupServiceState.SNAPSHOT_IN_PROGRESS, - ] + BackupServiceState.RESPONSE_FAILED_NETWORK, + ]: + # We have a backup in progress or we cannot reach the API + # taking the "safe path" of informing a backup is in progress + return True + return False def _query_backup_status(self, backup_id=None) -> BackupServiceState: - target = f"_snapshot/{S3_REPOSITORY}/" - target += f"{backup_id}" if backup_id else "_all" - output = self._request("GET", target) - logger.debug(f"Backup status: {output}") + try: + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(5)): + with attempt: + target = f"_snapshot/{S3_REPOSITORY}/" + target += f"{backup_id}" if backup_id else "_all" + output = self._request("GET", target) + logger.debug(f"Backup status: {output}") + except RetryError as e: + logger.error(f"_request failed with: {e}") + return BackupServiceState.RESPONSE_FAILED_NETWORK return self.get_service_status(output) def _on_s3_credentials_changed(self, event: EventBase) -> None: @@ -424,7 +485,9 @@ def _on_s3_credentials_changed(self, event: EventBase) -> None: try: plugin = self.charm.plugin_manager.get_plugin(OpenSearchBackupPlugin) - self.charm.plugin_manager._configure_if_needed(plugin) + if self.charm.plugin_manager.status(plugin) == PluginState.ENABLED: + self.charm.plugin_manager.apply_config(plugin.disable()) + self.charm.plugin_manager.apply_config(plugin.config()) except OpenSearchError as e: if isinstance(e, OpenSearchPluginRelationClusterNotReadyError): self.charm.status.set(WaitingStatus("s3-changed: cluster not ready yet")) @@ -480,21 +543,9 @@ def _on_s3_broken(self, event: EventBase) -> None: # noqa: C901 self.charm.model.get_relation(S3_RELATION) and self.charm.model.get_relation(S3_RELATION).units ): - # There are still members in the relation, defer until it is finished - # Make a relation-change in the peer relation, so it triggers this unit back - counter = self.charm.peers_data.get(Scope.UNIT, "s3_broken") - if counter: - self.charm.peers_data.put(Scope.UNIT, "s3_broken", counter + 1) - else: - self.charm.peers_data.put(Scope.UNIT, "s3_broken", 1) event.defer() return - # Second part of this work-around - if self.charm.peers_data.get(Scope.UNIT, "s3_broken"): - # Now, we can delete it - self.charm.peers_data.delete(Scope.UNIT, "s3_broken") - self.charm.status.set(MaintenanceStatus("Disabling backup service...")) snapshot_status = self._check_snapshot_status() if snapshot_status in [ @@ -529,11 +580,9 @@ def _on_s3_broken(self, event: EventBase) -> None: # noqa: C901 self._execute_s3_broken_calls() try: - self.charm.plugin_manager.set_event_scope( - OpenSearchPluginEventScope.RELATION_BROKEN_EVENT - ) plugin = self.charm.plugin_manager.get_plugin(OpenSearchBackupPlugin) - self.charm.plugin_manager._disable_if_needed(plugin) + if self.charm.plugin_manager.status(plugin) == PluginState.ENABLED: + self.charm.plugin_manager.apply_config(plugin.disable()) except OpenSearchError as e: if isinstance(e, OpenSearchPluginRelationClusterNotReadyError): self.charm.status.set(WaitingStatus("s3-broken event: cluster not ready yet")) @@ -543,14 +592,8 @@ def _on_s3_broken(self, event: EventBase) -> None: # noqa: C901 ) # There was an unexpected error, log it and block the unit logger.error(e) - self.charm.plugin_manager.reset_event_scope() event.defer() return - # we need to do this task, so we will ask for an exception from lint - except: # noqa: E722 - self.charm.plugin_manager.reset_event_scope() - return - self.charm.plugin_manager.reset_event_scope() self.charm.status.set(ActiveStatus()) def _execute_s3_broken_calls(self): @@ -560,13 +603,13 @@ def _execute_s3_broken_calls(self): def _check_repo_status(self) -> BackupServiceState: try: return self.get_service_status(self._request("GET", f"_snapshot/{S3_REPOSITORY}")) - except (ValueError, OpenSearchHttpError, requests.HTTPError): + except OpenSearchHttpError: return BackupServiceState.RESPONSE_FAILED_NETWORK def _check_snapshot_status(self) -> BackupServiceState: try: return self.get_snapshot_status(self._request("GET", "/_snapshot/_status")) - except (ValueError, OpenSearchHttpError, requests.HTTPError): + except OpenSearchHttpError: return BackupServiceState.RESPONSE_FAILED_NETWORK def _get_endpoint_protocol(self, endpoint: str) -> str: @@ -642,7 +685,6 @@ def _request(self, *args, **kwargs) -> str: Raises: - ValueError - OpenSearchHttpError - - requests.HTTPError """ if "retries" not in kwargs.keys(): kwargs["retries"] = 6 diff --git a/lib/charms/opensearch/v0/opensearch_base_charm.py b/lib/charms/opensearch/v0/opensearch_base_charm.py index 06ab0c1f9..061b5e6b3 100644 --- a/lib/charms/opensearch/v0/opensearch_base_charm.py +++ b/lib/charms/opensearch/v0/opensearch_base_charm.py @@ -21,6 +21,7 @@ COSUser, PeerRelationName, PluginConfigChangeError, + PluginConfigStart, RequestUnitServiceOps, SecurityIndexInitProgress, ServiceIsStopping, @@ -101,7 +102,7 @@ UpdateStatusEvent, ) from ops.framework import EventBase, EventSource -from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus, WaitingStatus +from ops.model import BlockedStatus, MaintenanceStatus, WaitingStatus # The unique Charmhub library identifier, never change it LIBID = "cba015bae34642baa1b6bb27bb35a2f7" @@ -495,21 +496,22 @@ def _on_config_changed(self, event: ConfigChangedEvent): event.defer() return + self.status.set(MaintenanceStatus(PluginConfigStart)) try: if self.plugin_manager.run(): self.on[self.service_manager.name].acquire_lock.emit( callback_override="_restart_opensearch" ) - except OpenSearchPluginError as e: - logger.exception(e) - if isinstance(e, OpenSearchPluginRelationClusterNotReadyError): - logger.warn("Plugin management: cluster not ready yet at config changed") - else: - # There was an unexpected error, log it and block the unit - self.status.set(BlockedStatus(PluginConfigChangeError)) + except OpenSearchPluginRelationClusterNotReadyError: + logger.warning("Plugin management: cluster not ready yet at config changed") + event.defer() + return + except OpenSearchPluginError: + self.status.set(BlockedStatus(PluginConfigChangeError)) event.defer() return - self.status.set(ActiveStatus()) + self.status.clear(PluginConfigChangeError) + self.status.clear(PluginConfigStart) def _on_set_password_action(self, event: ActionEvent): """Set new admin password from user input or generate if not passed.""" diff --git a/lib/charms/opensearch/v0/opensearch_distro.py b/lib/charms/opensearch/v0/opensearch_distro.py index d65957567..812a00af8 100644 --- a/lib/charms/opensearch/v0/opensearch_distro.py +++ b/lib/charms/opensearch/v0/opensearch_distro.py @@ -208,7 +208,6 @@ def request( # noqa Raises: ValueError if method or endpoint are missing OpenSearchHttpError if hosts are unreachable - requests.HTTPError if connection to opensearch fails """ def full_urls() -> List[str]: diff --git a/lib/charms/opensearch/v0/opensearch_plugin_manager.py b/lib/charms/opensearch/v0/opensearch_plugin_manager.py index a00a4f0e9..181af8c46 100644 --- a/lib/charms/opensearch/v0/opensearch_plugin_manager.py +++ b/lib/charms/opensearch/v0/opensearch_plugin_manager.py @@ -219,7 +219,7 @@ def _configure_if_needed(self, plugin: OpenSearchPlugin) -> bool: # Leave this method if either user did not request to enable this plugin # or plugin has been already enabled. return False - return self._apply_config(plugin.config()) + return self.apply_config(plugin.config()) except KeyError as e: raise OpenSearchPluginMissingConfigError(e) @@ -234,11 +234,11 @@ def _disable_if_needed(self, plugin: OpenSearchPlugin) -> bool: # represents a plugin that has been installed but either not yet configured # or user explicitly disabled. return False - return self._apply_config(plugin.disable()) + return self.apply_config(plugin.disable()) except KeyError as e: raise OpenSearchPluginMissingConfigError(e) - def _apply_config(self, config: OpenSearchPluginConfig) -> bool: + def apply_config(self, config: OpenSearchPluginConfig) -> bool: """Runs the configuration changes as passed via OpenSearchPluginConfig. For each: configuration and secret diff --git a/tests/integration/ha/continuous_writes.py b/tests/integration/ha/continuous_writes.py index c285056ea..7481a2135 100644 --- a/tests/integration/ha/continuous_writes.py +++ b/tests/integration/ha/continuous_writes.py @@ -39,13 +39,14 @@ class ContinuousWrites: LAST_WRITTEN_VAL_PATH = "last_written_value" CERT_PATH = "/tmp/ca_chain.cert" - def __init__(self, ops_test: OpsTest, app: str): + def __init__(self, ops_test: OpsTest, app: str, initial_count: int = 0): self._ops_test = ops_test self._app = app self._is_stopped = True self._event = None self._queue = None self._process = None + self._initial_count = initial_count @retry( wait=wait_fixed(wait=5) + wait_random(0, 5), @@ -179,7 +180,7 @@ def _create_process(self): self._process = Process( target=ContinuousWrites._run_async, name="continuous_writes", - args=(self._event, self._queue, 0, True), + args=(self._event, self._queue, self._initial_count, True), ) def _stop_process(self): diff --git a/tests/integration/ha/helpers.py b/tests/integration/ha/helpers.py index b80e3bdae..4c73a8ea7 100644 --- a/tests/integration/ha/helpers.py +++ b/tests/integration/ha/helpers.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. +import json import logging import subprocess import time @@ -18,12 +19,14 @@ ) from tests.integration.ha.continuous_writes import ContinuousWrites +from tests.integration.ha.helpers_data import index_docs_count from tests.integration.helpers import ( get_application_unit_ids, get_application_unit_ids_hostnames, get_application_unit_ids_ips, http_request, juju_version_major, + run_action, ) OPENSEARCH_SERVICE_PATH = "/etc/systemd/system/snap.opensearch.daemon.service" @@ -437,3 +440,73 @@ async def print_logs(ops_test: OpsTest, app: str, unit_id: int, msg: str) -> str logger.info(f"\n\n\nServer Logs:\n{stdout}") return msg + + +async def wait_backup_finish(ops_test, leader_id): + """Waits the backup to finish and move to the finished state or throws a RetryException.""" + for attempt in Retrying(stop=stop_after_attempt(8), wait=wait_fixed(15)): + with attempt: + action = await run_action( + ops_test, leader_id, "list-backups", params={"output": "json"} + ) + # Expected format: + # namespace(status='completed', response={'return-code': 0, 'backups': '{"1": ...}'}) + backups = json.loads(action.response["backups"]) + logger.debug(f"Backups recovered: {backups}") + if action.status == "completed" and len(backups) > 0: + logger.debug(f"list-backups output: {action}") + return + + raise Exception("Backup not finished yet") + + +async def wait_restore_finish(ops_test, unit_ip): + """Waits the backup to finish and move to the finished state or throws a RetryException.""" + for attempt in Retrying(stop=stop_after_attempt(8), wait=wait_fixed(15)): + with attempt: + indices_status = await http_request( + ops_test, + "GET", + f"https://{unit_ip}:9200/_recovery?human", + ) + for info in indices_status.values(): + # Now, check the status of each shard + for shard in info["shards"]: + if shard["type"] == "SNAPSHOT" and shard["stage"] != "DONE": + raise Exception() + + +async def continuous_writes_increases(ops_test: OpsTest, unit_ip: str, app: str) -> bool: + """Asserts that TEST_BACKUP_INDEX is writable while under continuous writes. + + Given we are restoring an index, we need to make sure ContinuousWrites restart at + the tip of that index instead of doc_id = 0. + + Closes the writer at the end. + """ + initial_count = await index_docs_count(ops_test, app, unit_ip, ContinuousWrites.INDEX_NAME) + logger.info( + f"Index {ContinuousWrites.INDEX_NAME} has {initial_count} documents, starting there" + ) + writer = ContinuousWrites(ops_test, app, initial_count=initial_count) + await writer.start() + time.sleep(5) + result = await writer.stop() + return result.count > initial_count + + +async def backup_cluster(ops_test: OpsTest, leader_id: int) -> bool: + """Runs the backup of the cluster.""" + action = await run_action(ops_test, leader_id, "create-backup") + logger.debug(f"create-backup output: {action}") + + await wait_backup_finish(ops_test, leader_id) + return action.status == "completed" + + +async def restore_cluster(ops_test: OpsTest, backup_id: int, unit_ip: str, leader_id: int) -> bool: + action = await run_action(ops_test, leader_id, "restore", params={"backup-id": backup_id}) + logger.debug(f"restore output: {action}") + + await wait_restore_finish(ops_test, unit_ip) + return action.status == "completed" diff --git a/tests/integration/ha/helpers_data.py b/tests/integration/ha/helpers_data.py index 3edd004ab..5b194e4a9 100644 --- a/tests/integration/ha/helpers_data.py +++ b/tests/integration/ha/helpers_data.py @@ -247,3 +247,22 @@ async def search( with attempt: # Raises RetryError if failed after "retries" resp = await http_request(ops_test, "GET", endpoint, payload=query, app=app) return resp["hits"]["hits"] + + +async def index_docs_count( + ops_test: OpsTest, + app: str, + unit_ip: str, + index_name: str, + retries: int = 15, +) -> int: + """Returns the number of documents in an index.""" + endpoint = f"https://{unit_ip}:9200/{index_name}/_count" + for attempt in Retrying( + stop=stop_after_attempt(retries), wait=wait_fixed(wait=5) + wait_random(0, 5) + ): + with attempt: # Raises RetryError if failed after "retries" + resp = await http_request(ops_test, "GET", endpoint, app=app) + if isinstance(resp["count"], int): + return resp["count"] + return int(resp["count"]) diff --git a/tests/integration/ha/test_backups.py b/tests/integration/ha/test_backups.py index 81219fde6..da74a3a3b 100644 --- a/tests/integration/ha/test_backups.py +++ b/tests/integration/ha/test_backups.py @@ -3,32 +3,35 @@ # See LICENSE file for licensing details. import asyncio -import json import logging import os +import random +import subprocess # from pathlib import Path # # import boto3 import pytest +import requests from pytest_operator.plugin import OpsTest from tests.integration.ha.continuous_writes import ContinuousWrites -from tests.integration.ha.helpers import app_name, assert_continuous_writes_consistency -from tests.integration.ha.helpers_data import ( - create_index, - default_doc, - index_doc, - search, +from tests.integration.ha.helpers import ( + app_name, + assert_continuous_writes_consistency, + backup_cluster, + continuous_writes_increases, + restore_cluster, ) +from tests.integration.ha.helpers_data import index_docs_count from tests.integration.ha.test_horizontal_scaling import IDLE_PERIOD from tests.integration.helpers import ( APP_NAME, MODEL_CONFIG, SERIES, - get_application_unit_ids_ips, get_leader_unit_id, get_leader_unit_ip, + get_reachable_unit_ips, http_request, run_action, ) @@ -99,9 +102,6 @@ # bucket_object.delete() -TEST_BACKUP_INDEX = "test_backup_index" - - @pytest.fixture() async def c_writes(ops_test: OpsTest): """Creates instance of the ContinuousWrites.""" @@ -114,13 +114,61 @@ async def c_writes_runner(ops_test: OpsTest, c_writes: ContinuousWrites): """Starts continuous write operations and clears writes at the end of the test.""" await c_writes.start() yield + + reachable_ip = random.choice(await get_reachable_unit_ips(ops_test)) + await http_request(ops_test, "GET", f"https://{reachable_ip}:9200/_cat/nodes", json_resp=False) + await http_request( + ops_test, "GET", f"https://{reachable_ip}:9200/_cat/shards", json_resp=False + ) + await c_writes.clear() logger.info("\n\n\n\nThe writes have been cleared.\n\n\n\n") +# TODO: Remove this method as soon as poetry gets merged. +@pytest.fixture(scope="session") +def microceph(): + """Starts microceph radosgw.""" + if "microceph" not in subprocess.check_output(["sudo", "snap", "list"]).decode(): + uceph = "/tmp/microceph.sh" + + with open(uceph, "w") as f: + # TODO: if this code stays, then the script below should be added as a file + # in the charm. + resp = requests.get( + "https://raw.githubusercontent.com/canonical/microceph-action/main/microceph.sh" + ) + f.write(resp.content.decode()) + + os.chmod(uceph, 0o755) + subprocess.check_output( + [ + "sudo", + uceph, + "-c", + "latest/edge", + "-d", + "/dev/sdc", + "-a", + "accesskey", + "-s", + "secretkey", + "-b", + "data-charms-testing", + "-z", + "5G", + ] + ) + ip = subprocess.check_output(["hostname", "-I"]).decode().split()[0] + # TODO: if this code stays, then we should generate random keys for the test. + return {"url": f"http://{ip}", "access-key": "accesskey", "secret-key": "secretkey"} + + @pytest.mark.abort_on_fail @pytest.mark.skip_if_deployed -async def test_build_and_deploy(ops_test: OpsTest) -> None: # , cloud_credentials) -> None: +async def test_build_and_deploy( + ops_test: OpsTest, microceph +) -> None: # , cloud_credentials) -> None: """Build and deploy an HA cluster of OpenSearch and corresponding S3 integration.""" # it is possible for users to provide their own cluster for HA testing. # Hence, check if there is a pre-existing cluster. @@ -128,30 +176,12 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: # , cloud_credentia if await app_name(ops_test): return - s3_storage = None - if ( - "S3_BUCKET" in os.environ - and "S3_SERVER_URL" in os.environ - and "S3_REGION" in os.environ - and "S3_ACCESS_KEY" in os.environ - and "S3_SECRET_KEY" in os.environ - ): - s3_config = { - "bucket": os.environ["S3_BUCKET"], - "path": "/", - "endpoint": os.environ["S3_SERVER_URL"], - "region": os.environ["S3_REGION"], - } - s3_storage = "ceph" - elif "AWS_ACCESS_KEY" in os.environ and "AWS_SECRET_KEY" in os.environ: - s3_config = CLOUD_CONFIGS["aws"].copy() - s3_storage = "aws" - elif "GCP_ACCESS_KEY" in os.environ and "GCP_SECRET_KEY" in os.environ: - s3_config = CLOUD_CONFIGS["gcp"].copy() - s3_storage = "gcp" - else: - logger.exception("Missing S3 configs in os.environ.") - raise Exception("Missing s3") + s3_config = { + "bucket": "data-charms-testing", + "path": "/", + "endpoint": microceph["url"], + "region": "default", + } my_charm = await ops_test.build_charm(".") await ops_test.model.set_config(MODEL_CONFIG) @@ -160,21 +190,17 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: # , cloud_credentia tls_config = {"generate-self-signed-certificates": "true", "ca-common-name": "CN_CA"} # Convert to integer as environ always returns string - app_num_units = int(os.environ.get("TEST_NUM_APP_UNITS", None) or 3) + app_num_units = 3 await asyncio.gather( ops_test.model.deploy(TLS_CERTIFICATES_APP_NAME, channel="stable", config=tls_config), ops_test.model.deploy(S3_INTEGRATOR_NAME, channel="stable", config=s3_config), ops_test.model.deploy(my_charm, num_units=app_num_units, series=SERIES), ) - # Set the access/secret keys - if s3_storage == "ceph": - s3_creds = { - "access-key": os.environ["S3_ACCESS_KEY"], - "secret-key": os.environ["S3_SECRET_KEY"], - } - # else: - # s3_creds = cloud_credentials[s3_storage].copy() + s3_creds = { + "access-key": microceph["access-key"], + "secret-key": microceph["secret-key"], + } await run_action( ops_test, @@ -201,90 +227,32 @@ async def test_backup_cluster( ) -> None: """Runs the backup process whilst writing to the cluster into 'noisy-index'.""" app = (await app_name(ops_test)) or APP_NAME + leader_id = await get_leader_unit_id(ops_test) - units = await get_application_unit_ids_ips(ops_test, app=app) - leader_unit_ip = await get_leader_unit_ip(ops_test, app=app) - - await create_index(ops_test, app, leader_unit_ip, TEST_BACKUP_INDEX, r_shards=len(units) - 1) - - # index document - doc_id = TEST_BACKUP_DOC_ID - await index_doc(ops_test, app, leader_unit_ip, TEST_BACKUP_INDEX, doc_id) - - # check that the doc can be retrieved from any node - logger.info("Test backup index: searching") - for u_id, u_ip in units.items(): - docs = await search( - ops_test, - app, - u_ip, - TEST_BACKUP_INDEX, - query={"query": {"term": {"_id": doc_id}}}, - preference="_only_local", - ) - # Validate the index and document are present - assert len(docs) == 1 - assert docs[0]["_source"] == default_doc(TEST_BACKUP_INDEX, doc_id) - - leader_id = await get_leader_unit_id(ops_test, app) - - action = await run_action(ops_test, leader_id, "create-backup") - logger.info(f"create-backup output: {action}") - - assert action.status == "completed" - - list_backups = await run_action(ops_test, leader_id, "list-backups", params={"output": "json"}) - logger.info(f"list-backups output: {list_backups}") - - # Expected format: - # namespace(status='completed', response={'return-code': 0, 'backups': '{"1": ...}'}) - backups = json.loads(list_backups.response["backups"]) - assert list_backups.status == "completed" - assert len(backups.keys()) == int(action.response["backup-id"]) - assert backups[action.response["backup-id"]]["state"] == "SUCCESS" - + assert await backup_cluster( + ops_test, + leader_id, + ) # continuous writes checks await assert_continuous_writes_consistency(ops_test, c_writes, app) @pytest.mark.abort_on_fail -async def test_restore_cluster( - ops_test: OpsTest, -) -> None: +async def test_restore_cluster(ops_test: OpsTest) -> None: """Deletes the TEST_BACKUP_INDEX, restores the cluster and tries to search for index.""" + unit_ip = await get_leader_unit_ip(ops_test) app = (await app_name(ops_test)) or APP_NAME + leader_id = await get_leader_unit_id(ops_test) - units = await get_application_unit_ids_ips(ops_test, app=app) - leader_id = await get_leader_unit_id(ops_test, app) - leader_unit_ip = await get_leader_unit_ip(ops_test, app=app) - - await http_request( + assert await restore_cluster( ops_test, - "DELETE", - f"https://{leader_unit_ip}:9200/{TEST_BACKUP_INDEX}", - app=app, + 1, # backup_id + unit_ip, + leader_id, ) - - action = await run_action(ops_test, leader_id, "restore", params={"backup-id": 1}) - logger.info(f"restore output: {action}") - assert action.status == "completed" - - # index document - doc_id = TEST_BACKUP_DOC_ID - # check that the doc can be retrieved from any node - logger.info("Test backup index: searching") - for u_id, u_ip in units.items(): - docs = await search( - ops_test, - app, - u_ip, - TEST_BACKUP_INDEX, - query={"query": {"term": {"_id": doc_id}}}, - preference="_only_local", - ) - # Validate the index and document are present - assert len(docs) == 1 - assert docs[0]["_source"] == default_doc(TEST_BACKUP_INDEX, doc_id) + count = await index_docs_count(ops_test, app, unit_ip, ContinuousWrites.INDEX_NAME) + assert count > 0 + await continuous_writes_increases(ops_test, unit_ip, app) @pytest.mark.abort_on_fail @@ -294,8 +262,10 @@ async def test_restore_cluster_after_app_destroyed(ops_test: OpsTest) -> None: Restores the backup and then checks if the same TEST_BACKUP_INDEX is there. """ app = (await app_name(ops_test)) or APP_NAME + + logging.info("Destroying the application") await ops_test.model.remove_application(app, block_until_done=True) - app_num_units = int(os.environ.get("TEST_NUM_APP_UNITS", None) or 3) + app_num_units = 3 my_charm = await ops_test.build_charm(".") # Redeploy await asyncio.gather( @@ -311,103 +281,58 @@ async def test_restore_cluster_after_app_destroyed(ops_test: OpsTest) -> None: idle_period=IDLE_PERIOD, ) - units = await get_application_unit_ids_ips(ops_test, app=app) - leader_id = await get_leader_unit_id(ops_test, app) - - action = await run_action(ops_test, leader_id, "restore", params={"backup-id": 1}) - logger.info(f"restore output: {action}") - assert action.status == "completed" - - # index document - doc_id = TEST_BACKUP_DOC_ID - # check that the doc can be retrieved from any node - logger.info("Test backup index: searching") - for u_id, u_ip in units.items(): - docs = await search( - ops_test, - app, - u_ip, - TEST_BACKUP_INDEX, - query={"query": {"term": {"_id": doc_id}}}, - preference="_only_local", - ) - # Validate the index and document are present - assert len(docs) == 1 - assert docs[0]["_source"] == default_doc(TEST_BACKUP_INDEX, doc_id) + leader_id = await get_leader_unit_id(ops_test) + leader_unit_ip = await get_leader_unit_ip(ops_test, app=app) + assert await restore_cluster( + ops_test, + 1, # backup_id + leader_unit_ip, + leader_id, + ) + # Count the number of docs in the index + count = await index_docs_count(ops_test, app, leader_unit_ip, ContinuousWrites.INDEX_NAME) + assert count > 0 + await continuous_writes_increases(ops_test, leader_unit_ip, app) @pytest.mark.abort_on_fail async def test_remove_and_readd_s3_relation(ops_test: OpsTest) -> None: """Removes and re-adds the s3-credentials relation to test backup and restore.""" app = (await app_name(ops_test)) or APP_NAME - units = await get_application_unit_ids_ips(ops_test, app=app) - leader_id = await get_leader_unit_id(ops_test, app) - leader_unit_ip = await get_leader_unit_ip(ops_test, app=app) + leader_id = await get_leader_unit_id(ops_test) + unit_ip = await get_leader_unit_ip(ops_test) logger.info("Remove s3-credentials relation") # Remove relation - await ops_test.model.applications[APP_NAME].destroy_relation( + await ops_test.model.applications[app].destroy_relation( "s3-credentials", f"{S3_INTEGRATOR_NAME}:s3-credentials" ) await ops_test.model.wait_for_idle( - apps=[APP_NAME], + apps=[app], status="active", timeout=1400, idle_period=IDLE_PERIOD, ) logger.info("Re-add s3-credentials relation") - await ops_test.model.relate(APP_NAME, S3_INTEGRATOR_NAME) + await ops_test.model.relate(app, S3_INTEGRATOR_NAME) await ops_test.model.wait_for_idle( - apps=[APP_NAME], + apps=[app], status="active", timeout=1400, idle_period=IDLE_PERIOD, ) - leader_id = await get_leader_unit_id(ops_test, app) - - action = await run_action(ops_test, leader_id, "create-backup") - logger.info(f"create-backup output: {action}") - - assert action.status == "completed" - - list_backups = await run_action(ops_test, leader_id, "list-backups", params={"output": "json"}) - logger.info(f"list-backups output: {list_backups}") - - # Expected format: - # namespace(status='completed', response={'return-code': 0, 'backups': '{"1": ...}'}) - backups = json.loads(list_backups.response["backups"]) - assert list_backups.status == "completed" - assert len(backups.keys()) == int(action.response["backup-id"]) - assert backups[action.response["backup-id"]]["state"] == "SUCCESS" - - await http_request( + assert await backup_cluster( ops_test, - "DELETE", - f"https://{leader_unit_ip}:9200/{TEST_BACKUP_INDEX}", - app=app, + leader_id, ) - - action = await run_action( - ops_test, leader_id, "restore", params={"backup-id": int(action.response["backup-id"])} + assert await restore_cluster( + ops_test, + 1, # backup_id + unit_ip, + leader_id, ) - logger.info(f"restore-backup output: {action}") - assert action.status == "completed" - - # index document - doc_id = TEST_BACKUP_DOC_ID - # check that the doc can be retrieved from any node - logger.info("Test backup index: searching") - for u_id, u_ip in units.items(): - docs = await search( - ops_test, - app, - u_ip, - TEST_BACKUP_INDEX, - query={"query": {"term": {"_id": doc_id}}}, - preference="_only_local", - ) - # Validate the index and document are present - assert len(docs) == 1 - assert docs[0]["_source"] == default_doc(TEST_BACKUP_INDEX, doc_id) + count = await index_docs_count(ops_test, app, unit_ip, ContinuousWrites.INDEX_NAME) + assert count > 0 + await continuous_writes_increases(ops_test, unit_ip, app) diff --git a/tests/unit/lib/test_backups.py b/tests/unit/lib/test_backups.py index 0a6580edb..7072c4ae5 100644 --- a/tests/unit/lib/test_backups.py +++ b/tests/unit/lib/test_backups.py @@ -3,17 +3,29 @@ """Unit test for the opensearch_plugins library.""" import unittest -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, PropertyMock, patch import charms +import pytest +import tenacity from charms.opensearch.v0.constants_charm import PeerRelationName +from charms.opensearch.v0.helper_cluster import IndexStateEnum from charms.opensearch.v0.opensearch_backups import ( S3_RELATION, S3_REPOSITORY, + BackupServiceState, OpenSearchBackupPlugin, + OpenSearchListBackupError, + OpenSearchRestoreIndexClosingError, ) +from charms.opensearch.v0.opensearch_exceptions import OpenSearchError from charms.opensearch.v0.opensearch_health import HealthColors -from charms.opensearch.v0.opensearch_plugins import OpenSearchPluginConfig, PluginState +from charms.opensearch.v0.opensearch_plugins import ( + OpenSearchPluginConfig, + OpenSearchPluginError, + PluginState, +) +from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus from ops.testing import Harness from charm import OpenSearchOperatorCharm @@ -22,7 +34,427 @@ TEST_BASE_PATH = "/test" +LIST_BACKUPS_TRIAL = """ backup-id | backup-status +--------------------------- + backup1 | success + backup2 | snapshot failed for unknown reason + backup3 | snapshot in progress""" + + +@pytest.fixture(scope="session") +def harness(): + harness_obj = Harness(OpenSearchOperatorCharm) + harness_obj.begin() + charm = harness_obj.charm + # Override the config to simulate the TestPlugin + # As config.yaml does not exist, the setup below simulates it + charm.plugin_manager._charm_config = harness_obj.model._config + # Override the ConfigExposedPlugins + charms.opensearch.v0.opensearch_plugin_manager.ConfigExposedPlugins = { + "repository-s3": { + "class": OpenSearchBackupPlugin, + "config": None, + "relation": "s3-credentials", + }, + } + charm.opensearch.is_started = MagicMock(return_value=True) + charm.health.apply = MagicMock(return_value=HealthColors.GREEN) + # Mock retrials to speed up tests + charms.opensearch.v0.opensearch_backups.wait_fixed = MagicMock( + return_value=tenacity.wait.wait_fixed(0.1) + ) + + # Replace some unused methods that will be called as part of set_leader with mock + charm.service_manager._update_locks = MagicMock() + charm._put_admin_user = MagicMock() + harness_obj.add_relation(PeerRelationName, "opensearch") + harness_obj.set_leader(is_leader=True) + return harness_obj + + +@pytest.fixture(scope="session", autouse=True) +def cleanup_harnes(harness): + yield + harness.cleanup() + + +@pytest.fixture(scope="function") +def mock_request(): + with patch("charms.opensearch.v0.opensearch_backups.OpenSearchBackup._request") as mock: + yield mock + + +def test_can_unit_perform_backup_plugin_not_ready(harness, caplog): + plugin_method = "charms.opensearch.v0.opensearch_backups.OpenSearchBackup._plugin_status" + event = MagicMock() + with patch(plugin_method, new_callable=PropertyMock) as mock_plugin_status: + mock_plugin_status.return_value = PluginState.DISABLED + result = harness.charm.backup._can_unit_perform_backup(event) + + assert ( + caplog.records[-1].message + == f"Failed: plugin is not ready yet, current status is {PluginState.DISABLED}" + ) + assert caplog.records[-1].levelname == "WARNING" + assert not result + + +def test_can_unit_perform_backup_repo_status_failed(harness, caplog): + plugin_method = "charms.opensearch.v0.opensearch_backups.OpenSearchBackup._plugin_status" + event = MagicMock() + with patch(plugin_method, new_callable=PropertyMock) as mock_plugin_status: + mock_plugin_status.return_value = PluginState.ENABLED + harness.charm.backup._check_repo_status = MagicMock( + return_value=BackupServiceState.REPO_NOT_CREATED + ) + result = harness.charm.backup._can_unit_perform_backup(event) + assert ( + caplog.records[-1].message + == f"Failed: repo status is {BackupServiceState.REPO_NOT_CREATED}" + ) + assert caplog.records[-1].levelname == "WARNING" + assert not result + + +def test_can_unit_perform_backup_backup_in_progress(harness, caplog): + plugin_method = "charms.opensearch.v0.opensearch_backups.OpenSearchBackup._plugin_status" + event = MagicMock() + with patch(plugin_method, new_callable=PropertyMock) as mock_plugin_status: + mock_plugin_status.return_value = PluginState.ENABLED + harness.charm.backup._check_repo_status = MagicMock( + return_value=BackupServiceState.SUCCESS + ) + harness.charm.backup.is_backup_in_progress = MagicMock(return_value=True) + result = harness.charm.backup._can_unit_perform_backup(event) + assert not caplog.records + assert not result + + +@pytest.mark.parametrize( + "leader,request_value,result_value", + [ + # Test leader + request_value that should return True + ( + False, + { + "index1": {"shards": [{"type": "SNAPSHOT", "stage": "DONE"}]}, + "index2": {"shards": [{"type": "SNAPSHOT", "stage": "DONE"}]}, + }, + True, + ), + ( + True, + { + "index1": {"shards": [{"type": "SNAPSHOT", "stage": "DONE"}]}, + "index2": {"shards": [{"type": "SNAPSHOT", "stage": "DONE"}]}, + }, + True, + ), + # Test leader + request_value that should return False + ( + False, + { + "index1": {"shards": [{"type": "SNAPSHOT", "stage": "DONE"}]}, + "index2": {"shards": [{"type": "SNAPSHOT", "stage": "IN_PROGRESS"}]}, + }, + False, + ), + ( + True, + { + "index1": {"shards": [{"type": "SNAPSHOT", "stage": "DONE"}]}, + "index2": {"shards": [{"type": "SNAPSHOT", "stage": "IN_PROGRESS"}]}, + }, + False, + ), + # Test leader + request_value that should return True + ( + False, + { + "index1": {"shards": [{"type": "SNAPSHOT", "stage": "DONE"}]}, + "index2": {"shards": [{"type": "SNAPSHOT", "stage": "DONE"}]}, + "index3": {"shards": [{"type": "NOT_SNAP", "stage": "DONE"}]}, + }, + True, + ), + ( + True, + { + "index1": {"shards": [{"type": "SNAPSHOT", "stage": "DONE"}]}, + "index2": {"shards": [{"type": "SNAPSHOT", "stage": "DONE"}]}, + "index3": {"shards": [{"type": "NOT_SNAP", "stage": "DONE"}]}, + }, + True, + ), + # Test leader + request_value that should return False + ( + False, + { + "index1": {"shards": [{"type": "SNAPSHOT", "stage": "DONE"}]}, + "index2": {"shards": [{"type": "SNAPSHOT", "stage": "IN_PROGRESS"}]}, + "index3": {"shards": [{"type": "NOT_SNAP", "stage": "DONE"}]}, + }, + False, + ), + ( + True, + { + "index1": {"shards": [{"type": "SNAPSHOT", "stage": "DONE"}]}, + "index2": {"shards": [{"type": "SNAPSHOT", "stage": "IN_PROGRESS"}]}, + "index3": {"shards": [{"type": "NOT_SNAP", "stage": "DONE"}]}, + }, + False, + ), + ], +) +def test_restore_finished_true(harness, mock_request, leader, request_value, result_value): + harness.charm.backup.charm.unit.is_leader = MagicMock(return_value=leader) + mock_request.return_value = request_value + assert harness.charm.backup._is_restore_complete() == result_value + + +@pytest.mark.parametrize( + "list_backup_response,cluster_state,req_response,exception_raised", + [ + # Check if only indices in backup-id=1 are closed + ( + {1: {"indices": ["index1", "index2"]}}, + { + "index1": {"status": IndexStateEnum.OPEN}, + "index2": {"status": IndexStateEnum.OPEN}, + "index3": {"status": IndexStateEnum.OPEN}, + }, + { + "acknowledged": True, + "shards_acknowledged": True, + "indices": { + "index1": { + "closed": True, + }, + "index2": { + "closed": True, + }, + }, # represents the closed indices + }, + False, + ), + # Check if only indices in backup-id=1 are closed + ( + { + 1: {"indices": ["index1", "index2"]}, + 2: {"indices": ["index3"]}, + }, + { + "index1": {"status": IndexStateEnum.OPEN}, + "index2": {"status": IndexStateEnum.OPEN}, + "index3": {"status": IndexStateEnum.OPEN}, + }, + { + "acknowledged": True, + "shards_acknowledged": True, + "indices": { + "index1": { + "closed": True, + }, + "index2": { + "closed": True, + }, + }, # represents the closed indices + }, + False, + ), + # Check if already closed indices are skipped + ( + { + 1: {"indices": ["index1", "index2"]}, + 2: {"indices": ["index3"]}, + }, + { + "index1": {"status": IndexStateEnum.OPEN}, + "index2": {"status": IndexStateEnum.CLOSED}, + "index3": {"status": IndexStateEnum.OPEN}, + }, + { + "acknowledged": True, + "shards_acknowledged": True, + "indices": { + "index1": { + "closed": True, + }, + }, # represents the closed indices + }, + False, + ), + # Represents an error where index2 is not closed + ( + {1: {"indices": ["index1", "index2"]}}, + { + "index1": {"status": IndexStateEnum.OPEN}, + "index2": {"status": IndexStateEnum.OPEN}, + "index3": {"status": IndexStateEnum.OPEN}, + }, + { + "acknowledged": True, + "shards_acknowledged": True, + "indices": { + "index1": { + "closed": True, + }, + "index2": { + "closed": False, + }, + }, # represents the closed indices + }, + True, + ), + # Represents an error where request failed + ( + {1: {"indices": ["index1", "index2"]}}, + { + "index1": {"status": IndexStateEnum.OPEN}, + "index2": {"status": IndexStateEnum.OPEN}, + "index3": {"status": IndexStateEnum.OPEN}, + }, + {"acknowledged": True, "shards_acknowledged": True, "indices": {}}, + True, + ), + # Represents an error where request failed + ( + {1: {"indices": ["index1", "index2"]}}, + { + "index1": {"status": IndexStateEnum.OPEN}, + "index2": {"status": IndexStateEnum.OPEN}, + "index3": {"status": IndexStateEnum.OPEN}, + }, + { + "acknowledged": False, + }, + True, + ), + ], +) +def test_close_indices_if_needed( + harness, mock_request, list_backup_response, cluster_state, req_response, exception_raised +): + harness.charm.backup._list_backups = MagicMock(return_value=list_backup_response) + charms.opensearch.v0.opensearch_backups.ClusterState.indices = MagicMock( + return_value=cluster_state + ) + mock_request.return_value = req_response + try: + idx = harness.charm.backup._close_indices_if_needed(1) + except OpenSearchError as e: + assert isinstance(e, OpenSearchRestoreIndexClosingError) and exception_raised + else: + idx = { + i + for i in list_backup_response[1]["indices"] + if (i in cluster_state.keys() and cluster_state[i]["status"] != IndexStateEnum.CLOSED) + } + mock_request.assert_called_with( + "POST", + f"{','.join(idx)}/_close", + payload={ + "ignore_unavailable": "true", + }, + ) + + +@pytest.mark.parametrize( + "test_type,s3_units,snapshot_status,is_leader,apply_config_exc", + [ + ( + "s3-still-units-present", + ["some_unit"], # This is a dummy value, so we trigger the .units check + None, + True, + None, + ), + ( + "snapshot-in-progress", + None, + BackupServiceState.SNAPSHOT_IN_PROGRESS, + True, + None, + ), + ( + "apply-config-error", + None, + BackupServiceState.SUCCESS, + True, + OpenSearchPluginError("Error"), + ), + # Using this test case so we validate that a non-leader unit goes through + # and eventually calls apply_config + ( + "apply-config-error-not-leader", + None, + BackupServiceState.SUCCESS, + True, + OpenSearchPluginError("Error"), + ), + ( + "success", + None, + BackupServiceState.SUCCESS, + True, + None, + ), + ], +) +def test_on_s3_broken_steps( + harness, test_type, s3_units, snapshot_status, is_leader, apply_config_exc +): + relation = MagicMock() + relation.units = s3_units + harness.charm.model.get_relation = MagicMock(return_value=relation) + event = MagicMock() + harness.charm.backup._execute_s3_broken_calls = MagicMock() + harness.charm.plugin_manager.apply_config = ( + MagicMock(side_effect=apply_config_exc) if apply_config_exc else MagicMock() + ) + harness.charm.backup._check_snapshot_status = MagicMock(return_value=snapshot_status) + harness.charm.unit.is_leader = MagicMock(return_value=is_leader) + harness.charm.plugin_manager.get_plugin = MagicMock() + harness.charm.plugin_manager.status = MagicMock(return_value=PluginState.ENABLED) + harness.charm.status.set = MagicMock() + + # Call the method + harness.charm.backup._on_s3_broken(event) + + if test_type == "s3-still-units-present": + event.defer.assert_called() + harness.charm.backup._execute_s3_broken_calls.assert_not_called() + elif test_type == "snapshot-in-progress": + event.defer.assert_called() + harness.charm.status.set.assert_any_call(MaintenanceStatus("Disabling backup service...")) + harness.charm.status.set.assert_any_call( + MaintenanceStatus( + "Disabling backup postponed until backup in progress: snapshot in progress" + ) + ) + harness.charm.backup._execute_s3_broken_calls.assert_not_called() + elif test_type == "apply-config-error" or test_type == "apply-config-error-not-leader": + event.defer.assert_called() + # harness.charm.status.set.call_args_list == [ + # call(MaintenanceStatus("Disabling backup service...")), + # call(BlockedStatus("Unexpected error during plugin configuration, check the logs")), + # ] + harness.charm.status.set.assert_any_call(MaintenanceStatus("Disabling backup service...")) + harness.charm.status.set.assert_any_call( + BlockedStatus("Unexpected error during plugin configuration, check the logs") + ) + harness.charm.backup._execute_s3_broken_calls.assert_called_once() + elif test_type == "success": + event.defer.assert_not_called() + harness.charm.status.set.assert_any_call(MaintenanceStatus("Disabling backup service...")) + harness.charm.status.set.assert_any_call(ActiveStatus()) + harness.charm.backup._execute_s3_broken_calls.assert_called_once() + + class TestBackups(unittest.TestCase): + maxDiff = None + def setUp(self) -> None: self.harness = Harness(OpenSearchOperatorCharm) self.addCleanup(self.harness.cleanup) @@ -42,6 +474,10 @@ def setUp(self) -> None: } self.charm.opensearch.is_started = MagicMock(return_value=True) self.charm.health.apply = MagicMock(return_value=HealthColors.GREEN) + # Mock retrials to speed up tests + charms.opensearch.v0.opensearch_backups.wait_fixed = MagicMock( + return_value=tenacity.wait.wait_fixed(0.1) + ) # Replace some unused methods that will be called as part of set_leader with mock self.charm._put_admin_user = MagicMock() @@ -64,7 +500,7 @@ def test_get_endpoint_protocol(self) -> None: @patch("charms.opensearch.v0.opensearch_plugin_manager.OpenSearchPluginManager.status") @patch("charms.opensearch.v0.opensearch_backups.OpenSearchBackup.apply_api_config_if_needed") - @patch("charms.opensearch.v0.opensearch_plugin_manager.OpenSearchPluginManager._apply_config") + @patch("charms.opensearch.v0.opensearch_plugin_manager.OpenSearchPluginManager.apply_config") @patch("charms.opensearch.v0.opensearch_distro.OpenSearchDistribution.version") def test_00_update_relation_data(self, __, mock_apply_config, _, mock_status) -> None: """Tests if new relation without data returns.""" @@ -99,7 +535,7 @@ def test_00_update_relation_data(self, __, mock_apply_config, _, mock_status) -> @patch("charms.opensearch.v0.opensearch_backups.OpenSearchBackup._request") @patch("charms.opensearch.v0.opensearch_distro.OpenSearchDistribution.request") @patch("charms.opensearch.v0.opensearch_plugin_manager.OpenSearchPluginManager.status") - def test_01_apply_api_config_if_needed(self, mock_status, _, mock_request) -> None: + def test_apply_api_config_if_needed(self, mock_status, _, mock_request) -> None: """Tests the application of post-restart steps.""" self.harness.update_relation_data( self.s3_rel_id, @@ -132,12 +568,46 @@ def test_01_apply_api_config_if_needed(self, mock_status, _, mock_request) -> No }, ) + def test_on_list_backups_action(self): + event = MagicMock() + event.params = {"output": "table"} + self.charm.backup._list_backups = MagicMock(return_value={"backup1": {"state": "SUCCESS"}}) + self.charm.backup._generate_backup_list_output = MagicMock( + return_value="backup1 | finished" + ) + self.charm.backup._on_list_backups_action(event) + event.set_results.assert_called_with({"backups": "backup1 | finished"}) + + def test_on_list_backups_action_in_json_format(self): + event = MagicMock() + event.params = {"output": "json"} + self.charm.backup._list_backups = MagicMock(return_value={"backup1": {"state": "SUCCESS"}}) + self.charm.backup._generate_backup_list_output = MagicMock( + return_value="backup1 | finished" + ) + self.charm.backup._on_list_backups_action(event) + event.set_results.assert_called_with({"backups": '{"backup1": {"state": "SUCCESS"}}'}) + + def test_is_restore_complete(self): + rel = MagicMock() + rel.data = {self.charm.app: {"restore_in_progress": "index1,index2"}} + self.charm.model.get_relation = MagicMock(return_value=rel) + self.charm.backup._request = MagicMock( + return_value={ + "index1": {"shards": [{"type": "SNAPSHOT", "stage": "DONE"}]}, + "index2": {"shards": [{"type": "SNAPSHOT", "stage": "DONE"}]}, + "index3": {"shards": [{"type": "PRIMARY", "stage": "DONE"}]}, + } + ) + result = self.charm.backup._is_restore_complete() + self.assertTrue(result) + @patch("charms.opensearch.v0.opensearch_backups.OpenSearchBackup.apply_api_config_if_needed") - @patch("charms.opensearch.v0.opensearch_plugin_manager.OpenSearchPluginManager._apply_config") + @patch("charms.opensearch.v0.opensearch_plugin_manager.OpenSearchPluginManager.apply_config") @patch("charms.opensearch.v0.opensearch_distro.OpenSearchDistribution.request") @patch("charms.opensearch.v0.opensearch_backups.OpenSearchBackup._execute_s3_broken_calls") @patch("charms.opensearch.v0.opensearch_plugin_manager.OpenSearchPluginManager.status") - def test_20_relation_broken( + def test_relation_broken( self, mock_status, mock_execute_s3_broken_calls, @@ -166,3 +636,69 @@ def test_20_relation_broken( ], ).__dict__ ) + + def test_format_backup_list(self): + """Tests the format of the backup list.""" + backup_list = { + "backup1": {"state": "SUCCESS"}, + "backup2": {"state": "FAILED"}, + "backup3": {"state": "IN_PROGRESS"}, + } + self.assertEqual( + self.charm.backup._generate_backup_list_output(backup_list), LIST_BACKUPS_TRIAL + ) + + def test_can_unit_perform_backup_success(self): + plugin_method = "charms.opensearch.v0.opensearch_backups.OpenSearchBackup._plugin_status" + event = MagicMock() + with patch(plugin_method, new_callable=PropertyMock) as mock_plugin_status: + mock_plugin_status.return_value = PluginState.ENABLED + self.charm.backup._check_repo_status = MagicMock( + return_value=BackupServiceState.SUCCESS + ) + self.charm.backup.is_backup_in_progress = MagicMock(return_value=False) + result = self.charm.backup._can_unit_perform_backup(event) + self.assertFalse(event.fail.called) + self.assertTrue(result) + + @patch("charms.opensearch.v0.opensearch_backups.OpenSearchBackup._request") + def test_on_create_backup_action_success(self, mock_request): + event = MagicMock() + self.charm.backup._can_unit_perform_backup = MagicMock(return_value=True) + self.charm.backup.is_backup_in_progress = MagicMock(return_value=False) + self.charm.backup._list_backups = MagicMock(return_value={}) + self.charm.backup.get_service_status = MagicMock(return_value="Backup completed.") + self.charm.backup._on_create_backup_action(event) + event.set_results.assert_called_with({"backup-id": 1, "status": "Backup is running."}) + assert mock_request.call_args[0][0] == "PUT" + assert ( + mock_request.call_args[0][1] + == f"_snapshot/{S3_REPOSITORY}/1?wait_for_completion=false" + ) + + def test_on_create_backup_action_failure(self): + event = MagicMock() + self.charm.backup._can_unit_perform_backup = MagicMock(return_value=False) + self.charm.backup._on_create_backup_action(event) + event.fail.assert_called_with("Failed: backup service is not configured or busy") + + def test_on_create_backup_action_backup_in_progress(self): + event = MagicMock() + self.charm.backup._check_repo_status = MagicMock(return_value=BackupServiceState.SUCCESS) + self.charm.backup.is_backup_in_progress = MagicMock(return_value=True) + plugin_method = "charms.opensearch.v0.opensearch_backups.OpenSearchBackup._plugin_status" + with patch(plugin_method, new_callable=PropertyMock) as mock_plugin_status: + mock_plugin_status.return_value = PluginState.ENABLED + self.charm.backup._on_create_backup_action(event) + mock_plugin_status.assert_called_once() + event.fail.assert_called_with("Failed: backup service is not configured or busy") + + def test_on_create_backup_action_exception(self): + event = MagicMock() + self.charm.backup._can_unit_perform_backup = MagicMock(return_value=True) + self.charm.backup.is_backup_in_progress = MagicMock(return_value=False) + self.charm.backup._list_backups = MagicMock( + side_effect=OpenSearchListBackupError("Backup error") + ) + self.charm.backup._on_create_backup_action(event) + event.fail.assert_called_with("Failed with exception: Backup error") diff --git a/tests/unit/lib/test_opensearch_base_charm.py b/tests/unit/lib/test_opensearch_base_charm.py index 995c58563..90295e87f 100644 --- a/tests/unit/lib/test_opensearch_base_charm.py +++ b/tests/unit/lib/test_opensearch_base_charm.py @@ -201,10 +201,12 @@ def test_on_start( _initialize_security_index.assert_called_once() self.assertTrue(self.peers_data.get(Scope.APP, "security_index_initialised")) + @patch(f"{BASE_LIB_PATH}.opensearch_backups.OpenSearchBackup.is_backup_in_progress") + @patch(f"{BASE_LIB_PATH}.opensearch_backups.OpenSearchBackup._is_restore_complete") @patch(f"{BASE_CHARM_CLASS}._stop_opensearch") @patch(f"{BASE_LIB_PATH}.opensearch_base_charm.cert_expiration_remaining_hours") @patch(f"{BASE_LIB_PATH}.opensearch_users.OpenSearchUserManager.remove_users_and_roles") - def test_on_update_status(self, _, cert_expiration_remaining_hours, _stop_opensearch): + def test_on_update_status(self, _, cert_expiration_remaining_hours, _stop_opensearch, __, ___): """Test on update status.""" with patch( f"{self.OPENSEARCH_DISTRO}.missing_sys_requirements"