diff --git a/astacus/common/cassandra/config.py b/astacus/common/cassandra/config.py index 3f0c12fc..99d9d0cf 100644 --- a/astacus/common/cassandra/config.py +++ b/astacus/common/cassandra/config.py @@ -25,6 +25,7 @@ SNAPSHOT_NAME = "astacus-backup" SNAPSHOT_GLOB = f"data/*/*/snapshots/{SNAPSHOT_NAME}" +BACKUP_GLOB = "data/*/*/backups/" class CassandraClientConfiguration(AstacusModel): diff --git a/astacus/common/ipc.py b/astacus/common/ipc.py index aca2bc3d..efe5cf83 100644 --- a/astacus/common/ipc.py +++ b/astacus/common/ipc.py @@ -9,10 +9,12 @@ from .magic import DEFAULT_EMBEDDED_FILE_SIZE, StrEnum from .progress import Progress from .utils import AstacusModel, now, SizeLimitedFile +from astacus.common.snapshot import SnapshotGroup from datetime import datetime +from enum import Enum from pathlib import Path from pydantic import Field, root_validator -from typing import List, Optional, Sequence +from typing import List, Optional, Sequence, Set import functools import socket @@ -28,6 +30,15 @@ class Plugin(StrEnum): flink = "flink" +class NodeFeatures(Enum): + # Added on 2022-11-29, this can be assumed to be supported everywhere after 1 or 2 years + validate_file_hashes = "validate_file_hashes" + # Added on 2023-06-07 + snapshot_groups = "snapshot_groups" + # Added on 2023-10-16 + release_snapshot_files = "release_snapshot_files" + + class Retention(AstacusModel): # If set, number of backups to retain always (even beyond days) minimum_backups: Optional[int] = None @@ -108,6 +119,26 @@ class SnapshotRequestV2(NodeRequest): root_globs: Sequence[str] = () +def create_snapshot_request( + snapshot_groups: Sequence[SnapshotGroup], *, node_features: Set[str] +) -> SnapshotRequestV2 | SnapshotRequest: + if NodeFeatures.snapshot_groups.value in node_features: + return SnapshotRequestV2( + groups=[ + SnapshotRequestGroup( + root_glob=group.root_glob, + excluded_names=group.excluded_names, + embedded_file_size_max=group.embedded_file_size_max, + ) + for group in snapshot_groups + ], + ) + # This is a lossy backward compatibility since the extra options are not passed + return SnapshotRequest( + root_globs=[group.root_glob for group in snapshot_groups], + ) + + class SnapshotHash(AstacusModel): """ This class represents something that is to be stored in the object storage. @@ -226,6 +257,7 @@ class CassandraRestoreSSTablesRequest(NodeRequest): table_glob: str keyspaces_to_skip: Sequence[str] match_tables_by: CassandraTableMatching + expect_empty_target: bool # coordinator.api diff --git a/astacus/coordinator/plugins/base.py b/astacus/coordinator/plugins/base.py index 113a5fb6..376e7321 100644 --- a/astacus/coordinator/plugins/base.py +++ b/astacus/coordinator/plugins/base.py @@ -15,7 +15,6 @@ from astacus.coordinator.cluster import Cluster, Result from astacus.coordinator.config import CoordinatorNode from astacus.coordinator.manifest import download_backup_manifest -from astacus.node.api import Features from collections import Counter from typing import Any, Counter as TCounter, Dict, Generic, List, Optional, Sequence, Set, Type, TypeVar @@ -98,27 +97,14 @@ class SnapshotStep(Step[List[ipc.SnapshotResult]]): snapshot_groups: Sequence[SnapshotGroup] snapshot_request: str = "snapshot" + nodes_to_snapshot: Optional[List[CoordinatorNode]] = None async def run_step(self, cluster: Cluster, context: StepsContext) -> List[ipc.SnapshotResult]: nodes_metadata = await get_nodes_metadata(cluster) - if all(Features.snapshot_groups.value in node_metadata.features for node_metadata in nodes_metadata): - req: ipc.NodeRequest = ipc.SnapshotRequestV2( - groups=[ - ipc.SnapshotRequestGroup( - root_glob=group.root_glob, - excluded_names=group.excluded_names, - embedded_file_size_max=group.embedded_file_size_max, - ) - for group in self.snapshot_groups - ], - ) - else: - # This is a lossy backward compatibility since the extra options are not passed - req = ipc.SnapshotRequest( - root_globs=[group.root_glob for group in self.snapshot_groups], - ) + cluster_features = set.intersection(*(set(n.features) for n in nodes_metadata)) + req = ipc.create_snapshot_request(self.snapshot_groups, node_features=cluster_features) start_results = await cluster.request_from_nodes( - self.snapshot_request, method="post", caller="SnapshotStep", req=req + self.snapshot_request, method="post", caller="SnapshotStep", req=req, nodes=self.nodes_to_snapshot ) return await cluster.wait_successful_results(start_results=start_results, result_class=ipc.SnapshotResult) @@ -203,7 +189,7 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> List[ipc.No snapshot_results = context.get_result(SnapshotStep) nodes_metadata = await get_nodes_metadata(cluster) all_nodes_have_release_feature = nodes_metadata and all( - Features.release_snapshot_files.value in n.features for n in nodes_metadata + ipc.NodeFeatures.release_snapshot_files.value in n.features for n in nodes_metadata ) if not all_nodes_have_release_feature: logger.info("Skipped SnapshotReleaseStep because some nodes don't support it, node features: %s", nodes_metadata) @@ -382,6 +368,160 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> set[str]: return set(b for b in await self.json_storage.list_jsons() if b.startswith(magic.JSON_BACKUP_PREFIX)) +@dataclasses.dataclass +class DeltaManifestsStep(Step[List[ipc.BackupManifest]]): + """ + Download and parse all delta manifests necessary for restore. + + Includes only the deltas created after the base backup selected for restore. + Returns manifests sorted by start time. + """ + + json_storage: AsyncJsonStorage + + async def run_step(self, cluster: Cluster, context: StepsContext) -> List[ipc.BackupManifest]: + backup_manifest = context.get_result(BackupManifestStep) + # Right now does not really matter whether it's end or start, since backup and + # delta operations are mutually exclusive. + # Theoretically we might allow uploading deltas in parallel with base backup, + # in that scenario it makes sense to rely on backup start (because a delta might + # finish uploading while the base is still being uploaded). + delta_names = sorted(d for d in await self.json_storage.list_jsons() if d.startswith(magic.JSON_DELTA_PREFIX)) + matching_delta_manifests = [] + for delta_name in delta_names: + delta_manifest = await download_backup_manifest(self.json_storage, delta_name) + if delta_manifest.start >= backup_manifest.start: + matching_delta_manifests.append(delta_manifest) + return sorted(matching_delta_manifests, key=lambda m: m.start) + + +@dataclasses.dataclass +class RestoreDeltasStep(Step[None]): + """ + Restore the delta backups: download and apply to the node. + """ + + json_storage: AsyncJsonStorage + storage_name: str + # Delta restore is plugin-dependent, allow to customize it. + restore_delta_url: str + restore_delta_request: ipc.NodeRequest + partial_restore_nodes: Optional[List[ipc.PartialRestoreRequestNode]] = None + delta_manifests_step: Type[Step[List[ipc.BackupManifest]]] = DeltaManifestsStep + + async def run_step(self, cluster: Cluster, context: StepsContext) -> None: + deltas_to_restore = sorted(context.get_result(self.delta_manifests_step), key=lambda m: m.start) + + for delta_manifest in deltas_to_restore: + delta_name = delta_manifest.filename + # Since deltas can be uploaded from a different set of nodes than the base backup, + # explicitly re-match current nodes to delta manifest nodes on each restore. + if self.partial_restore_nodes and not self.contains_partial_restore_hostnames(delta_manifest): + logger.info("Skipped %s, because no data from it maps to partial restore nodes") + continue + node_to_backup_index = get_node_to_backup_index( + partial_restore_nodes=self.partial_restore_nodes, + snapshot_results=delta_manifest.snapshot_results, + nodes=cluster.nodes, + ) + nodes = [ + cluster.nodes[node_index] + for node_index, backup_index in enumerate(node_to_backup_index) + if backup_index is not None + ] + await self.download_delta( + delta_name, + nodes=nodes, + cluster=cluster, + node_to_backup_index=node_to_backup_index, + delta_manifest=delta_manifest, + ) + await self.restore_delta(delta_name, nodes=nodes, cluster=cluster) + await self.clear_delta( + delta_name, + nodes=nodes, + cluster=cluster, + node_to_backup_index=node_to_backup_index, + delta_manifest=delta_manifest, + ) + + def contains_partial_restore_hostnames(self, manifest: ipc.BackupManifest) -> bool: + assert self.partial_restore_nodes is not None + partial_restore_hostnames = {pr.backup_hostname for pr in self.partial_restore_nodes} + return any(sr.hostname in partial_restore_hostnames for sr in manifest.snapshot_results) + + async def download_delta( + self, + delta_name: str, + *, + nodes: List[CoordinatorNode], + cluster: Cluster, + node_to_backup_index: List[Optional[int]], + delta_manifest: ipc.BackupManifest, + ) -> None: + reqs: List[ipc.NodeRequest] = [] + for backup_index in node_to_backup_index: + if backup_index is not None: + snapshot_result = delta_manifest.snapshot_results[backup_index] + assert snapshot_result.state is not None + reqs.append( + ipc.SnapshotDownloadRequest( + storage=self.storage_name, + backup_name=delta_name, + snapshot_index=backup_index, + root_globs=snapshot_result.state.root_globs, + ) + ) + start_results = await cluster.request_from_nodes( + "delta/download", + method="post", + caller="restore_deltas", + reqs=reqs, + nodes=nodes, + ) + if not start_results: + raise StepFailedError(f"Initiating delta {delta_name} download failed") + await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult) + + async def restore_delta(self, delta_name: str, *, nodes: List[CoordinatorNode], cluster: Cluster) -> None: + start_results = await cluster.request_from_nodes( + self.restore_delta_url, + method="post", + caller="restore_deltas", + req=self.restore_delta_request, + nodes=nodes, + ) + if not start_results: + raise StepFailedError(f"Initiating delta {delta_name} restore failed") + await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult) + + async def clear_delta( + self, + delta_name: str, + *, + nodes: List[CoordinatorNode], + cluster: Cluster, + node_to_backup_index: List[Optional[int]], + delta_manifest: ipc.BackupManifest, + ) -> None: + reqs: List[ipc.NodeRequest] = [] + for backup_index in node_to_backup_index: + if backup_index is not None: + snapshot_result = delta_manifest.snapshot_results[backup_index] + assert snapshot_result.state is not None + reqs.append(ipc.SnapshotClearRequest(root_globs=snapshot_result.state.root_globs)) + start_results = await cluster.request_from_nodes( + "delta/clear", + method="post", + caller="restore_deltas", + reqs=reqs, + nodes=nodes, + ) + if not start_results: + raise StepFailedError(f"Initiating delta {delta_name} clear failed") + await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult) + + @dataclasses.dataclass class ComputeKeptBackupsStep(Step[set[str]]): """ @@ -635,7 +775,7 @@ async def upload_node_index_datas( start_results: List[Optional[Result]] = [] nodes_metadata = await get_nodes_metadata(cluster) for data in node_index_datas: - if Features.validate_file_hashes.value in nodes_metadata[data.node_index].features: + if ipc.NodeFeatures.validate_file_hashes.value in nodes_metadata[data.node_index].features: req: ipc.NodeRequest = ipc.SnapshotUploadRequestV20221129( hashes=data.sshashes, storage=storage_name, validate_file_hashes=validate_file_hashes ) @@ -650,8 +790,10 @@ async def upload_node_index_datas( return await cluster.wait_successful_results(start_results=start_results, result_class=ipc.SnapshotUploadResult) -async def get_nodes_metadata(cluster: Cluster) -> list[ipc.MetadataResult]: - metadata_responses = await cluster.request_from_nodes("metadata", caller="get_nodes_metadata", method="get") +async def get_nodes_metadata( + cluster: Cluster, *, nodes: Sequence[CoordinatorNode] | None = None +) -> list[ipc.MetadataResult]: + metadata_responses = await cluster.request_from_nodes("metadata", caller="get_nodes_metadata", method="get", nodes=nodes) return [ ipc.MetadataResult(version="", features=[]) if response is None else ipc.MetadataResult.parse_obj(response) for response in metadata_responses diff --git a/astacus/coordinator/plugins/cassandra/plugin.py b/astacus/coordinator/plugins/cassandra/plugin.py index b9edbcd1..8fed8f04 100644 --- a/astacus/coordinator/plugins/cassandra/plugin.py +++ b/astacus/coordinator/plugins/cassandra/plugin.py @@ -5,13 +5,12 @@ """ from .model import CassandraConfigurationNode -from .utils import run_subop +from .utils import delta_snapshot_groups, run_subop, snapshot_groups from astacus.common import ipc from astacus.common.cassandra.client import CassandraClient -from astacus.common.cassandra.config import CassandraClientConfiguration, SNAPSHOT_GLOB, SNAPSHOT_NAME +from astacus.common.cassandra.config import CassandraClientConfiguration, SNAPSHOT_GLOB from astacus.common.cassandra.utils import SYSTEM_KEYSPACES from astacus.common.magic import JSON_DELTA_PREFIX -from astacus.common.snapshot import SnapshotGroup from astacus.coordinator.cluster import Cluster from astacus.coordinator.plugins import base from astacus.coordinator.plugins.base import ( @@ -81,11 +80,6 @@ class CassandraPlugin(CoordinatorPlugin): def get_backup_steps(self, *, context: OperationContext) -> List[Step]: nodes = self.nodes or [CassandraConfigurationNode(listen_address=self.client.get_listen_address())] client = CassandraClient(self.client) - # first *: keyspace name; second *: table name - snapshot_groups = [ - SnapshotGroup(root_glob=f"data/*/*/snapshots/{SNAPSHOT_NAME}/*.db"), - SnapshotGroup(root_glob=f"data/*/*/snapshots/{SNAPSHOT_NAME}/*.txt"), - ] return [ ValidateConfigurationStep(nodes=nodes), @@ -94,7 +88,7 @@ def get_backup_steps(self, *, context: OperationContext) -> List[Step]: CassandraSubOpStep(op=ipc.CassandraSubOp.remove_snapshot), CassandraSubOpStep(op=ipc.CassandraSubOp.take_snapshot), backup_steps.AssertSchemaUnchanged(), - base.SnapshotStep(snapshot_groups=snapshot_groups), + base.SnapshotStep(snapshot_groups=snapshot_groups()), base.ListHexdigestsStep(hexdigest_storage=context.hexdigest_storage), base.UploadBlocksStep(storage_name=context.storage_name), CassandraSubOpStep(op=ipc.CassandraSubOp.remove_snapshot), @@ -108,11 +102,6 @@ def get_backup_steps(self, *, context: OperationContext) -> List[Step]: def get_delta_backup_steps(self, *, context: OperationContext) -> List[Step]: nodes = self.nodes or [CassandraConfigurationNode(listen_address=self.client.get_listen_address())] - # first *: keyspace name; second *: table name - delta_snapshot_groups = [ - SnapshotGroup(root_glob="data/*/*/backups/*.db"), - SnapshotGroup(root_glob="data/*/*/backups/*.txt"), - ] @dataclasses.dataclass class SkipHexdigestsListStep(Step[Set[str]]): @@ -121,7 +110,7 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Set[str]: return [ ValidateConfigurationStep(nodes=nodes), - base.SnapshotStep(snapshot_groups=delta_snapshot_groups, snapshot_request="delta/snapshot"), + base.SnapshotStep(snapshot_groups=delta_snapshot_groups(), snapshot_request="delta/snapshot"), SkipHexdigestsListStep(), base.UploadBlocksStep( storage_name=context.storage_name, @@ -162,12 +151,25 @@ def get_restore_schema_from_snapshot_steps(self, *, context: OperationContext, r table_glob=SNAPSHOT_GLOB, keyspaces_to_skip=[ks for ks in SYSTEM_KEYSPACES if ks != "system_schema"], match_tables_by=ipc.CassandraTableMatching.cfid, + expect_empty_target=True, ) return [ base.RestoreStep(storage_name=context.storage_name, partial_restore_nodes=req.partial_restore_nodes), CassandraRestoreSubOpStep(op=ipc.CassandraSubOp.restore_sstables, req=restore_sstables_req), + base.DeltaManifestsStep(json_storage=context.json_storage), + restore_steps.RestoreCassandraDeltasStep( + json_storage=context.json_storage, + storage_name=context.storage_name, + partial_restore_nodes=req.partial_restore_nodes, + ), restore_steps.StopReplacedNodesStep(partial_restore_nodes=req.partial_restore_nodes, cassandra_nodes=self.nodes), + restore_steps.UploadFinalDeltaStep(json_storage=context.json_storage, storage_name=context.storage_name), + restore_steps.RestoreFinalDeltasStep( + json_storage=context.json_storage, + storage_name=context.storage_name, + partial_restore_nodes=req.partial_restore_nodes, + ), restore_steps.StartCassandraStep(replace_backup_nodes=True, override_tokens=True, cassandra_nodes=self.nodes), restore_steps.WaitCassandraUpStep(duration=self.restore_start_timeout), ] @@ -179,6 +181,7 @@ def get_restore_schema_from_manifest_steps(self, *, context: OperationContext, r table_glob=SNAPSHOT_GLOB, keyspaces_to_skip=SYSTEM_KEYSPACES, match_tables_by=ipc.CassandraTableMatching.cfname, + expect_empty_target=True, ) return [ # Start cassandra with backed up token distribution + set schema + stop it @@ -186,7 +189,10 @@ def get_restore_schema_from_manifest_steps(self, *, context: OperationContext, r restore_steps.WaitCassandraUpStep(duration=self.restore_start_timeout), restore_steps.RestorePreDataStep(client=client), CassandraRestoreSubOpStep(op=ipc.CassandraSubOp.stop_cassandra), - # Restore snapshot + # Restore snapshot. Restoring deltas is not possible in this scenario, + # because once we've created our own system_schema keyspace and written data to it, + # we've started a new sequence of sstables that might clash with the sequence from the node + # we took the backup from (e.g. the old node had nb-1, the new node has nb-1, unclear how to proceed). base.RestoreStep(storage_name=context.storage_name, partial_restore_nodes=req.partial_restore_nodes), CassandraRestoreSubOpStep(op=ipc.CassandraSubOp.restore_sstables, req=restore_sstables_req), # restart cassandra and do the final actions with data available diff --git a/astacus/coordinator/plugins/cassandra/restore_steps.py b/astacus/coordinator/plugins/cassandra/restore_steps.py index dbbc8072..6f77bf35 100644 --- a/astacus/coordinator/plugins/cassandra/restore_steps.py +++ b/astacus/coordinator/plugins/cassandra/restore_steps.py @@ -6,16 +6,28 @@ """ from .model import CassandraConfigurationNode, CassandraManifest, CassandraManifestNode -from .utils import get_schema_hash, run_subop +from .utils import delta_snapshot_groups, get_schema_hash, run_subop from astacus.common import ipc, utils +from astacus.common.asyncstorage import AsyncJsonStorage from astacus.common.cassandra.client import CassandraClient +from astacus.common.cassandra.config import BACKUP_GLOB from astacus.common.cassandra.schema import CassandraKeyspace +from astacus.common.cassandra.utils import SYSTEM_KEYSPACES +from astacus.common.magic import JSON_DELTA_PREFIX from astacus.coordinator.cluster import Cluster from astacus.coordinator.config import CoordinatorNode -from astacus.coordinator.plugins.base import BackupManifestStep, MapNodesStep, Step, StepFailedError, StepsContext +from astacus.coordinator.plugins.base import ( + BackupManifestStep, + get_nodes_metadata, + MapNodesStep, + RestoreDeltasStep, + Step, + StepFailedError, + StepsContext, +) from cassandra import metadata as cm -from dataclasses import dataclass -from typing import Iterable, List, Optional +from dataclasses import dataclass, field +from typing import Iterable, List, Optional, Type import logging @@ -48,11 +60,11 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> CassandraMa @dataclass -class StopReplacedNodesStep(Step[None]): +class StopReplacedNodesStep(Step[List[CoordinatorNode]]): partial_restore_nodes: Optional[List[ipc.PartialRestoreRequestNode]] cassandra_nodes: List[CassandraConfigurationNode] - async def run_step(self, cluster: Cluster, context: StepsContext) -> None: + async def run_step(self, cluster: Cluster, context: StepsContext) -> List[CoordinatorNode]: node_to_backup_index = context.get_result(MapNodesStep) cassandra_manifest = context.get_result(ParsePluginManifestStep) @@ -69,6 +81,7 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> None: if nodes_to_stop: await run_subop(cluster, ipc.CassandraSubOp.stop_cassandra, nodes=nodes_to_stop, result_class=ipc.NodeResult) + return nodes_to_stop def find_matching_cassandra_index(self, backup_node: CassandraManifestNode) -> Optional[int]: for cassandra_index, cassandra_node in enumerate(self.cassandra_nodes): @@ -77,6 +90,75 @@ def find_matching_cassandra_index(self, backup_node: CassandraManifestNode) -> O return None +@dataclass +class UploadFinalDeltaStep(Step[List[ipc.BackupManifest]]): + json_storage: AsyncJsonStorage + storage_name: str + + async def run_step(self, cluster: Cluster, context: StepsContext) -> List[ipc.BackupManifest]: + replaced_nodes = context.get_result(StopReplacedNodesStep) + if len(replaced_nodes) != 1: + raise StepFailedError(f"Multiple node replacement not supported: {replaced_nodes}") + replaced_node = replaced_nodes[0] + + snapshot_results = await self.snapshot_delta(replaced_node, cluster=cluster, context=context) + if len(snapshot_results) != 1: + raise StepFailedError(f"Unexpected snapshot results (expected only 1): {snapshot_results}") + final_delta_hashes = snapshot_results[0].hashes + if final_delta_hashes is None: + raise StepFailedError("Final delta hashes are missing (None)") + upload_results = await self.upload_delta(replaced_node, final_delta_hashes, cluster=cluster) + return await self.upload_manifest(snapshot_results, upload_results, context=context) + + async def snapshot_delta( + self, node: CoordinatorNode, *, cluster: Cluster, context: StepsContext + ) -> List[ipc.SnapshotResult]: + nodes_metadata = await get_nodes_metadata(cluster, nodes=[node]) + if len(nodes_metadata) != 1: + raise StepFailedError(f"Unexpected node metadata results (expected only 1): {nodes_metadata}") + node_features = set(nodes_metadata[0].features) + req = ipc.create_snapshot_request(delta_snapshot_groups(), node_features=node_features) + start_results = await cluster.request_from_nodes( + "delta/snapshot", method="post", caller="UploadFinalDeltaStep", req=req, nodes=[node] + ) + return await cluster.wait_successful_results(start_results=start_results, result_class=ipc.SnapshotResult) + + async def upload_delta( + self, node: CoordinatorNode, hashes: List[ipc.SnapshotHash], *, cluster: Cluster + ) -> List[ipc.SnapshotUploadResult]: + upload_req: ipc.NodeRequest = ipc.SnapshotUploadRequestV20221129( + hashes=hashes, storage=self.storage_name, validate_file_hashes=False + ) + upload_start_results = await cluster.request_from_nodes( + "delta/upload", caller="upload_final_delta", method="post", req=upload_req, nodes=[node] + ) + return await cluster.wait_successful_results( + start_results=upload_start_results, result_class=ipc.SnapshotUploadResult + ) + + async def upload_manifest( + self, + snapshot_results: List[ipc.SnapshotResult], + upload_results: List[ipc.SnapshotUploadResult], + *, + context: StepsContext, + ) -> List[ipc.BackupManifest]: + iso = context.attempt_start.isoformat(timespec="seconds") + backup_name = f"{JSON_DELTA_PREFIX}{iso}" + manifest = ipc.BackupManifest( + attempt=context.attempt, + start=context.attempt_start, + snapshot_results=snapshot_results, + upload_results=upload_results, + plugin=ipc.Plugin.cassandra, + plugin_data={}, + filename=backup_name, + ) + logger.info("Storing backup manifest %s", backup_name) + await self.json_storage.upload_json(backup_name, manifest) + return [manifest] + + @dataclass class StartCassandraStep(Step[None]): override_tokens: bool @@ -148,3 +230,25 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> None: # TBD: How does partial restore work in this case? manifest = context.get_result(ParsePluginManifestStep) await self.client.run_sync(manifest.cassandra_schema.restore_post_data) + + +def restore_cassandra_deltas_req() -> ipc.NodeRequest: + return ipc.CassandraRestoreSSTablesRequest( + table_glob=BACKUP_GLOB, + keyspaces_to_skip=[ks for ks in SYSTEM_KEYSPACES if ks != "system_schema"], + match_tables_by=ipc.CassandraTableMatching.cfid, + expect_empty_target=False, + ) + + +@dataclass +class RestoreCassandraDeltasStep(RestoreDeltasStep): + restore_delta_url: str = f"cassandra/{ipc.CassandraSubOp.restore_sstables}" + restore_delta_request: ipc.NodeRequest = field(default_factory=restore_cassandra_deltas_req) + + +@dataclass +class RestoreFinalDeltasStep(RestoreDeltasStep): + restore_delta_url: str = f"cassandra/{ipc.CassandraSubOp.restore_sstables}" + restore_delta_request: ipc.NodeRequest = field(default_factory=restore_cassandra_deltas_req) + delta_manifests_step: Type[Step[List[ipc.BackupManifest]]] = UploadFinalDeltaStep diff --git a/astacus/coordinator/plugins/cassandra/utils.py b/astacus/coordinator/plugins/cassandra/utils.py index 761df351..2d5ad13d 100644 --- a/astacus/coordinator/plugins/cassandra/utils.py +++ b/astacus/coordinator/plugins/cassandra/utils.py @@ -7,6 +7,8 @@ from astacus.common import ipc +from astacus.common.cassandra.config import SNAPSHOT_NAME +from astacus.common.snapshot import SnapshotGroup from astacus.coordinator.cluster import Cluster from astacus.coordinator.config import CoordinatorNode from astacus.coordinator.plugins.base import StepFailedError @@ -49,3 +51,21 @@ async def get_schema_hash(cluster: Cluster) -> Tuple[str, str]: if len(set(hashes)) != 1: return "", f"Multiple schema hashes present: {hashes}" return hashes[0], "" + + +def snapshot_groups() -> List[SnapshotGroup]: + # first *: keyspace name; second *: table name + return [ + SnapshotGroup(root_glob=f"data/*/*/snapshots/{SNAPSHOT_NAME}/*.db"), + SnapshotGroup(root_glob=f"data/*/*/snapshots/{SNAPSHOT_NAME}/*.txt"), + SnapshotGroup(root_glob=f"data/*/*/snapshots/{SNAPSHOT_NAME}/*.crc32"), + ] + + +def delta_snapshot_groups() -> List[SnapshotGroup]: + # first *: keyspace name; second *: table name + return [ + SnapshotGroup(root_glob="data/*/*/backups/*.db"), + SnapshotGroup(root_glob="data/*/*/backups/*.txt"), + SnapshotGroup(root_glob="data/*/*/backups/*.crc32"), + ] diff --git a/astacus/node/api.py b/astacus/node/api.py index 79dba5bc..376b7184 100644 --- a/astacus/node/api.py +++ b/astacus/node/api.py @@ -14,7 +14,6 @@ from astacus.node.config import CassandraAccessLevel from astacus.node.snapshotter import Snapshotter from astacus.version import __version__ -from enum import Enum from fastapi import APIRouter, Depends, HTTPException from typing import Sequence, Union @@ -38,15 +37,6 @@ class OpName(StrEnum): release = "release" -class Features(Enum): - # Added on 2022-11-29, this can be assumed to be supported everywhere after 1 or 2 years - validate_file_hashes = "validate_file_hashes" - # Added on 2023-06-07 - snapshot_groups = "snapshot_groups" - # Added on 2023-10-16 - release_snapshot_files = "release_snapshot_files" - - def is_allowed(subop: ipc.CassandraSubOp, access_level: CassandraAccessLevel): match access_level: case CassandraAccessLevel.read: @@ -59,7 +49,7 @@ def is_allowed(subop: ipc.CassandraSubOp, access_level: CassandraAccessLevel): def metadata() -> ipc.MetadataResult: return ipc.MetadataResult( version=__version__, - features=[feature.value for feature in Features], + features=[feature.value for feature in ipc.NodeFeatures], ) @@ -180,6 +170,20 @@ def download_result(*, op_id: int, n: Node = Depends()): return op.result +@router.post("/delta/download") +def delta_download(req: ipc.SnapshotDownloadRequest, n: Node = Depends()): + if not n.state.is_locked: + raise HTTPException(status_code=409, detail="Not locked") + snapshotter = delta_snapshotter_from_snapshot_req(req, n) + return DownloadOp(n=n, op_id=n.allocate_op_id(), stats=n.stats, req=req).start(snapshotter) + + +@router.get("/delta/download/{op_id}") +def delta_download_result(*, op_id: int, n: Node = Depends()): + op, _ = n.get_op_and_op_info(op_id=op_id, op_name=OpName.download) + return op.result + + @router.post("/clear") def clear(req: ipc.SnapshotClearRequest, n: Node = Depends()): if not n.state.is_locked: diff --git a/astacus/node/cassandra.py b/astacus/node/cassandra.py index 98f60f40..74fb2177 100644 --- a/astacus/node/cassandra.py +++ b/astacus/node/cassandra.py @@ -14,7 +14,7 @@ from astacus.common.exceptions import TransientException from pathlib import Path from pydantic import DirectoryPath -from typing import Callable +from typing import Callable, Tuple import contextlib import logging @@ -28,6 +28,16 @@ TABLES_GLOB = "data/*/*" +def ks_table_from_snapshot_path(p: Path) -> Tuple[str, str]: + # /.../keyspace/table/snapshots/astacus + return p.parts[-4], p.parts[-3] + + +def ks_table_from_backup_path(p: Path) -> Tuple[str, str]: + # /.../keyspace/table/backups + return p.parts[-3], p.parts[-2] + + class SimpleCassandraSubOp(NodeOp[ipc.NodeRequest, ipc.NodeResult]): """ Generic class to handle no arguments in + no output out case subops. @@ -145,14 +155,8 @@ def restore_sstables(self) -> None: else self._match_table_by_name(keyspace_name, table_name_and_id) ) - # Ensure destination path is empty except for potential directories (e.g. backups/) - # This should never have anything - except for system_auth, it gets populated when we restore schema. - existing_files = [file_path for file_path in table_path.glob("*") if file_path.is_file()] - if keyspace_name == "system_auth": - for existing_file in existing_files: - existing_file.unlink() - existing_files = [] - assert not existing_files, f"Files found in {table_name_and_id}: {existing_files}" + if self.req.expect_empty_target: + self._ensure_target_is_empty(keyspace_name=keyspace_name, table_path=table_path) for file_path in table_snapshot.glob("*"): file_path.rename(table_path / file_path.name) @@ -167,14 +171,27 @@ def _match_table_by_name(self, keyspace_name: str, table_name_and_id: str) -> Di # This could be more efficient too; oh well. keyspace_path = self.config.root / "data" / keyspace_name table_paths = list(keyspace_path.glob(f"{table_name}-*")) - assert len(table_paths) >= 1, f"NO tables with prefix {table_name}- found in {keyspace_path}!" + if not table_paths: + raise RuntimeError(f"NO tables with prefix {table_name}- found in {keyspace_path}!") if len(table_paths) > 1: # Prefer the one that isn't table_name_and_id table_paths = [p for p in table_paths if p.name != table_name_and_id] - assert len(table_paths) == 1 + if len(table_paths) != 1: + raise RuntimeError(f"Too many tables with prefix {table_name}- found in {keyspace_path}: {table_paths}") return table_paths[0] + def _ensure_target_is_empty(self, *, keyspace_name: str, table_path: Path) -> None: + # Ensure destination path is empty except for potential directories (e.g. backups/) + # This should never have anything - except for system_auth, it gets populated when we restore schema. + existing_files = [file_path for file_path in table_path.glob("*") if file_path.is_file()] + if keyspace_name == "system_auth": + for existing_file in existing_files: + existing_file.unlink() + existing_files = [] + if existing_files: + raise RuntimeError(f"Files found in {table_path.name}: {existing_files}") + class CassandraStartOp(NodeOp[ipc.CassandraStartRequest, ipc.NodeResult]): def create_result(self) -> ipc.NodeResult: diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 0d8e24ea..b2d640f4 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -53,6 +53,9 @@ def __init__(self, *, mocker, tmpdir): other_table_path.mkdir(parents=True) (other_table_path / "data.file").write_text("data") + self.backup_path = keyspace_path / "dummytable-123" / "backups" + self.backup_path.mkdir(parents=True) + system_schema_path = self.root / "data" / "system_schema" / "tables-789" / "snapshots" / SNAPSHOT_NAME system_schema_path.mkdir(parents=True) (system_schema_path / "data.file").write_text("schema") @@ -71,6 +74,8 @@ def __init__(self, *, mocker, tmpdir): (keyspace_path / "dummytable-234").mkdir() (keyspace_path / "anothertable-789").mkdir() + (self.backup_path / "incremental.backup").write_text("delta") + named_temporary_file = mocker.patch.object(tempfile, "NamedTemporaryFile") self.fake_conffile = StringIO() named_temporary_file.return_value.__enter__.return_value = self.fake_conffile diff --git a/tests/unit/coordinator/plugins/cassandra/test_restore_steps.py b/tests/unit/coordinator/plugins/cassandra/test_restore_steps.py index 095d8722..1b6e729c 100644 --- a/tests/unit/coordinator/plugins/cassandra/test_restore_steps.py +++ b/tests/unit/coordinator/plugins/cassandra/test_restore_steps.py @@ -132,7 +132,7 @@ def get_result(cl): context = SimpleNamespace(get_result=get_result) cluster = SimpleNamespace(nodes=nodes) result = await step.run_step(cluster, context) - assert result is None + assert result == [_coordinator_node(1)] run_subop.assert_awaited_once_with( cluster, ipc.CassandraSubOp.stop_cassandra, diff --git a/tests/unit/coordinator/plugins/test_base.py b/tests/unit/coordinator/plugins/test_base.py index a2013868..88dc31ae 100644 --- a/tests/unit/coordinator/plugins/test_base.py +++ b/tests/unit/coordinator/plugins/test_base.py @@ -5,13 +5,16 @@ """ from astacus.common import ipc, utils from astacus.common.asyncstorage import AsyncJsonStorage +from astacus.common.ipc import Plugin from astacus.common.op import Op from astacus.common.progress import Progress from astacus.common.utils import now from astacus.coordinator.cluster import Cluster from astacus.coordinator.config import CoordinatorNode from astacus.coordinator.plugins.base import ( + BackupManifestStep, ComputeKeptBackupsStep, + DeltaManifestsStep, ListBackupsStep, ListHexdigestsStep, SnapshotReleaseStep, @@ -21,7 +24,6 @@ UploadBlocksStep, UploadManifestStep, ) -from astacus.node.api import Features from astacus.node.snapshotter import hash_hexdigest_readable from http import HTTPStatus from io import BytesIO @@ -30,6 +32,7 @@ from typing import AbstractSet, Callable, List, Optional, Sequence from unittest import mock +import dataclasses import datetime import httpx import json @@ -102,7 +105,7 @@ def check_request(request: httpx.Request) -> httpx.Response: ipc.SnapshotUploadRequest(result_url="", hashes=get_sample_hashes(), storage="fake"), ), ( - [Features.validate_file_hashes], + [ipc.NodeFeatures.validate_file_hashes], ipc.SnapshotUploadRequestV20221129( result_url="", hashes=get_sample_hashes(), storage="fake", validate_file_hashes=True ), @@ -111,7 +114,7 @@ def check_request(request: httpx.Request) -> httpx.Response: ids=["no_feature", "validate_file_hashes"], ) async def test_upload_step_uses_new_request_if_supported( - node_features: Sequence[Features], + node_features: Sequence[ipc.NodeFeatures], expected_request: ipc.SnapshotUploadRequest, single_node_cluster: Cluster, context: StepsContext, @@ -250,13 +253,13 @@ async def test_upload_manifest_step_generates_correct_backup_name( [ ([], None), ( - [Features.release_snapshot_files], + [ipc.NodeFeatures.release_snapshot_files], ipc.SnapshotReleaseRequest(hexdigests=["aaa", "bbb"]), ), ], ) async def test_snapshot_release_step( - node_features: Sequence[Features], + node_features: Sequence[ipc.NodeFeatures], expected_request: Optional[ipc.SnapshotReleaseRequest], single_node_cluster: Cluster, context: StepsContext, @@ -269,7 +272,7 @@ async def test_snapshot_release_step( metadata_request = respx.get("http://node_1/metadata").respond( json=ipc.MetadataResult(version="0.1", features=[feature.value for feature in node_features]).jsondict() ) - if Features.release_snapshot_files in node_features: + if ipc.NodeFeatures.release_snapshot_files in node_features: assert expected_request is not None release_request = respx.post("http://node_1/release").mock( side_effect=make_request_check(expected_request.jsondict(), "release") @@ -283,6 +286,82 @@ async def test_snapshot_release_step( ) await release_step.run_step(cluster=single_node_cluster, context=context) assert metadata_request.call_count == 1 - if Features.release_snapshot_files in node_features: + if ipc.NodeFeatures.release_snapshot_files in node_features: assert release_request.call_count == 1 assert status_request.called + + +def make_manifest(start: str, end: str) -> ipc.BackupManifest: + manifest = ipc.BackupManifest( + start=datetime.datetime.fromisoformat(start), + end=datetime.datetime.fromisoformat(end), + attempt=1, + snapshot_results=[], + upload_results=[], + plugin=Plugin.files, + ) + return manifest + + +@dataclasses.dataclass +class TestListDeltasParam: + test_id: str + basebackup_manifest: ipc.BackupManifest + stored_jsons: dict[str, str] + expected_deltas: list[str] + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "p", + [ + TestListDeltasParam( + test_id="empty_storage", + basebackup_manifest=make_manifest(start="1970-01-01T00:00", end="1970-01-01T00:30"), + stored_jsons={}, + expected_deltas=[], + ), + TestListDeltasParam( + test_id="single_delta", + basebackup_manifest=make_manifest(start="1970-01-01T00:00", end="1970-01-01T00:30"), + stored_jsons={ + "backup-base": make_manifest(start="1970-01-01T00:00", end="1970-01-01T00:30").json(), + "delta-one": make_manifest(start="1970-01-01T01:00", end="1970-01-01T01:05").json(), + }, + expected_deltas=["delta-one"], + ), + TestListDeltasParam( + test_id="deltas_older_than_backup_are_not_listed", + basebackup_manifest=make_manifest(start="2000-01-01T00:00", end="2000-01-01T00:30"), + stored_jsons={ + "backup-old": make_manifest(start="1970-01-01T00:00", end="1970-01-01T00:30").json(), + "delta-old": make_manifest(start="1970-01-01T01:00", end="1970-01-01T01:05").json(), + "backup-one": make_manifest(start="2000-01-01T00:00", end="2000-01-01T00:30").json(), + "delta-one": make_manifest(start="2000-01-01T01:00", end="2000-01-01T01:05").json(), + "delta-two": make_manifest(start="2000-01-01T12:00", end="1970-01-01T12:05").json(), + "backup-two": make_manifest(start="2000-01-02T00:00", end="2000-01-02T00:30").json(), + "delta-three": make_manifest(start="2000-01-02T12:00", end="2000-01-02T12:05").json(), + }, + expected_deltas=["delta-one", "delta-two", "delta-three"], + ), + TestListDeltasParam( + test_id="relies_on_start_time_in_case_of_intersections", + basebackup_manifest=make_manifest(start="2000-01-01T00:00", end="2000-01-01T00:30"), + stored_jsons={ + "delta-old": make_manifest(start="1999-12-31T23:59", end="2000-01-01T00:04").json(), + "backup-one": make_manifest(start="2000-01-01T00:00", end="2000-01-01T00:30").json(), + "delta-one": make_manifest(start="2000-01-01T00:05", end="2000-01-01T00:10").json(), + }, + expected_deltas=["delta-one"], + ), + ], + ids=lambda p: p.test_id, +) +async def test_list_delta_backups(p: TestListDeltasParam) -> None: + async_json_storage = AsyncJsonStorage(MemoryJsonStorage(p.stored_jsons)) + step = DeltaManifestsStep(async_json_storage) + cluster = Cluster(nodes=[CoordinatorNode(url="http://node_1")]) + context = StepsContext() + context.set_result(BackupManifestStep, p.basebackup_manifest) + backup_names = [b.filename for b in await step.run_step(cluster=cluster, context=context)] + assert backup_names == p.expected_deltas diff --git a/tests/unit/node/test_node_cassandra.py b/tests/unit/node/test_node_cassandra.py index c67ac5d1..74d0c4be 100644 --- a/tests/unit/node/test_node_cassandra.py +++ b/tests/unit/node/test_node_cassandra.py @@ -100,7 +100,7 @@ def test_api_cassandra_subop(app, ctenv, mocker, subop): assert (ctenv.root / "data").exists() assert (ctenv.root / "data" / "dummyks").exists() assert (ctenv.root / "data" / "dummyks" / "dummytable-123").exists() - assert [p.name for p in (ctenv.root / "data" / "dummyks" / "dummytable-123").iterdir()] == ["snapshots"] + assert {p.name for p in (ctenv.root / "data" / "dummyks" / "dummytable-123").iterdir()} == {"backups", "snapshots"} assert not (ctenv.root / "data" / "dummyks" / "dummytable-234").exists() elif subop == ipc.CassandraSubOp.start_cassandra: subprocess_run.assert_any_call(ctenv.cassandra_node_config.start_command + ["tempfilename"], check=True) @@ -154,6 +154,7 @@ class DefaultedRestoreSSTablesRequest(ipc.CassandraRestoreSSTablesRequest): table_glob: str = astacus_node_cassandra.SNAPSHOT_GLOB keyspaces_to_skip: Sequence[str] = list(SYSTEM_KEYSPACES) match_tables_by: ipc.CassandraTableMatching = ipc.CassandraTableMatching.cfname + expect_empty_target: bool = True return DefaultedRestoreSSTablesRequest @@ -183,6 +184,12 @@ def test_matches_tables_by_id_when_told_to(self, ctenv, make_sstables_request) - assert (ctenv.root / "data" / "dummyks" / "dummytable-123" / "asdf").read_text() == "foobar" assert not (ctenv.root / "data" / "dummyks" / "dummytable-234" / "asdf").exists() + def test_allows_existing_files_when_told_to(self, ctenv, make_sstables_request) -> None: + req = make_sstables_request(expect_empty_target=False) + (ctenv.root / "data" / "dummyks" / "dummytable-123" / "existing_file").write_text("exists") + + self.assert_request_succeeded(ctenv, req) + def assert_request_succeeded(self, ctenv, req: ipc.CassandraRestoreSSTablesRequest) -> None: response = ctenv.post(subop=ipc.CassandraSubOp.restore_sstables, json=req.dict()) status = ctenv.get_status(response)