Skip to content

Commit

Permalink
Merge pull request #173 from Aiven-Open/sebinsunny-add-snapshotter-ca…
Browse files Browse the repository at this point in the history
…llback

feat: Add progress_callback to snapshot method
  • Loading branch information
RommelLayco authored Mar 13, 2024
2 parents 02f5ae6 + 65cd4ec commit 3bcc58f
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 5 deletions.
20 changes: 19 additions & 1 deletion rohmu/delta/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@

from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from multiprocessing.dummy import Pool
from pathlib import Path
from pydantic import BaseModel, Field
from rohmu.dates import now
from rohmu.typing import AnyPath, HasRead
from types import TracebackType
from typing import Any, Callable, Iterable, List, Optional, Sequence, Type, TypeVar
from typing import Any, Callable, Iterable, List, Optional, Sequence, Type, TypedDict, TypeVar

import functools
import hashlib
Expand All @@ -28,6 +29,20 @@
logger = logging.getLogger(__name__)


class ProgressMetrics(TypedDict, total=True):
handled: int
failed: int
total: int
final: bool


class ProgressStep(Enum):
CREATING_MISSING_DIRECTORIES = "creating_missing_directories"
REMOVING_EXTRA_FILES = "removing_extra_files"
ADDING_MISSING_FILES = "adding_missing_files"
PROCESSING_AND_HASHING_SNAPSHOT_FILES = "processing_and_hashing_snapshot_files"


def hash_hexdigest_readable(f: HasRead, *, read_buffer: int = 1_000_000) -> str:
h = _hash()
while True:
Expand Down Expand Up @@ -254,6 +269,9 @@ def add_success(self, n: int = 1, *, info: str = "add_success") -> None:
logger.debug("%s %r -> %r", info, n, self)
assert not self.final

def progress_metrics(self) -> ProgressMetrics:
return {"handled": self.handled, "failed": self.failed, "total": self.total, "final": self.final}

def download_success(self, size: int) -> None:
self.add_success(size, info="download_success")

Expand Down
34 changes: 31 additions & 3 deletions rohmu/delta/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
increase_worth_reporting,
parallel_map_to,
Progress,
ProgressMetrics,
ProgressStep,
SnapshotFile,
SnapshotHash,
SnapshotState,
Expand Down Expand Up @@ -177,13 +179,25 @@ def _snapshot_create_missing_directories(self, *, src_dirs: Sequence[Path], dst_
changes += 1
return changes

def _snapshot_remove_extra_files(self, *, src_files: Sequence[Path], dst_files: Sequence[Path]) -> int:
def _snapshot_remove_extra_files(
self,
*,
src_files: Sequence[Path],
dst_files: Sequence[Path],
progress: Optional[Progress] = None,
progress_callback: Optional[Callable[[ProgressStep, ProgressMetrics], None]] = None,
) -> int:
if progress is None:
progress = Progress()
changes = 0
for i, relative_path in enumerate(set(dst_files).difference(src_files), 1):
dst_path = self.dst / relative_path
snapshotfile = self.relative_path_to_snapshotfile.get(relative_path)
if snapshotfile:
self._remove_snapshotfile(snapshotfile)
if progress_callback:
progress.add_success()
progress_callback(ProgressStep.REMOVING_EXTRA_FILES, progress.progress_metrics())
dst_path.unlink()
if increase_worth_reporting(i):
logger.debug("#%d. extra file: %r", i, relative_path)
Expand Down Expand Up @@ -218,7 +232,13 @@ def _snapshot_add_missing_files(self, *, src_files: Sequence[Path], dst_files: S
changes += 1
return changes

def snapshot(self, *, progress: Optional[Progress] = None, reuse_old_snapshotfiles: bool = True) -> int:
def snapshot(
self,
*,
progress: Optional[Progress] = None,
reuse_old_snapshotfiles: bool = True,
progress_callback: Optional[Callable[[ProgressStep, ProgressMetrics], None]] = None,
) -> int:
assert self.lock.locked()

if progress is None:
Expand Down Expand Up @@ -258,14 +278,20 @@ def snapshot(self, *, progress: Optional[Progress] = None, reuse_old_snapshotfil
# Create missing directories
changes = self._snapshot_create_missing_directories(src_dirs=src_dirs, dst_dirs=dst_dirs)
progress.add_success()
if progress_callback:
progress_callback(ProgressStep.CREATING_MISSING_DIRECTORIES, progress.progress_metrics())

# Remove extra files
changes += self._snapshot_remove_extra_files(src_files=src_files, dst_files=dst_files)
changes += self._snapshot_remove_extra_files(
src_files=src_files, dst_files=dst_files, progress=progress, progress_callback=progress_callback
)
progress.add_success()

# Add missing files
changes += self._snapshot_add_missing_files(src_files=src_files, dst_files=dst_files)
progress.add_success()
if progress_callback:
progress_callback(ProgressStep.ADDING_MISSING_FILES, progress.progress_metrics())

# We COULD also remove extra directories, but it is not
# probably really worth it and due to ignored files it
Expand Down Expand Up @@ -294,6 +320,8 @@ def _result_cb(*, map_in: Any, map_out: SnapshotFile) -> bool:
self._add_snapshotfile(map_out)
assert progress is not None
progress.add_success()
if progress_callback:
progress_callback(ProgressStep.PROCESSING_AND_HASHING_SNAPSHOT_FILES, progress.progress_metrics())
return True

changes += len(snapshotfiles)
Expand Down
57 changes: 56 additions & 1 deletion test/delta/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@
from __future__ import annotations

from pathlib import Path
from rohmu.delta.common import BackupPath, EMBEDDED_FILE_SIZE, Progress, SizeLimitedFile, SnapshotFile, SnapshotHash
from rohmu.delta.common import (
BackupPath,
EMBEDDED_FILE_SIZE,
Progress,
ProgressMetrics,
ProgressStep,
SizeLimitedFile,
SnapshotFile,
SnapshotHash,
)
from rohmu.typing import AnyPath
from test.conftest import SnapshotterWithDefaults
from typing import Any, Callable, Union
Expand Down Expand Up @@ -202,3 +211,49 @@ def fake_open_for_reading(self: SnapshotFile, path: Path) -> SizeLimitedFile:
with patch("rohmu.delta.snapshot.Path.stat", side_effect=FileNotFoundError):
with pytest.raises(FileNotFoundError):
snapshotter.snapshot(progress=Progress(), reuse_old_snapshotfiles=True)


@pytest.mark.timeout(2)
def test_snapshot_with_callback(snapshotter_creator: Callable[..., SnapshotterWithDefaults]) -> None:
snapshotter = snapshotter_creator()
callback_messages: list[str] = []

def progress_callback(message: ProgressStep, progress_metrics: ProgressMetrics) -> None:
callback_message = f"{message.value}: {progress_metrics['handled']}"
callback_messages.append(callback_message)

samples: dict[Union[str, Path], str] = {
"foo": "foobar",
"foo2": "foobar",
"foobig": "foobar" * EMBEDDED_FILE_SIZE,
"foobig2": "foobar" * EMBEDDED_FILE_SIZE,
}

src = snapshotter.src
for file_name, body in samples.items():
(src / file_name).write_text(body)

with snapshotter.lock:
progress = Progress()
changes_detected = snapshotter.snapshot(progress=progress, progress_callback=progress_callback)
assert changes_detected > 0

ss1 = snapshotter.get_snapshot_state()

progress = Progress()
assert snapshotter.snapshot(progress=progress, progress_callback=progress_callback) == 0

ss2 = snapshotter.get_snapshot_state()
assert ss1 == ss2
expected_messages = [
"creating_missing_directories: 1",
"adding_missing_files: 3",
"processing_and_hashing_snapshot_files: 4",
"processing_and_hashing_snapshot_files: 5",
"processing_and_hashing_snapshot_files: 6",
"processing_and_hashing_snapshot_files: 7",
"creating_missing_directories: 1",
"adding_missing_files: 3",
]

assert callback_messages == expected_messages

0 comments on commit 3bcc58f

Please sign in to comment.