Skip to content

Commit

Permalink
Merge branch 'main' into dpe-4816-missing-jinja
Browse files Browse the repository at this point in the history
  • Loading branch information
dragomirp committed Jul 10, 2024
2 parents 15b5253 + 7299748 commit 54084df
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 47 deletions.
25 changes: 24 additions & 1 deletion lib/charms/data_platform_libs/v0/data_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 37
LIBPATCH = 38

PYDEPS = ["ops>=2.0.0"]

Expand Down Expand Up @@ -2606,6 +2606,14 @@ def set_version(self, relation_id: int, version: str) -> None:
"""
self.update_relation_data(relation_id, {"version": version})

def set_subordinated(self, relation_id: int) -> None:
"""Raises the subordinated flag in the application relation databag.
Args:
relation_id: the identifier for a particular relation.
"""
self.update_relation_data(relation_id, {"subordinated": "true"})


class DatabaseProviderEventHandlers(EventHandlers):
"""Provider-side of the database relation handlers."""
Expand Down Expand Up @@ -2842,6 +2850,21 @@ def _on_relation_created_event(self, event: RelationCreatedEvent) -> None:

def _on_relation_changed_event(self, event: RelationChangedEvent) -> None:
"""Event emitted when the database relation has changed."""
is_subordinate = False
remote_unit_data = None
for key in event.relation.data.keys():
if isinstance(key, Unit) and not key.name.startswith(self.charm.app.name):
remote_unit_data = event.relation.data[key]
elif isinstance(key, Application) and key.name != self.charm.app.name:
is_subordinate = event.relation.data[key].get("subordinated") == "true"

if is_subordinate:
if not remote_unit_data:
return

if remote_unit_data.get("state") != "ready":
return

# Check which data has changed to emit customs events.
diff = self._diff(event)

Expand Down
6 changes: 5 additions & 1 deletion lib/charms/postgresql_k8s/v0/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ class PostgreSQLEnableDisableExtensionError(Exception):
"""Exception raised when enabling/disabling an extension fails."""


class PostgreSQLGetLastArchivedWALError(Exception):
"""Exception raised when retrieving last archived WAL fails."""


class PostgreSQLGetPostgreSQLVersionError(Exception):
"""Exception raised when retrieving PostgreSQL version fails."""

Expand Down Expand Up @@ -391,7 +395,7 @@ def get_last_archived_wal(self) -> str:
return cursor.fetchone()[0]
except psycopg2.Error as e:
logger.error(f"Failed to get PostgreSQL last archived WAL: {e}")
raise PostgreSQLGetPostgreSQLVersionError()
raise PostgreSQLGetLastArchivedWALError()

def get_postgresql_text_search_configs(self) -> Set[str]:
"""Returns the PostgreSQL available text search configs.
Expand Down
76 changes: 69 additions & 7 deletions src/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,43 @@ def result():

def _format_backup_list(self, backup_list) -> str:
"""Formats provided list of backups as a table."""
backups = ["{:<21s} | {:<12s} | {:s}".format("backup-id", "backup-type", "backup-status")]
backups.append("-" * len(backups[0]))
for backup_id, backup_type, backup_status in backup_list:
s3_parameters, _ = self._retrieve_s3_parameters()
backups = [
"Storage bucket name: {:s}".format(s3_parameters["bucket"]),
"Backups base path: {:s}/backup/\n".format(s3_parameters["path"]),
"{:<20s} | {:<12s} | {:<8s} | {:<20s} | {:<23s} | {:<20s} | {:<20s} | {:s}".format(
"backup-id",
"type",
"status",
"reference-backup-id",
"LSN start/stop",
"start-time",
"finish-time",
"backup-path",
),
]
backups.append("-" * len(backups[2]))
for (
backup_id,
backup_type,
backup_status,
reference,
lsn_start_stop,
start,
stop,
path,
) in backup_list:
backups.append(
"{:<21s} | {:<12s} | {:s}".format(backup_id, backup_type, backup_status)
"{:<20s} | {:<12s} | {:<8s} | {:<20s} | {:<23s} | {:<20s} | {:<20s} | {:s}".format(
backup_id,
backup_type,
backup_status,
reference,
lsn_start_stop,
start,
stop,
path,
)
)
return "\n".join(backups)

Expand All @@ -368,11 +400,29 @@ def _generate_backup_list_output(self) -> str:
backups = json.loads(output)[0]["backup"]
for backup in backups:
backup_id, backup_type = self._parse_backup_id(backup["label"])
backup_reference = "None"
if backup["reference"]:
backup_reference, _ = self._parse_backup_id(backup["reference"][-1])
lsn_start_stop = f'{backup["lsn"]["start"]} / {backup["lsn"]["stop"]}'
time_start, time_stop = (
datetime.strftime(datetime.fromtimestamp(stamp), "%Y-%m-%dT%H:%M:%SZ")
for stamp in backup["timestamp"].values()
)
backup_path = f'/{self.stanza_name}/{backup["label"]}'
error = backup["error"]
backup_status = "finished"
if error:
backup_status = f"failed: {error}"
backup_list.append((backup_id, backup_type, backup_status))
backup_list.append((
backup_id,
backup_type,
backup_status,
backup_reference,
lsn_start_stop,
time_start,
time_stop,
backup_path,
))
return self._format_backup_list(backup_list)

def _list_backups(self, show_failed: bool, parse=True) -> OrderedDict[str, str]:
Expand Down Expand Up @@ -594,12 +644,13 @@ def _on_s3_credential_changed(self, event: CredentialsChangedEvent):
event.defer()
return

if self.charm.unit.is_leader():
self.charm.app_peer_data.pop("require-change-bucket-after-restore", None)

# Verify the s3 relation only on the primary.
if not self.charm.is_primary:
return

self.charm.app_peer_data.pop("require-change-bucket-after-restore", None)

try:
self._create_bucket_if_not_exists()
except (ClientError, ValueError):
Expand Down Expand Up @@ -633,6 +684,17 @@ def _on_create_backup_action(self, event) -> None: # noqa: C901
event.fail(error_message)
return

if (
backup_type in ["differential", "incremental"]
and len(self._list_backups(show_failed=False)) == 0
):
error_message = (
f"Invalid backup type: {backup_type}. No previous full backup to reference."
)
logger.error(f"Backup failed: {error_message}")
event.fail(error_message)
return

logger.info(f"A {backup_type} backup has been requested on unit")
can_unit_perform_backup, validation_message = self._can_unit_perform_backup()
if not can_unit_perform_backup:
Expand Down
6 changes: 6 additions & 0 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -1410,6 +1410,12 @@ def _set_primary_status_message(self) -> None:
"""Display 'Primary' in the unit status message if the current unit is the primary."""
try:
if "require-change-bucket-after-restore" in self.app_peer_data:
if self.unit.is_leader():
self.app_peer_data.update({
"restoring-backup": "",
"restore-stanza": "",
"restore-to-time": "",
})
self.unit.status = BlockedStatus(MOVE_RESTORED_CLUSTER_TO_ANOTHER_BUCKET)
return
if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name:
Expand Down
7 changes: 5 additions & 2 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class ProcessRunningError(Exception):
"""Raised when a process is running when it is not expected to be."""


async def are_all_db_processes_down(ops_test: OpsTest, process: str) -> bool:
async def are_all_db_processes_down(ops_test: OpsTest, process: str, signal: str) -> bool:
"""Verifies that all units of the charm do not have the DB process running."""
app = await app_name(ops_test)
if "/" in process:
Expand All @@ -68,7 +68,7 @@ async def are_all_db_processes_down(ops_test: OpsTest, process: str) -> bool:
pgrep_cmd = ("pgrep", "-x", process)

try:
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
for attempt in Retrying(stop=stop_after_delay(400), wait=wait_fixed(3)):
with attempt:
for unit in ops_test.model.applications[app].units:
_, processes, _ = await ops_test.juju("ssh", unit.name, *pgrep_cmd)
Expand All @@ -79,6 +79,9 @@ async def are_all_db_processes_down(ops_test: OpsTest, process: str) -> bool:

# If something was returned, there is a running process.
if len(processes) > 0:
logger.info("Unit %s not yet down" % unit.name)
# Try to rekill the unit
await send_signal_to_process(ops_test, unit.name, process, signal)
raise ProcessRunningError
except RetryError:
return False
Expand Down
10 changes: 9 additions & 1 deletion tests/integration/ha_tests/test_self_healing.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None:


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_storage_re_use(ops_test, continuous_writes):
"""Verifies that database units with attached storage correctly repurpose storage.
Expand Down Expand Up @@ -142,6 +143,7 @@ async def test_storage_re_use(ops_test, continuous_writes):


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
@pytest.mark.parametrize("process", DB_PROCESSES)
async def test_kill_db_process(
ops_test: OpsTest, process: str, continuous_writes, primary_start_timeout
Expand Down Expand Up @@ -170,6 +172,7 @@ async def test_kill_db_process(


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
@pytest.mark.parametrize("process", DB_PROCESSES)
async def test_freeze_db_process(
ops_test: OpsTest, process: str, continuous_writes, primary_start_timeout
Expand Down Expand Up @@ -208,6 +211,7 @@ async def test_freeze_db_process(


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
@pytest.mark.parametrize("process", DB_PROCESSES)
async def test_restart_db_process(
ops_test: OpsTest, process: str, continuous_writes, primary_start_timeout
Expand Down Expand Up @@ -236,6 +240,7 @@ async def test_restart_db_process(


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
@pytest.mark.parametrize("process", DB_PROCESSES)
@pytest.mark.parametrize("signal", ["SIGTERM", "SIGKILL"])
async def test_full_cluster_restart(
Expand Down Expand Up @@ -272,7 +277,7 @@ async def test_full_cluster_restart(
# of all replicas being down at the same time.
try:
assert await are_all_db_processes_down(
ops_test, process
ops_test, process, signal
), "Not all units down at the same time."
finally:
if process == PATRONI_PROCESS:
Expand Down Expand Up @@ -304,6 +309,7 @@ async def test_full_cluster_restart(


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
@pytest.mark.unstable
async def test_forceful_restart_without_data_and_transaction_logs(
ops_test: OpsTest,
Expand Down Expand Up @@ -380,6 +386,7 @@ async def test_forceful_restart_without_data_and_transaction_logs(


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_network_cut(ops_test: OpsTest, continuous_writes, primary_start_timeout):
"""Completely cut and restore network."""
# Locate primary unit.
Expand Down Expand Up @@ -468,6 +475,7 @@ async def test_network_cut(ops_test: OpsTest, continuous_writes, primary_start_t


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_network_cut_without_ip_change(
ops_test: OpsTest, continuous_writes, primary_start_timeout
):
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1150,8 +1150,8 @@ async def backup_operations(
action = await ops_test.model.units.get(replica).run_action("list-backups")
await action.wait()
backups = action.results.get("backups")
# 2 lines for header output, 1 backup line ==> 3 total lines
assert len(backups.split("\n")) == 3, "full backup is not outputted"
# 5 lines for header output, 1 backup line ==> 6 total lines
assert len(backups.split("\n")) == 6, "full backup is not outputted"
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Write some data.
Expand All @@ -1177,8 +1177,8 @@ async def backup_operations(
action = await ops_test.model.units.get(replica).run_action("list-backups")
await action.wait()
backups = action.results.get("backups")
# 2 lines for header output, 2 backup lines ==> 4 total lines
assert len(backups.split("\n")) == 4, "differential backup is not outputted"
# 5 lines for header output, 2 backup lines ==> 7 total lines
assert len(backups.split("\n")) == 7, "differential backup is not outputted"
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Write some data.
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ async def test_restore_on_new_cluster(ops_test: OpsTest, github_secrets, charm)
async with ops_test.fast_forward():
unit = ops_test.model.units.get(f"{database_app_name}/0")
await ops_test.model.block_until(
lambda: unit.workload_status_message == ANOTHER_CLUSTER_REPOSITORY_ERROR_MESSAGE
lambda: unit.workload_status_message == MOVE_RESTORED_CLUSTER_TO_ANOTHER_BUCKET
)

# Check that the backup was correctly restored by having only the first created table.
Expand Down
Loading

0 comments on commit 54084df

Please sign in to comment.