Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-4114] Test: Scale to zero units #347

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9d7bfed
deployment test "zero-units"
BalabaDmitri Feb 6, 2024
938035f
deployment test "zero-units"
BalabaDmitri Mar 4, 2024
7dc328b
Zero-units: continuous writes ON, deploy 3 units, check, scale to 0 u…
BalabaDmitri Mar 12, 2024
b762ec8
Zero-units: continuous writes ON, deploy 3 units, check, scale to 0 u…
BalabaDmitri Mar 12, 2024
0ca9740
Zero-units: continuous writes ON, deploy 3 units, check, scale to 0 u…
BalabaDmitri Mar 12, 2024
04bc51c
run format & lint
BalabaDmitri Mar 12, 2024
171b53f
reduce time out
BalabaDmitri Mar 12, 2024
27c97f4
merge from remote main
BalabaDmitri Mar 12, 2024
8382d0d
remove replication storage list
BalabaDmitri Mar 12, 2024
d467d8c
checking after scale to 2 and checking after scale up to 3
BalabaDmitri Mar 12, 2024
526357b
checking after scale to 2 and checking after scale up to 3
BalabaDmitri Mar 12, 2024
4b64ce9
checking after scale to 2 and checking after scale up to 3
BalabaDmitri Mar 12, 2024
927ad24
run format & lint
BalabaDmitri Mar 12, 2024
a18b1d3
handle error: storage belongs to different cluster
BalabaDmitri Apr 3, 2024
18211ed
handle error: storage belongs to different cluster
BalabaDmitri Apr 4, 2024
d917d88
handling different versions of Postgres of unit
BalabaDmitri Apr 12, 2024
0a0486f
fix unit fixed setting postgresql version into app_peer_data
BalabaDmitri Apr 17, 2024
ab160f3
merge canonical/main
BalabaDmitri Apr 18, 2024
263a1ef
format
Apr 18, 2024
a1b24dd
fix record of postgres version in databags
BalabaDmitri Apr 27, 2024
19574bd
Merge remote-tracking branch 'canorigin/main' into deployment-zero-units
BalabaDmitri Apr 29, 2024
6873326
format & lint
BalabaDmitri Apr 29, 2024
6716eaf
merge canonical/postgresql-operator
BalabaDmitri May 7, 2024
41bfc2f
Merge remote-tracking branch 'canorigin/main' into deployment-zero-units
BalabaDmitri Jun 10, 2024
2b7db14
checking blocked status based using blocking message
BalabaDmitri Jun 11, 2024
ef84bf6
Merge branch 'main' of https://github.com/canonical/postgresql-operat…
BalabaDmitri Jun 11, 2024
e670781
Merge branch 'canonical_main' into deployment-zero-units
BalabaDmitri Jun 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@

PRIMARY_NOT_REACHABLE_MESSAGE = "waiting for primary to be reachable from this unit"
EXTENSIONS_DEPENDENCY_MESSAGE = "Unsatisfied plugin dependencies. Please check the logs"
DIFFERENT_VERSIONS_PSQL_BLOCKING_MESSAGE = "Please select the correct version of postgresql to use. No need to use different versions of postgresql."
marceloneppel marked this conversation as resolved.
Show resolved Hide resolved

Scopes = Literal[APP_SCOPE, UNIT_SCOPE]

Expand Down Expand Up @@ -491,6 +492,11 @@ def _on_peer_relation_changed(self, event: HookEvent):
try:
# Update the members of the cluster in the Patroni configuration on this unit.
self.update_config()
if self._patroni.cluster_system_id_mismatch(unit_name=self.unit.name):
self.unit.status = BlockedStatus(
"Failed to start postgresql. The storage belongs to a third-party cluster"
)
return
except RetryError:
self.unit.status = BlockedStatus("failed to update cluster members on member")
return
Expand Down Expand Up @@ -527,6 +533,8 @@ def _on_peer_relation_changed(self, event: HookEvent):

self._update_new_unit_status()

self._validate_database_version()

# Split off into separate function, because of complexity _on_peer_relation_changed
def _start_stop_pgbackrest_service(self, event: HookEvent) -> None:
# Start or stop the pgBackRest TLS server service when TLS certificate change.
Expand Down Expand Up @@ -1588,6 +1596,20 @@ def client_relations(self) -> List[Relation]:
relations.append(relation)
return relations

def _validate_database_version(self):
"""Checking that only one version of Postgres is used."""
peer_db_version = self.app_peer_data.get("database-version")

if self.unit.is_leader() and peer_db_version is None:
_psql_version = self._patroni.get_postgresql_version()
if _psql_version is not None:
self.app_peer_data.update({"database-version": _psql_version})
return

if peer_db_version != self._patroni.get_postgresql_version():
self.unit.status = BlockedStatus(DIFFERENT_VERSIONS_PSQL_BLOCKING_MESSAGE)
return


if __name__ == "__main__":
main(PostgresqlOperatorCharm)
37 changes: 37 additions & 0 deletions src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

"""Helper class used to manage cluster lifecycle."""

import glob
import logging
import os
import pwd
Expand Down Expand Up @@ -647,3 +648,39 @@ def update_synchronous_node_count(self, units: int = None) -> None:
# Check whether the update was unsuccessful.
if r.status_code != 200:
raise UpdateSyncNodeCountError(f"received {r.status_code}")

def cluster_system_id_mismatch(self, unit_name: str) -> bool:
"""Check if the Patroni service is down.

If there is the error storage belongs to third-party cluster in its logs.

Returns:
"True" if an error occurred due to the fact that the storage belongs to someone else's cluster.
"""
last_log_file = self._last_patroni_log_file()
unit_name = unit_name.replace("/", "-")
if (
f" CRITICAL: system ID mismatch, node {unit_name} belongs to a different cluster:"
in last_log_file
):
return True
return False

def _last_patroni_log_file(self) -> str:
"""Get last log file content of Patroni service.

If there is no available log files, empty line will be returned.

Returns:
Content of last log file of Patroni service.
"""
log_files = glob.glob(f"{PATRONI_LOGS_PATH}/*.log")
if len(log_files) == 0:
return ""
latest_file = max(log_files, key=os.path.getmtime)
try:
with open(latest_file) as last_log_file:
return last_log_file.read()
except OSError as e:
logger.exception("Failed to read last patroni log file", exc_info=e)
return ""
58 changes: 56 additions & 2 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ def storage_id(ops_test, unit_name):
return line.split()[1]


async def add_unit_with_storage(ops_test, app, storage):
async def add_unit_with_storage(ops_test, app, storage, is_blocked: bool = False):
"""Adds unit with storage.

Note: this function exists as a temporary solution until this issue is resolved:
Expand All @@ -820,7 +820,14 @@ async def add_unit_with_storage(ops_test, app, storage):
return_code, _, _ = await ops_test.juju(*add_unit_cmd)
assert return_code == 0, "Failed to add unit with storage"
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=2000)
if is_blocked:
application = ops_test.model.applications[app]
await ops_test.model.block_until(
lambda: "blocked" in {unit.workload_status for unit in application.units},
timeout=1500,
)
else:
await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1500)
assert (
len(ops_test.model.applications[app].units) == expected_units
), "New unit not added to model"
Expand Down Expand Up @@ -862,3 +869,50 @@ async def reused_full_cluster_recovery_storage(ops_test: OpsTest, unit_name) ->
"/var/snap/charmed-postgresql/common/var/log/patroni/patroni.log*",
)
return True


async def get_db_connection(ops_test, dbname, is_primary=True, replica_unit_name=""):
unit_name = await get_primary(ops_test, APP_NAME)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may add the type hint for the returned values and a docstring to make the output even easier to understand. The same also applies to the other functions you created in this file.

password = await get_password(ops_test, APP_NAME)
address = get_unit_address(ops_test, unit_name)
if not is_primary and unit_name != "":
unit_name = replica_unit_name
address = ops_test.model.applications[APP_NAME].units[unit_name].public_address
connection_string = (
f"dbname='{dbname}' user='operator'"
f" host='{address}' password='{password}' connect_timeout=10"
)
return connection_string, unit_name


async def validate_test_data(connection_string):
with psycopg2.connect(connection_string) as connection:
connection.autocommit = True
with connection.cursor() as cursor:
cursor.execute("SELECT data FROM test;")
data = cursor.fetchone()
assert data[0] == "some data"
connection.close()


async def create_test_data(connection_string):
with psycopg2.connect(connection_string) as connection:
connection.autocommit = True
with connection.cursor() as cursor:
# Check that it's possible to write and read data from the database that
# was created for the application.
cursor.execute("DROP TABLE IF EXISTS test;")
cursor.execute("CREATE TABLE test(data TEXT);")
cursor.execute("INSERT INTO test(data) VALUES('some data');")
cursor.execute("SELECT data FROM test;")
data = cursor.fetchone()
assert data[0] == "some data"
connection.close()


async def get_last_added_unit(ops_test, app, prev_units):
curr_units = [unit.name for unit in ops_test.model.applications[app].units]
new_unit = list(set(curr_units) - set(prev_units))[0]
for unit in ops_test.model.applications[app].units:
if new_unit == unit.name:
return unit
34 changes: 17 additions & 17 deletions tests/integration/ha_tests/test_restore_cluster.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
import asyncio
import logging

import pytest
Expand Down Expand Up @@ -37,24 +38,23 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None:
charm = await ops_test.build_charm(".")
async with ops_test.fast_forward():
# Deploy the first cluster with reusable storage
await ops_test.model.deploy(
charm,
application_name=FIRST_APPLICATION,
num_units=3,
series=CHARM_SERIES,
storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}},
config={"profile": "testing"},
await asyncio.gather(
ops_test.model.deploy(
charm,
application_name=FIRST_APPLICATION,
num_units=3,
series=CHARM_SERIES,
storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}},
config={"profile": "testing"},
),
ops_test.model.deploy(
charm,
application_name=SECOND_APPLICATION,
num_units=1,
series=CHARM_SERIES,
config={"profile": "testing"},
),
)

# Deploy the second cluster
await ops_test.model.deploy(
charm,
application_name=SECOND_APPLICATION,
num_units=1,
series=CHARM_SERIES,
config={"profile": "testing"},
)

await ops_test.model.wait_for_idle(status="active", timeout=1500)

# TODO have a better way to bootstrap clusters with existing storage
Expand Down
131 changes: 131 additions & 0 deletions tests/integration/ha_tests/test_self_healing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging

import pytest
from pip._vendor import requests
from pytest_operator.plugin import OpsTest
from tenacity import Retrying, stop_after_delay, wait_fixed

Expand All @@ -15,6 +16,7 @@
get_password,
get_unit_address,
run_command_on_unit,
scale_application,
)
from .conftest import APPLICATION_NAME
from .helpers import (
Expand All @@ -27,10 +29,13 @@
change_patroni_setting,
change_wal_settings,
check_writes,
create_test_data,
cut_network_from_unit,
cut_network_from_unit_without_ip_change,
fetch_cluster_members,
get_controller_machine,
get_db_connection,
get_last_added_unit,
get_patroni_setting,
get_primary,
get_unit_ip,
Expand All @@ -49,8 +54,10 @@
storage_id,
storage_type,
update_restart_condition,
validate_test_data,
wait_network_restore,
)
from .test_restore_cluster import SECOND_APPLICATION
marceloneppel marked this conversation as resolved.
Show resolved Hide resolved

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -78,6 +85,14 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None:
storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}},
config={"profile": "testing"},
)
await ops_test.model.deploy(
charm,
num_units=1,
application_name=SECOND_APPLICATION,
series=CHARM_SERIES,
storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}},
config={"profile": "testing"},
)
# Deploy the continuous writes application charm if it wasn't already deployed.
if not await app_name(ops_test, APPLICATION_NAME):
wait_for_apps = True
Expand Down Expand Up @@ -543,3 +558,119 @@ async def test_network_cut_without_ip_change(
), "Connection is not possible after network restore"

await is_cluster_updated(ops_test, primary_name, use_ip_from_inside=True)


@pytest.mark.group(1)
async def test_deploy_zero_units(ops_test: OpsTest, charm):
"""Scale the database to zero units and scale up again."""
app = await app_name(ops_test)
dbname = f"{APPLICATION_NAME.replace('-', '_')}_first_database"
connection_string, _ = await get_db_connection(ops_test, dbname=dbname)

# Start an application that continuously writes data to the database.
await start_continuous_writes(ops_test, app)

logger.info("checking whether writes are increasing")
await are_writes_increasing(ops_test)

# Connect to the database.
# Create test data.
logger.info("connect to DB and create test table")
await create_test_data(connection_string)

# Test to check the use of different versions postgresql.
# Release of a new version of charm with another version of postgresql,
# it is necessary to implement a test that will check the use of different versions of postgresql.
marceloneppel marked this conversation as resolved.
Show resolved Hide resolved

unit_ip_addresses = []
primary_storage = ""
for unit in ops_test.model.applications[app].units:
# Save IP addresses of units
unit_ip_addresses.append(await get_unit_ip(ops_test, unit.name))

# Save detached storage ID
if await unit.is_leader_from_status():
primary_storage = storage_id(ops_test, unit.name)

logger.info(f"get storage id app: {SECOND_APPLICATION}")
second_storage = ""
for unit in ops_test.model.applications[SECOND_APPLICATION].units:
if await unit.is_leader_from_status():
second_storage = storage_id(ops_test, unit.name)
break

# Scale the database to zero units.
logger.info("scaling database to zero units")
await scale_application(ops_test, app, 0)
await scale_application(ops_test, SECOND_APPLICATION, 0)

# Checking shutdown units.
for unit_ip in unit_ip_addresses:
try:
resp = requests.get(f"http://{unit_ip}:8008")
assert (
resp.status_code != 200
), f"status code = {resp.status_code}, message = {resp.text}"
except requests.exceptions.ConnectionError:
assert True, f"unit host = http://{unit_ip}:8008, all units shutdown"
except Exception as e:
assert False, f"{e} unit host = http://{unit_ip}:8008, something went wrong"

# Scale up to one unit.
logger.info("scaling database to one unit")
await add_unit_with_storage(ops_test, app=app, storage=primary_storage)
await ops_test.model.wait_for_idle(
apps=[APP_NAME, APPLICATION_NAME], status="active", timeout=1500
)

connection_string, _ = await get_db_connection(ops_test, dbname=dbname)
logger.info("checking whether writes are increasing")
await are_writes_increasing(ops_test)

logger.info("check test database data")
await validate_test_data(connection_string)

logger.info("database scaling up to two units using third-party cluster storage")
new_unit = await add_unit_with_storage(
ops_test, app=app, storage=second_storage, is_blocked=True
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: IMHO, it worth to check: we are blocked with the right message (foreign disk).


logger.info(f"remove unit {new_unit.name} with storage from application {SECOND_APPLICATION}")
await ops_test.model.destroy_units(new_unit.name)

await are_writes_increasing(ops_test)

logger.info("check test database data")
await validate_test_data(connection_string)

# Scale up to two units.
logger.info("scaling database to two unit")
prev_units = [unit.name for unit in ops_test.model.applications[app].units]
await scale_application(ops_test, application_name=app, count=2)
unit = await get_last_added_unit(ops_test, app, prev_units)

logger.info(f"check test database data of unit name {unit.name}")
connection_string, _ = await get_db_connection(
ops_test, dbname=dbname, is_primary=False, replica_unit_name=unit.name
)
await validate_test_data(connection_string)
assert await reused_replica_storage(
ops_test, unit_name=unit.name
), "attached storage not properly re-used by Postgresql."

# Scale up to three units.
logger.info("scaling database to three unit")
prev_units = [unit.name for unit in ops_test.model.applications[app].units]
await scale_application(ops_test, application_name=app, count=3)
unit = await get_last_added_unit(ops_test, app, prev_units)

logger.info(f"check test database data of unit name {unit.name}")
connection_string, _ = await get_db_connection(
ops_test, dbname=dbname, is_primary=False, replica_unit_name=unit.name
)
await validate_test_data(connection_string)
assert await reused_replica_storage(
ops_test, unit_name=unit.name
), "attached storage not properly re-used by Postgresql."

await check_writes(ops_test)
Loading
Loading