Skip to content

Commit

Permalink
Add !!pb database migrate_hash_method command to migrate to another…
Browse files Browse the repository at this point in the history
… hash method

no need to stick to one hash method forever
  • Loading branch information
Fallen-Breath committed Dec 11, 2023
1 parent b995754 commit f645fed
Show file tree
Hide file tree
Showing 17 changed files with 296 additions and 45 deletions.
13 changes: 13 additions & 0 deletions lang/en_us.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ prime_backup:
pause.hover: Click to pause job {}
resume: resume
resume.hover: Click to resume job {}
db_migrate_hash_method:
name: migrate hash method
hash_method_unchanged: The hash method is already {}
missing_library:
Failed to import the target hasher, please make sure you have installed the required python library for {}.
Search hash_method in the document {} for more help
show_whats_going_on: Prepare for file pool hash method migration, from {} to {}
confirm_target: migrate
no_confirm: No choice has been made, migrate hash method task aborted
aborted: Migrate hash method task aborted
done: Migrated the hash method from {} to {}
db_overview:
name: overview database
title: Database overview
Expand Down Expand Up @@ -238,8 +249,10 @@ prime_backup:
§7{prefix} database overview§r: Report an overview of the database
§7{prefix} database validate §a<part>§r: Validate the correctness of contents in the database. Might take a long time
§7{prefix} database vacuum§r: Compact the SQLite database manually, to reduce the size of the database file
§7{prefix} database migrate_hash_method <hash_method>§r: Migrate the currently used hash method to another. Affects all data, might take a long time
{scheduled_compact_notes}
§d[Arguments]§r
§d<hash_method>§r: Available options: {hash_methods}
§a<part>§r:
- §ablobs§r: Validate the correctness of blobs, e.g. data size, hash value
- §afiles§r: Validate the correctness of file objects, e.g. the association between files and blobs
Expand Down
13 changes: 13 additions & 0 deletions lang/zh_cn.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ prime_backup:
pause.hover: 点击以暂停运行作业{}
resume: 继续
resume.hover: 点击以继续运行作业{}
db_migrate_hash_method:
name: 哈希算法迁移
hash_method_unchanged: 哈希算法已经是{}了
missing_library:
无法导入目标哈希算法, 请确保你已经安装了算法{}所需的Python依赖库。
在文档{}中搜索hash_method以获得更多帮助
show_whats_going_on: 准备把文件池所使用的哈希算法从{}迁移至{}
confirm_target: 迁移
no_confirm: 未做出选择, 哈希算法迁移任务中止
aborted: 哈希算法迁移任务中止
done: 已将哈希算法从{}迁移至{}
db_overview:
name: 概览数据库
title: 数据库概览
Expand Down Expand Up @@ -238,8 +249,10 @@ prime_backup:
§7{prefix} database overview§r: 查看数据库信息概览
§7{prefix} database validate §a<组件>§r: 验证数据库内容的正确性。耗时可能较长
§7{prefix} database vacuum§r: 手动执行SQLite数据库的精简操作,减少数据库文件的体积
§7{prefix} database migrate_hash_method <哈希算法>§r: 将当前使用的哈希算法迁移至另一种算法。这将影响所有数据,耗时可能较长
{scheduled_compact_notes}
§d【参数帮助】§r
§d<哈希算法>§r: 可用选项: {hash_methods}
§a<组件>§r:
- §ablobs§r: 验证数据对象的正确性,如数据大小、哈希值
- §afiles§r: 验证文件对象的正确性,如文件与数据的关联
Expand Down
6 changes: 1 addition & 5 deletions prime_backup/action/export_backup_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,14 @@
from prime_backup.db import schema
from prime_backup.db.access import DbAccess
from prime_backup.db.session import DbSession
from prime_backup.exceptions import PrimeBackupError
from prime_backup.exceptions import PrimeBackupError, VerificationError
from prime_backup.types.backup_meta import BackupMeta
from prime_backup.types.export_failure import ExportFailures
from prime_backup.types.tar_format import TarFormat
from prime_backup.utils import file_utils, blob_utils, misc_utils, hash_utils
from prime_backup.utils.bypass_io import BypassReader


class VerificationError(PrimeBackupError):
pass


class _ExportInterrupted(PrimeBackupError):
pass

Expand Down
98 changes: 98 additions & 0 deletions prime_backup/action/migrate_hash_method_action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import shutil
import time
from pathlib import Path
from typing import List, Dict, Set

from prime_backup.action import Action
from prime_backup.compressors import Compressor
from prime_backup.db.access import DbAccess
from prime_backup.db.session import DbSession
from prime_backup.exceptions import PrimeBackupError
from prime_backup.types.hash_method import HashMethod
from prime_backup.utils import blob_utils, hash_utils, collection_utils


class HashCollisionError(PrimeBackupError):
"""
Same hash value, between 2 hash methods
"""
pass


class MigrateHashMethodAction(Action[None]):
def __init__(self, new_hash_method: HashMethod):
super().__init__()
self.new_hash_method = new_hash_method

def __migrate_blobs(self, session: DbSession, blob_hashes: List[str], old_hashes: Set[str], processed_hash_mapping: Dict[str, str]):
hash_mapping: Dict[str, str] = {}
blobs = list(session.get_blobs(blob_hashes).values())

# calc blob hashes
for blob in blobs:
blob_path = blob_utils.get_blob_path(blob.hash)
with Compressor.create(blob.compress).open_decompressed(blob_path) as f:
sah = hash_utils.calc_reader_size_and_hash(f, hash_method=self.new_hash_method)
hash_mapping[blob.hash] = sah.hash
if sah.hash in old_hashes:
raise HashCollisionError(sah.hash)

# update the objects
for blob in blobs:
old_hash, new_hash = blob.hash, hash_mapping[blob.hash]
old_path = blob_utils.get_blob_path(old_hash)
new_path = blob_utils.get_blob_path(new_hash)
old_path.rename(new_path)

processed_hash_mapping[old_hash] = new_hash
blob.hash = new_hash

for file in session.get_file_by_blob_hashes(list(hash_mapping.keys())):
file.blob_hash = hash_mapping[file.blob_hash]

def __replace_blob_store(self, old_store: Path, new_store: Path):
trash_bin = self.config.storage_path / 'temp' / 'old_blobs'
trash_bin.parent.mkdir(parents=True, exist_ok=True)

old_store.rename(trash_bin)
new_store.rename(old_store)
shutil.rmtree(trash_bin)

def run(self):
processed_hash_mapping: Dict[str, str] = {} # old -> new
try:
t = time.time()
with DbAccess.open_session() as session:
meta = session.get_db_meta()
if meta.hash_method == self.new_hash_method.name:
self.logger.info('Hash method of the database is already {}, no need to migrate'.format(self.new_hash_method.name))
return

total_blob_count = session.get_blob_count()
all_hashes = session.get_all_blob_hashes()
all_hash_set = set(all_hashes)
cnt = 0
for blob_hashes in collection_utils.slicing_iterate(all_hashes, 1000):
blob_hashes: List[str] = list(blob_hashes)
cnt += len(blob_hashes)
self.logger.info('Migrating blobs {} / {}'.format(cnt, total_blob_count))

self.__migrate_blobs(session, blob_hashes, all_hash_set, processed_hash_mapping)
session.flush_and_expunge_all()

meta = session.get_db_meta() # get the meta again, cuz expunge_all() was called
meta.hash_method = self.new_hash_method.name

self.logger.info('Syncing config and variables')
DbAccess.sync_hash_method()
self.config.backup.hash_method = self.new_hash_method.name

self.logger.info('Migration done, cost {}s'.format(round(time.time() - t, 2)))

except Exception:
self.logger.info('Error occurs during migration, applying rollback')
for old_hash, new_hash in processed_hash_mapping.items():
old_path = blob_utils.get_blob_path(old_hash)
new_path = blob_utils.get_blob_path(new_hash)
new_path.rename(old_path)
raise
4 changes: 2 additions & 2 deletions prime_backup/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ def get(cls) -> 'Config':
return cls.__get_default()
return _config

@functools.cached_property
@property
def storage_path(self) -> Path:
return Path(self.storage_root)

@functools.cached_property
@property
def source_path(self) -> Path:
return Path(self.backup.source_root)

Expand Down
16 changes: 10 additions & 6 deletions prime_backup/db/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,23 @@ def init(cls, auto_migrate: bool = True):
else:
migration.ensure_version()

with cls.open_session() as session:
hash_method_str = str(session.get_db_meta().hash_method)
try:
cls.__hash_method = HashMethod[hash_method_str]
except KeyError:
raise ValueError('invalid hash method {!r} in db meta'.format(hash_method_str)) from None
cls.sync_hash_method()

@classmethod
def shutdown(cls):
if (logger := db_logger.get()) is not None:
for hdr in list(logger.handlers):
logger.removeHandler(hdr)

@classmethod
def sync_hash_method(cls):
with cls.open_session() as session:
hash_method_str = str(session.get_db_meta().hash_method)
try:
cls.__hash_method = HashMethod[hash_method_str]
except KeyError:
raise ValueError('invalid hash method {!r} in db meta'.format(hash_method_str)) from None

@classmethod
def __ensure_not_none(cls, value):
if value is None:
Expand Down
41 changes: 40 additions & 1 deletion prime_backup/db/session.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import contextlib
import time
from typing import Optional, Sequence, Dict, ContextManager
from typing import Optional, Sequence, Dict, ContextManager, Iterator
from typing import TypeVar, List

from sqlalchemy import select, delete, desc, func, Select, JSON, text
Expand Down Expand Up @@ -40,9 +40,22 @@ def __init__(self, session: Session):
def add(self, obj: schema.Base):
self.session.add(obj)

def expunge(self, obj: schema.Base):
self.session.expunge(obj)

def expunge_all(self):
self.session.expunge_all()

def flush(self):
self.session.flush()

def flush_and_expunge_all(self):
self.flush()
self.expunge_all()

def commit(self):
self.session.commit()

@contextlib.contextmanager
def no_auto_flush(self) -> ContextManager[None]:
with self.session.no_autoflush:
Expand Down Expand Up @@ -94,6 +107,14 @@ def list_blobs(self, limit: Optional[int] = None, offset: Optional[int] = None)
s = s.offset(offset)
return _list_it(self.session.execute(s).scalars().all())

def iterate_blob_batch(self, *, batch_size: int = 3000) -> Iterator[List[schema.Blob]]:
limit, offset = batch_size, 0
while True:
blobs = self.list_blobs(limit=limit, offset=offset)
if len(blobs) == 0:
break
yield blobs

def get_all_blob_hashes(self) -> List[str]:
return _list_it(self.session.execute(select(schema.Blob.hash)).scalars().all())

Expand Down Expand Up @@ -161,6 +182,16 @@ def get_file(self, backup_id: int, path: str) -> schema.File:
def get_file_raw_size_sum(self) -> int:
return _int_or_0(self.session.execute(func.sum(schema.File.blob_raw_size).select()).scalar_one())

def get_file_by_blob_hashes(self, hashes: List[str]) -> List[schema.File]:
hashes = collection_utils.deduplicated_list(hashes)
result = []
for view in collection_utils.slicing_iterate(hashes, self.__safe_var_limit):
result.extend(self.session.execute(
select(schema.File).
where(schema.File.blob_hash.in_(view))
).scalars().all())
return result

def get_file_count_by_blob_hashes(self, hashes: List[str]) -> int:
cnt = 0
for view in collection_utils.slicing_iterate(hashes, self.__safe_var_limit):
Expand All @@ -179,6 +210,14 @@ def list_files(self, limit: Optional[int] = None, offset: Optional[int] = None)
s = s.offset(offset)
return _list_it(self.session.execute(s).scalars().all())

def iterate_file_batch(self, *, batch_size: int = 3000) -> Iterator[List[schema.File]]:
limit, offset = batch_size, 0
while True:
files = self.list_files(limit=limit, offset=offset)
if len(files) == 0:
break
yield files

def delete_file(self, file: schema.File):
self.session.delete(file)

Expand Down
4 changes: 4 additions & 0 deletions prime_backup/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ def __init__(self, backup_id: int, path: str):
class UnsupportedFileFormat(PrimeBackupError):
def __init__(self, mode: int):
self.mode = mode


class VerificationError(PrimeBackupError):
pass
29 changes: 20 additions & 9 deletions prime_backup/mcdr/command/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from prime_backup.mcdr.task.crontab.list_crontab_task import ListCrontabJobTask
from prime_backup.mcdr.task.crontab.operate_crontab_task import OperateCrontabJobTask
from prime_backup.mcdr.task.crontab.show_crontab_task import ShowCrontabJobTask
from prime_backup.mcdr.task.db.migrate_hash_method_task import MigrateHashMethodTask
from prime_backup.mcdr.task.db.show_db_overview_task import ShowDbOverviewTask
from prime_backup.mcdr.task.db.vacuum_sqlite_task import VacuumSqliteTask
from prime_backup.mcdr.task.db.validate_db_task import ValidateDbTask, ValidateParts
Expand All @@ -31,6 +32,7 @@
from prime_backup.mcdr.task_manager import TaskManager
from prime_backup.types.backup_filter import BackupFilter
from prime_backup.types.backup_tags import BackupTagName
from prime_backup.types.hash_method import HashMethod
from prime_backup.types.operator import Operator
from prime_backup.types.standalone_backup_format import StandaloneBackupFormat
from prime_backup.utils import misc_utils
Expand Down Expand Up @@ -72,6 +74,10 @@ def cmd_db_validate(self, source: CommandSource, _: CommandContext, parts: Valid
def cmd_db_vacuum(self, source: CommandSource, _: CommandContext):
self.task_manager.add_task(VacuumSqliteTask(source))

def cmd_db_migrate_hash_method(self, source: CommandSource, context: CommandContext):
new_hash_method = context['hash_method']
self.task_manager.add_task(MigrateHashMethodTask(source, new_hash_method))

def cmd_make(self, source: CommandSource, context: CommandContext):
def callback(_, err):
if err is None:
Expand Down Expand Up @@ -217,9 +223,13 @@ def create_backup_id(arg: str = 'backup_id', clazz: Type[Integer] = Integer) ->

builder = SimpleCommandBuilder()

# help

builder.command('help', self.cmd_help)
builder.command('help <what>', self.cmd_help)

builder.arg('what', Text).suggests(lambda: ShowHelpTask.COMMANDS_WITH_DETAILED_HELP)

# backup
builder.command('make', self.cmd_make)
builder.command('make <comment>', self.cmd_make)
Expand All @@ -228,32 +238,33 @@ def create_backup_id(arg: str = 'backup_id', clazz: Type[Integer] = Integer) ->
builder.command('delete_range <backup_id_range>', self.cmd_delete_range)
builder.command('prune', self.cmd_prune)

builder.arg('backup_id', create_backup_id)
builder.arg('backup_id_range', IdRangeNode)
builder.arg('comment', GreedyText)

# crontab
builder.command('crontab', self.cmd_crontab_show)
builder.command('crontab <job_id>', self.cmd_crontab_show)
builder.command('crontab <job_id> pause', self.cmd_crontab_pause)
builder.command('crontab <job_id> resume', self.cmd_crontab_resume)

builder.arg('job_id', lambda n: Enumeration(n, CrontabJobId))

# db
builder.command('database overview', self.cmd_db_overview)
builder.command('database validate all', functools.partial(self.cmd_db_validate, parts=ValidateParts.all()))
builder.command('database validate blobs', functools.partial(self.cmd_db_validate, parts=ValidateParts.blobs))
builder.command('database validate files', functools.partial(self.cmd_db_validate, parts=ValidateParts.files))
builder.command('database vacuum', self.cmd_db_vacuum)
builder.command('database migrate_hash_method <hash_method>', self.cmd_db_migrate_hash_method)

builder.arg('hash_method', lambda n: Enumeration(n, HashMethod))

# operations
builder.command('confirm', self.cmd_confirm)
builder.command('abort', self.cmd_abort)

# node defs
builder.arg('backup_id', create_backup_id)
builder.arg('backup_id_range', IdRangeNode)
builder.arg('comment', GreedyText)
builder.arg('job_id', lambda n: Enumeration(n, CrontabJobId))
builder.arg('page', lambda n: Integer(n).at_min(1))
builder.arg('per_page', lambda n: Integer(n).at_min(1))
builder.arg('what', Text).suggests(lambda: ShowHelpTask.COMMANDS_WITH_DETAILED_HELP)

# subcommand permissions
for name, level in permissions.items():
builder.literal(name).requires(get_permission_checker(name), get_permission_denied_text)

Expand Down
Loading

0 comments on commit f645fed

Please sign in to comment.