Skip to content

Commit

Permalink
Merge pull request #151 from Aiven-Open/dmitry-potepalov-cassandra-de…
Browse files Browse the repository at this point in the history
…lta-restore

Restore delta backups in Cassandra

#151
  • Loading branch information
kmichel-aiven authored Nov 6, 2023
2 parents 4ea6911 + 054081b commit 59becf1
Show file tree
Hide file tree
Showing 12 changed files with 493 additions and 76 deletions.
1 change: 1 addition & 0 deletions astacus/common/cassandra/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

SNAPSHOT_NAME = "astacus-backup"
SNAPSHOT_GLOB = f"data/*/*/snapshots/{SNAPSHOT_NAME}"
BACKUP_GLOB = "data/*/*/backups/"


class CassandraClientConfiguration(AstacusModel):
Expand Down
34 changes: 33 additions & 1 deletion astacus/common/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
from .magic import DEFAULT_EMBEDDED_FILE_SIZE, StrEnum
from .progress import Progress
from .utils import AstacusModel, now, SizeLimitedFile
from astacus.common.snapshot import SnapshotGroup
from datetime import datetime
from enum import Enum
from pathlib import Path
from pydantic import Field, root_validator
from typing import List, Optional, Sequence
from typing import List, Optional, Sequence, Set

import functools
import socket
Expand All @@ -28,6 +30,15 @@ class Plugin(StrEnum):
flink = "flink"


class NodeFeatures(Enum):
# Added on 2022-11-29, this can be assumed to be supported everywhere after 1 or 2 years
validate_file_hashes = "validate_file_hashes"
# Added on 2023-06-07
snapshot_groups = "snapshot_groups"
# Added on 2023-10-16
release_snapshot_files = "release_snapshot_files"


class Retention(AstacusModel):
# If set, number of backups to retain always (even beyond days)
minimum_backups: Optional[int] = None
Expand Down Expand Up @@ -108,6 +119,26 @@ class SnapshotRequestV2(NodeRequest):
root_globs: Sequence[str] = ()


def create_snapshot_request(
snapshot_groups: Sequence[SnapshotGroup], *, node_features: Set[str]
) -> SnapshotRequestV2 | SnapshotRequest:
if NodeFeatures.snapshot_groups.value in node_features:
return SnapshotRequestV2(
groups=[
SnapshotRequestGroup(
root_glob=group.root_glob,
excluded_names=group.excluded_names,
embedded_file_size_max=group.embedded_file_size_max,
)
for group in snapshot_groups
],
)
# This is a lossy backward compatibility since the extra options are not passed
return SnapshotRequest(
root_globs=[group.root_glob for group in snapshot_groups],
)


class SnapshotHash(AstacusModel):
"""
This class represents something that is to be stored in the object storage.
Expand Down Expand Up @@ -226,6 +257,7 @@ class CassandraRestoreSSTablesRequest(NodeRequest):
table_glob: str
keyspaces_to_skip: Sequence[str]
match_tables_by: CassandraTableMatching
expect_empty_target: bool


# coordinator.api
Expand Down
186 changes: 164 additions & 22 deletions astacus/coordinator/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from astacus.coordinator.cluster import Cluster, Result
from astacus.coordinator.config import CoordinatorNode
from astacus.coordinator.manifest import download_backup_manifest
from astacus.node.api import Features
from collections import Counter
from typing import Any, Counter as TCounter, Dict, Generic, List, Optional, Sequence, Set, Type, TypeVar

Expand Down Expand Up @@ -98,27 +97,14 @@ class SnapshotStep(Step[List[ipc.SnapshotResult]]):

snapshot_groups: Sequence[SnapshotGroup]
snapshot_request: str = "snapshot"
nodes_to_snapshot: Optional[List[CoordinatorNode]] = None

async def run_step(self, cluster: Cluster, context: StepsContext) -> List[ipc.SnapshotResult]:
nodes_metadata = await get_nodes_metadata(cluster)
if all(Features.snapshot_groups.value in node_metadata.features for node_metadata in nodes_metadata):
req: ipc.NodeRequest = ipc.SnapshotRequestV2(
groups=[
ipc.SnapshotRequestGroup(
root_glob=group.root_glob,
excluded_names=group.excluded_names,
embedded_file_size_max=group.embedded_file_size_max,
)
for group in self.snapshot_groups
],
)
else:
# This is a lossy backward compatibility since the extra options are not passed
req = ipc.SnapshotRequest(
root_globs=[group.root_glob for group in self.snapshot_groups],
)
cluster_features = set.intersection(*(set(n.features) for n in nodes_metadata))
req = ipc.create_snapshot_request(self.snapshot_groups, node_features=cluster_features)
start_results = await cluster.request_from_nodes(
self.snapshot_request, method="post", caller="SnapshotStep", req=req
self.snapshot_request, method="post", caller="SnapshotStep", req=req, nodes=self.nodes_to_snapshot
)
return await cluster.wait_successful_results(start_results=start_results, result_class=ipc.SnapshotResult)

Expand Down Expand Up @@ -203,7 +189,7 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> List[ipc.No
snapshot_results = context.get_result(SnapshotStep)
nodes_metadata = await get_nodes_metadata(cluster)
all_nodes_have_release_feature = nodes_metadata and all(
Features.release_snapshot_files.value in n.features for n in nodes_metadata
ipc.NodeFeatures.release_snapshot_files.value in n.features for n in nodes_metadata
)
if not all_nodes_have_release_feature:
logger.info("Skipped SnapshotReleaseStep because some nodes don't support it, node features: %s", nodes_metadata)
Expand Down Expand Up @@ -382,6 +368,160 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> set[str]:
return set(b for b in await self.json_storage.list_jsons() if b.startswith(magic.JSON_BACKUP_PREFIX))


@dataclasses.dataclass
class DeltaManifestsStep(Step[List[ipc.BackupManifest]]):
"""
Download and parse all delta manifests necessary for restore.
Includes only the deltas created after the base backup selected for restore.
Returns manifests sorted by start time.
"""

json_storage: AsyncJsonStorage

async def run_step(self, cluster: Cluster, context: StepsContext) -> List[ipc.BackupManifest]:
backup_manifest = context.get_result(BackupManifestStep)
# Right now does not really matter whether it's end or start, since backup and
# delta operations are mutually exclusive.
# Theoretically we might allow uploading deltas in parallel with base backup,
# in that scenario it makes sense to rely on backup start (because a delta might
# finish uploading while the base is still being uploaded).
delta_names = sorted(d for d in await self.json_storage.list_jsons() if d.startswith(magic.JSON_DELTA_PREFIX))
matching_delta_manifests = []
for delta_name in delta_names:
delta_manifest = await download_backup_manifest(self.json_storage, delta_name)
if delta_manifest.start >= backup_manifest.start:
matching_delta_manifests.append(delta_manifest)
return sorted(matching_delta_manifests, key=lambda m: m.start)


@dataclasses.dataclass
class RestoreDeltasStep(Step[None]):
"""
Restore the delta backups: download and apply to the node.
"""

json_storage: AsyncJsonStorage
storage_name: str
# Delta restore is plugin-dependent, allow to customize it.
restore_delta_url: str
restore_delta_request: ipc.NodeRequest
partial_restore_nodes: Optional[List[ipc.PartialRestoreRequestNode]] = None
delta_manifests_step: Type[Step[List[ipc.BackupManifest]]] = DeltaManifestsStep

async def run_step(self, cluster: Cluster, context: StepsContext) -> None:
deltas_to_restore = sorted(context.get_result(self.delta_manifests_step), key=lambda m: m.start)

for delta_manifest in deltas_to_restore:
delta_name = delta_manifest.filename
# Since deltas can be uploaded from a different set of nodes than the base backup,
# explicitly re-match current nodes to delta manifest nodes on each restore.
if self.partial_restore_nodes and not self.contains_partial_restore_hostnames(delta_manifest):
logger.info("Skipped %s, because no data from it maps to partial restore nodes")
continue
node_to_backup_index = get_node_to_backup_index(
partial_restore_nodes=self.partial_restore_nodes,
snapshot_results=delta_manifest.snapshot_results,
nodes=cluster.nodes,
)
nodes = [
cluster.nodes[node_index]
for node_index, backup_index in enumerate(node_to_backup_index)
if backup_index is not None
]
await self.download_delta(
delta_name,
nodes=nodes,
cluster=cluster,
node_to_backup_index=node_to_backup_index,
delta_manifest=delta_manifest,
)
await self.restore_delta(delta_name, nodes=nodes, cluster=cluster)
await self.clear_delta(
delta_name,
nodes=nodes,
cluster=cluster,
node_to_backup_index=node_to_backup_index,
delta_manifest=delta_manifest,
)

def contains_partial_restore_hostnames(self, manifest: ipc.BackupManifest) -> bool:
assert self.partial_restore_nodes is not None
partial_restore_hostnames = {pr.backup_hostname for pr in self.partial_restore_nodes}
return any(sr.hostname in partial_restore_hostnames for sr in manifest.snapshot_results)

async def download_delta(
self,
delta_name: str,
*,
nodes: List[CoordinatorNode],
cluster: Cluster,
node_to_backup_index: List[Optional[int]],
delta_manifest: ipc.BackupManifest,
) -> None:
reqs: List[ipc.NodeRequest] = []
for backup_index in node_to_backup_index:
if backup_index is not None:
snapshot_result = delta_manifest.snapshot_results[backup_index]
assert snapshot_result.state is not None
reqs.append(
ipc.SnapshotDownloadRequest(
storage=self.storage_name,
backup_name=delta_name,
snapshot_index=backup_index,
root_globs=snapshot_result.state.root_globs,
)
)
start_results = await cluster.request_from_nodes(
"delta/download",
method="post",
caller="restore_deltas",
reqs=reqs,
nodes=nodes,
)
if not start_results:
raise StepFailedError(f"Initiating delta {delta_name} download failed")
await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult)

async def restore_delta(self, delta_name: str, *, nodes: List[CoordinatorNode], cluster: Cluster) -> None:
start_results = await cluster.request_from_nodes(
self.restore_delta_url,
method="post",
caller="restore_deltas",
req=self.restore_delta_request,
nodes=nodes,
)
if not start_results:
raise StepFailedError(f"Initiating delta {delta_name} restore failed")
await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult)

async def clear_delta(
self,
delta_name: str,
*,
nodes: List[CoordinatorNode],
cluster: Cluster,
node_to_backup_index: List[Optional[int]],
delta_manifest: ipc.BackupManifest,
) -> None:
reqs: List[ipc.NodeRequest] = []
for backup_index in node_to_backup_index:
if backup_index is not None:
snapshot_result = delta_manifest.snapshot_results[backup_index]
assert snapshot_result.state is not None
reqs.append(ipc.SnapshotClearRequest(root_globs=snapshot_result.state.root_globs))
start_results = await cluster.request_from_nodes(
"delta/clear",
method="post",
caller="restore_deltas",
reqs=reqs,
nodes=nodes,
)
if not start_results:
raise StepFailedError(f"Initiating delta {delta_name} clear failed")
await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult)


@dataclasses.dataclass
class ComputeKeptBackupsStep(Step[set[str]]):
"""
Expand Down Expand Up @@ -635,7 +775,7 @@ async def upload_node_index_datas(
start_results: List[Optional[Result]] = []
nodes_metadata = await get_nodes_metadata(cluster)
for data in node_index_datas:
if Features.validate_file_hashes.value in nodes_metadata[data.node_index].features:
if ipc.NodeFeatures.validate_file_hashes.value in nodes_metadata[data.node_index].features:
req: ipc.NodeRequest = ipc.SnapshotUploadRequestV20221129(
hashes=data.sshashes, storage=storage_name, validate_file_hashes=validate_file_hashes
)
Expand All @@ -650,8 +790,10 @@ async def upload_node_index_datas(
return await cluster.wait_successful_results(start_results=start_results, result_class=ipc.SnapshotUploadResult)


async def get_nodes_metadata(cluster: Cluster) -> list[ipc.MetadataResult]:
metadata_responses = await cluster.request_from_nodes("metadata", caller="get_nodes_metadata", method="get")
async def get_nodes_metadata(
cluster: Cluster, *, nodes: Sequence[CoordinatorNode] | None = None
) -> list[ipc.MetadataResult]:
metadata_responses = await cluster.request_from_nodes("metadata", caller="get_nodes_metadata", method="get", nodes=nodes)
return [
ipc.MetadataResult(version="", features=[]) if response is None else ipc.MetadataResult.parse_obj(response)
for response in metadata_responses
Expand Down
Loading

0 comments on commit 59becf1

Please sign in to comment.