Skip to content

Commit

Permalink
Add test_smoke test re-open (#480)
Browse files Browse the repository at this point in the history
* Smoke testing. Garbage storage ignorance. Recourses confilcts test.

* Smoke testing. Garbage storage ignorance. Recourses confilcts test.

* Smoke testing. Garbage storage ignorance. Recourses confilcts test.

* Smoke testing. Garbage storage ignorance. Recourses confilcts test.
  • Loading branch information
BON4 authored Jun 7, 2024
1 parent c520bdb commit 52543e5
Show file tree
Hide file tree
Showing 3 changed files with 459 additions and 6 deletions.
238 changes: 235 additions & 3 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright 2022 Canonical Ltd.
# See LICENSE file for licensing details.
import asyncio
import json
import os
import string
import subprocess
Expand All @@ -16,8 +17,12 @@
from kubernetes import config
from kubernetes.client.api import core_v1_api
from kubernetes.stream import stream
from lightkube.core.client import Client
from lightkube.resources.core_v1 import Pod
from lightkube.core.client import Client, GlobalResource
from lightkube.resources.core_v1 import (
PersistentVolume,
PersistentVolumeClaim,
Pod,
)
from pytest_operator.plugin import OpsTest
from tenacity import (
RetryError,
Expand All @@ -32,9 +37,12 @@
APPLICATION_NAME,
app_name,
db_connect,
execute_query_on_unit,
get_password,
get_password_on_unit,
get_primary,
get_unit_address,
run_command_on_unit,
)

PORT = 5432
Expand Down Expand Up @@ -477,7 +485,7 @@ async def get_sync_standby(model: Model, application_name: str) -> str:
async def is_connection_possible(ops_test: OpsTest, unit_name: str) -> bool:
"""Test a connection to a PostgreSQL server."""
app = unit_name.split("/")[0]
password = await get_password(ops_test, database_app_name=app, down_unit=unit_name)
password = await get_password(ops_test, database_app_name=app, unit_name=unit_name)
address = await get_unit_address(ops_test, unit_name)
try:
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
Expand Down Expand Up @@ -819,3 +827,227 @@ async def stop_continuous_writes(ops_test: OpsTest) -> int:
)
action = await action.wait()
return int(action.results["writes"])


async def get_storage_id(ops_test: OpsTest, unit_name: str) -> str:
"""Retrieves storage id associated with provided unit.
Note: this function exists as a temporary solution until this issue is ported to libjuju 2:
https://github.com/juju/python-libjuju/issues/694
"""
model_name = ops_test.model.info.name
proc = subprocess.check_output(f"juju storage --model={model_name}".split())
proc = proc.decode("utf-8")
for line in proc.splitlines():
if "Storage" in line:
continue

if len(line) == 0:
continue

if "detached" in line:
continue

if line.split()[0] == unit_name:
return line.split()[1]


def is_pods_exists(ops_test: OpsTest, unit_name: str) -> bool:
client = Client(namespace=ops_test.model.name)
pods = client.list(Pod, namespace=ops_test.model.name)

for pod in pods:
print(
f"Pod: {pod.metadata.name} STATUS: {pod.status.phase} TAGGED: {unit_name.replace('/', '-')}"
)
if (pod.metadata.name == unit_name.replace("/", "-")) and (pod.status.phase == "Running"):
return True

return False


async def is_storage_exists(ops_test: OpsTest, storage_id: str) -> bool:
"""Returns True if storage exists by provided storage ID."""
complete_command = [
"show-storage",
"-m",
f"{ops_test.controller_name}:{ops_test.model.info.name}",
storage_id,
"--format=json",
]
return_code, stdout, _ = await ops_test.juju(*complete_command)
if return_code != 0:
if return_code == 1:
return storage_id in stdout
raise Exception(
"Expected command %s to succeed instead it failed: %s with code: ",
complete_command,
stdout,
return_code,
)
return storage_id in str(stdout)


@retry(stop=stop_after_attempt(8), wait=wait_fixed(15), reraise=True)
async def create_db(ops_test: OpsTest, app: str, db: str) -> None:
"""Creates database with specified name."""
unit = ops_test.model.applications[app].units[0]
unit_address = await get_unit_address(ops_test, unit.name)
password = await get_password_on_unit(ops_test, "operator", unit, app)

conn = db_connect(unit_address, password)
conn.autocommit = True
cursor = conn.cursor()
cursor.execute(f"CREATE DATABASE {db};")
cursor.close()
conn.close()


@retry(stop=stop_after_attempt(8), wait=wait_fixed(15), reraise=True)
async def check_db(ops_test: OpsTest, app: str, db: str) -> bool:
"""Returns True if database with specified name already exists."""
unit = ops_test.model.applications[app].units[0]
unit_address = await get_unit_address(ops_test, unit.name)
password = await get_password_on_unit(ops_test, "operator", unit, app)

query = await execute_query_on_unit(
unit_address,
password,
f"select datname from pg_catalog.pg_database where datname = '{db}';",
)

if "ERROR" in query:
raise Exception(f"Database check is failed with postgresql err: {query}")

return db in query


async def get_any_deatached_storage(ops_test: OpsTest) -> str:
"""Returns any of the current available deatached storage."""
return_code, storages_list, stderr = await ops_test.juju(
"storage", "-m", f"{ops_test.controller_name}:{ops_test.model.info.name}", "--format=json"
)
if return_code != 0:
raise Exception(f"failed to get storages info with error: {stderr}")

parsed_storages_list = json.loads(storages_list)
for storage_name, storage in parsed_storages_list["storage"].items():
if (str(storage["status"]["current"]) == "detached") and (str(storage["life"] == "alive")):
return storage_name

raise Exception("failed to get deatached storage")


async def check_system_id_mismatch(ops_test: OpsTest, unit_name: str) -> bool:
"""Returns True if system id mismatch if found in logs."""
log_str = f'CRITICAL: system ID mismatch, node {unit_name.replace("/", "-")} belongs to a different cluster'
stdout = await run_command_on_unit(
ops_test,
unit_name,
"""cat /var/log/postgresql/*""",
)

return log_str in str(stdout)


def delete_pvc(ops_test: OpsTest, pvc: GlobalResource):
"""Deletes PersistentVolumeClaim."""
client = Client(namespace=ops_test.model.name)
client.delete(PersistentVolumeClaim, namespace=ops_test.model.name, name=pvc.metadata.name)


def get_pvc(ops_test: OpsTest, unit_name: str):
"""Get PersistentVolumeClaim for unit."""
client = Client(namespace=ops_test.model.name)
pvc_list = client.list(PersistentVolumeClaim, namespace=ops_test.model.name)
for pvc in pvc_list:
if unit_name.replace("/", "-") in pvc.metadata.name:
return pvc
return None


def get_pv(ops_test: OpsTest, unit_name: str):
"""Get PersistentVolume for unit."""
client = Client(namespace=ops_test.model.name)
pv_list = client.list(PersistentVolume, namespace=ops_test.model.name)
for pv in pv_list:
if unit_name.replace("/", "-") in str(pv.spec.hostPath.path):
return pv
return None


def change_pv_reclaim_policy(ops_test: OpsTest, pvc_config: PersistentVolumeClaim, policy: str):
"""Change PersistentVolume reclaim policy config value."""
client = Client(namespace=ops_test.model.name)
res = client.patch(
PersistentVolume,
pvc_config.metadata.name,
{"spec": {"persistentVolumeReclaimPolicy": f"{policy}"}},
namespace=ops_test.model.name,
)
return res


def remove_pv_claimref(ops_test: OpsTest, pv_config: PersistentVolume):
"""Remove claimRef config value for PersistentVolume."""
client = Client(namespace=ops_test.model.name)
client.patch(
PersistentVolume,
pv_config.metadata.name,
{"spec": {"claimRef": None}},
namespace=ops_test.model.name,
)


def change_pvc_pv_name(
pvc_config: PersistentVolumeClaim, pv_name_new: str
) -> PersistentVolumeClaim:
"""Change PersistentVolume name config value for PersistentVolumeClaim."""
pvc_config.spec.volumeName = pv_name_new
del pvc_config.metadata.annotations["pv.kubernetes.io/bind-completed"]
del pvc_config.metadata.uid
return pvc_config


def apply_pvc_config(ops_test: OpsTest, pvc_config: PersistentVolumeClaim):
"""Apply provided PersistentVolumeClaim config."""
client = Client(namespace=ops_test.model.name)
pvc_config.metadata.managedFields = None
client.apply(pvc_config, namespace=ops_test.model.name, field_manager="lightkube")


async def remove_unit_force(ops_test: OpsTest, num_units: int):
"""Removes unit with --force --no-wait."""
app_name_str = await app_name(ops_test)
scale = len(ops_test.model.applications[app_name_str].units) - num_units
complete_command = [
"remove-unit",
f"{app_name_str}",
"--force",
"--no-wait",
"--no-prompt",
"--num-units",
num_units,
]
return_code, stdout, stderr = await ops_test.juju(*complete_command)
if return_code != 0:
raise Exception(
"Expected command %s to succeed instead it failed: %s with err: %s with code: %s",
complete_command,
stdout,
stderr,
return_code,
)

if scale == 0:
await ops_test.model.block_until(
lambda: len(ops_test.model.applications[app_name_str].units) == scale,
timeout=1000,
)
else:
await ops_test.model.wait_for_idle(
apps=[app_name_str],
status="active",
timeout=1000,
wait_for_exact_units=scale,
)
Loading

0 comments on commit 52543e5

Please sign in to comment.