Skip to content

Commit

Permalink
[DPE-4408] Add differential Backup support (#476)
Browse files Browse the repository at this point in the history
* Add Differential backups support

* adapt naame mapping logic

* fix linting

* fix unit tests

* add sugestions + adapt integration tests

* pick correct backup in integration test

* add docstrings + fine tuning tests

* make model deployment wait for idle before relate

* revert fine-tune changes

* reload patroni after restart

* fix unit test

* revert last 2 commits

---------

Co-authored-by: Mykola Marzhan <[email protected]>
  • Loading branch information
lucasgameiroborges and delgod authored May 21, 2024
1 parent 2853224 commit 1a25c39
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 31 deletions.
7 changes: 7 additions & 0 deletions actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@

create-backup:
description: Creates a backup to s3 storage in AWS.
params:
type:
type: string
description: The backup type, the default value is 'full'.
Full backup is a full copy of all data.
Differential backup is a copy only of changed data since the last full backup.
Possible values - full, differential.
get-primary:
description: Get the unit with is the primary/leader in the replication.
get-password:
Expand Down
78 changes: 66 additions & 12 deletions src/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,21 +255,20 @@ def _generate_backup_list_output(self) -> str:
output, _ = self._execute_command(["pgbackrest", "info", "--output=json"])
backups = json.loads(output)[0]["backup"]
for backup in backups:
backup_id = datetime.strftime(
datetime.strptime(backup["label"][:-1], "%Y%m%d-%H%M%S"), "%Y-%m-%dT%H:%M:%SZ"
)
backup_id, backup_type = self._parse_backup_id(backup["label"])
error = backup["error"]
backup_status = "finished"
if error:
backup_status = f"failed: {error}"
backup_list.append((backup_id, "physical", backup_status))
backup_list.append((backup_id, backup_type, backup_status))
return self._format_backup_list(backup_list)

def _list_backups(self, show_failed: bool) -> OrderedDict[str, str]:
def _list_backups(self, show_failed: bool, parse=True) -> OrderedDict[str, str]:
"""Retrieve the list of backups.
Args:
show_failed: whether to also return the failed backups.
parse: whether to convert backup labels to their IDs or not.
Returns:
a dict of previously created backups (id + stanza name) or an empty list
Expand All @@ -286,15 +285,34 @@ def _list_backups(self, show_failed: bool) -> OrderedDict[str, str]:
stanza_name = repository_info["name"]
return OrderedDict[str, str](
(
datetime.strftime(
datetime.strptime(backup["label"][:-1], "%Y%m%d-%H%M%S"), "%Y-%m-%dT%H:%M:%SZ"
),
self._parse_backup_id(backup["label"])[0] if parse else backup["label"],
stanza_name,
)
for backup in backups
if show_failed or not backup["error"]
)

def _parse_backup_id(self, label) -> Tuple[str, str]:
"""Parse backup ID as a timestamp."""
if label[-1] == "F":
timestamp = label
backup_type = "full"
elif label[-1] == "D":
timestamp = label.split("_")[1]
backup_type = "differential"
elif label[-1] == "I":
timestamp = label.split("_")[1]
backup_type = "incremental"
else:
raise ValueError("Unknown label format for backup ID: %s", label)

return (
datetime.strftime(
datetime.strptime(timestamp[:-1], "%Y%m%d-%H%M%S"), "%Y-%m-%dT%H:%M:%SZ"
),
backup_type,
)

def _initialise_stanza(self) -> None:
"""Initialize the stanza.
Expand Down Expand Up @@ -454,8 +472,18 @@ def _on_s3_credential_changed(self, event: CredentialsChangedEvent):

self._initialise_stanza()

def _on_create_backup_action(self, event) -> None:
def _on_create_backup_action(self, event) -> None: # noqa: C901
"""Request that pgBackRest creates a backup."""
backup_type = event.params.get("type", "full").lower()[:4]
if backup_type not in ["full", "diff"]:
error_message = (
f"Invalid backup type: {backup_type}. Possible values: full, differential."
)
logger.error(f"Backup failed: {error_message}")
event.fail(error_message)
return

logger.info(f"A {backup_type} backup has been requested on unit")
can_unit_perform_backup, validation_message = self._can_unit_perform_backup()
if not can_unit_perform_backup:
logger.error(f"Backup failed: {validation_message}")
Expand Down Expand Up @@ -502,7 +530,7 @@ def _on_create_backup_action(self, event) -> None:
"pgbackrest",
f"--stanza={self.stanza_name}",
"--log-level-console=debug",
"--type=full",
f"--type={backup_type}",
"backup",
]
if self.charm.is_primary:
Expand All @@ -523,7 +551,7 @@ def _on_create_backup_action(self, event) -> None:
else:
# Generate a backup id from the current date and time if the backup failed before
# generating the backup label (our backup id).
backup_id = datetime.strftime(datetime.now(), "%Y%m%d-%H%M%SF")
backup_id = self._generate_fake_backup_id(backup_type)

# Upload the logs to S3.
logs = f"""Stdout:
Expand Down Expand Up @@ -664,7 +692,7 @@ def _on_restore_action(self, event):
# Mark the cluster as in a restoring backup state and update the Patroni configuration.
logger.info("Configuring Patroni to restore the backup")
self.charm.app_peer_data.update({
"restoring-backup": f'{datetime.strftime(datetime.strptime(backup_id, "%Y-%m-%dT%H:%M:%SZ"), "%Y%m%d-%H%M%S")}F',
"restoring-backup": self._fetch_backup_from_id(backup_id),
"restore-stanza": backups[backup_id],
})
self.charm.update_config()
Expand All @@ -675,6 +703,32 @@ def _on_restore_action(self, event):

event.set_results({"restore-status": "restore started"})

def _generate_fake_backup_id(self, backup_type: str) -> str:
"""Creates a backup id for failed backup operations (to store log file)."""
if backup_type == "F":
return datetime.strftime(datetime.now(), "%Y%m%d-%H%M%SF")
if backup_type == "D":
backups = self._list_backups(show_failed=False, parse=False).keys()
last_full_backup = None
for label in backups[::-1]:
if label.endswith("F"):
last_full_backup = label
break

if last_full_backup is None:
raise TypeError("Differential backup requested but no previous full backup")
return f'{last_full_backup}_{datetime.strftime(datetime.now(), "%Y%m%d-%H%M%SD")}'

def _fetch_backup_from_id(self, backup_id: str) -> str:
"""Fetches backup's pgbackrest label from backup id."""
timestamp = f'{datetime.strftime(datetime.strptime(backup_id, "%Y-%m-%dT%H:%M:%SZ"), "%Y%m%d-%H%M%S")}'
backups = self._list_backups(show_failed=False, parse=False).keys()
for label in backups:
if timestamp in label:
return label

return None

def _pre_restore_checks(self, event: ActionEvent) -> bool:
"""Run some checks before starting the restore.
Expand Down
3 changes: 3 additions & 0 deletions templates/pgbackrest.conf.j2
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[global]
backup-standby=y
repo1-retention-full=9999999
repo1-retention-history=365
repo1-type=s3
repo1-path={{ path }}
repo1-s3-region={{ region }}
Expand All @@ -9,6 +10,8 @@ repo1-s3-bucket={{ bucket }}
repo1-s3-uri-style={{ s3_uri_style }}
repo1-s3-key={{ access_key }}
repo1-s3-key-secret={{ secret_key }}
repo1-block=y
repo1-bundle=y
start-fast=y
{%- if enable_tls %}
tls-server-address=*
Expand Down
99 changes: 91 additions & 8 deletions tests/integration/test_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async def test_backup_and_restore(ops_test: OpsTest, cloud_configs: Tuple[Dict,
# (to be able to create backups from replicas).
database_app_name = f"{DATABASE_APP_NAME}-{cloud.lower()}"
await build_and_deploy(
ops_test, 2, database_app_name=database_app_name, wait_for_idle=False
ops_test, 2, database_app_name=database_app_name, wait_for_idle=True
)
await ops_test.model.relate(database_app_name, S3_INTEGRATOR_APP_NAME)
await ops_test.model.relate(database_app_name, TLS_CERTIFICATES_APP_NAME)
Expand Down Expand Up @@ -156,7 +156,8 @@ async def test_backup_and_restore(ops_test: OpsTest, cloud_configs: Tuple[Dict,
action = await ops_test.model.units.get(replica).run_action("list-backups")
await action.wait()
backups = action.results.get("backups")
assert backups, "backups not outputted"
# 2 lines for header output, 1 backup line ==> 3 total lines
assert len(backups.split("\n")) == 3, "full backup is not outputted"
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Write some data.
Expand All @@ -166,18 +167,93 @@ async def test_backup_and_restore(ops_test: OpsTest, cloud_configs: Tuple[Dict,
connection.cursor().execute("CREATE TABLE backup_table_2 (test_collumn INT );")
connection.close()

# Run the "create backup" action.
logger.info("creating a backup")
action = await ops_test.model.units.get(replica).run_action(
"create-backup", **{"type": "diff"}
)
await action.wait()
backup_status = action.results.get("backup-status")
assert backup_status, "backup hasn't succeeded"
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Run the "list backups" action.
logger.info("listing the available backups")
action = await ops_test.model.units.get(replica).run_action("list-backups")
await action.wait()
backups = action.results.get("backups")
# 2 lines for header output, 2 backup lines ==> 4 total lines
assert len(backups.split("\n")) == 4, "differential backup is not outputted"
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Write some data.
logger.info("creating a second table in the database")
with db_connect(host=address, password=password) as connection:
connection.autocommit = True
connection.cursor().execute("CREATE TABLE backup_table_3 (test_collumn INT );")
connection.close()
# Scale down to be able to restore.
async with ops_test.fast_forward(fast_interval="60s"):
await scale_application(ops_test, database_app_name, 1)

# Run the "restore backup" action.
# Run the "restore backup" action for differential backup.
for attempt in Retrying(
stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30)
):
with attempt:
logger.info("restoring the backup")
most_recent_backup = backups.split("\n")[-1]
backup_id = most_recent_backup.split()[0]
last_diff_backup = backups.split("\n")[-1]
backup_id = last_diff_backup.split()[0]
action = await ops_test.model.units.get(f"{database_app_name}/0").run_action(
"restore", **{"backup-id": backup_id}
)
await action.wait()
restore_status = action.results.get("restore-status")
assert restore_status, "restore hasn't succeeded"

# Wait for the restore to complete.
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Check that the backup was correctly restored by having only the first created table.
logger.info("checking that the backup was correctly restored")
primary = await get_primary(ops_test, database_app_name)
address = await get_unit_address(ops_test, primary)
with db_connect(
host=address, password=password
) as connection, connection.cursor() as cursor:
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_1');"
)
assert cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_1' doesn't exist"
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_2');"
)
assert cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_2' doesn't exist"
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_3');"
)
assert not cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_3' exists"
connection.close()

# Run the "restore backup" action for full backup.
for attempt in Retrying(
stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30)
):
with attempt:
logger.info("restoring the backup")
last_full_backup = backups.split("\n")[-2]
backup_id = last_full_backup.split()[0]
action = await ops_test.model.units.get(f"{database_app_name}/0").run_action(
"restore", **{"backup-id": backup_id}
)
Expand Down Expand Up @@ -210,6 +286,13 @@ async def test_backup_and_restore(ops_test: OpsTest, cloud_configs: Tuple[Dict,
assert not cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_2' exists"
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_3');"
)
assert not cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_3' exists"
connection.close()

# Run the following steps only in one cloud (it's enough for those checks).
Expand All @@ -225,7 +308,7 @@ async def test_backup_and_restore(ops_test: OpsTest, cloud_configs: Tuple[Dict,
)

# Scale up to be able to test primary and leader being different.
async with ops_test.fast_forward():
async with ops_test.fast_forward(fast_interval="60s"):
await scale_application(ops_test, database_app_name, 2)

logger.info("ensuring that the replication is working correctly")
Expand Down Expand Up @@ -283,9 +366,9 @@ async def test_restore_on_new_cluster(ops_test: OpsTest, github_secrets) -> None
previous_database_app_name = f"{DATABASE_APP_NAME}-gcp"
database_app_name = f"new-{DATABASE_APP_NAME}"
await build_and_deploy(
ops_test, 1, database_app_name=previous_database_app_name, wait_for_idle=False
ops_test, 1, database_app_name=previous_database_app_name, wait_for_idle=True
)
await build_and_deploy(ops_test, 1, database_app_name=database_app_name, wait_for_idle=False)
await build_and_deploy(ops_test, 1, database_app_name=database_app_name, wait_for_idle=True)
await ops_test.model.relate(previous_database_app_name, S3_INTEGRATOR_APP_NAME)
await ops_test.model.relate(database_app_name, S3_INTEGRATOR_APP_NAME)
async with ops_test.fast_forward():
Expand Down
Loading

0 comments on commit 1a25c39

Please sign in to comment.