From 3acd04254ac3de84b3e33193762d248e73e01180 Mon Sep 17 00:00:00 2001 From: Heavybullets8 Date: Thu, 30 May 2024 11:24:57 -0600 Subject: [PATCH] Snapshot volumes (#171) Create backups of snapshots for ix_volumes and Persistent Volumes. Allow configuration for the maximum size for those two. --- .default.config.ini | 6 + functions/backup_restore/backup/backup.py | 148 +++++--- functions/backup_restore/backup/export_.py | 19 - functions/backup_restore/backup_manager.py | 85 ++--- functions/backup_restore/base_manager.py | 54 ++- functions/backup_restore/charts/api_fetch.py | 21 +- .../backup_restore/charts/backup_fetch.py | 34 +- functions/backup_restore/database/base.py | 2 +- functions/backup_restore/database/restore.py | 4 +- .../backup_restore/kube/resources_restore.py | 102 +++--- .../backup_restore/restore/restore_all.py | 20 +- .../backup_restore/restore/restore_base.py | 145 +++++++- functions/backup_restore/restore_manager.py | 3 +- functions/backup_restore/zfs/cache.py | 57 ++- functions/backup_restore/zfs/lifecycle.py | 35 +- functions/backup_restore/zfs/snapshot.py | 344 +++++++++++------- heavy_script.sh | 2 +- utils/update_config.py | 88 +++-- 18 files changed, 735 insertions(+), 434 deletions(-) diff --git a/.default.config.ini b/.default.config.ini index 7b2585f6..9ceea6ac 100644 --- a/.default.config.ini +++ b/.default.config.ini @@ -54,7 +54,13 @@ ignore= ## true/false options ## export_enabled=true full_backup_enabled=true +backup_snapshot_streams=false ## String options ## # Uncomment the following line to specify a custom dataset location for backups # custom_dataset_location= + +# Maximum size of a backup stream, be careful when setting this higher +# Especially considering PV's for plex, sonarr, radarr, etc. can be quite large +# Example: max_stream_size=10G, max_stream_size=20K, max_stream_size=1T +max_stream_size=1G \ No newline at end of file diff --git a/functions/backup_restore/backup/backup.py b/functions/backup_restore/backup/backup.py index 49a600c3..21a38d32 100644 --- a/functions/backup_restore/backup/backup.py +++ b/functions/backup_restore/backup/backup.py @@ -1,4 +1,5 @@ from datetime import datetime, timezone +from configobj import ConfigObj from pathlib import Path from collections import defaultdict @@ -49,7 +50,7 @@ def __init__(self, backup_dir: Path, retention_number: int = 15): self.backup_dataset_parent = self.backup_dir.relative_to("/mnt") self.backup_dataset = str(self.backup_dataset_parent) - self._create_backup_dataset(self.backup_dataset) + self._create_backup_dataset() self.chart_collection = APIChartCollection() self.all_chart_names = self.chart_collection.all_chart_names @@ -57,13 +58,28 @@ def __init__(self, backup_dir: Path, retention_number: int = 15): self.kube_pvc_fetcher = KubePVCFetcher() - def _create_backup_dataset(self, dataset): + # Read configuration settings + config_file_path = str(Path(__file__).parent.parent.parent.parent / 'config.ini') + config = ConfigObj(config_file_path, encoding='utf-8', list_values=False) + + self.backup_snapshot_streams = config['BACKUP'].as_bool('backup_snapshot_streams') + self.max_stream_size_str = config['BACKUP'].get('max_stream_size', '10G') + self.max_stream_size_bytes = self._size_str_to_bytes(self.max_stream_size_str) + + def _create_backup_dataset(self): """ - Create a ZFS dataset for backups if it doesn't already exist. + Create a ZFS dataset for backups. """ - if not self.lifecycle_manager.dataset_exists(dataset): - if not self.lifecycle_manager.create_dataset(dataset): - raise RuntimeError(f"Failed to create backup dataset: {dataset}") + if not self.lifecycle_manager.dataset_exists(self.backup_dataset): + if not self.lifecycle_manager.create_dataset( + self.backup_dataset, + options={ + "atime": "off", + "compression": "zstd-19", + "recordsize": "1M" + } + ): + raise RuntimeError(f"Failed to create backup dataset: {self.backup_dataset}") def backup_all(self): """ @@ -144,14 +160,78 @@ def backup_all(self): dataset_paths = self.kube_pvc_fetcher.get_volume_paths_by_namespace(f"ix-{app_name}") if dataset_paths: - self.logger.info(f"Backing up {app_name} PVCs...") - snapshot_errors = self.snapshot_manager.create_snapshots(self.snapshot_name, dataset_paths, self.retention_number) - if snapshot_errors: - failures[app_name].extend(snapshot_errors) + for dataset_path in dataset_paths: + pvc_name = dataset_path.split('/')[-1] + self.logger.info(f"Snapshotting PVC: {pvc_name}...") + + # Check to see if dataset exists + if not self.lifecycle_manager.dataset_exists(dataset_path): + error_msg = f"Dataset {dataset_path} does not exist." + self.logger.error(error_msg) + failures[app_name].append(error_msg) + continue + + # Create the snapshot for the current dataset + snapshot_result = self.snapshot_manager.create_snapshot(self.snapshot_name, dataset_path) + if not snapshot_result["success"]: + failures[app_name].append(snapshot_result["message"]) + continue + + self.logger.debug(f"backup_snapshot_streams: {self.backup_snapshot_streams}") + self.logger.debug(f"max_stream_size_str: {self.max_stream_size_str}") + self.logger.debug(f"max_stream_size_bytes: {self.max_stream_size_bytes}") + + if self.backup_snapshot_streams: + snapshot = f"{dataset_path}@{self.snapshot_name}" + snapshot_refer_size = self.snapshot_manager.get_snapshot_refer_size(snapshot) + self.logger.debug(f"snapshot_refer_size: {snapshot_refer_size}") + + if snapshot_refer_size <= self.max_stream_size_bytes: + # Send the snapshot to the backup directory + self.logger.info(f"Sending PV snapshot stream to backup file...") + snapshot = f"{dataset_path}@{self.snapshot_name}" + backup_path = app_backup_dir / "snapshots" / f"{snapshot.replace('/', '%%')}.zfs" + backup_path.parent.mkdir(parents=True, exist_ok=True) + send_result = self.snapshot_manager.zfs_send(snapshot, backup_path, compress=True) + if not send_result["success"]: + failures[app_name].append(send_result["message"]) + else: + self.logger.warning(f"Snapshot refer size {snapshot_refer_size} exceeds the maximum configured size {self.max_stream_size_bytes}") + else: + self.logger.debug("Backup snapshot streams are disabled in the configuration.") + + # Handle ix_volumes_dataset separately + if chart_info.ix_volumes_dataset: + snapshot = chart_info.ix_volumes_dataset + "@" + self.snapshot_name + if self.backup_snapshot_streams: + snapshot_refer_size = self.snapshot_manager.get_snapshot_refer_size(snapshot) + self.logger.debug(f"ix_volumes_dataset snapshot_refer_size: {snapshot_refer_size}") + + if snapshot_refer_size <= self.max_stream_size_bytes: + self.logger.info(f"Sending ix_volumes snapshot stream to backup file...") + backup_path = app_backup_dir / "snapshots" / f"{snapshot.replace('/', '%%')}.zfs" + backup_path.parent.mkdir(parents=True, exist_ok=True) + send_result = self.snapshot_manager.zfs_send(snapshot, backup_path, compress=True) + if not send_result["success"]: + failures[app_name].append(send_result["message"]) + else: + self.logger.warning(f"ix_volumes_dataset snapshot refer size {snapshot_refer_size} exceeds the maximum configured size {self.max_stream_size_bytes}") + else: + self.logger.debug("Backup snapshot streams are disabled in the configuration.") self._create_backup_snapshot() self._log_failures(failures) - self._cleanup_old_backups() + + def _size_str_to_bytes(self, size_str): + size_units = {"K": 1024, "M": 1024**2, "G": 1024**3, "T": 1024**4} + try: + if size_str[-1] in size_units: + return int(float(size_str[:-1]) * size_units[size_str[-1]]) + else: + return int(size_str) + except ValueError: + self.logger.error(f"Invalid size string: {size_str}") + return 0 def _log_failures(self, failures): """ @@ -175,34 +255,14 @@ def _create_backup_snapshot(self): Create a snapshot of the backup dataset after all backups are completed. """ self.logger.info(f"\nCreating snapshot for backup: {self.backup_dataset}") - if self.snapshot_manager.create_snapshots(self.snapshot_name, [self.backup_dataset], self.retention_number): - self.logger.error("Failed to create snapshot for backup dataset.") - else: - self.logger.info("Snapshot created successfully for backup dataset.") + snapshot_result = self.snapshot_manager.create_snapshot(self.snapshot_name, self.backup_dataset) - def _cleanup_old_backups(self): - """ - Cleanup old backups and their associated snapshots if the number of backups exceeds the retention limit. - """ - backup_datasets = sorted( - (ds for ds in self.lifecycle_manager.list_datasets() if ds.startswith(f"{self.backup_dataset_parent}/HeavyScript--")), - key=lambda ds: datetime.strptime(ds.replace(f"{self.backup_dataset_parent}/HeavyScript--", ""), '%Y-%m-%d_%H:%M:%S') - ) - - if len(backup_datasets) > self.retention_number: - for old_backup_dataset in backup_datasets[:-self.retention_number]: - snapshot_name = old_backup_dataset.split("/")[-1] - self.logger.info(f"Deleting oldest backup due to retention limit: {snapshot_name}") - try: - self.lifecycle_manager.delete_dataset(old_backup_dataset) - self.logger.debug(f"Removed old backup: {old_backup_dataset}") - except Exception as e: - self.logger.error(f"Failed to delete old backup dataset {old_backup_dataset}: {e}", exc_info=True) - - self.logger.debug(f"Deleting snapshots for: {snapshot_name}") - snapshot_errors = self.snapshot_manager.delete_snapshots(snapshot_name) - if snapshot_errors: - self.logger.error(f"Failed to delete snapshots for {snapshot_name}: {snapshot_errors}") + if snapshot_result.get("success"): + self.logger.info("Snapshot created successfully for backup dataset.") + else: + self.logger.error("Failed to create snapshot for backup dataset.") + for error in snapshot_result.get("errors", []): + self.logger.error(error) def _backup_application_datasets(self): """ @@ -212,12 +272,12 @@ def _backup_application_datasets(self): - applications_dataset (str): The root dataset under which Kubernetes operates. """ datasets_to_ignore = KubeUtils().to_ignore_datasets_on_backup(self.kubeconfig.dataset) - all_datasets = self.lifecycle_manager.list_datasets() - datasets_to_backup = [ds for ds in all_datasets if ds.startswith(self.kubeconfig.dataset) and ds not in datasets_to_ignore] + datasets_to_backup = [ds for ds in self.lifecycle_manager.datasets if ds.startswith(self.kubeconfig.dataset) and ds not in datasets_to_ignore] self.logger.debug(f"Snapshotting datasets: {datasets_to_backup}") - snapshot_errors = self.snapshot_manager.create_snapshots(self.snapshot_name, datasets_to_backup, self.retention_number) - if snapshot_errors: - self.logger.error(f"Failed to create snapshots for application datasets: {snapshot_errors}") - + for dataset in datasets_to_backup: + # Create snapshot for each dataset + snapshot_result = self.snapshot_manager.create_snapshot(self.snapshot_name, dataset) + if not snapshot_result.get("success"): + self.logger.error(f"Failed to create snapshot for dataset {dataset}: {snapshot_result['message']}") \ No newline at end of file diff --git a/functions/backup_restore/backup/export_.py b/functions/backup_restore/backup/export_.py index b558bbdc..75d5e47d 100644 --- a/functions/backup_restore/backup/export_.py +++ b/functions/backup_restore/backup/export_.py @@ -66,25 +66,6 @@ def export(self): self._convert_json_to_yaml(chart_info_dir / 'values.json') self.logger.info("Chart information export completed.") - self._cleanup_old_exports() - - def _cleanup_old_exports(self): - """ - Cleanup old exports if the number of exports exceeds the retention limit. - """ - export_dirs = sorted( - (d for d in self.export_dir.iterdir() if d.is_dir() and d.name.startswith("Export--")), - key=lambda d: datetime.strptime(d.name.replace("Export--", ""), '%Y-%m-%d_%H:%M:%S') - ) - - if len(export_dirs) > self.retention_number: - for old_export_dir in export_dirs[:-self.retention_number]: - self.logger.info(f"Deleting oldest export due to retention limit: {old_export_dir.name}") - try: - shutil.rmtree(old_export_dir) - self.logger.debug(f"Removed old export: {old_export_dir}") - except Exception as e: - self.logger.error(f"Failed to delete old export directory {old_export_dir}: {e}", exc_info=True) def _convert_json_to_yaml(self, json_file: Path): """ diff --git a/functions/backup_restore/backup_manager.py b/functions/backup_restore/backup_manager.py index 1ddf0668..2d1b8fa7 100644 --- a/functions/backup_restore/backup_manager.py +++ b/functions/backup_restore/backup_manager.py @@ -4,14 +4,12 @@ from base_manager import BaseManager from backup.backup import Backup from backup.export_ import ChartInfoExporter -from zfs.snapshot import ZFSSnapshotManager from utils.logger import get_logger class BackupManager(BaseManager): def __init__(self, backup_abs_path: Path): super().__init__(backup_abs_path) self.logger = get_logger() - self.snapshot_manager = ZFSSnapshotManager() self.logger.info(f"BackupManager initialized for {self.backup_abs_path}") def backup_all(self, retention=None): @@ -20,9 +18,9 @@ def backup_all(self, retention=None): backup = Backup(self.backup_abs_path) backup.backup_all() self.logger.info("Backup completed successfully") - self.cleanup_dangling_snapshots() if retention is not None: self.delete_old_backups(retention) + self.cleanup_dangling_snapshots() def export_chart_info(self, retention=None): """Export chart information with optional retention.""" @@ -30,34 +28,17 @@ def export_chart_info(self, retention=None): exporter = ChartInfoExporter(self.backup_abs_path) exporter.export() self.logger.info("Chart information export completed successfully") - self.cleanup_dangling_snapshots() if retention is not None: self.delete_old_exports(retention) def delete_backup_by_name(self, backup_name: str): """Delete a specific backup by name.""" self.logger.info(f"Attempting to delete backup: {backup_name}") - full_backups, export_dirs = self.list_backups() - - for backup in full_backups: - if backup.endswith(backup_name): - self.logger.info(f"Deleting full backup: {backup}") - self.lifecycle_manager.delete_dataset(backup) - self.snapshot_manager.delete_snapshots(backup_name) - self.logger.info(f"Deleted full backup: {backup} and associated snapshots") - self.cleanup_dangling_snapshots() - return True - - for export in export_dirs: - if export.name == backup_name: - self.logger.info(f"Deleting export: {export}") - shutil.rmtree(export) - self.logger.info(f"Deleted export: {export}") - self.cleanup_dangling_snapshots() - return True - - self.logger.info(f"Backup {backup_name} not found") - return False + result = self.delete_backup(backup_name) + if result: + self.logger.info(f"Deleted backup: {backup_name}") + else: + self.logger.info(f"Backup {backup_name} not found") def delete_backup_by_index(self, backup_index: int): """Delete a specific backup by index.""" @@ -67,30 +48,20 @@ def delete_backup_by_index(self, backup_index: int): if 0 <= backup_index < len(all_backups): backup = all_backups[backup_index] - if backup in full_backups: - backup_name = Path(backup).name - self.logger.info(f"Deleting full backup: {backup_name}") - self.lifecycle_manager.delete_dataset(backup) - self.snapshot_manager.delete_snapshots(backup_name) - self.logger.info(f"Deleted full backup: {backup_name} and associated snapshots") - elif backup in export_dirs: - self.logger.info(f"Deleting export: {backup.name}") - shutil.rmtree(backup) - self.logger.info(f"Deleted export: {backup.name}") - self.cleanup_dangling_snapshots() - return True - - self.logger.info(f"Invalid backup index: {backup_index}") - return False + backup_name = Path(backup).name + self.logger.info(f"Deleting backup: {backup_name}") + self.delete_backup(backup_name) + self.logger.info(f"Deleted backup: {backup_name}") + else: + self.logger.info(f"Invalid backup index: {backup_index}") def interactive_delete_backup(self): """Offer an interactive selection to delete backups.""" self.logger.info("Starting interactive backup deletion") selected_backup = self.interactive_select_backup() if selected_backup: - all_backups = self.list_backups()[0] + self.list_backups()[1] - backup_index = all_backups.index(selected_backup) - self.delete_backup_by_index(backup_index) + backup_name = Path(selected_backup).name + self.delete_backup_by_name(backup_name) def display_backups(self): """Display all backups without deleting them.""" @@ -118,31 +89,19 @@ def cleanup_dangling_snapshots(self): full_backups, _ = self.list_backups() full_backup_names = {Path(backup).name for backup in full_backups} - all_snapshots = self.snapshot_manager.list_snapshots() pattern = re.compile(r'HeavyScript--\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2}') - deleted_snapshots = set() - for snapshot in all_snapshots: + for snapshot in self.snapshot_manager.snapshots: match = pattern.search(snapshot) if match: snapshot_name = match.group() - if snapshot_name not in full_backup_names and snapshot_name not in deleted_snapshots: - self.logger.info(f"Deleting dangling snapshot: {snapshot_name}") - self.snapshot_manager.delete_snapshots(snapshot_name) - self.logger.info(f"Deleted snapshot: {snapshot_name}") - deleted_snapshots.add(snapshot_name) - - def delete_old_backups(self, retention): - """Delete backups that exceed the retention limit.""" - self.logger.debug(f"Deleting old backups exceeding retention: {retention}") - full_backups, _ = self.list_backups() - if len(full_backups) > retention: - for backup in full_backups[retention:]: - backup_name = Path(backup).name - self.logger.info(f"Deleting old backup: {backup_name}") - self.lifecycle_manager.delete_dataset(backup) - self.snapshot_manager.delete_snapshots(backup_name) - self.logger.info(f"Deleted old backup: {backup_name} and associated snapshots") + if snapshot_name not in full_backup_names: + self.logger.info(f"Deleting dangling snapshot: {snapshot}") + delete_result = self.snapshot_manager.delete_snapshot(snapshot) + if delete_result["success"]: + self.logger.info(f"Deleted snapshot: {snapshot}") + else: + self.logger.error(f"Failed to delete snapshot {snapshot}: {delete_result['message']}") def delete_old_exports(self, retention): """Delete exports that exceed the retention limit.""" diff --git a/functions/backup_restore/base_manager.py b/functions/backup_restore/base_manager.py index c83610cc..095a6556 100644 --- a/functions/backup_restore/base_manager.py +++ b/functions/backup_restore/base_manager.py @@ -1,3 +1,5 @@ +import re +import shutil from datetime import datetime from pathlib import Path from zfs.lifecycle import ZFSLifecycleManager @@ -14,7 +16,14 @@ def __init__(self, backup_abs_path: Path): self.logger.debug(f"Initializing BaseManager for path: {self.backup_abs_path}") if not self.lifecycle_manager.dataset_exists(self.backup_dataset_parent): - self.lifecycle_manager.create_dataset(self.backup_dataset_parent) + self.lifecycle_manager.create_dataset( + self.backup_dataset_parent, + options={ + "atime": "off", + "compression": "zstd-19", + "recordsize": "1M" + } + ) self.logger.debug(f"Created dataset: {self.backup_dataset_parent}") def _derive_dataset_parent(self): @@ -27,7 +36,7 @@ def list_backups(self): """List all backups in the parent dataset, separated into full backups and exports.""" self.logger.debug("Listing all backups") full_backups = sorted( - (ds for ds in self.lifecycle_manager.list_datasets() if ds.startswith(f"{self.backup_dataset_parent}/HeavyScript--")), + (ds for ds in self.lifecycle_manager.datasets if ds.startswith(f"{self.backup_dataset_parent}/HeavyScript--")), key=lambda ds: datetime.strptime(ds.split('/')[-1].replace("HeavyScript--", ""), '%Y-%m-%d_%H:%M:%S'), reverse=True ) @@ -41,6 +50,47 @@ def list_backups(self): self.logger.debug(f"Found {len(full_backups)} full backups and {len(export_dirs)} export directories") return full_backups, export_dirs + def _list_snapshots_for_backup(self, backup_name: str): + """List all snapshots matching a specific backup name.""" + self.logger.debug(f"Listing snapshots for backup: {backup_name}") + pattern = re.compile(r'HeavyScript--\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2}') + matching_snapshots = [snap for snap in self.snapshot_manager.snapshots if pattern.search(snap) and snap.endswith(f"@{backup_name}")] + self.logger.debug(f"Found {len(matching_snapshots)} snapshots for backup: {backup_name}") + return matching_snapshots + + def delete_backup(self, backup_name: str): + """Delete a specific backup and its associated snapshots by name.""" + full_backups, export_dirs = self.list_backups() + + for backup in full_backups: + if backup.endswith(backup_name): + self.logger.info(f"Deleting full backup: {backup}") + self.lifecycle_manager.delete_dataset(backup) + snapshots = self._list_snapshots_for_backup(backup_name) + for snapshot in snapshots: + self.snapshot_manager.delete_snapshot(snapshot) + self.logger.info(f"Deleted full backup: {backup} and associated snapshots") + return True + + for export in export_dirs: + if export.name == backup_name: + self.logger.info(f"Deleting export: {export}") + shutil.rmtree(export) + self.logger.info(f"Deleted export: {export}") + return True + + self.logger.info(f"Backup {backup_name} not found") + return False + + def delete_old_backups(self, retention: int): + """Delete backups that exceed the retention limit.""" + self.logger.debug(f"Deleting old backups exceeding retention: {retention}") + full_backups, _ = self.list_backups() + if len(full_backups) > retention: + for backup in full_backups[retention:]: + backup_name = Path(backup).name + self.delete_backup(backup_name) + def interactive_select_backup(self, backup_type="all"): """ Offer an interactive selection of backups. diff --git a/functions/backup_restore/charts/api_fetch.py b/functions/backup_restore/charts/api_fetch.py index 04d8adbc..670924fa 100644 --- a/functions/backup_restore/charts/api_fetch.py +++ b/functions/backup_restore/charts/api_fetch.py @@ -1,6 +1,7 @@ import threading +from pathlib import Path from abc import ABC, abstractmethod -from typing import Dict, List +from typing import Dict, List, Union from utils.logger import get_logger from utils.type_check import type_check from utils.singletons import MiddlewareClientManager @@ -254,6 +255,24 @@ def has_pvc(self) -> bool: self.logger.debug(f"Has PVCs for app {self.app_name}: {has_pvc}") return has_pvc + @property + def ix_volumes_dataset(self) -> Union[str, None]: + """ + Get the ixVolumes dataset path for a given application. + + Returns: + - str: The ixVolumes dataset path if it exists, else None. + """ + ix_volumes = self.chart_config.get("ixVolumes", []) + if ix_volumes: + host_path = ix_volumes[0].get("hostPath") + if host_path: + if host_path.startswith("/mnt/"): + host_path = host_path[5:] + dataset_path = str(Path(host_path).parent) + return dataset_path + return None + class APIChartCollection(ChartObserver): @type_check def __init__(self, refresh_on_update: bool = False): diff --git a/functions/backup_restore/charts/backup_fetch.py b/functions/backup_restore/charts/backup_fetch.py index 333774f0..a65e572b 100644 --- a/functions/backup_restore/charts/backup_fetch.py +++ b/functions/backup_restore/charts/backup_fetch.py @@ -57,6 +57,9 @@ def _parse_chart(self, chart_base_dir: Path, app_name: str) -> Dict[str, Union[s 'dataset': '', 'is_cnpg': False, }, + 'config': { + 'ixVolumes': [] + }, 'files': { 'database': None, 'namespace': None, @@ -66,6 +69,7 @@ def _parse_chart(self, chart_base_dir: Path, app_name: str) -> Dict[str, Union[s 'secrets': [], 'crds': [], 'pv_zfs_volumes': [], + 'snapshots': [] } } @@ -73,6 +77,7 @@ def _parse_chart(self, chart_base_dir: Path, app_name: str) -> Dict[str, Union[s kubernetes_objects_dir = chart_base_dir / 'kubernetes_objects' database_dir = chart_base_dir / 'database' versions_dir = chart_base_dir / 'chart_versions' + snapshots_dir = chart_base_dir / 'snapshots' # Parse metadata and config metadata = self._parse_metadata(chart_info_dir) @@ -86,6 +91,8 @@ def _parse_chart(self, chart_base_dir: Path, app_name: str) -> Dict[str, Union[s chart_info['metadata']['dataset'] = metadata.get('dataset', '') chart_info['metadata']['is_cnpg'] = self._is_cnpg(config) + chart_info['config']['ixVolumes'] = config.get('ixVolumes', []) + # Add files chart_info['files']['database'] = self._get_database_file(database_dir, app_name) chart_info['files']['namespace'] = self._get_file(kubernetes_objects_dir / 'namespace' / 'namespace.yaml') @@ -96,6 +103,7 @@ def _parse_chart(self, chart_base_dir: Path, app_name: str) -> Dict[str, Union[s chart_info['files']['crds'] = self._get_files(kubernetes_objects_dir / 'crds') chart_info['files']['pv_zfs_volumes'] = self._get_files(kubernetes_objects_dir / 'pv_zfs_volumes') chart_info['files']['cnpg_pvcs_to_delete'] = self._get_file(kubernetes_objects_dir / 'cnpg_pvcs_to_delete.txt') + chart_info['files']['snapshots'] = self._get_files(snapshots_dir) return chart_info @@ -359,6 +367,26 @@ def get_dataset(self, app_name: str) -> str: """ return self.charts_info.get(app_name, {}).get('metadata', {}).get('dataset', '') + @type_check + def get_ix_volumes_dataset(self, app_name: str) -> Union[str, None]: + """ + Get the ixVolumes dataset path for a given application. + + Returns: + - str: The ixVolumes dataset path if it exists, else None. + """ + ix_volumes = self.charts_info.get(app_name, {}).get("config", {}).get("ixVolumes", []) + if ix_volumes: + host_path = ix_volumes[0].get("hostPath") + if host_path: + # Remove the "/mnt/" prefix + if host_path.startswith("/mnt/"): + host_path = host_path[5:] + # Remove the last directory to get the dataset path + dataset_path = str(Path(host_path).parent) + return dataset_path + return None + def handle_critical_failure(self, app_name: str) -> None: """ Remove the application from all_releases and other relevant lists. @@ -390,6 +418,10 @@ def _serialize_charts_info(self) -> dict: k: str(v) if isinstance(v, Path) else ( [str(i) for i in v] if v is not None else 'None' ) for k, v in info['files'].items() + }, + 'config': { + k: v if not isinstance(v, Path) else str(v) + for k, v in info.get('config', {}).items() } } for app, info in self.charts_info.items() - } + } \ No newline at end of file diff --git a/functions/backup_restore/database/base.py b/functions/backup_restore/database/base.py index b93e1e5f..bd42def5 100644 --- a/functions/backup_restore/database/base.py +++ b/functions/backup_restore/database/base.py @@ -33,7 +33,7 @@ def __init__(self, app_name: str): self.dump_command = None self.error = None - # Fetch database name and user if needed + # # Fetch database name and user if needed if self.chart_info.chart_name != "immich": self.database_name = self.fetch_database_name() self.database_user = self.fetch_database_user() or self.database_name diff --git a/functions/backup_restore/database/restore.py b/functions/backup_restore/database/restore.py index 069021ee..b25b4cd3 100644 --- a/functions/backup_restore/database/restore.py +++ b/functions/backup_restore/database/restore.py @@ -201,7 +201,7 @@ def _execute_restore_command(self, retries=3, wait=5) -> Dict[str, str]: return result # Check for deadlock and retry if detected - if 'deadlock detected' in stderr: + if b'deadlock detected' in stderr: message = f"Deadlock detected. Retrying {attempt + 1}/{retries}..." self.logger.warning(message) result["message"] = f"{result['message']} {message}" @@ -211,4 +211,4 @@ def _execute_restore_command(self, retries=3, wait=5) -> Dict[str, str]: result["message"] = f"{result['message']} Restore failed after retrying." self.logger.error(result["message"]) - return result + return result \ No newline at end of file diff --git a/functions/backup_restore/kube/resources_restore.py b/functions/backup_restore/kube/resources_restore.py index fa688a84..39c0ee79 100644 --- a/functions/backup_restore/kube/resources_restore.py +++ b/functions/backup_restore/kube/resources_restore.py @@ -93,69 +93,69 @@ def restore_namespace(self, namespace_file: Path) -> bool: return False @type_check - def restore_secrets(self, secret_files: List[Path]) -> list: + def restore_secret(self, secret_file: Path) -> dict: """ - Restore secrets for the application from its backup directory. + Restore a single secret for the application from its backup directory. Parameters: - - secret_files (List[Path]): List of secret file paths to restore. + - secret_file (Path): Path of the secret file to restore. Returns: - - list: List of files that failed to restore. If everything succeeds, returns an empty list. + - dict: Result containing status and message. """ - self.logger.debug("Restoring secrets from provided file list...") - failures = [] + result = { + "success": False, + "message": "" + } - if not secret_files: - self.logger.warning("No secret files provided.") - return [] - - for secret_file in secret_files: - self.logger.debug(f"Restoring secret from file: {secret_file}") - try: - with open(secret_file, 'r') as f: - secret_body = yaml.safe_load(f) - secret_body['metadata'].pop('resourceVersion', None) - secret_body['metadata'].pop('uid', None) - secret_body['metadata']['annotations'] = secret_body['metadata'].get('annotations', {}) - secret_body['metadata']['annotations']['kubectl.kubernetes.io/last-applied-configuration'] = yaml.dump(secret_body) - with open(secret_file, 'w') as f: - yaml.dump(secret_body, f) - restoreResult = run_command(f"k3s kubectl apply -f \"{secret_file}\" --validate=false") - if restoreResult.is_success(): - self.logger.debug(f"Restored {secret_file.name}") - else: - self.logger.error(f"Failed to restore {secret_file.name}: {restoreResult.get_error()}") - failures.append(secret_file.name) - except Exception as e: - self.logger.error(f"Error processing secret file {secret_file}: {e}") - failures.append(secret_file.name) - return failures + self.logger.debug(f"Restoring secret from file: {secret_file}") + try: + with open(secret_file, 'r') as f: + secret_body = yaml.safe_load(f) + secret_body['metadata'].pop('resourceVersion', None) + secret_body['metadata'].pop('uid', None) + secret_body['metadata']['annotations'] = secret_body['metadata'].get('annotations', {}) + secret_body['metadata']['annotations']['kubectl.kubernetes.io/last-applied-configuration'] = yaml.dump(secret_body) + with open(secret_file, 'w') as f: + yaml.dump(secret_body, f) + restore_result = run_command(f"k3s kubectl apply -f \"{secret_file}\" --validate=false") + if restore_result.is_success(): + self.logger.debug(f"Restored {secret_file.name}") + result["success"] = True + result["message"] = f"Restored {secret_file.name} successfully." + else: + self.logger.error(f"Failed to restore {secret_file.name}: {restore_result.get_error()}") + result["message"] = f"Failed to restore {secret_file.name}: {restore_result.get_error()}" + except Exception as e: + self.logger.error(f"Error processing secret file {secret_file}: {e}") + result["message"] = f"Error processing secret file {secret_file}: {e}" + + return result @type_check - def restore_crd(self, crd_files: List[Path]) -> list: + def restore_crd(self, crd_file: Path) -> dict: """ - Restore CRDs for the application from its backup directory. + Restore a single CRD for the application from its backup directory. Parameters: - - crd_files (List[Path]): List of CRD file paths to restore. + - crd_file (Path): Path of the CRD file to restore. Returns: - - list: List of files that failed to restore. If everything succeeds, returns an empty list. + - dict: Result containing status and message. """ - self.logger.debug("Restoring CRDs from provided file list...") - failures = [] - - if not crd_files: - self.logger.warning("No CRD files provided.") - return [] - - for file in crd_files: - self.logger.debug(f"Restoring CRD from file: {file}") - restoreResult = run_command(f"k3s kubectl apply -f \"{file}\" --validate=false") - if restoreResult.is_success(): - self.logger.debug(f"Restored {file.name}") - else: - self.logger.error(f"Failed to restore {file.name}: {restoreResult.get_error()}") - failures.append(file.name) - return failures \ No newline at end of file + result = { + "success": False, + "message": "" + } + + self.logger.debug(f"Restoring CRD from file: {crd_file}") + restore_result = run_command(f"k3s kubectl apply -f \"{crd_file}\" --validate=false") + if restore_result.is_success(): + self.logger.debug(f"Restored {crd_file.name}") + result["success"] = True + result["message"] = f"Restored {crd_file.name} successfully." + else: + self.logger.error(f"Failed to restore {crd_file.name}: {restore_result.get_error()}") + result["message"] = f"Failed to restore {crd_file.name}: {restore_result.get_error()}" + + return result \ No newline at end of file diff --git a/functions/backup_restore/restore/restore_all.py b/functions/backup_restore/restore/restore_all.py index d7bca1a7..103ca416 100644 --- a/functions/backup_restore/restore/restore_all.py +++ b/functions/backup_restore/restore/restore_all.py @@ -11,6 +11,10 @@ def __init__(self, backup_dir: Path): super().__init__(backup_dir) def restore(self): + if not self.chart_info.all_releases: + self.logger.error("No releases found in backup directory.") + return + """Perform the entire restore process.""" self.logger.info("Building Restore Plan\n" "----------------------") @@ -20,10 +24,6 @@ def restore(self): self.logger.error(str(e)) return - if not self.chart_info.all_releases: - self.logger.error("No releases found in backup directory.") - return - self.logger.info("Performing Initial Kubernetes Operations\n" "----------------------------------------") try: @@ -40,7 +40,7 @@ def restore(self): self._rollback_volumes(app_name) except Exception as e: self.logger.error(f"Failed to rollback snapshots for {app_name}: {e}\n") - self.failures[app_name].append(f"Failed to rollback volume snapshots: {e}") + self.failures.setdefault(app_name, []).append(f"Failed to rollback volume snapshots: {e}") self.logger.info("\nStarting Kubernetes Services\n" "----------------------------") @@ -57,7 +57,7 @@ def restore(self): CatalogRestoreManager(self.catalog_dir).restore() except Exception as e: self.logger.warning(f"Failed to restore catalog: {e}") - self.failures["Catalog"].append(f"Restoration failed: {e}") + self.failures.setdefault(app_name, []).append(f"Restoration failed: {e}") if self.chart_info.apps_with_crds: self.logger.info("\nRestoring Custom Resource Definitions\n" @@ -90,7 +90,7 @@ def restore(self): continue except Exception as e: self.logger.error(f"Failed to restore {app_name}: {e}\n") - self.failures[app_name].append(f"Restoration failed: {e}") + self.failures.setdefault(app_name, []).append(f"Restoration failed: {e}") continue self.logger.info("") @@ -115,12 +115,12 @@ def restore(self): db_manager = RestoreCNPGDatabase(app_name, self.chart_info.get_file(app_name, "database")) result = db_manager.restore() if not result["success"]: - self.failures[app_name].append(result["message"]) + self.failures.setdefault(app_name, []).append(result["message"]) else: self.logger.info(result["message"]) except Exception as e: self.logger.error(f"Failed to restore database for {app_name}: {e}") - self.failures[app_name].append(f"Database restore failed: {e}") + self.failures.setdefault(app_name, []).append(f"Database restore failed: {e}") self._log_failures() @@ -157,7 +157,7 @@ def _initial_kubernetes_setup(self): try: self.logger.info(f"Rolling back snapshots under {self.kube_config_reader.dataset}") - self.snapshot_manager.rollback_all_snapshots(self.snapshot_name, self.kube_config_reader.dataset) + self.snapshot_manager.rollback_all_snapshots(self.snapshot_name, self.kube_config_reader.dataset, recursive=True, force=True) except Exception as e: self.logger.error(f"Failed to rollback snapshots: {e}") raise Exception("Initial Kubernetes setup failed.") diff --git a/functions/backup_restore/restore/restore_base.py b/functions/backup_restore/restore/restore_base.py index 6a8ab70c..3139aa6a 100644 --- a/functions/backup_restore/restore/restore_base.py +++ b/functions/backup_restore/restore/restore_base.py @@ -1,4 +1,5 @@ import os +import yaml from pathlib import Path from collections import defaultdict from app.app_manager import AppManager @@ -9,6 +10,7 @@ from kube.resources_restore import KubeRestoreResources from zfs.snapshot import ZFSSnapshotManager from zfs.lifecycle import ZFSLifecycleManager +from utils.shell import run_command from utils.logger import setup_global_logger, set_logger from utils.singletons import MiddlewareClientManager from utils.type_check import type_check @@ -33,9 +35,9 @@ def __init__(self, backup_dir: Path): self.backup_chart_dir = self.backup_dir / "charts" self.catalog_dir = self.backup_dir / "catalog" - print("Rolling back snapshot for backup dataset, ensuring integrity...") + self.logger.info("Rolling back snapshot for backup dataset, ensuring integrity...") self.snapshot_manager = ZFSSnapshotManager() - self.snapshot_manager.rollback_all_snapshots(self.snapshot_name, self.backup_dataset) + self.snapshot_manager.rollback_all_snapshots(self.snapshot_name, self.backup_dataset, recursive=True, force=True) self.middleware = MiddlewareClientManager.fetch() self.kubernetes_config_file = self.backup_dir / "kubernetes_config" / "kubernetes_config.json" @@ -82,31 +84,132 @@ def _log_failures(self): def _handle_critical_failure(self, app_name: str, error: str): """Handle a critical failure that prevents further restoration.""" self.logger.error(f"Critical error for {app_name}: {error}") - self.failures[app_name].append(error) + self.failures.setdefault(app_name, []).append(error) if app_name not in self.critical_failures: self.critical_failures.append(app_name) self.chart_info.handle_critical_failure(app_name) def _restore_crds(self, app_name): + """ + Restore CRDs for the specified application. + + Parameters: + - app_name (str): The name of the application to restore CRDs for. + """ self.logger.info(f"Restoring CRDs for {app_name}...") - crd_failures = self.restore_resources.restore_crd(self.chart_info.get_file(app_name, "crds")) - if crd_failures: - self.failures[app_name].extend(crd_failures) + crd_files = self.chart_info.get_file(app_name, "crds") + + for crd_file in crd_files: + self.logger.debug(f"Restoring CRD from file: {crd_file}") + restore_result = self.restore_resources.restore_crd(crd_file) + if not restore_result["success"]: + self.failures.setdefault(app_name, []).append(restore_result["message"]) + self.logger.error(f"Failed to restore CRD from {crd_file}: {restore_result['message']}") @type_check def _rollback_volumes(self, app_name: str): - """Rollback persistent volumes.""" + """ + Rollback persistent volumes or restore from backups if necessary. + + Parameters: + - app_name (str): The name of the application to restore volumes for. + """ + self.logger.debug(f"Starting rollback process for {app_name}...") + + def set_mountpoint_legacy(dataset_path): + command = f"/sbin/zfs set mountpoint=legacy \"{dataset_path}\"" + result = run_command(command, suppress_output=True) + if result.is_success(): + self.logger.debug(f"Set mountpoint to legacy for {dataset_path}") + else: + message = f"Failed to set mountpoint to legacy for {dataset_path}: {result.get_error()}" + self.failures.setdefault(app_name, []).append(message) + self.logger.error(message) + + def rollback_snapshot(snapshot: str, name: str, volume_type: str): + self.logger.info(f"{app_name}: rolling back {volume_type} {name}...") + rollback_result = self.snapshot_manager.rollback_snapshot(snapshot, recursive=True, force=True) + if not rollback_result.get("success", False): + self.failures.setdefault(app_name, []).append(rollback_result.get("message", "Unknown error")) + self.logger.error(rollback_result.get("message", "Unknown error")) + + def restore_snapshot(snapshot: str, name: str, volume_type: str): + self.logger.info(f"{app_name}: restoring {volume_type} {name} from backup...") + snapshot_files = self.chart_info.get_file(app_name, "snapshots") + if snapshot_files: + for snapshot_file in snapshot_files: + snapshot_file_path = snapshot_file + snapshot_file_name = snapshot_file.stem.replace('%%', '/') + dataset_path, _ = snapshot_file_name.split('@', 1) + parent_dataset_path = '/'.join(dataset_path.split('/')[:-1]) + + if not self.zfs_manager.dataset_exists(parent_dataset_path): + self.logger.debug(f"Parent dataset {parent_dataset_path} does not exist. Creating it...") + if not self.zfs_manager.create_dataset(parent_dataset_path): + message = f"Failed to create parent dataset {parent_dataset_path}" + self.failures.setdefault(app_name, []).append(message) + self.logger.error(message) + continue + + if snapshot_file_name == snapshot: + restore_result = self.snapshot_manager.zfs_receive(snapshot_file_path, dataset_path, decompress=True) + if not restore_result["success"]: + self.failures.setdefault(app_name, []).append(restore_result["message"]) + self.logger.error(f"Failed to restore snapshot from {snapshot_file_path} for {app_name}: {restore_result['message']}") + else: + message = f"No snapshot files found for {app_name}" + self.failures.setdefault(app_name, []).append(message) + self.logger.error(message) + + # Process PV files pv_files = self.chart_info.get_file(app_name, "pv_zfs_volumes") + self.logger.debug(f"Found PV files for {app_name}: {pv_files}") pv_only_files = [file for file in pv_files if file.name.endswith('-pv.yaml')] - if pv_only_files: - self.logger.info(f"Rolling back ZFS snapshots for {app_name}...") - for pv_file in pv_only_files: - result = self.snapshot_manager.rollback_persistent_volume(self.snapshot_name, pv_file) - if not result["success"]: - self.failures[app_name].append(result["message"]) - self.logger.error(f"Failed to rollback {pv_file} for {app_name}: {result['message']}") + + for pv_file in pv_only_files: + try: + with pv_file.open('r') as file: + pv_data = yaml.safe_load(file) + self.logger.debug(f"Loaded PV data from {pv_file}: {pv_data}") + pool_name = pv_data['spec']['csi']['volumeAttributes']['openebs.io/poolname'] + volume_handle = pv_data['spec']['csi']['volumeHandle'] + dataset_path = f"{pool_name}/{volume_handle}" + snapshot = f"{dataset_path}@{self.snapshot_name}" + pv_name = pv_file.stem + + self.logger.debug(f"Constructed snapshot path: {snapshot}") + + if self.snapshot_manager.snapshot_exists(snapshot): + rollback_snapshot(snapshot, pv_name, "PVC") + elif any(snap.stem.replace('%%', '/') == snapshot for snap in self.chart_info.get_file(app_name, "snapshots") or []): + restore_snapshot(snapshot, pv_name, "PVC") else: - self.logger.debug(result["message"]) + message = f"Snapshot {snapshot} for PVC {pv_name} cannot be rolled back or restored from backup." + self.failures.setdefault(app_name, []).append(message) + self.logger.error(message) + continue + + set_mountpoint_legacy(dataset_path) + except Exception as e: + message = f"Failed to process PV file {pv_file}: {e}" + self.logger.error(message, exc_info=True) + self.failures.setdefault(app_name, []).append(message) + + # Process ix_volumes + ix_volumes_dataset = self.chart_info.get_ix_volumes_dataset(app_name) + if ix_volumes_dataset: + self.logger.debug(f"Found ix_volumes dataset for {app_name}: {ix_volumes_dataset}") + snapshot = f"{ix_volumes_dataset}@{self.snapshot_name}" + self.logger.debug(f"Constructed ix_volumes snapshot path: {snapshot}") + + if self.snapshot_manager.snapshot_exists(snapshot): + rollback_snapshot(snapshot, "ix_volumes", "ix_volumes") + elif any(snap.stem.replace('%%', '/') == snapshot for snap in self.chart_info.get_file(app_name, "snapshots") or []): + restore_snapshot(snapshot, "ix_volumes", "ix_volumes") + else: + message = f"Snapshot {snapshot} for ix_volumes cannot be rolled back or restored from backup." + self.failures.setdefault(app_name, []).append(message) + self.logger.error(message) @type_check def _restore_application(self, app_name: str) -> bool: @@ -155,12 +258,14 @@ def _restore_application(self, app_name: str) -> bool: self.logger.error(f"Critical failure in restoring {app_name}, skipping further processing.\n") return False - app_secrets = self.chart_info.get_file(app_name, "secrets") - if app_secrets: + secret_files = self.chart_info.get_file(app_name, "secrets") + if secret_files: self.logger.info(f"Restoring secrets for {app_name}...") - secret_failures = self.restore_resources.restore_secrets(app_secrets) - if secret_failures: - self.failures[app_name].extend(secret_failures) + for secret_file in secret_files: + secret_result = self.restore_resources.restore_secret(secret_file) + if not secret_result.get("success", False): + self.failures.setdefault(app_name, []).append(secret_result.get("message")) + self.logger.error(secret_result.get("message")) if app_name not in self.create_list: try: diff --git a/functions/backup_restore/restore_manager.py b/functions/backup_restore/restore_manager.py index b09c2e85..fb98e4f2 100644 --- a/functions/backup_restore/restore_manager.py +++ b/functions/backup_restore/restore_manager.py @@ -39,8 +39,7 @@ def remove_newer_backups(self, backup_name: str): for backup in newer_backups: newer_backup_name = Path(backup).name self.logger.info(f"Deleting newer backup due to restore: {newer_backup_name}") - self.lifecycle_manager.delete_dataset(backup) - self.snapshot_manager.delete_snapshots(newer_backup_name) + self.delete_backup(newer_backup_name) self.logger.info(f"Deleted backup: {newer_backup_name} and associated snapshots.") return True diff --git a/functions/backup_restore/zfs/cache.py b/functions/backup_restore/zfs/cache.py index bf158e25..d75e383c 100644 --- a/functions/backup_restore/zfs/cache.py +++ b/functions/backup_restore/zfs/cache.py @@ -12,7 +12,7 @@ class ZFSCache: _instance = None _lock = threading.Lock() _datasets = set() - _snapshots = set() + _snapshots = {} def __new__(cls): if not cls._instance: @@ -47,22 +47,39 @@ def _load_datasets(self) -> set: self.logger.error("Failed to load datasets.") return set() - def _load_snapshots(self) -> set: + def _load_snapshots(self) -> dict: """ - Load all ZFS snapshots. + Load all ZFS snapshots and their refer sizes. Returns: - set: A set of all ZFS snapshots. + dict: A dictionary of all ZFS snapshots with their details. """ - command = "/sbin/zfs list -H -t snapshot -o name" + command = "/sbin/zfs list -H -t snapshot -o name,refer" result = run_command(command, suppress_output=True) if result.is_success(): - snapshots = set(result.get_output().split('\n')) + snapshots = {} + for line in result.get_output().split('\n'): + if line: + parts = line.rsplit('\t', 1) + snapshot_name = parts[0] + refer_size = self._convert_size_to_bytes(parts[1]) + snapshots[snapshot_name] = {"refer": refer_size} self.logger.debug(f"Loaded {len(snapshots)} snapshots.") return snapshots else: self.logger.error("Failed to load snapshots.") - return set() + return {} + + def _convert_size_to_bytes(self, size_str): + size_units = {"K": 1024, "M": 1024**2, "G": 1024**3, "T": 1024**4} + try: + if size_str[-1] in size_units: + return int(float(size_str[:-1]) * size_units[size_str[-1]]) + else: + return int(size_str) + except ValueError: + self.logger.error(f"Invalid size string: {size_str}") + return 0 def hard_refresh(self): """ @@ -71,7 +88,7 @@ def hard_refresh(self): ZFSCache._datasets = self._load_datasets() ZFSCache._snapshots = self._load_snapshots() - def get_snapshots_for_dataset(self, dataset: str) -> set: + def get_snapshots_for_dataset(self, dataset: str) -> dict: """ Get all snapshots associated with a specific dataset. @@ -79,10 +96,10 @@ def get_snapshots_for_dataset(self, dataset: str) -> set: dataset (str): The name of the dataset. Returns: - set: A set of snapshots associated with the dataset. + dict: A dictionary of snapshots associated with the dataset. """ with self._lock: - return {snap for snap in ZFSCache._snapshots if snap.startswith(dataset + '@')} + return {snap: details for snap, details in ZFSCache._snapshots.items() if snap.startswith(dataset + '@')} @property def datasets(self) -> set: @@ -107,23 +124,23 @@ def datasets(self, value: set): ZFSCache._datasets = value @property - def snapshots(self) -> set: + def snapshots(self) -> dict: """ Get the current set of snapshots. Returns: - set: The current set of snapshots. + dict: The current set of snapshots. """ with self._lock: return ZFSCache._snapshots @snapshots.setter - def snapshots(self, value: set): + def snapshots(self, value: dict): """ Set the current set of snapshots. Parameters: - value (set): The new set of snapshots. + value (dict): The new set of snapshots. """ with self._lock: ZFSCache._snapshots = value @@ -153,16 +170,17 @@ def remove_dataset(self, dataset: str): self.logger.debug(f"Removed dataset: {dataset}") @type_check - def add_snapshot(self, snapshot: str): + def add_snapshot(self, snapshot: str, refer_size: int): """ Add a snapshot to the cache. Parameters: snapshot (str): The snapshot to add. + refer_size (int): The refer size of the snapshot. """ with self._lock: - ZFSCache._snapshots.add(snapshot) - self.logger.debug(f"Added snapshot: {snapshot}") + ZFSCache._snapshots[snapshot] = {"refer": refer_size} + self.logger.debug(f"Added snapshot: {snapshot} with refer size: {refer_size}") @type_check def remove_snapshot(self, snapshot: str): @@ -173,5 +191,6 @@ def remove_snapshot(self, snapshot: str): snapshot (str): The snapshot to remove. """ with self._lock: - ZFSCache._snapshots.discard(snapshot) - self.logger.debug(f"Removed snapshot: {snapshot}") + if snapshot in ZFSCache._snapshots: + del ZFSCache._snapshots[snapshot] + self.logger.debug(f"Removed snapshot: {snapshot}") \ No newline at end of file diff --git a/functions/backup_restore/zfs/lifecycle.py b/functions/backup_restore/zfs/lifecycle.py index fd0b75cb..76a12db1 100644 --- a/functions/backup_restore/zfs/lifecycle.py +++ b/functions/backup_restore/zfs/lifecycle.py @@ -1,17 +1,13 @@ from zfs.cache import ZFSCache from utils.shell import run_command from utils.type_check import type_check -from utils.logger import get_logger +from .cache import ZFSCache -class ZFSLifecycleManager: +class ZFSLifecycleManager(ZFSCache): """ Class responsible for lifecycle operations of ZFS datasets, such as checking existence, creating, and deleting datasets. """ - def __init__(self): - self.logger = get_logger() - self.cache = ZFSCache() - @type_check def dataset_exists(self, dataset: str) -> bool: """ @@ -23,7 +19,7 @@ def dataset_exists(self, dataset: str) -> bool: Returns: - bool: True if the dataset exists, False otherwise. """ - exists = dataset in self.cache.datasets + exists = dataset in self.datasets self.logger.debug(f"Dataset \"{dataset}\" exists: {exists}") return exists @@ -51,7 +47,7 @@ def create_dataset(self, dataset: str, options: dict = None) -> bool: result = run_command(command, suppress_output=True) if result.is_success(): - self.cache.add_dataset(dataset) + self.add_dataset(dataset) self.logger.debug(f"Dataset \"{dataset}\" created successfully.") return True else: @@ -60,17 +56,26 @@ def create_dataset(self, dataset: str, options: dict = None) -> bool: @type_check def delete_dataset(self, dataset: str) -> bool: + """ + Delete a ZFS dataset, including all its snapshots. + + Parameters: + - dataset (str): The name of the dataset to delete. + + Returns: + - bool: True if the dataset was successfully deleted, False otherwise. + """ if not self.dataset_exists(dataset): self.logger.warning(f"Dataset \"{dataset}\" does not exist. Cannot delete.") return False # Delete all associated snapshots first - snapshots_to_delete = self.cache.get_snapshots_for_dataset(dataset) + snapshots_to_delete = self.get_snapshots_for_dataset(dataset) for snapshot in snapshots_to_delete: command = f"/sbin/zfs destroy \"{snapshot}\"" result = run_command(command, suppress_output=True) if result.is_success(): - self.cache.remove_snapshot(snapshot) + self.remove_snapshot(snapshot) self.logger.debug(f"Snapshot \"{snapshot}\" deleted successfully.") else: self.logger.error(f"Failed to delete snapshot \"{snapshot}\": {result.get_error()}") @@ -80,21 +85,21 @@ def delete_dataset(self, dataset: str) -> bool: command = f"/sbin/zfs destroy -r \"{dataset}\"" result = run_command(command, suppress_output=True) if result.is_success(): - self.cache.remove_dataset(dataset) + self.remove_dataset(dataset) self.logger.debug(f"Dataset \"{dataset}\" deleted successfully.") return True else: self.logger.error(f"Failed to delete dataset \"{dataset}\": {result.get_error()}") return False - @type_check - def list_datasets(self) -> list: + @property + def datasets(self) -> list: """ - List all cached ZFS datasets. + Property to get the current list of cached ZFS datasets. Returns: - list: A list of all dataset names. """ - datasets = list(self.cache.datasets) + datasets = list(self._datasets) self.logger.debug(f"Listing all datasets: {datasets}") return datasets diff --git a/functions/backup_restore/zfs/snapshot.py b/functions/backup_restore/zfs/snapshot.py index 251492d3..cb41c92b 100644 --- a/functions/backup_restore/zfs/snapshot.py +++ b/functions/backup_restore/zfs/snapshot.py @@ -1,134 +1,112 @@ -import re -import yaml from pathlib import Path -from datetime import datetime - -from zfs.cache import ZFSCache from utils.shell import run_command from utils.type_check import type_check -from utils.logger import get_logger +from .cache import ZFSCache -class ZFSSnapshotManager: +class ZFSSnapshotManager(ZFSCache): """ Class responsible for managing ZFS snapshots, including creation, deletion, and rollback operations. """ @type_check - def __init__(self): - """ - Initialize the ZFSSnapshotManager class. - """ - self.logger = get_logger() - self.cache = ZFSCache() - - @type_check - def _cleanup_snapshots(self, dataset_paths: list, retention_number: int) -> list: + def create_snapshot(self, snapshot_name: str, dataset: str) -> dict: """ - Cleanup older snapshots, retaining only a specified number of the most recent ones. + Create a single ZFS snapshot for the specified dataset. Parameters: - - dataset_paths (list): List of paths to datasets. - - retention_number (int): Number of recent snapshots to retain. + - snapshot_name (str): Name of the snapshot. + - dataset (str): Dataset to create the snapshot for. Returns: - - list: A list of error messages, if any. - """ - errors = [] - for path in dataset_paths: - if path not in self.cache.datasets: - error_msg = f"Dataset {path} does not exist." - self.logger.error(error_msg) - errors.append(error_msg) - continue - - matching_snapshots = [snap for snap in self.cache.snapshots if snap.startswith(f"{path}@HeavyScript--")] - matching_snapshots.sort(key=lambda x: datetime.strptime(re.search(r'HeavyScript--\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2}', x).group(), "HeavyScript--%Y-%m-%d_%H:%M:%S")) - - self.logger.debug(f"Found {len(matching_snapshots)} snapshots for dataset path {path}.") - - if len(matching_snapshots) > retention_number: - snapshots_to_delete = matching_snapshots[:-retention_number] - for snapshot in snapshots_to_delete: - delete_command = f"/sbin/zfs destroy \"{snapshot}\"" - delete_result = run_command(delete_command) - if delete_result.is_success(): - self.cache.remove_snapshot(snapshot) - self.logger.debug(f"Deleted snapshot: {snapshot}") - else: - error_msg = f"Failed to delete snapshot {snapshot}: {delete_result.get_error()}" - self.logger.error(error_msg) - errors.append(error_msg) - return errors + - dict: Result containing status and message. + """ + result = { + "success": False, + "message": "" + } + + if dataset not in self.datasets: + result["message"] = f"Dataset {dataset} does not exist." + self.logger.error(result["message"]) + return result + + snapshot_full_name = f"{dataset}@{snapshot_name}" + command = f"/sbin/zfs snapshot \"{snapshot_full_name}\"" + snapshot_result = run_command(command) + if snapshot_result.is_success(): + refer_size = self.get_snapshot_refer_size(snapshot_full_name) + self.add_snapshot(snapshot_full_name, refer_size) + self.logger.debug(f"Created snapshot: {snapshot_full_name} with refer size: {refer_size}") + result["success"] = True + result["message"] = f"Snapshot {snapshot_full_name} created successfully." + else: + result["message"] = f"Failed to create snapshot for {snapshot_full_name}: {snapshot_result.get_error()}" + self.logger.error(result["message"]) + + return result @type_check - def create_snapshots(self, snapshot_name, dataset_paths: list, retention_number: int) -> list: + def get_snapshot_refer_size(self, snapshot: str) -> int: """ - Create snapshots for specified ZFS datasets and cleanup old snapshots. + Get the refer size of a ZFS snapshot. Parameters: - - snapshot_name (str): Name of the snapshot. - - dataset_paths (list): List of paths to create snapshots for. - - retention_number (int): Number of recent snapshots to retain. + - snapshot (str): The name of the snapshot. Returns: - - list: A list of error messages, if any. - """ - errors = [] - for path in dataset_paths: - if path not in self.cache.datasets: - error_msg = f"Dataset {path} does not exist." - self.logger.error(error_msg) - errors.append(error_msg) - continue - - snapshot_full_name = f"{path}@{snapshot_name}" - command = f"/sbin/zfs snapshot \"{snapshot_full_name}\"" - result = run_command(command) + - int: The refer size of the snapshot in bytes. + """ + try: + result = run_command(f"/sbin/zfs list -H -o refer \"{snapshot}\"") if result.is_success(): - self.cache.add_snapshot(snapshot_full_name) - self.logger.debug(f"Created snapshot: {snapshot_full_name}") + size_str = result.get_output() + size = self._convert_size_to_bytes(size_str) + return size else: - error_msg = f"Failed to create snapshot for {snapshot_full_name}: {result.get_error()}" - self.logger.error(error_msg) - errors.append(error_msg) - - cleanup_errors = self._cleanup_snapshots(dataset_paths, retention_number) - errors.extend(cleanup_errors) - return errors + self.logger.error(f"Failed to get refer size for snapshot {snapshot}: {result.get_error()}") + return 0 + except Exception as e: + self.logger.error(f"Exception occurred while getting refer size for snapshot {snapshot}: {e}") + return 0 @type_check - def delete_snapshots(self, snapshot_name: str) -> list: + def delete_snapshot(self, snapshot: str) -> dict: """ - Delete all snapshots matching a specific name. + Delete a single ZFS snapshot. Parameters: - - snapshot_name (str): The name of the snapshot to delete. + - snapshot (str): The name of the snapshot to delete. Returns: - - list: A list of error messages, if any. - """ - errors = [] - matching_snapshots = [snap for snap in self.cache.snapshots if snap.endswith(f"@{snapshot_name}")] - for snapshot in matching_snapshots: - delete_command = f"/sbin/zfs destroy \"{snapshot}\"" - delete_result = run_command(delete_command) - if delete_result.is_success(): - self.cache.remove_snapshot(snapshot) - self.logger.debug(f"Deleted snapshot: {snapshot}") - else: - error_msg = f"Failed to delete snapshot {snapshot}: {delete_result.get_error()}" - self.logger.error(error_msg) - errors.append(error_msg) - return errors + - dict: Result containing status and message. + """ + result = { + "success": False, + "message": "" + } + + delete_command = f"/sbin/zfs destroy \"{snapshot}\"" + delete_result = run_command(delete_command) + if delete_result.is_success(): + self.remove_snapshot(snapshot) + self.logger.debug(f"Deleted snapshot: {snapshot}") + result["success"] = True + result["message"] = f"Snapshot {snapshot} deleted successfully." + else: + result["message"] = f"Failed to delete snapshot {snapshot}: {delete_result.get_error()}" + self.logger.error(result["message"]) + + return result @type_check - def rollback_persistent_volume(self, snapshot_name: str, pv_file: Path) -> dict: + def rollback_snapshot(self, snapshot: str, recursive: bool = False, force: bool = False) -> dict: """ - Restore a PV from a backup YAML file and rollback to a specified snapshot. + Rollback a single ZFS snapshot. Parameters: - - snapshot_name (str): Name of the snapshot to rollback to. - - pv_file (Path): Path to the PV file. + - snapshot (str): The name of the snapshot to rollback. + - recursive (bool): Whether to rollback recursively. Default is False. + - force (bool): Whether to force the rollback. Default is False. Returns: - dict: Result containing status and message. @@ -138,72 +116,162 @@ def rollback_persistent_volume(self, snapshot_name: str, pv_file: Path) -> dict: "message": "" } - try: - with pv_file.open('r') as file: - pv_data = yaml.safe_load(file) - - pool_name = pv_data['spec']['csi']['volumeAttributes']['openebs.io/poolname'] - volume_handle = pv_data['spec']['csi']['volumeHandle'] - dataset_path = f"{pool_name}/{volume_handle}" + dataset_path, snapshot_name = snapshot.split('@', 1) + if dataset_path not in self.datasets: + result["message"] = f"Dataset {dataset_path} does not exist. Cannot restore snapshot." + self.logger.warning(result["message"]) + return result - if dataset_path not in self.cache.datasets: - message = f"Dataset {dataset_path} does not exist. Cannot restore snapshot." - self.logger.warning(message) - result["message"] = message - return result - - rollback_command = f"/sbin/zfs rollback -r -f \"{dataset_path}@{snapshot_name}\"" - rollback_result = run_command(rollback_command) - if rollback_result.is_success(): - message = f"Successfully rolled back {dataset_path} to snapshot {snapshot_name}." - self.logger.debug(message) - result["success"] = True - result["message"] = message - else: - message = f"Failed to rollback {dataset_path} to snapshot {snapshot_name}: {rollback_result.get_error()}" - self.logger.error(message) - result["message"] = message - except Exception as e: - message = f"Failed to process PV file {pv_file}: {e}" - self.logger.error(message, exc_info=True) - result["message"] = message + rollback_command = f"/sbin/zfs rollback" + if recursive: + rollback_command += " -r" + if force: + rollback_command += " -f" + rollback_command += f" \"{snapshot}\"" + + rollback_result = run_command(rollback_command) + if rollback_result.is_success(): + result["success"] = True + result["message"] = f"Successfully rolled back {dataset_path} to snapshot {snapshot_name}." + self.logger.debug(result["message"]) + else: + result["message"] = f"Failed to rollback {dataset_path} to snapshot {snapshot_name}: {rollback_result.get_error()}" + self.logger.error(result["message"]) return result - @type_check - def list_snapshots(self) -> list: + @property + def snapshots(self) -> list: """ List all cached ZFS snapshots. Returns: - list: A list of all snapshot names. """ - snapshots = list(self.cache.snapshots) - self.logger.debug(f"Listing all snapshots: {snapshots}") + snapshots = list(self._snapshots.keys()) return snapshots @type_check - def rollback_all_snapshots(self, snapshot_name: str, dataset_path: str) -> None: + def snapshot_exists(self, snapshot_name: str) -> bool: + """ + Check if a snapshot exists in the cache. + + Parameters: + - snapshot_name (str): The name of the snapshot to check. + + Returns: + - bool: True if the snapshot exists, False otherwise. + """ + return snapshot_name in self.snapshots + + @type_check + def rollback_all_snapshots(self, snapshot_name: str, dataset_path: str, recursive: bool = False, force: bool = False) -> None: """ Rollback all snapshots under a given path recursively that match the snapshot name. Parameters: - snapshot_name (str): The name of the snapshot to rollback to. - dataset_path (str): The path of the dataset to rollback snapshots for. + - recursive (bool): Whether to rollback recursively. Default is False. + - force (bool): Whether to force the rollback. Default is False. """ - if dataset_path not in self.cache.datasets: + if dataset_path not in self.datasets: self.logger.error(f"Dataset {dataset_path} does not exist. Cannot rollback snapshots.") return try: - all_snapshots = [snap for snap in self.cache.snapshots if snap.startswith(dataset_path)] + all_snapshots = [snap for snap in self.snapshots if snap.startswith(dataset_path)] matching_snapshots = [snap for snap in all_snapshots if snap.endswith(f"@{snapshot_name}")] for snapshot in matching_snapshots: - rollback_command = f"/sbin/zfs rollback -r -f \"{snapshot}\"" - rollback_result = run_command(rollback_command) - if rollback_result.is_success(): - self.logger.debug(f"Successfully rolled back {snapshot}.") - else: - self.logger.error(f"Failed to rollback {snapshot}: {rollback_result.get_error()}") + self.rollback_snapshot(snapshot, recursive, force) except Exception as e: - self.logger.error(f"Failed to rollback snapshots for {dataset_path}: {e}", exc_info=True) \ No newline at end of file + self.logger.error(f"Failed to rollback snapshots for {dataset_path}: {e}", exc_info=True) + + @type_check + def zfs_send(self, source: str, destination: Path, compress: bool = False) -> dict: + """ + Send a ZFS snapshot to a destination file, with optional gzip compression. + + Parameters: + - source (str): The source ZFS snapshot to send. + - destination (Path): The destination file to send the snapshot to. + - compress (bool): Whether to use gzip compression. Default is False. + + Returns: + - dict: Result containing status and message. + """ + result = { + "success": False, + "message": "" + } + + try: + if compress: + command = f"/sbin/zfs send \"{source}\" | gzip > \"{destination}\"" + else: + command = f"/sbin/zfs send \"{source}\" > \"{destination}\"" + + send_result = run_command(command) + if send_result.is_success(): + self.logger.debug(f"Successfully sent snapshot {source} to {destination}") + result["success"] = True + result["message"] = f"Successfully sent snapshot {source} to {destination}" + else: + result["message"] = f"Failed to send snapshot {source}: {send_result.get_error()}" + self.logger.error(result["message"]) + except Exception as e: + result["message"] = f"Exception occurred while sending snapshot {source}: {e}" + self.logger.error(result["message"], exc_info=True) + + return result + + @type_check + def zfs_receive(self, snapshot_file: Path, dataset_path: str, decompress: bool = False) -> dict: + """ + Receive a ZFS snapshot from a file and restore it to the specified dataset path. + + Parameters: + - snapshot_file (Path): The path to the snapshot file. + - dataset_path (str): The ZFS dataset path to restore to. + - decompress (bool): Whether the snapshot file is gzip compressed. Default is False. + + Returns: + - dict: Result containing status and message. + """ + result = { + "success": False, + "message": "" + } + + try: + dataset_snapshots = [snap for snap in self.snapshots if snap.startswith(f"{dataset_path}@")] + + delete_errors = [] + for snapshot in dataset_snapshots: + delete_result = self.delete_snapshot(snapshot) + if not delete_result["success"]: + delete_errors.append(delete_result["message"]) + + if delete_errors: + result["message"] = f"Failed to destroy existing snapshots: {delete_errors}" + self.logger.error(result["message"]) + return result + + receive_command = f'/sbin/zfs recv -F "{dataset_path}"' + if decompress: + command = f'gunzip < "{snapshot_file}" | {receive_command}' + else: + command = f'cat "{snapshot_file}" | {receive_command}' + + self.logger.debug(f"Executing command: {command}") + receive_result = run_command(command) + if receive_result.is_success(): + result["success"] = True + result["message"] = f"Successfully restored snapshot from {snapshot_file} to {dataset_path}" + else: + result["message"] = receive_result.get_error() + except Exception as e: + result["message"] = f"Exception occurred while restoring snapshot from {snapshot_file}: {e}" + self.logger.error(result["message"], exc_info=True) + + return result diff --git a/heavy_script.sh b/heavy_script.sh index 72899dba..7189c249 100644 --- a/heavy_script.sh +++ b/heavy_script.sh @@ -56,7 +56,7 @@ fi # generate the config.ini file if it does not exist generate_config_ini -python3 utils/update_config.py "$script_path/config.ini" +python3 utils/update_config.py # Separate bundled short options args=() diff --git a/utils/update_config.py b/utils/update_config.py index 5444174e..ccc4ef0c 100644 --- a/utils/update_config.py +++ b/utils/update_config.py @@ -1,51 +1,49 @@ -import sys from pathlib import Path -import configparser +from configobj import ConfigObj -def update_config(config_file_path): - config_file_path = Path(config_file_path) - - # Read the original content preserving comments - with config_file_path.open('r') as file: +def update_config(): + config_file_path = str(Path(__file__).parent.parent / 'config.ini') + default_config_path = str(Path(__file__).parent.parent / '.default.config.ini') + + # Load the existing config and default config + current_config = ConfigObj(config_file_path, encoding='utf-8', list_values=False) + default_config = ConfigObj(default_config_path, encoding='utf-8', list_values=False) + + # Remove sections from current config that are not in the default config + for section in list(current_config.keys()): + if section not in default_config: + del current_config[section] + + # Update sections and keys from the default config + for section, default_options in default_config.items(): + if section not in current_config: + current_config[section] = default_options + current_config.comments[section] = default_config.comments.get(section, []) + else: + # Remove keys not present in the default config + for key in list(current_config[section].keys()): + if key not in default_options: + del current_config[section][key] + # Add keys from the default config + for key, value in default_options.items(): + if key not in current_config[section]: + current_config[section][key] = value + current_config[section].comments[key] = default_config[section].comments.get(key, []) + if key in default_options.inline_comments: + current_config[section].inline_comments[key] = default_options.inline_comments[key] + + # Write the updated config back to the file + current_config.write() + + # Ensure new lines before new sections + with open(config_file_path, 'r', encoding='utf-8') as file: lines = file.readlines() - # Create a new config parser object - config = configparser.ConfigParser(allow_no_value=True) - config.read(config_file_path) - - # Remove the [databases] section if it exists - if 'databases' in config: - config.remove_section('databases') - - # Prepare the new content - new_content = [] - in_databases_section = False - - for line in lines: - if line.strip().lower() == '[databases]': - in_databases_section = True - continue - if line.startswith('[') and in_databases_section: - in_databases_section = False - if not in_databases_section: - new_content.append(line) - - # Ensure the [BACKUP] section is added if it does not exist - if 'BACKUP' not in config: - new_content.append('\n[BACKUP]\n') - new_content.append('export_enabled=true\n') - new_content.append('full_backup_enabled=true\n') - new_content.append('# Uncomment the following line to specify a custom dataset location for backups\n') - new_content.append('# custom_dataset_location=\n') - - # Write the new content back to the config file - with config_file_path.open('w') as file: - file.writelines(new_content) + with open(config_file_path, 'w', encoding='utf-8') as file: + for i, line in enumerate(lines): + if line.startswith('[') and i != 0 and lines[i-1].strip() != '': + file.write('\n') + file.write(line) if __name__ == "__main__": - if len(sys.argv) != 2: - print("Usage: python update_config.py ") - sys.exit(1) - - config_file_path = sys.argv[1] - update_config(config_file_path) + update_config()