diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index 57633a9172..f8f3ad2b23 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -36,7 +36,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 29 +LIBPATCH = 30 INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles" @@ -383,6 +383,16 @@ def _generate_database_privileges_statements( ) return statements + def get_last_archived_wal(self) -> str: + """Get the name of the last archived wal for the current PostgreSQL cluster.""" + try: + with self._connect_to_database() as connection, connection.cursor() as cursor: + cursor.execute("SELECT last_archived_wal FROM pg_stat_archiver;") + return cursor.fetchone()[0] + except psycopg2.Error as e: + logger.error(f"Failed to get PostgreSQL last archived WAL: {e}") + raise PostgreSQLGetPostgreSQLVersionError() + def get_postgresql_text_search_configs(self) -> Set[str]: """Returns the PostgreSQL available text search configs. @@ -629,16 +639,3 @@ def validate_date_style(self, date_style: str) -> bool: return True except psycopg2.Error: return False - - def get_last_archived_wal(self) -> str | None: - """ - Returns: - Name of the last archived wal for the current PostgreSQL cluster. None, if there is no information available. - """ - try: - with self._connect_to_database() as connection, connection.cursor() as cursor: - cursor.execute("SELECT last_archived_wal FROM pg_stat_archiver;") - return cursor.fetchone()[0] - except psycopg2.Error as e: - logger.error(f"Failed to get PostgreSQL last archived WAL: {e}") - raise PostgreSQLGetPostgreSQLVersionError() diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 27b9367ede..d9b491e59a 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -37,6 +37,7 @@ DATABASE_APP_NAME = METADATA["name"] STORAGE_PATH = METADATA["storage"]["pgdata"]["location"] APPLICATION_NAME = "postgresql-test-app" +MOVE_RESTORED_CLUSTER_TO_ANOTHER_BUCKET = "Move restored cluster to another S3 bucket" logger = logging.getLogger(__name__) @@ -1212,7 +1213,11 @@ async def backup_operations( # Wait for the restore to complete. async with ops_test.fast_forward(): - await ops_test.model.wait_for_idle(status="active", timeout=1000) + await ops_test.model.block_until( + lambda: remaining_unit.workload_status_message + == MOVE_RESTORED_CLUSTER_TO_ANOTHER_BUCKET, + timeout=1000, + ) # Check that the backup was correctly restored by having only the first created table. logger.info("checking that the backup was correctly restored") @@ -1257,7 +1262,11 @@ async def backup_operations( # Wait for the restore to complete. async with ops_test.fast_forward(): - await ops_test.model.wait_for_idle(status="active", timeout=1000) + await ops_test.model.block_until( + lambda: remaining_unit.workload_status_message + == MOVE_RESTORED_CLUSTER_TO_ANOTHER_BUCKET, + timeout=1000, + ) # Check that the backup was correctly restored by having only the first created table. primary = await get_primary(ops_test, remaining_unit.name) diff --git a/tests/integration/test_backups.py b/tests/integration/test_backups.py index 36c546bcfb..c3c62e75a1 100644 --- a/tests/integration/test_backups.py +++ b/tests/integration/test_backups.py @@ -14,6 +14,7 @@ from .helpers import ( CHARM_SERIES, DATABASE_APP_NAME, + MOVE_RESTORED_CLUSTER_TO_ANOTHER_BUCKET, backup_operations, construct_endpoint, db_connect, @@ -55,7 +56,7 @@ @pytest.fixture(scope="module") -async def cloud_configs(ops_test: OpsTest, github_secrets) -> None: +async def cloud_configs(github_secrets) -> None: # Define some configurations and credentials. configs = { AWS: { @@ -123,7 +124,6 @@ async def test_backup_aws(ops_test: OpsTest, cloud_configs: Tuple[Dict, Dict], c await ops_test.model.applications[database_app_name].remove_relation( f"{database_app_name}:certificates", f"{tls_certificates_app_name}:certificates" ) - await ops_test.model.wait_for_idle(apps=[database_app_name], status="active", timeout=1000) new_unit_name = f"{database_app_name}/2" diff --git a/tests/integration/test_backups_pitr.py b/tests/integration/test_backups_pitr.py new file mode 100644 index 0000000000..77dcf917c3 --- /dev/null +++ b/tests/integration/test_backups_pitr.py @@ -0,0 +1,285 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +import logging +import uuid +from typing import Dict, Tuple + +import boto3 +import pytest as pytest +from pytest_operator.plugin import OpsTest +from tenacity import Retrying, stop_after_attempt, wait_exponential + +from . import architecture +from .helpers import ( + CHARM_SERIES, + DATABASE_APP_NAME, + MOVE_RESTORED_CLUSTER_TO_ANOTHER_BUCKET, + construct_endpoint, + db_connect, + get_password, + get_primary, + get_unit_address, +) +from .juju_ import juju_major_version + +CANNOT_RESTORE_PITR = "cannot restore PITR, juju debug-log for details" +S3_INTEGRATOR_APP_NAME = "s3-integrator" +if juju_major_version < 3: + TLS_CERTIFICATES_APP_NAME = "tls-certificates-operator" + if architecture.architecture == "arm64": + TLS_CHANNEL = "legacy/edge" + else: + TLS_CHANNEL = "legacy/stable" + TLS_CONFIG = {"generate-self-signed-certificates": "true", "ca-common-name": "Test CA"} +else: + TLS_CERTIFICATES_APP_NAME = "self-signed-certificates" + if architecture.architecture == "arm64": + TLS_CHANNEL = "latest/edge" + else: + TLS_CHANNEL = "latest/stable" + TLS_CONFIG = {"ca-common-name": "Test CA"} + +logger = logging.getLogger(__name__) + +AWS = "AWS" +GCP = "GCP" + + +@pytest.fixture(scope="module") +async def cloud_configs(github_secrets) -> None: + # Define some configurations and credentials. + configs = { + AWS: { + "endpoint": "https://s3.amazonaws.com", + "bucket": "data-charms-testing", + "path": f"/postgresql-vm/{uuid.uuid1()}", + "region": "us-east-1", + }, + GCP: { + "endpoint": "https://storage.googleapis.com", + "bucket": "data-charms-testing", + "path": f"/postgresql-vm/{uuid.uuid1()}", + "region": "", + }, + } + credentials = { + AWS: { + "access-key": github_secrets["AWS_ACCESS_KEY"], + "secret-key": github_secrets["AWS_SECRET_KEY"], + }, + GCP: { + "access-key": github_secrets["GCP_ACCESS_KEY"], + "secret-key": github_secrets["GCP_SECRET_KEY"], + }, + } + yield configs, credentials + # Delete the previously created objects. + logger.info("deleting the previously created backups") + for cloud, config in configs.items(): + session = boto3.session.Session( + aws_access_key_id=credentials[cloud]["access-key"], + aws_secret_access_key=credentials[cloud]["secret-key"], + region_name=config["region"], + ) + s3 = session.resource( + "s3", endpoint_url=construct_endpoint(config["endpoint"], config["region"]) + ) + bucket = s3.Bucket(config["bucket"]) + # GCS doesn't support batch delete operation, so delete the objects one by one. + for bucket_object in bucket.objects.filter(Prefix=config["path"].lstrip("/")): + bucket_object.delete() + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_pitr_backup(ops_test: OpsTest, cloud_configs: Tuple[Dict, Dict], charm) -> None: + """Build, deploy two units of PostgreSQL and do backup. Then, write new data into DB, switch WAL file and test point-in-time-recovery restore action.""" + # Deploy S3 Integrator and TLS Certificates Operator. + await ops_test.model.deploy(S3_INTEGRATOR_APP_NAME) + await ops_test.model.deploy(TLS_CERTIFICATES_APP_NAME, config=TLS_CONFIG, channel=TLS_CHANNEL) + + for cloud, config in cloud_configs[0].items(): + # Deploy and relate PostgreSQL to S3 integrator (one database app for each cloud for now + # as archive_mode is disabled after restoring the backup) and to TLS Certificates Operator + # (to be able to create backups from replicas). + database_app_name = f"{DATABASE_APP_NAME}-{cloud.lower()}" + await ops_test.model.deploy( + charm, + application_name=database_app_name, + num_units=2, + series=CHARM_SERIES, + config={"profile": "testing"}, + ) + await ops_test.model.relate(database_app_name, S3_INTEGRATOR_APP_NAME) + await ops_test.model.relate(database_app_name, TLS_CERTIFICATES_APP_NAME) + + # Configure and set access and secret keys. + logger.info(f"configuring S3 integrator for {cloud}") + await ops_test.model.applications[S3_INTEGRATOR_APP_NAME].set_config(config) + action = await ops_test.model.units.get(f"{S3_INTEGRATOR_APP_NAME}/0").run_action( + "sync-s3-credentials", + **cloud_configs[1][cloud], + ) + await action.wait() + async with ops_test.fast_forward(fast_interval="60s"): + await ops_test.model.wait_for_idle( + apps=[database_app_name, S3_INTEGRATOR_APP_NAME], status="active", timeout=1500 + ) + + primary = await get_primary(ops_test, f"{database_app_name}/0") + for unit in ops_test.model.applications[database_app_name].units: + if unit.name != primary: + replica = unit.name + break + + # Write some data. + password = await get_password(ops_test, primary) + address = get_unit_address(ops_test, primary) + logger.info("creating a table in the database") + with db_connect(host=address, password=password) as connection: + connection.autocommit = True + connection.cursor().execute( + "CREATE TABLE IF NOT EXISTS backup_table_1 (test_column INT);" + ) + connection.close() + + # Run the "create backup" action. + logger.info("creating a backup") + action = await ops_test.model.units.get(replica).run_action("create-backup") + await action.wait() + backup_status = action.results.get("backup-status") + assert backup_status, "backup hasn't succeeded" + await ops_test.model.wait_for_idle( + apps=[database_app_name, S3_INTEGRATOR_APP_NAME], status="active", timeout=1000 + ) + + # Run the "list backups" action. + logger.info("listing the available backups") + action = await ops_test.model.units.get(replica).run_action("list-backups") + await action.wait() + backups = action.results.get("backups") + assert backups, "backups not outputted" + await ops_test.model.wait_for_idle(status="active", timeout=1000) + + # Write some data. + logger.info("creating after-backup data in the database") + with db_connect(host=address, password=password) as connection: + connection.autocommit = True + connection.cursor().execute( + "INSERT INTO backup_table_1 (test_column) VALUES (1), (2), (3), (4), (5);" + ) + connection.close() + with db_connect( + host=address, password=password + ) as connection, connection.cursor() as cursor: + cursor.execute("SELECT current_timestamp;") + after_backup_ts = str(cursor.fetchone()[0]) + connection.close() + with db_connect(host=address, password=password) as connection: + connection.autocommit = True + connection.cursor().execute( + "CREATE TABLE IF NOT EXISTS backup_table_2 (test_column INT);" + ) + connection.close() + with db_connect(host=address, password=password) as connection: + connection.autocommit = True + connection.cursor().execute("SELECT pg_switch_wal();") + connection.close() + + # Scale down to be able to restore. + async with ops_test.fast_forward(): + await ops_test.model.destroy_unit(replica) + await ops_test.model.block_until( + lambda: len(ops_test.model.applications[database_app_name].units) == 1 + ) + + for unit in ops_test.model.applications[database_app_name].units: + remaining_unit = unit + break + + most_recent_backup = backups.split("\n")[-1] + backup_id = most_recent_backup.split()[0] + # Wrong timestamp pointing to one year ahead + wrong_ts = after_backup_ts.replace( + after_backup_ts[:4], str(int(after_backup_ts[:4]) + 1), 1 + ) + + # Run the "restore backup" action with bad PITR parameter. + logger.info("restoring the backup with bad restore-to-time parameter") + action = await remaining_unit.run_action( + "restore", **{"backup-id": backup_id, "restore-to-time": "bad data"} + ) + await action.wait() + assert ( + action.status == "failed" + ), "action must fail with bad restore-to-time parameter, but it succeeded" + + # Run the "restore backup" action with unreachable PITR parameter. + logger.info("restoring the backup with unreachable restore-to-time parameter") + action = await remaining_unit.run_action( + "restore", **{"backup-id": backup_id, "restore-to-time": wrong_ts} + ) + await action.wait() + logger.info("waiting for the database charm to become blocked") + async with ops_test.fast_forward(): + await ops_test.model.block_until( + lambda: remaining_unit.workload_status_message == CANNOT_RESTORE_PITR, + timeout=1000, + ) + logger.info( + "database charm become in blocked state, as supposed to be with unreachable PITR parameter" + ) + + # Run the "restore backup" action. + for attempt in Retrying( + stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30) + ): + with attempt: + logger.info("restoring the backup") + action = await remaining_unit.run_action( + "restore", **{"backup-id": backup_id, "restore-to-time": after_backup_ts} + ) + await action.wait() + restore_status = action.results.get("restore-status") + assert restore_status, "restore hasn't succeeded" + + # Wait for the restore to complete. + async with ops_test.fast_forward(): + await ops_test.model.block_until( + lambda: remaining_unit.workload_status_message + == MOVE_RESTORED_CLUSTER_TO_ANOTHER_BUCKET, + timeout=1000, + ) + + # Check that the backup was correctly restored. + primary = await get_primary(ops_test, remaining_unit.name) + address = get_unit_address(ops_test, primary) + logger.info("checking that the backup was correctly restored") + with db_connect( + host=address, password=password + ) as connection, connection.cursor() as cursor: + cursor.execute( + "SELECT EXISTS (SELECT FROM information_schema.tables" + " WHERE table_schema = 'public' AND table_name = 'backup_table_1');" + ) + assert cursor.fetchone()[ + 0 + ], "backup wasn't correctly restored: table 'backup_table_1' doesn't exist" + cursor.execute("SELECT COUNT(1) FROM backup_table_1;") + assert ( + int(cursor.fetchone()[0]) == 5 + ), "backup wasn't correctly restored: table 'backup_table_1' doesn't have 5 rows" + cursor.execute( + "SELECT EXISTS (SELECT FROM information_schema.tables" + " WHERE table_schema = 'public' AND table_name = 'backup_table_2');" + ) + assert not cursor.fetchone()[ + 0 + ], "backup wasn't correctly restored: table 'backup_table_2' exists" + connection.close() + + # Remove the database app. + await ops_test.model.remove_application(database_app_name, block_until_done=True) + # Remove the TLS operator. + await ops_test.model.remove_application(TLS_CERTIFICATES_APP_NAME, block_until_done=True)