Skip to content

Commit

Permalink
[DPE-4462] Add Incremental+Differential backup support (#479)
Browse files Browse the repository at this point in the history
* add diff+incr backups

* fix integration test

* remove await from get_unit_address
  • Loading branch information
lucasgameiroborges authored Jun 5, 2024
1 parent fabce25 commit 8c47bae
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 32 deletions.
8 changes: 8 additions & 0 deletions actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@

create-backup:
description: Creates a backup to s3 storage.
params:
type:
type: string
description: The backup type, the default value is 'full'.
Full backup is a full copy of all data.
Differential backup is a copy only of changed data since the last full backup.
Incremental backup is a copy only of changed data since the last backup (any type).
Possible values - full, differential, incremental.
get-primary:
description: Get the unit which is the primary/leader in the replication.
get-password:
Expand Down
93 changes: 77 additions & 16 deletions src/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from constants import (
BACKUP_ID_FORMAT,
BACKUP_TYPE_OVERRIDES,
BACKUP_USER,
PATRONI_CONF_PATH,
PGBACKREST_BACKUP_ID_FORMAT,
Expand Down Expand Up @@ -331,22 +332,20 @@ def _generate_backup_list_output(self) -> str:

backups = json.loads(output)[0]["backup"]
for backup in backups:
backup_id = datetime.strftime(
datetime.strptime(backup["label"][:-1], PGBACKREST_BACKUP_ID_FORMAT),
BACKUP_ID_FORMAT,
)
backup_id, backup_type = self._parse_backup_id(backup["label"])
error = backup["error"]
backup_status = "finished"
if error:
backup_status = f"failed: {error}"
backup_list.append((backup_id, "physical", backup_status))
backup_list.append((backup_id, backup_type, backup_status))
return self._format_backup_list(backup_list)

def _list_backups(self, show_failed: bool) -> OrderedDict[str, str]:
def _list_backups(self, show_failed: bool, parse=True) -> OrderedDict[str, str]:
"""Retrieve the list of backups.
Args:
show_failed: whether to also return the failed backups.
parse: whether to convert backup labels to their IDs or not.
Returns:
a dict of previously created backups (id + stanza name) or an empty list
Expand All @@ -371,16 +370,35 @@ def _list_backups(self, show_failed: bool) -> OrderedDict[str, str]:
stanza_name = repository_info["name"]
return OrderedDict[str, str](
(
datetime.strftime(
datetime.strptime(backup["label"][:-1], PGBACKREST_BACKUP_ID_FORMAT),
BACKUP_ID_FORMAT,
),
self._parse_backup_id(backup["label"])[0] if parse else backup["label"],
stanza_name,
)
for backup in backups
if show_failed or not backup["error"]
)

def _parse_backup_id(self, label) -> Tuple[str, str]:
"""Parse backup ID as a timestamp."""
if label[-1] == "F":
timestamp = label
backup_type = "full"
elif label[-1] == "D":
timestamp = label.split("_")[1]
backup_type = "differential"
elif label[-1] == "I":
timestamp = label.split("_")[1]
backup_type = "incremental"
else:
raise ValueError("Unknown label format for backup ID: %s", label)

return (
datetime.strftime(
datetime.strptime(timestamp[:-1], PGBACKREST_BACKUP_ID_FORMAT),
BACKUP_ID_FORMAT,
),
backup_type,
)

def _initialise_stanza(self) -> None:
"""Initialize the stanza.
Expand Down Expand Up @@ -557,8 +575,16 @@ def _on_s3_credential_gone(self, _) -> None:
if self.charm.is_blocked and self.charm.unit.status.message in S3_BLOCK_MESSAGES:
self.charm.unit.status = ActiveStatus()

def _on_create_backup_action(self, event) -> None:
def _on_create_backup_action(self, event) -> None: # noqa: C901
"""Request that pgBackRest creates a backup."""
backup_type = event.params.get("type", "full")
if backup_type not in BACKUP_TYPE_OVERRIDES:
error_message = f"Invalid backup type: {backup_type}. Possible values: {', '.join(BACKUP_TYPE_OVERRIDES.keys())}."
logger.error(f"Backup failed: {error_message}")
event.fail(error_message)
return

logger.info(f"A {backup_type} backup has been requested on unit")
can_unit_perform_backup, validation_message = self._can_unit_perform_backup()
if not can_unit_perform_backup:
logger.error(f"Backup failed: {validation_message}")
Expand Down Expand Up @@ -600,7 +626,7 @@ def _on_create_backup_action(self, event) -> None:
# (reference: https://github.com/pgbackrest/pgbackrest/issues/2007)
self.charm.update_config(is_creating_backup=True)

self._run_backup(event, s3_parameters, datetime_backup_requested)
self._run_backup(event, s3_parameters, datetime_backup_requested, backup_type)

if not self.charm.is_primary:
# Remove the rule that marks the cluster as in a creating backup state
Expand All @@ -611,14 +637,18 @@ def _on_create_backup_action(self, event) -> None:
self.charm.unit.status = ActiveStatus()

def _run_backup(
self, event: ActionEvent, s3_parameters: Dict, datetime_backup_requested: str
self,
event: ActionEvent,
s3_parameters: Dict,
datetime_backup_requested: str,
backup_type: str,
) -> None:
command = [
PGBACKREST_EXECUTABLE,
PGBACKREST_CONFIGURATION_FILE,
f"--stanza={self.stanza_name}",
"--log-level-console=debug",
"--type=full",
f"--type={BACKUP_TYPE_OVERRIDES[backup_type]}",
"backup",
]
if self.charm.is_primary:
Expand All @@ -638,7 +668,7 @@ def _run_backup(
else:
# Generate a backup id from the current date and time if the backup failed before
# generating the backup label (our backup id).
backup_id = datetime.strftime(datetime.now(), "%Y%m%d-%H%M%SF")
backup_id = self._generate_fake_backup_id(backup_type)

# Upload the logs to S3.
logs = f"""Stdout:
Expand Down Expand Up @@ -750,7 +780,7 @@ def _on_restore_action(self, event):
# Mark the cluster as in a restoring backup state and update the Patroni configuration.
logger.info("Configuring Patroni to restore the backup")
self.charm.app_peer_data.update({
"restoring-backup": f"{datetime.strftime(datetime.strptime(backup_id, BACKUP_ID_FORMAT), PGBACKREST_BACKUP_ID_FORMAT)}F",
"restoring-backup": self._fetch_backup_from_id(backup_id),
"restore-stanza": backups[backup_id],
})
self.charm.update_config()
Expand Down Expand Up @@ -780,6 +810,37 @@ def _on_restore_action(self, event):

event.set_results({"restore-status": "restore started"})

def _generate_fake_backup_id(self, backup_type: str) -> str:
"""Creates a backup id for failed backup operations (to store log file)."""
if backup_type == "full":
return datetime.strftime(datetime.now(), "%Y%m%d-%H%M%SF")
if backup_type == "differential":
backups = self._list_backups(show_failed=False, parse=False).keys()
last_full_backup = None
for label in backups[::-1]:
if label.endswith("F"):
last_full_backup = label
break

if last_full_backup is None:
raise TypeError("Differential backup requested but no previous full backup")
return f'{last_full_backup}_{datetime.strftime(datetime.now(), "%Y%m%d-%H%M%SD")}'
if backup_type == "incremental":
backups = self._list_backups(show_failed=False, parse=False).keys()
if not backups:
raise TypeError("Incremental backup requested but no previous successful backup")
return f'{backups[-1]}_{datetime.strftime(datetime.now(), "%Y%m%d-%H%M%SI")}'

def _fetch_backup_from_id(self, backup_id: str) -> str:
"""Fetches backup's pgbackrest label from backup id."""
timestamp = f'{datetime.strftime(datetime.strptime(backup_id, "%Y-%m-%dT%H:%M:%SZ"), "%Y%m%d-%H%M%S")}'
backups = self._list_backups(show_failed=False, parse=False).keys()
for label in backups:
if timestamp in label:
return label

return None

def _pre_restore_checks(self, event: ActionEvent) -> bool:
"""Run some checks before starting the restore.
Expand Down
1 change: 0 additions & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ def __init__(self, *args):
self.framework.observe(self.on.get_primary_action, self._on_get_primary)
self.framework.observe(self.on[PEER].relation_changed, self._on_peer_relation_changed)
self.framework.observe(self.on.secret_changed, self._on_peer_relation_changed)
self.framework.observe(self.on.secret_remove, self._on_peer_relation_changed)
self.framework.observe(self.on[PEER].relation_departed, self._on_peer_relation_departed)
self.framework.observe(self.on.pgdata_storage_detaching, self._on_pgdata_storage_detaching)
self.framework.observe(self.on.start, self._on_start)
Expand Down
1 change: 1 addition & 0 deletions src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@
ENDPOINT_SIMULTANEOUSLY_BLOCKING_MESSAGE = (
"Please choose one endpoint to use. No need to relate all of them simultaneously!"
)
BACKUP_TYPE_OVERRIDES = {"full": "full", "differential": "diff", "incremental": "incr"}
3 changes: 3 additions & 0 deletions templates/pgbackrest.conf.j2
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ lock-path=/tmp
log-path={{ log_path }}
repo1-retention-full-type=time
repo1-retention-full={{ retention_full }}
repo1-retention-history=365
repo1-type=s3
repo1-path={{ path }}
repo1-s3-region={{ region }}
Expand All @@ -12,6 +13,8 @@ repo1-s3-bucket={{ bucket }}
repo1-s3-uri-style={{ s3_uri_style }}
repo1-s3-key={{ access_key }}
repo1-s3-key-secret={{ secret_key }}
repo1-block=y
repo1-bundle=y
start-fast=y
{%- if enable_tls %}
tls-server-address=*
Expand Down
89 changes: 85 additions & 4 deletions tests/integration/test_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ async def test_backup(ops_test: OpsTest, cloud_configs: Tuple[Dict, Dict], charm
action = await ops_test.model.units.get(replica).run_action("list-backups")
await action.wait()
backups = action.results.get("backups")
assert backups, "backups not outputted"
# 2 lines for header output, 1 backup line ==> 3 total lines
assert len(backups.split("\n")) == 3, "full backup is not outputted"
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Write some data.
Expand All @@ -175,6 +176,32 @@ async def test_backup(ops_test: OpsTest, cloud_configs: Tuple[Dict, Dict], charm
connection.cursor().execute("CREATE TABLE backup_table_2 (test_collumn INT );")
connection.close()

# Run the "create backup" action.
logger.info("creating a backup")
action = await ops_test.model.units.get(replica).run_action(
"create-backup", **{"type": "differential"}
)
await action.wait()
backup_status = action.results.get("backup-status")
assert backup_status, "backup hasn't succeeded"
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Run the "list backups" action.
logger.info("listing the available backups")
action = await ops_test.model.units.get(replica).run_action("list-backups")
await action.wait()
backups = action.results.get("backups")
# 2 lines for header output, 2 backup lines ==> 4 total lines
assert len(backups.split("\n")) == 4, "differential backup is not outputted"
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Write some data.
logger.info("creating a second table in the database")
with db_connect(host=address, password=password) as connection:
connection.autocommit = True
connection.cursor().execute("CREATE TABLE backup_table_3 (test_collumn INT );")
connection.close()
# Scale down to be able to restore.
async with ops_test.fast_forward():
await ops_test.model.destroy_unit(replica)
Expand All @@ -186,14 +213,61 @@ async def test_backup(ops_test: OpsTest, cloud_configs: Tuple[Dict, Dict], charm
remaining_unit = unit
break

# Run the "restore backup" action.
# Run the "restore backup" action for differential backup.
for attempt in Retrying(
stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30)
):
with attempt:
logger.info("restoring the backup")
most_recent_backup = backups.split("\n")[-1]
backup_id = most_recent_backup.split()[0]
last_diff_backup = backups.split("\n")[-1]
backup_id = last_diff_backup.split()[0]
action = await remaining_unit.run_action("restore", **{"backup-id": backup_id})
await action.wait()
restore_status = action.results.get("restore-status")
assert restore_status, "restore hasn't succeeded"

# Wait for the restore to complete.
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Check that the backup was correctly restored by having only the first created table.
logger.info("checking that the backup was correctly restored")
primary = await get_primary(ops_test, remaining_unit.name)
address = get_unit_address(ops_test, primary)
with db_connect(
host=address, password=password
) as connection, connection.cursor() as cursor:
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_1');"
)
assert cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_1' doesn't exist"
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_2');"
)
assert cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_2' doesn't exist"
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_3');"
)
assert not cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_3' exists"
connection.close()

# Run the "restore backup" action for full backup.
for attempt in Retrying(
stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30)
):
with attempt:
logger.info("restoring the backup")
last_full_backup = backups.split("\n")[-2]
backup_id = last_full_backup.split()[0]
action = await remaining_unit.run_action("restore", **{"backup-id": backup_id})
await action.wait()
restore_status = action.results.get("restore-status")
Expand Down Expand Up @@ -224,6 +298,13 @@ async def test_backup(ops_test: OpsTest, cloud_configs: Tuple[Dict, Dict], charm
assert not cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_2' exists"
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_3');"
)
assert not cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_3' exists"
connection.close()

# Run the following steps only in one cloud (it's enough for those checks).
Expand Down
Loading

0 comments on commit 8c47bae

Please sign in to comment.