diff --git a/prime_backup/action/export_backup_action.py b/prime_backup/action/export_backup_action.py index 5b8f63c..156a49d 100644 --- a/prime_backup/action/export_backup_action.py +++ b/prime_backup/action/export_backup_action.py @@ -17,18 +17,25 @@ from prime_backup.db import schema from prime_backup.db.access import DbAccess from prime_backup.db.session import DbSession +from prime_backup.exceptions import PrimeBackupError from prime_backup.types.backup_meta import BackupMeta from prime_backup.types.export_failure import ExportFailures from prime_backup.types.tar_format import TarFormat -from prime_backup.utils import file_utils, blob_utils, misc_utils +from prime_backup.utils import file_utils, blob_utils, misc_utils, hash_utils +from prime_backup.utils.bypass_io import BypassReader + + +class VerificationError(PrimeBackupError): + pass class _ExportBackupActionBase(Action[ExportFailures], ABC): - def __init__(self, backup_id: int, output_path: Path, *, fail_soft: bool = False): + def __init__(self, backup_id: int, output_path: Path, *, fail_soft: bool = False, verify_blob: bool = True): super().__init__() self.backup_id = misc_utils.ensure_type(backup_id, int) self.output_path = output_path self.fail_soft = fail_soft + self.verify_blob = verify_blob def run(self) -> ExportFailures: with DbAccess.open_session() as session: @@ -50,9 +57,17 @@ def _create_meta_buf(cls, backup: schema.Backup) -> bytes: meta = BackupMeta.from_backup(backup) return json.dumps(meta.to_dict(), indent=2, ensure_ascii=False).encode('utf8') - def _on_unsupported_file_mode(self, file: schema.File): + @classmethod + def _on_unsupported_file_mode(cls, file: schema.File): raise NotImplementedError('file at {!r} with mode={} ({} or {}) is not supported yet'.format(file.path, file.mode, hex(file.mode), oct(file.mode))) + @classmethod + def _verify_exported_blob(cls, file: schema.File, written_size: int, written_hash: str): + if written_size != file.blob_raw_size: + raise VerificationError('raw size mismatched for {}, expected {}, actual written {}'.format(file.path, file.blob_raw_size, written_size)) + if written_hash != file.blob_hash: + raise VerificationError('hash mismatched for {}, expected {}, actual written {}'.format(file.path, file.blob_hash, written_hash)) + def _i_am_root(): # reference: tarfile.TarFile.chown @@ -86,6 +101,57 @@ def __set_attrs(cls, file: schema.File, file_path: Path): if file.atime_ns is not None and file.mtime_ns is not None: os.utime(file_path, (file.atime_ns / 1e9, file.mtime_ns / 1e9)) + def __export_file(self, file: schema.File, directories_store: list): + if self.child_to_export is not None: + try: + rel_path = Path(file.path).relative_to(self.child_to_export) + except ValueError: + return + if rel_path != Path('.') and not self.recursively_export_child: + return + file_path = self.output_path / self.child_to_export.name / rel_path + if rel_path == Path('.'): + self.logger.info('Exporting child {!r} to {!r}'.format(file.path, file_path.as_posix())) + else: + file_path = self.output_path / file.path + + if stat.S_ISREG(file.mode): + self.logger.debug('write file {}'.format(file.path)) + file_path.parent.mkdir(parents=True, exist_ok=True) + blob_path = blob_utils.get_blob_path(file.blob_hash) + compressor = Compressor.create(file.blob_compress) + if compressor.get_method() == CompressMethod.plain: + file_utils.copy_file_fast(blob_path, file_path) + if self.verify_blob: + sah = hash_utils.calc_file_size_and_hash(file_path) + self._verify_exported_blob(file, sah.size, sah.hash) + else: + with compressor.open_decompressed(blob_path) as f_in: + with open(file_path, 'wb') as f_out: + if self.verify_blob: + reader = BypassReader(f_in, calc_hash=True) + shutil.copyfileobj(reader, f_out) + else: + reader = None + shutil.copyfileobj(f_in, f_out) + if reader is not None: + self._verify_exported_blob(file, reader.get_read_len(), reader.get_hash()) + + elif stat.S_ISDIR(file.mode): + file_path.mkdir(parents=True, exist_ok=True) + self.logger.debug('write dir {}'.format(file.path)) + directories_store.append((file, file_path)) + + elif stat.S_ISLNK(file.mode): + link_target = file.content.decode('utf8') + os.symlink(link_target, file_path) + self.logger.debug('write symbolic link {} -> {}'.format(file_path, link_target)) + else: + self._on_unsupported_file_mode(file) + + if not stat.S_ISDIR(file.mode): + self.__set_attrs(file, file_path) + def _export_backup(self, session, backup: schema.Backup) -> ExportFailures: failures = ExportFailures(self.fail_soft) if self.child_to_export is None: @@ -103,60 +169,24 @@ def _export_backup(self, session, backup: schema.Backup) -> ExportFailures: else: target_path.unlink(missing_ok=True) - directories: List[Tuple[schema.File, Path]] = [] - file: schema.File - for file in backup.files: - if self.child_to_export is not None: + try: + directories: List[Tuple[schema.File, Path]] = [] + for file in backup.files: try: - rel_path = Path(file.path).relative_to(self.child_to_export) - except ValueError: - continue - if rel_path != Path('.') and not self.recursively_export_child: - continue - file_path = self.output_path / self.child_to_export.name / rel_path - if rel_path == Path('.'): - self.logger.info('Exporting child {!r} to {!r}'.format(file.path, file_path.as_posix())) - else: - file_path = self.output_path / file.path - - try: - if stat.S_ISREG(file.mode): - self.logger.debug('write file {}'.format(file.path)) - file_path.parent.mkdir(parents=True, exist_ok=True) - blob_path = blob_utils.get_blob_path(file.blob_hash) - compressor = Compressor.create(file.blob_compress) - if compressor.get_method() == CompressMethod.plain: - file_utils.copy_file_fast(blob_path, file_path) - else: - with compressor.open_decompressed(blob_path) as f_in: - with open(file_path, 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) - - elif stat.S_ISDIR(file.mode): - file_path.mkdir(parents=True, exist_ok=True) - self.logger.debug('write dir {}'.format(file.path)) - directories.append((file, file_path)) - - elif stat.S_ISLNK(file.mode): - link_target = file.content.decode('utf8') - os.symlink(link_target, file_path) - self.logger.debug('write symbolic link {} -> {}'.format(file_path, link_target)) - else: - self._on_unsupported_file_mode(file) - - if not stat.S_ISDIR(file.mode): - self.__set_attrs(file, file_path) - - except Exception as e: - failures.add_or_raise(file, e) + self.__export_file(file, directories) + except Exception as e: + failures.add_or_raise(file, e) - # child dir first - # reference: tarfile.TarFile.extractall - for dir_file, dir_file_path in sorted(directories, key=lambda d: d[0].path, reverse=True): - try: - self.__set_attrs(dir_file, dir_file_path) - except Exception as e: - failures.add_or_raise(dir_file, e) + # child dir first + # reference: tarfile.TarFile.extractall + for dir_file, dir_file_path in sorted(directories, key=lambda d: d[0].path, reverse=True): + try: + self.__set_attrs(dir_file, dir_file_path) + except Exception as e: + failures.add_or_raise(dir_file, e) + except Exception: + # TODO: rollback + raise return failures @@ -174,6 +204,45 @@ def __open_tar(self) -> ContextManager[tarfile.TarFile]: with tarfile.open(fileobj=f_compressed, mode=self.tar_format.value.mode_w) as tar: yield tar + def __export_file(self, tar: tarfile.TarFile, file: schema.File): + info = tarfile.TarInfo(name=file.path) + info.mode = file.mode + + if file.uid is not None: + info.uid = file.uid + if file.gid is not None: + info.gid = file.gid + if file.mtime_ns is not None: + info.mtime = int(file.mtime_ns / 1e9) + if stat.S_ISREG(file.mode): + self.logger.debug('add file {} to tarfile'.format(file.path)) + info.type = tarfile.REGTYPE + info.size = file.blob_raw_size + blob_path = blob_utils.get_blob_path(file.blob_hash) + + with Compressor.create(file.blob_compress).open_decompressed(blob_path) as stream: + if self.verify_blob: + reader = BypassReader(stream, calc_hash=True) + tar.addfile(tarinfo=info, fileobj=reader) + else: + reader = None + tar.addfile(tarinfo=info, fileobj=stream) + if reader is not None: + self._verify_exported_blob(file, reader.get_read_len(), reader.get_hash()) + + elif stat.S_ISDIR(file.mode): + self.logger.debug('add dir {} to tarfile'.format(file.path)) + info.type = tarfile.DIRTYPE + tar.addfile(tarinfo=info) + elif stat.S_ISLNK(file.mode): + self.logger.debug('add symlink {} to tarfile'.format(file.path)) + link_target = file.content.decode('utf8') + info.type = tarfile.SYMTYPE + info.linkname = link_target + tar.addfile(tarinfo=info) + else: + self._on_unsupported_file_mode(file) + def _export_backup(self, session, backup: schema.Backup) -> ExportFailures: failures = ExportFailures(self.fail_soft) if not self.output_path.name.endswith(self.tar_format.value.extension): @@ -184,106 +253,94 @@ def _export_backup(self, session, backup: schema.Backup) -> ExportFailures: self.logger.info('Exporting backup {} to tarfile {}'.format(backup, self.output_path)) self.output_path.parent.mkdir(parents=True, exist_ok=True) - with self.__open_tar() as tar: - file: schema.File - - for file in backup.files: - try: - info = tarfile.TarInfo(name=file.path) - info.mode = file.mode - - if file.uid is not None: - info.uid = file.uid - if file.gid is not None: - info.gid = file.gid - if file.mtime_ns is not None: - info.mtime = int(file.mtime_ns / 1e9) - if stat.S_ISREG(file.mode): - self.logger.debug('add file {} to tarfile'.format(file.path)) - info.type = tarfile.REGTYPE - info.size = file.blob_raw_size - blob_path = blob_utils.get_blob_path(file.blob_hash) - - with Compressor.create(file.blob_compress).open_decompressed(blob_path) as stream: - tar.addfile(tarinfo=info, fileobj=stream) - elif stat.S_ISDIR(file.mode): - self.logger.debug('add dir {} to tarfile'.format(file.path)) - info.type = tarfile.DIRTYPE - tar.addfile(tarinfo=info) - elif stat.S_ISLNK(file.mode): - self.logger.debug('add symlink {} to tarfile'.format(file.path)) - link_target = file.content.decode('utf8') - info.type = tarfile.SYMTYPE - info.linkname = link_target - tar.addfile(tarinfo=info) - else: - self._on_unsupported_file_mode(file) - - except Exception as e: - failures.add_or_raise(file, e) - - meta_buf = self._create_meta_buf(backup) - info = tarfile.TarInfo(name=BACKUP_META_FILE_NAME) - info.mtime = int(time.time()) - info.size = len(meta_buf) - tar.addfile(tarinfo=info, fileobj=BytesIO(meta_buf)) + try: + with self.__open_tar() as tar: + for file in backup.files: + try: + self.__export_file(tar, file) + except Exception as e: + failures.add_or_raise(file, e) + + meta_buf = self._create_meta_buf(backup) + info = tarfile.TarInfo(name=BACKUP_META_FILE_NAME) + info.mtime = int(time.time()) + info.size = len(meta_buf) + tar.addfile(tarinfo=info, fileobj=BytesIO(meta_buf)) + except Exception: + with contextlib.suppress(OSError): + self.output_path.unlink(missing_ok=True) + raise return failures class ExportBackupToZipAction(_ExportBackupActionBase): + def __export_file(self, zipf: zipfile.ZipFile, file: schema.File): + # reference: zipf.writestr -> zipfile.ZipInfo.from_file + if file.mtime_ns is not None: + date_time = time.localtime(file.mtime_ns / 1e9) + else: + date_time = time.localtime() + arc_name = file.path + while len(arc_name) > 0 and arc_name[0] in (os.sep, os.altsep): + arc_name = arc_name[1:] + if stat.S_ISDIR(file.mode) and not arc_name.endswith('/'): + arc_name += '/' + + info = zipfile.ZipInfo(arc_name, date_time[0:6]) + info.external_attr = (file.mode & 0xFFFF) << 16 + info.compress_type = zipf.compression + + if stat.S_ISREG(file.mode): + self.logger.debug('add file {} to zipfile'.format(file.path)) + info.file_size = file.blob_raw_size + blob_path = blob_utils.get_blob_path(file.blob_hash) + + with Compressor.create(file.blob_compress).open_decompressed(blob_path) as stream: + with zipf.open(info, 'w') as zip_item: + if self.verify_blob: + reader = BypassReader(stream, calc_hash=True) + shutil.copyfileobj(reader, zip_item) + else: + reader = None + shutil.copyfileobj(stream, zip_item) + if reader is not None: + self._verify_exported_blob(file, reader.get_read_len(), reader.get_hash()) + + elif stat.S_ISDIR(file.mode): + self.logger.debug('add dir {} to zipfile'.format(file.path)) + info.external_attr |= 0x10 + zipf.writestr(info, b'') + elif stat.S_ISLNK(file.mode): + self.logger.debug('add symlink {} to zipfile'.format(file.path)) + with zipf.open(info, 'w') as zip_item: + zip_item.write(file.content) + else: + self._on_unsupported_file_mode(file) + def _export_backup(self, session, backup: schema.Backup) -> ExportFailures: failures = ExportFailures(self.fail_soft) self.logger.info('Exporting backup {} to zipfile {}'.format(backup, self.output_path)) self.output_path.parent.mkdir(parents=True, exist_ok=True) - with zipfile.ZipFile(self.output_path, 'w', zipfile.ZIP_DEFLATED) as zipf: - file: schema.File - for file in backup.files: - try: - # reference: zipf.writestr -> zipfile.ZipInfo.from_file - if file.mtime_ns is not None: - date_time = time.localtime(file.mtime_ns / 1e9) - else: - date_time = time.localtime() - arc_name = file.path - while len(arc_name) > 0 and arc_name[0] in (os.sep, os.altsep): - arc_name = arc_name[1:] - if stat.S_ISDIR(file.mode) and not arc_name.endswith('/'): - arc_name += '/' - - info = zipfile.ZipInfo(arc_name, date_time[0:6]) - info.external_attr = (file.mode & 0xFFFF) << 16 - info.compress_type = zipf.compression - - if stat.S_ISREG(file.mode): - self.logger.debug('add file {} to zipfile'.format(file.path)) - info.file_size = file.blob_raw_size - blob_path = blob_utils.get_blob_path(file.blob_hash) - - with Compressor.create(file.blob_compress).open_decompressed(blob_path) as stream: - with zipf.open(info, 'w') as zip_item: - shutil.copyfileobj(stream, zip_item) - - elif stat.S_ISDIR(file.mode): - self.logger.debug('add dir {} to zipfile'.format(file.path)) - info.external_attr |= 0x10 - zipf.writestr(info, b'') - elif stat.S_ISLNK(file.mode): - self.logger.debug('add symlink {} to zipfile'.format(file.path)) - with zipf.open(info, 'w') as zip_item: - zip_item.write(file.content) - else: - self._on_unsupported_file_mode(file) - - except Exception as e: - failures.add_or_raise(file, e) - - meta_buf = self._create_meta_buf(backup) - info = zipfile.ZipInfo(BACKUP_META_FILE_NAME, time.localtime()[0:6]) - info.compress_type = zipf.compression - info.file_size = len(meta_buf) - with zipf.open(info, 'w') as f: - f.write(meta_buf) + try: + with zipfile.ZipFile(self.output_path, 'w', zipfile.ZIP_DEFLATED) as zipf: + for file in backup.files: + try: + self.__export_file(zipf, file) + except Exception as e: + failures.add_or_raise(file, e) + + meta_buf = self._create_meta_buf(backup) + info = zipfile.ZipInfo(BACKUP_META_FILE_NAME, time.localtime()[0:6]) + info.compress_type = zipf.compression + info.file_size = len(meta_buf) + with zipf.open(info, 'w') as f: + f.write(meta_buf) + + except Exception: + with contextlib.suppress(OSError): + self.output_path.unlink(missing_ok=True) + raise return failures diff --git a/prime_backup/compressors.py b/prime_backup/compressors.py index 7ec2cd4..c95b6e0 100644 --- a/prime_backup/compressors.py +++ b/prime_backup/compressors.py @@ -9,7 +9,7 @@ from typing_extensions import Protocol from prime_backup.types.common import PathLike -from prime_backup.utils.bypass_io import ByPassReader, ByPassWriter +from prime_backup.utils.bypass_io import BypassReader, BypassWriter class Compressor(ABC): @@ -36,9 +36,12 @@ def get_name(cls) -> str: return cls.get_method().name def copy_compressed(self, source_path: PathLike, dest_path: PathLike, *, calc_hash: bool = False) -> CopyCompressResult: + """ + source --[compressor]--> destination + """ with open(source_path, 'rb') as f_in, open(dest_path, 'wb') as f_out: - reader = ByPassReader(f_in, calc_hash) - writer = ByPassWriter(f_out) + reader = BypassReader(f_in, calc_hash=calc_hash) + writer = BypassWriter(f_out) self._copy_compressed(reader, writer) return self.CopyCompressResult(reader.get_read_len(), reader.get_hash(), writer.get_write_len()) @@ -53,9 +56,12 @@ def open_compressed(self, target_path: PathLike) -> ContextManager[BinaryIO]: yield f_compressed @contextlib.contextmanager - def open_compressed_bypassed(self, target_path: PathLike) -> ContextManager[Tuple[ByPassWriter, BinaryIO]]: + def open_compressed_bypassed(self, target_path: PathLike) -> ContextManager[Tuple[BypassWriter, BinaryIO]]: + """ + Bypass the receiving-compressed-content file writer + """ with open(target_path, 'wb') as f: - writer = ByPassWriter(f) + writer = BypassWriter(f) with self.compress_stream(writer) as f_compressed: yield writer, f_compressed @@ -66,9 +72,12 @@ def open_decompressed(self, source_path: PathLike) -> ContextManager[BinaryIO]: yield f_decompressed @contextlib.contextmanager - def open_decompressed_bypassed(self, source_path: PathLike) -> ContextManager[Tuple[ByPassReader, BinaryIO]]: + def open_decompressed_bypassed(self, source_path: PathLike) -> ContextManager[Tuple[BypassReader, BinaryIO]]: + """ + Bypass the raw compressed file reader + """ with open(source_path, 'rb') as f: - reader = ByPassReader(f, calc_hash=False) # it's meaningless to calc hash on the compressed file + reader = BypassReader(f, calc_hash=False) # it's meaningless to calc hash on the compressed file with self.decompress_stream(reader) as f_decompressed: yield reader, f_decompressed diff --git a/prime_backup/utils/bypass_io.py b/prime_backup/utils/bypass_io.py index b33d876..f39a80c 100644 --- a/prime_backup/utils/bypass_io.py +++ b/prime_backup/utils/bypass_io.py @@ -2,13 +2,14 @@ from typing import Union -class ByPassReader(io.BytesIO): +class BypassReader(io.BytesIO): def __init__(self, file_obj, calc_hash: bool): super().__init__() self.file_obj: io.BytesIO = file_obj + self.read_len = 0 + from prime_backup.utils import hash_utils self.hasher = hash_utils.create_hasher() if calc_hash else None - self.read_len = 0 def read(self, *args, **kwargs): data = self.file_obj.read(*args, **kwargs) @@ -44,7 +45,7 @@ def __getattribute__(self, item: str): return self.file_obj.__getattribute__(item) -class ByPassWriter(io.BytesIO): +class BypassWriter(io.BytesIO): def __init__(self, file_obj): super().__init__() self.file_obj: io.BytesIO = file_obj diff --git a/prime_backup/utils/hash_utils.py b/prime_backup/utils/hash_utils.py index 175d319..bbdb5e3 100644 --- a/prime_backup/utils/hash_utils.py +++ b/prime_backup/utils/hash_utils.py @@ -3,7 +3,7 @@ from prime_backup.config.config import Config from prime_backup.types.hash_method import Hasher -from prime_backup.utils.bypass_io import ByPassReader +from prime_backup.utils.bypass_io import BypassReader def create_hasher() -> 'Hasher': @@ -19,7 +19,7 @@ class SizeAndHash(NamedTuple): def calc_reader_size_and_hash(file_obj: IO[bytes], *, buf_size: int = _READ_BUF_SIZE) -> SizeAndHash: - reader = ByPassReader(file_obj, True) + reader = BypassReader(file_obj, True) while reader.read(buf_size): pass return SizeAndHash(reader.get_read_len(), reader.get_hash())