From 52543e5815f598dc1a28f2caebafbdf25de1dd1f Mon Sep 17 00:00:00 2001 From: BON4 Date: Fri, 7 Jun 2024 16:43:43 +0300 Subject: [PATCH] Add `test_smoke` test re-open (#480) * Smoke testing. Garbage storage ignorance. Recourses confilcts test. * Smoke testing. Garbage storage ignorance. Recourses confilcts test. * Smoke testing. Garbage storage ignorance. Recourses confilcts test. * Smoke testing. Garbage storage ignorance. Recourses confilcts test. --- tests/integration/ha_tests/helpers.py | 238 ++++++++++++++++++++++- tests/integration/ha_tests/test_smoke.py | 218 +++++++++++++++++++++ tests/integration/helpers.py | 9 +- 3 files changed, 459 insertions(+), 6 deletions(-) create mode 100644 tests/integration/ha_tests/test_smoke.py diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 987daa7060..e7d5834fb9 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -1,6 +1,7 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. import asyncio +import json import os import string import subprocess @@ -16,8 +17,12 @@ from kubernetes import config from kubernetes.client.api import core_v1_api from kubernetes.stream import stream -from lightkube.core.client import Client -from lightkube.resources.core_v1 import Pod +from lightkube.core.client import Client, GlobalResource +from lightkube.resources.core_v1 import ( + PersistentVolume, + PersistentVolumeClaim, + Pod, +) from pytest_operator.plugin import OpsTest from tenacity import ( RetryError, @@ -32,9 +37,12 @@ APPLICATION_NAME, app_name, db_connect, + execute_query_on_unit, get_password, + get_password_on_unit, get_primary, get_unit_address, + run_command_on_unit, ) PORT = 5432 @@ -477,7 +485,7 @@ async def get_sync_standby(model: Model, application_name: str) -> str: async def is_connection_possible(ops_test: OpsTest, unit_name: str) -> bool: """Test a connection to a PostgreSQL server.""" app = unit_name.split("/")[0] - password = await get_password(ops_test, database_app_name=app, down_unit=unit_name) + password = await get_password(ops_test, database_app_name=app, unit_name=unit_name) address = await get_unit_address(ops_test, unit_name) try: for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): @@ -819,3 +827,227 @@ async def stop_continuous_writes(ops_test: OpsTest) -> int: ) action = await action.wait() return int(action.results["writes"]) + + +async def get_storage_id(ops_test: OpsTest, unit_name: str) -> str: + """Retrieves storage id associated with provided unit. + + Note: this function exists as a temporary solution until this issue is ported to libjuju 2: + https://github.com/juju/python-libjuju/issues/694 + """ + model_name = ops_test.model.info.name + proc = subprocess.check_output(f"juju storage --model={model_name}".split()) + proc = proc.decode("utf-8") + for line in proc.splitlines(): + if "Storage" in line: + continue + + if len(line) == 0: + continue + + if "detached" in line: + continue + + if line.split()[0] == unit_name: + return line.split()[1] + + +def is_pods_exists(ops_test: OpsTest, unit_name: str) -> bool: + client = Client(namespace=ops_test.model.name) + pods = client.list(Pod, namespace=ops_test.model.name) + + for pod in pods: + print( + f"Pod: {pod.metadata.name} STATUS: {pod.status.phase} TAGGED: {unit_name.replace('/', '-')}" + ) + if (pod.metadata.name == unit_name.replace("/", "-")) and (pod.status.phase == "Running"): + return True + + return False + + +async def is_storage_exists(ops_test: OpsTest, storage_id: str) -> bool: + """Returns True if storage exists by provided storage ID.""" + complete_command = [ + "show-storage", + "-m", + f"{ops_test.controller_name}:{ops_test.model.info.name}", + storage_id, + "--format=json", + ] + return_code, stdout, _ = await ops_test.juju(*complete_command) + if return_code != 0: + if return_code == 1: + return storage_id in stdout + raise Exception( + "Expected command %s to succeed instead it failed: %s with code: ", + complete_command, + stdout, + return_code, + ) + return storage_id in str(stdout) + + +@retry(stop=stop_after_attempt(8), wait=wait_fixed(15), reraise=True) +async def create_db(ops_test: OpsTest, app: str, db: str) -> None: + """Creates database with specified name.""" + unit = ops_test.model.applications[app].units[0] + unit_address = await get_unit_address(ops_test, unit.name) + password = await get_password_on_unit(ops_test, "operator", unit, app) + + conn = db_connect(unit_address, password) + conn.autocommit = True + cursor = conn.cursor() + cursor.execute(f"CREATE DATABASE {db};") + cursor.close() + conn.close() + + +@retry(stop=stop_after_attempt(8), wait=wait_fixed(15), reraise=True) +async def check_db(ops_test: OpsTest, app: str, db: str) -> bool: + """Returns True if database with specified name already exists.""" + unit = ops_test.model.applications[app].units[0] + unit_address = await get_unit_address(ops_test, unit.name) + password = await get_password_on_unit(ops_test, "operator", unit, app) + + query = await execute_query_on_unit( + unit_address, + password, + f"select datname from pg_catalog.pg_database where datname = '{db}';", + ) + + if "ERROR" in query: + raise Exception(f"Database check is failed with postgresql err: {query}") + + return db in query + + +async def get_any_deatached_storage(ops_test: OpsTest) -> str: + """Returns any of the current available deatached storage.""" + return_code, storages_list, stderr = await ops_test.juju( + "storage", "-m", f"{ops_test.controller_name}:{ops_test.model.info.name}", "--format=json" + ) + if return_code != 0: + raise Exception(f"failed to get storages info with error: {stderr}") + + parsed_storages_list = json.loads(storages_list) + for storage_name, storage in parsed_storages_list["storage"].items(): + if (str(storage["status"]["current"]) == "detached") and (str(storage["life"] == "alive")): + return storage_name + + raise Exception("failed to get deatached storage") + + +async def check_system_id_mismatch(ops_test: OpsTest, unit_name: str) -> bool: + """Returns True if system id mismatch if found in logs.""" + log_str = f'CRITICAL: system ID mismatch, node {unit_name.replace("/", "-")} belongs to a different cluster' + stdout = await run_command_on_unit( + ops_test, + unit_name, + """cat /var/log/postgresql/*""", + ) + + return log_str in str(stdout) + + +def delete_pvc(ops_test: OpsTest, pvc: GlobalResource): + """Deletes PersistentVolumeClaim.""" + client = Client(namespace=ops_test.model.name) + client.delete(PersistentVolumeClaim, namespace=ops_test.model.name, name=pvc.metadata.name) + + +def get_pvc(ops_test: OpsTest, unit_name: str): + """Get PersistentVolumeClaim for unit.""" + client = Client(namespace=ops_test.model.name) + pvc_list = client.list(PersistentVolumeClaim, namespace=ops_test.model.name) + for pvc in pvc_list: + if unit_name.replace("/", "-") in pvc.metadata.name: + return pvc + return None + + +def get_pv(ops_test: OpsTest, unit_name: str): + """Get PersistentVolume for unit.""" + client = Client(namespace=ops_test.model.name) + pv_list = client.list(PersistentVolume, namespace=ops_test.model.name) + for pv in pv_list: + if unit_name.replace("/", "-") in str(pv.spec.hostPath.path): + return pv + return None + + +def change_pv_reclaim_policy(ops_test: OpsTest, pvc_config: PersistentVolumeClaim, policy: str): + """Change PersistentVolume reclaim policy config value.""" + client = Client(namespace=ops_test.model.name) + res = client.patch( + PersistentVolume, + pvc_config.metadata.name, + {"spec": {"persistentVolumeReclaimPolicy": f"{policy}"}}, + namespace=ops_test.model.name, + ) + return res + + +def remove_pv_claimref(ops_test: OpsTest, pv_config: PersistentVolume): + """Remove claimRef config value for PersistentVolume.""" + client = Client(namespace=ops_test.model.name) + client.patch( + PersistentVolume, + pv_config.metadata.name, + {"spec": {"claimRef": None}}, + namespace=ops_test.model.name, + ) + + +def change_pvc_pv_name( + pvc_config: PersistentVolumeClaim, pv_name_new: str +) -> PersistentVolumeClaim: + """Change PersistentVolume name config value for PersistentVolumeClaim.""" + pvc_config.spec.volumeName = pv_name_new + del pvc_config.metadata.annotations["pv.kubernetes.io/bind-completed"] + del pvc_config.metadata.uid + return pvc_config + + +def apply_pvc_config(ops_test: OpsTest, pvc_config: PersistentVolumeClaim): + """Apply provided PersistentVolumeClaim config.""" + client = Client(namespace=ops_test.model.name) + pvc_config.metadata.managedFields = None + client.apply(pvc_config, namespace=ops_test.model.name, field_manager="lightkube") + + +async def remove_unit_force(ops_test: OpsTest, num_units: int): + """Removes unit with --force --no-wait.""" + app_name_str = await app_name(ops_test) + scale = len(ops_test.model.applications[app_name_str].units) - num_units + complete_command = [ + "remove-unit", + f"{app_name_str}", + "--force", + "--no-wait", + "--no-prompt", + "--num-units", + num_units, + ] + return_code, stdout, stderr = await ops_test.juju(*complete_command) + if return_code != 0: + raise Exception( + "Expected command %s to succeed instead it failed: %s with err: %s with code: %s", + complete_command, + stdout, + stderr, + return_code, + ) + + if scale == 0: + await ops_test.model.block_until( + lambda: len(ops_test.model.applications[app_name_str].units) == scale, + timeout=1000, + ) + else: + await ops_test.model.wait_for_idle( + apps=[app_name_str], + status="active", + timeout=1000, + wait_for_exact_units=scale, + ) diff --git a/tests/integration/ha_tests/test_smoke.py b/tests/integration/ha_tests/test_smoke.py new file mode 100644 index 0000000000..3fe9904945 --- /dev/null +++ b/tests/integration/ha_tests/test_smoke.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python3 +# Copyright 2021 Canonical Ltd. +# See LICENSE file for licensing details. + +import logging +import os +from asyncio import TimeoutError + +import pytest +from pytest_operator.plugin import OpsTest +from tenacity import Retrying, stop_after_delay, wait_fixed + +from ..helpers import ( + CHARM_SERIES, + DATABASE_APP_NAME, + scale_application, +) +from .helpers import ( + apply_pvc_config, + change_pv_reclaim_policy, + change_pvc_pv_name, + check_db, + check_system_id_mismatch, + create_db, + delete_pvc, + get_any_deatached_storage, + get_pv, + get_pvc, + get_storage_id, + is_postgresql_ready, + is_storage_exists, + remove_pv_claimref, + remove_unit_force, +) + +TEST_DATABASE_RELATION_NAME = "test_database" +DUP_DATABASE_APP_NAME = DATABASE_APP_NAME + "2" + +logger = logging.getLogger(__name__) + +env = os.environ +env["KUBECONFIG"] = os.path.expanduser("~/.kube/config") + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_app_force_removal(ops_test: OpsTest): + """Remove unit with force while storage is alive.""" + global primary_pv, primary_pvc + # Deploy the charm. + async with ops_test.fast_forward(): + await ops_test.model.deploy( + DATABASE_APP_NAME, + application_name=DATABASE_APP_NAME, + num_units=1, + channel="14/stable", + series=CHARM_SERIES, + trust=True, + config={"profile": "testing"}, + ) + + await ops_test.model.wait_for_idle(status="active", timeout=1000) + + assert ops_test.model.applications[DATABASE_APP_NAME].units[0].workload_status == "active" + + primary_name = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + + logger.info("waiting for postgresql") + for attempt in Retrying(stop=stop_after_delay(15 * 3), wait=wait_fixed(3), reraise=True): + with attempt: + assert await is_postgresql_ready(ops_test, primary_name) + + # Create test database to check there is no resources conflicts + logger.info("creating db") + await create_db(ops_test, DATABASE_APP_NAME, TEST_DATABASE_RELATION_NAME) + + assert primary_name + + logger.info(f"get pvc for {primary_name}") + primary_pvc = get_pvc(ops_test, primary_name) + + assert primary_pvc + + logger.info(f"get pv for {primary_name}") + primary_pv = get_pv(ops_test, primary_name) + + assert primary_pv + + logger.info("get storage id") + storage_id = await get_storage_id(ops_test, primary_name) + + assert storage_id + + # Force remove unit without storage removal + logger.info("scale to 0 with force") + await remove_unit_force(ops_test, 1) + + # Storage will remain with deatached status + logger.info("werifing is storage exists") + for attempt in Retrying(stop=stop_after_delay(15 * 3), wait=wait_fixed(3), reraise=True): + with attempt: + assert await is_storage_exists(ops_test, storage_id) + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_app_garbage_ignorance(ops_test: OpsTest): + """Test charm deploy in dirty environment with garbage storage.""" + global primary_pv, primary_pvc + async with ops_test.fast_forward(): + logger.info("checking garbage storage") + garbage_storage = None + for attempt in Retrying(stop=stop_after_delay(30 * 3), wait=wait_fixed(3), reraise=True): + with attempt: + garbage_storage = await get_any_deatached_storage(ops_test) + + logger.info("scale to 1") + await scale_application(ops_test, DATABASE_APP_NAME, 1) + + # Timeout is increeced due to k8s Init:CrashLoopBackOff status of postgresql pod + logger.info("waiting for idle") + await ops_test.model.wait_for_idle(status="active", timeout=2000) + assert ops_test.model.applications[DATABASE_APP_NAME].units[0].workload_status == "active" + + logger.info("getting primary") + primary_name = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + + assert primary_name + + logger.info("getting storage id") + storage_id_str = await get_storage_id(ops_test, primary_name) + + assert storage_id_str == garbage_storage + + logger.info("waiting for postgresql") + for attempt in Retrying(stop=stop_after_delay(15 * 3), wait=wait_fixed(3), reraise=True): + with attempt: + assert await is_postgresql_ready(ops_test, primary_name) + + # Check that test database is exists for duplicate application + logger.info("checking db") + assert await check_db(ops_test, DATABASE_APP_NAME, TEST_DATABASE_RELATION_NAME) + + logger.info("scale to 0") + await scale_application(ops_test, DATABASE_APP_NAME, 0) + + logger.info("changing pv reclaim policy") + primary_pv = change_pv_reclaim_policy(ops_test, primary_pv, "Retain") + + logger.info("remove application") + await ops_test.model.remove_application(DATABASE_APP_NAME, block_until_done=True) + + logger.info(f"delete pvc {primary_pvc.metadata.name}") + delete_pvc(ops_test, primary_pvc) + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_app_resources_conflicts(ops_test: OpsTest): + """Test application deploy in dirty environment with garbage storage from another application.""" + global primary_pv, primary_pvc + async with ops_test.fast_forward(): + await ops_test.model.deploy( + DATABASE_APP_NAME, + application_name=DUP_DATABASE_APP_NAME, + num_units=1, + channel="14/stable", + series=CHARM_SERIES, + trust=True, + config={"profile": "testing"}, + ) + + logger.info("waiting for idle") + await ops_test.model.wait_for_idle(status="active", timeout=1000) + assert ( + ops_test.model.applications[DUP_DATABASE_APP_NAME].units[0].workload_status == "active" + ) + + dup_primary_name = ops_test.model.applications[DUP_DATABASE_APP_NAME].units[0].name + + assert dup_primary_name + + logger.info(f"get pvc for {dup_primary_name}") + dup_primary_pvc = get_pvc(ops_test, dup_primary_name) + + assert dup_primary_pvc + + logger.info("scale to 0") + await scale_application(ops_test, DUP_DATABASE_APP_NAME, 0) + + logger.info(f"load and change pv-name config for pvc {dup_primary_pvc.metadata.name}") + dup_primary_pvc = change_pvc_pv_name(dup_primary_pvc, primary_pv.metadata.name) + + logger.info(f"delete pvc {dup_primary_pvc.metadata.name}") + delete_pvc(ops_test, dup_primary_pvc) + + logger.info(f"remove claimref from pv {primary_pv.metadata.name}") + remove_pv_claimref(ops_test, primary_pv) + + logger.info(f"apply pvc for {dup_primary_name}") + apply_pvc_config(ops_test, dup_primary_pvc) + + logger.info("scale to 1") + await ops_test.model.applications[DUP_DATABASE_APP_NAME].scale(1) + + logger.info("waiting for duplicate application to be blocked") + try: + await ops_test.model.wait_for_idle( + apps=[DUP_DATABASE_APP_NAME], timeout=500, status="blocked" + ) + except TimeoutError: + logger.info("Application is not in blocked state. Checking logs...") + + # Since application have postgresql db in storage from external application it should not be able to connect due to new password + logger.info("checking operator password auth") + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3), reraise=True): + with attempt: + assert await check_system_id_mismatch(ops_test, dup_primary_name) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 9d88d75af6..29fe3047f2 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -669,10 +669,13 @@ async def run_command_on_unit(ops_test: OpsTest, unit_name: str, command: str) - the command output if it succeeds, otherwise raises an exception. """ complete_command = f"ssh --container postgresql {unit_name} {command}" - return_code, stdout, _ = await ops_test.juju(*complete_command.split()) + return_code, stdout, stderr = await ops_test.juju(*complete_command.split()) if return_code != 0: raise Exception( - "Expected command %s to succeed instead it failed: %s", command, return_code + "Expected command %s to succeed instead it failed: %s. Code: %s", + command, + stderr, + return_code, ) return stdout @@ -693,7 +696,7 @@ async def scale_application( await model.applications[application_name].scale(scale) if scale == 0: await model.block_until( - lambda: len(model.applications[DATABASE_APP_NAME].units) == scale, + lambda: len(model.applications[application_name].units) == scale, timeout=1000, ) else: