Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-4462] Add Incremental+Differential backup support #479

Merged
merged 4 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@

create-backup:
description: Creates a backup to s3 storage.
params:
type:
type: string
description: The backup type, the default value is 'full'.
Full backup is a full copy of all data.
Differential backup is a copy only of changed data since the last full backup.
Incremental backup is a copy only of changed data since the last backup (any type).
Possible values - full, differential, incremental.
get-primary:
description: Get the unit which is the primary/leader in the replication.
get-password:
Expand Down
93 changes: 77 additions & 16 deletions src/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from constants import (
BACKUP_ID_FORMAT,
BACKUP_TYPE_OVERRIDES,
BACKUP_USER,
PATRONI_CONF_PATH,
PGBACKREST_BACKUP_ID_FORMAT,
Expand Down Expand Up @@ -331,22 +332,20 @@ def _generate_backup_list_output(self) -> str:

backups = json.loads(output)[0]["backup"]
for backup in backups:
backup_id = datetime.strftime(
datetime.strptime(backup["label"][:-1], PGBACKREST_BACKUP_ID_FORMAT),
BACKUP_ID_FORMAT,
)
backup_id, backup_type = self._parse_backup_id(backup["label"])
error = backup["error"]
backup_status = "finished"
if error:
backup_status = f"failed: {error}"
backup_list.append((backup_id, "physical", backup_status))
backup_list.append((backup_id, backup_type, backup_status))
return self._format_backup_list(backup_list)

def _list_backups(self, show_failed: bool) -> OrderedDict[str, str]:
def _list_backups(self, show_failed: bool, parse=True) -> OrderedDict[str, str]:
"""Retrieve the list of backups.

Args:
show_failed: whether to also return the failed backups.
parse: whether to convert backup labels to their IDs or not.

Returns:
a dict of previously created backups (id + stanza name) or an empty list
Expand All @@ -371,16 +370,35 @@ def _list_backups(self, show_failed: bool) -> OrderedDict[str, str]:
stanza_name = repository_info["name"]
return OrderedDict[str, str](
(
datetime.strftime(
datetime.strptime(backup["label"][:-1], PGBACKREST_BACKUP_ID_FORMAT),
BACKUP_ID_FORMAT,
),
self._parse_backup_id(backup["label"])[0] if parse else backup["label"],
stanza_name,
)
for backup in backups
if show_failed or not backup["error"]
)

def _parse_backup_id(self, label) -> Tuple[str, str]:
"""Parse backup ID as a timestamp."""
if label[-1] == "F":
timestamp = label
backup_type = "full"
elif label[-1] == "D":
timestamp = label.split("_")[1]
backup_type = "differential"
elif label[-1] == "I":
timestamp = label.split("_")[1]
backup_type = "incremental"
else:
raise ValueError("Unknown label format for backup ID: %s", label)

return (
datetime.strftime(
datetime.strptime(timestamp[:-1], PGBACKREST_BACKUP_ID_FORMAT),
BACKUP_ID_FORMAT,
),
backup_type,
)

def _initialise_stanza(self) -> None:
"""Initialize the stanza.

Expand Down Expand Up @@ -557,8 +575,16 @@ def _on_s3_credential_gone(self, _) -> None:
if self.charm.is_blocked and self.charm.unit.status.message in S3_BLOCK_MESSAGES:
self.charm.unit.status = ActiveStatus()

def _on_create_backup_action(self, event) -> None:
def _on_create_backup_action(self, event) -> None: # noqa: C901
"""Request that pgBackRest creates a backup."""
backup_type = event.params.get("type", "full")
if backup_type not in BACKUP_TYPE_OVERRIDES:
error_message = f"Invalid backup type: {backup_type}. Possible values: {', '.join(BACKUP_TYPE_OVERRIDES.keys())}."
logger.error(f"Backup failed: {error_message}")
event.fail(error_message)
return

logger.info(f"A {backup_type} backup has been requested on unit")
can_unit_perform_backup, validation_message = self._can_unit_perform_backup()
if not can_unit_perform_backup:
logger.error(f"Backup failed: {validation_message}")
Expand Down Expand Up @@ -600,7 +626,7 @@ def _on_create_backup_action(self, event) -> None:
# (reference: https://github.com/pgbackrest/pgbackrest/issues/2007)
self.charm.update_config(is_creating_backup=True)

self._run_backup(event, s3_parameters, datetime_backup_requested)
self._run_backup(event, s3_parameters, datetime_backup_requested, backup_type)

if not self.charm.is_primary:
# Remove the rule that marks the cluster as in a creating backup state
Expand All @@ -611,14 +637,18 @@ def _on_create_backup_action(self, event) -> None:
self.charm.unit.status = ActiveStatus()

def _run_backup(
self, event: ActionEvent, s3_parameters: Dict, datetime_backup_requested: str
self,
event: ActionEvent,
s3_parameters: Dict,
datetime_backup_requested: str,
backup_type: str,
) -> None:
command = [
PGBACKREST_EXECUTABLE,
PGBACKREST_CONFIGURATION_FILE,
f"--stanza={self.stanza_name}",
"--log-level-console=debug",
"--type=full",
f"--type={BACKUP_TYPE_OVERRIDES[backup_type]}",
"backup",
]
if self.charm.is_primary:
Expand All @@ -638,7 +668,7 @@ def _run_backup(
else:
# Generate a backup id from the current date and time if the backup failed before
# generating the backup label (our backup id).
backup_id = datetime.strftime(datetime.now(), "%Y%m%d-%H%M%SF")
backup_id = self._generate_fake_backup_id(backup_type)

# Upload the logs to S3.
logs = f"""Stdout:
Expand Down Expand Up @@ -750,7 +780,7 @@ def _on_restore_action(self, event):
# Mark the cluster as in a restoring backup state and update the Patroni configuration.
logger.info("Configuring Patroni to restore the backup")
self.charm.app_peer_data.update({
"restoring-backup": f"{datetime.strftime(datetime.strptime(backup_id, BACKUP_ID_FORMAT), PGBACKREST_BACKUP_ID_FORMAT)}F",
"restoring-backup": self._fetch_backup_from_id(backup_id),
"restore-stanza": backups[backup_id],
})
self.charm.update_config()
Expand Down Expand Up @@ -780,6 +810,37 @@ def _on_restore_action(self, event):

event.set_results({"restore-status": "restore started"})

def _generate_fake_backup_id(self, backup_type: str) -> str:
"""Creates a backup id for failed backup operations (to store log file)."""
if backup_type == "full":
return datetime.strftime(datetime.now(), "%Y%m%d-%H%M%SF")
if backup_type == "differential":
backups = self._list_backups(show_failed=False, parse=False).keys()
last_full_backup = None
for label in backups[::-1]:
if label.endswith("F"):
last_full_backup = label
break

if last_full_backup is None:
raise TypeError("Differential backup requested but no previous full backup")
return f'{last_full_backup}_{datetime.strftime(datetime.now(), "%Y%m%d-%H%M%SD")}'
if backup_type == "incremental":
backups = self._list_backups(show_failed=False, parse=False).keys()
if not backups:
raise TypeError("Incremental backup requested but no previous successful backup")
return f'{backups[-1]}_{datetime.strftime(datetime.now(), "%Y%m%d-%H%M%SI")}'

def _fetch_backup_from_id(self, backup_id: str) -> str:
"""Fetches backup's pgbackrest label from backup id."""
timestamp = f'{datetime.strftime(datetime.strptime(backup_id, "%Y-%m-%dT%H:%M:%SZ"), "%Y%m%d-%H%M%S")}'
backups = self._list_backups(show_failed=False, parse=False).keys()
for label in backups:
if timestamp in label:
return label

return None

def _pre_restore_checks(self, event: ActionEvent) -> bool:
"""Run some checks before starting the restore.

Expand Down
1 change: 0 additions & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ def __init__(self, *args):
self.framework.observe(self.on.get_primary_action, self._on_get_primary)
self.framework.observe(self.on[PEER].relation_changed, self._on_peer_relation_changed)
self.framework.observe(self.on.secret_changed, self._on_peer_relation_changed)
self.framework.observe(self.on.secret_remove, self._on_peer_relation_changed)
self.framework.observe(self.on[PEER].relation_departed, self._on_peer_relation_departed)
self.framework.observe(self.on.pgdata_storage_detaching, self._on_pgdata_storage_detaching)
self.framework.observe(self.on.start, self._on_start)
Expand Down
1 change: 1 addition & 0 deletions src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@
ENDPOINT_SIMULTANEOUSLY_BLOCKING_MESSAGE = (
"Please choose one endpoint to use. No need to relate all of them simultaneously!"
)
BACKUP_TYPE_OVERRIDES = {"full": "full", "differential": "diff", "incremental": "incr"}
3 changes: 3 additions & 0 deletions templates/pgbackrest.conf.j2
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ lock-path=/tmp
log-path={{ log_path }}
repo1-retention-full-type=time
repo1-retention-full={{ retention_full }}
repo1-retention-history=365
taurus-forever marked this conversation as resolved.
Show resolved Hide resolved
repo1-type=s3
repo1-path={{ path }}
repo1-s3-region={{ region }}
Expand All @@ -12,6 +13,8 @@ repo1-s3-bucket={{ bucket }}
repo1-s3-uri-style={{ s3_uri_style }}
repo1-s3-key={{ access_key }}
repo1-s3-key-secret={{ secret_key }}
repo1-block=y
repo1-bundle=y
start-fast=y
{%- if enable_tls %}
tls-server-address=*
Expand Down
89 changes: 85 additions & 4 deletions tests/integration/test_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ async def test_backup(ops_test: OpsTest, cloud_configs: Tuple[Dict, Dict], charm
action = await ops_test.model.units.get(replica).run_action("list-backups")
await action.wait()
backups = action.results.get("backups")
assert backups, "backups not outputted"
# 2 lines for header output, 1 backup line ==> 3 total lines
assert len(backups.split("\n")) == 3, "full backup is not outputted"
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Write some data.
Expand All @@ -175,6 +176,32 @@ async def test_backup(ops_test: OpsTest, cloud_configs: Tuple[Dict, Dict], charm
connection.cursor().execute("CREATE TABLE backup_table_2 (test_collumn INT );")
connection.close()

# Run the "create backup" action.
logger.info("creating a backup")
action = await ops_test.model.units.get(replica).run_action(
"create-backup", **{"type": "differential"}
)
await action.wait()
backup_status = action.results.get("backup-status")
assert backup_status, "backup hasn't succeeded"
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Run the "list backups" action.
logger.info("listing the available backups")
action = await ops_test.model.units.get(replica).run_action("list-backups")
await action.wait()
backups = action.results.get("backups")
# 2 lines for header output, 2 backup lines ==> 4 total lines
assert len(backups.split("\n")) == 4, "differential backup is not outputted"
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Write some data.
logger.info("creating a second table in the database")
with db_connect(host=address, password=password) as connection:
connection.autocommit = True
connection.cursor().execute("CREATE TABLE backup_table_3 (test_collumn INT );")
connection.close()
# Scale down to be able to restore.
async with ops_test.fast_forward():
await ops_test.model.destroy_unit(replica)
Expand All @@ -186,14 +213,61 @@ async def test_backup(ops_test: OpsTest, cloud_configs: Tuple[Dict, Dict], charm
remaining_unit = unit
break

# Run the "restore backup" action.
# Run the "restore backup" action for differential backup.
for attempt in Retrying(
stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30)
):
with attempt:
logger.info("restoring the backup")
most_recent_backup = backups.split("\n")[-1]
backup_id = most_recent_backup.split()[0]
last_diff_backup = backups.split("\n")[-1]
backup_id = last_diff_backup.split()[0]
action = await remaining_unit.run_action("restore", **{"backup-id": backup_id})
await action.wait()
restore_status = action.results.get("restore-status")
assert restore_status, "restore hasn't succeeded"

# Wait for the restore to complete.
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Check that the backup was correctly restored by having only the first created table.
logger.info("checking that the backup was correctly restored")
primary = await get_primary(ops_test, remaining_unit.name)
address = get_unit_address(ops_test, primary)
with db_connect(
host=address, password=password
) as connection, connection.cursor() as cursor:
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_1');"
)
assert cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_1' doesn't exist"
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_2');"
)
assert cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_2' doesn't exist"
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_3');"
)
assert not cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_3' exists"
connection.close()

# Run the "restore backup" action for full backup.
for attempt in Retrying(
stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30)
):
with attempt:
logger.info("restoring the backup")
last_full_backup = backups.split("\n")[-2]
backup_id = last_full_backup.split()[0]
action = await remaining_unit.run_action("restore", **{"backup-id": backup_id})
await action.wait()
restore_status = action.results.get("restore-status")
Expand Down Expand Up @@ -224,6 +298,13 @@ async def test_backup(ops_test: OpsTest, cloud_configs: Tuple[Dict, Dict], charm
assert not cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_2' exists"
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_3');"
)
assert not cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_3' exists"
connection.close()

# Run the following steps only in one cloud (it's enough for those checks).
Expand Down
Loading
Loading