Skip to content

Commit

Permalink
[DPE-3660] Add backups testing infra (#181)
Browse files Browse the repository at this point in the history
Re adds the backup testing infra + AWS testing for backup, as described
in [DPE-3660](https://warthogs.atlassian.net/browse/DPE-3660), on top of
the new CI testing infra.

It is broken in two types of tests:
1. Per cloud test: backup, restore, relation removal/readd, disaster
recovery
2. All clouds: test configuration changes and backup / restore, test
expected API call responses

The tests will use different groups, so they can be executed in parallel.

[DPE-3660]:
https://warthogs.atlassian.net/browse/DPE-3660?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ

---------

Co-authored-by: Mehdi Bendriss <[email protected]>
  • Loading branch information
phvalguima and Mehdi-Bendriss authored Apr 15, 2024
1 parent 3ad1deb commit 09252b7
Show file tree
Hide file tree
Showing 4 changed files with 687 additions and 27 deletions.
6 changes: 3 additions & 3 deletions lib/charms/opensearch/v0/opensearch_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def _on_list_backups_action(self, event: ActionEvent) -> None:
f"List backups action failed - {str(e)} - check the application logs for the full stack trace."
)
if event.params.get("output").lower() == "json":
event.set_results({"backups": (json.dumps(backups)).replace("_", "-")})
event.set_results({"backups": json.dumps(backups)})
elif event.params.get("output").lower() == "table":
event.set_results({"backups": self._generate_backup_list_output(backups)})
else:
Expand Down Expand Up @@ -540,9 +540,9 @@ def apply_api_config_if_needed(self) -> None:
# (3) based on the response, set the message status
if state != BackupServiceState.SUCCESS:
logger.error(f"Failed to setup backup service with state {state}")
self.charm.status.set(BlockedStatus(BackupSetupFailed))
self.charm.status.set(BlockedStatus(BackupSetupFailed), app=True)
return
self.charm.status.clear(BackupSetupFailed)
self.charm.status.clear(BackupSetupFailed, app=True)
self.charm.status.clear(BackupConfigureStart)

def _on_s3_broken(self, event: EventBase) -> None: # noqa: C901
Expand Down
92 changes: 70 additions & 22 deletions tests/integration/ha/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

import asyncio
import json
import logging
import subprocess
import time
from typing import Dict, List, Optional

from charms.opensearch.v0.models import Node
from charms.opensearch.v0.opensearch_backups import S3_REPOSITORY
from pytest_operator.plugin import OpsTest
from tenacity import (
RetryError,
Expand All @@ -23,6 +25,7 @@
get_application_unit_ids,
get_application_unit_ids_hostnames,
get_application_unit_ids_ips,
get_leader_unit_ip,
http_request,
juju_version_major,
run_action,
Expand Down Expand Up @@ -217,7 +220,7 @@ async def assert_continuous_writes_increasing(
) -> None:
"""Asserts that the continuous writes are increasing."""
writes_count = await c_writes.count()
time.sleep(5)
await asyncio.sleep(20)
more_writes = await c_writes.count()
assert more_writes > writes_count, "Writes not continuing to DB"

Expand All @@ -227,6 +230,7 @@ async def assert_continuous_writes_consistency(
) -> None:
"""Continuous writes checks."""
result = await c_writes.stop()
logger.info(f"Continuous writes result: {result}")
assert result.max_stored_id == result.count - 1
assert result.max_stored_id == result.last_expected_id

Expand Down Expand Up @@ -457,28 +461,23 @@ async def print_logs(ops_test: OpsTest, app: str, unit_id: int, msg: str) -> str
return msg


async def wait_for_backup(ops_test, leader_id):
async def wait_for_backup_system_to_settle(ops_test: OpsTest, leader_id: int, unit_ip: str):
"""Waits the backup to finish and move to the finished state or throws a RetryException."""
for attempt in Retrying(stop=stop_after_attempt(8), wait=wait_fixed(15)):
with attempt:
# First, check if current backups are finished
action = await run_action(
ops_test, leader_id, "list-backups", params={"output": "json"}
)
# Expected format:
# namespace(status='completed', response={'return-code': 0, 'backups': '{"1": ...}'})
backups = json.loads(action.response["backups"])
logger.debug(f"Backups recovered: {backups}")
if action.status == "completed" and len(backups) > 0:
logger.debug(f"list-backups output: {action}")
return

raise Exception("Backup not finished yet")

if action.status != "completed" or len(backups) == 0:
raise Exception("Failed to retrieve backup list or list is empty")

async def wait_restore_finish(ops_test, unit_ip):
"""Waits the backup to finish and move to the finished state or throws a RetryException."""
for attempt in Retrying(stop=stop_after_attempt(8), wait=wait_fixed(15)):
with attempt:
logger.debug(f"list-backups output: {action}")
# Now, check if we have finished the restore
indices_status = await http_request(
ops_test,
"GET",
Expand All @@ -488,7 +487,18 @@ async def wait_restore_finish(ops_test, unit_ip):
# Now, check the status of each shard
for shard in info["shards"]:
if shard["type"] == "SNAPSHOT" and shard["stage"] != "DONE":
raise Exception()
raise Exception(f"Recovery failed for shard {shard}")


async def delete_backup(ops_test: OpsTest, backup_id: int) -> None:
"""Deletes a backup."""
# Now, check if we have finished the restore
unit_ip = await get_leader_unit_ip(ops_test)
await http_request(
ops_test,
"DELETE",
f"https://{unit_ip}:9200/_snapshot/{S3_REPOSITORY}/{backup_id}",
)


async def start_and_check_continuous_writes(ops_test: OpsTest, unit_ip: str, app: str) -> bool:
Expand All @@ -505,24 +515,62 @@ async def start_and_check_continuous_writes(ops_test: OpsTest, unit_ip: str, app
)
writer = ContinuousWrites(ops_test, app, initial_count=initial_count)
await writer.start()
time.sleep(10)
result = await writer.stop()
return result.count > initial_count
time.sleep(60)
# Ensure we have writes happening and the index is consistent at the end
await assert_continuous_writes_increasing(writer)
await assert_continuous_writes_consistency(ops_test, writer, app)
# Clear the writer manually, as we are not using the conftest c_writes_runner to do so
await writer.clear()


async def backup_cluster(ops_test: OpsTest, leader_id: int) -> int:
async def create_backup(ops_test: OpsTest, leader_id: int, unit_ip: str) -> int:
"""Runs the backup of the cluster."""
action = await run_action(ops_test, leader_id, "create-backup")
assert action.status == "completed"
logger.debug(f"create-backup output: {action}")

await wait_for_backup(ops_test, leader_id)
await wait_for_backup_system_to_settle(ops_test, leader_id, unit_ip)
assert action.status == "completed"
assert action.response["status"] == "Backup is running."
return int(action.response["backup-id"])


async def restore_cluster(ops_test: OpsTest, backup_id: int, unit_ip: str, leader_id: int) -> bool:
action = await run_action(ops_test, leader_id, "restore", params={"backup-id": backup_id})
async def restore(ops_test: OpsTest, backup_id: int, unit_ip: str, leader_id: int) -> bool:
"""Restores a backup."""
id = backup_id
if not isinstance(backup_id, int):
id = int(backup_id)
action = await run_action(ops_test, leader_id, "restore", params={"backup-id": id})
logger.debug(f"restore output: {action}")

await wait_restore_finish(ops_test, unit_ip)
await wait_for_backup_system_to_settle(ops_test, leader_id, unit_ip)
return action.status == "completed"


async def list_backups(ops_test: OpsTest, leader_id: int) -> Dict[str, str]:
action = await run_action(ops_test, leader_id, "list-backups", params={"output": "json"})
assert action.status == "completed"
return json.loads(action.response["backups"])


async def assert_restore_indices_and_compare_consistency(
ops_test: OpsTest, app: str, leader_id: int, unit_ip: str, backup_id: int
) -> None:
"""Ensures that continuous writes index has at least the value below.
assert new_count >= <current-doc-count> * (1 - loss) documents.
"""
original_count = await index_docs_count(ops_test, app, unit_ip, ContinuousWrites.INDEX_NAME)
# As stated on: https://discuss.elastic.co/t/how-to-parse-snapshot-dat-file/218888,
# the only way to discover the documents in a backup is to recover it and check
# on opensearch.
# The logic below will run over each backup id, restore it and ensure continuous writes
# index loss is within the "loss" parameter.
assert await restore(ops_test, backup_id, unit_ip, leader_id)
new_count = await index_docs_count(ops_test, app, unit_ip, ContinuousWrites.INDEX_NAME)
logger.info(
f"Testing restore for {ContinuousWrites.INDEX_NAME} - "
f"original count pre-restore: {original_count}, and now, new count: {new_count}"
)
# We expect that new_count has a loss of documents and the numbers are different.
# Check if we have data but not all of it.
assert new_count > 0 and new_count < original_count
9 changes: 7 additions & 2 deletions tests/integration/ha/helpers_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,17 @@ async def index_docs_count(
retries: int = 15,
) -> int:
"""Returns the number of documents in an index."""
endpoint = f"https://{unit_ip}:9200/{index_name}/_count"
endpoint = f"https://{unit_ip}:9200/{index_name}/"
for attempt in Retrying(
stop=stop_after_attempt(retries), wait=wait_fixed(wait=5) + wait_random(0, 5)
):
with attempt: # Raises RetryError if failed after "retries"
resp = await http_request(ops_test, "GET", endpoint, app=app)
# We need to refresh and then count the docs
resp = await http_request(ops_test, "POST", endpoint + "_refresh", app=app)
logger.debug(f"Index refresh response: {resp}")

resp = await http_request(ops_test, "GET", endpoint + "_count", app=app)
logger.debug(f"Index count response: {resp['count']}")
if isinstance(resp["count"], int):
return resp["count"]
return int(resp["count"])
Loading

0 comments on commit 09252b7

Please sign in to comment.