Skip to content

Commit

Permalink
Merge pull request #14026 from abhijat/issue/13843/filter-replaced-se…
Browse files Browse the repository at this point in the history
…gments

ducktape: Exclude replaced segments when checking size
  • Loading branch information
piyushredpanda authored Oct 9, 2023
2 parents 6cb4c6c + 22b12a9 commit 4750d19
Showing 1 changed file with 46 additions and 7 deletions.
53 changes: 46 additions & 7 deletions tests/rptest/tests/topic_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,38 @@
import json
import os
import pprint
import re
import time
from collections import defaultdict, deque, namedtuple
from collections import defaultdict, deque
from queue import Queue
from threading import Thread
from typing import Callable, NamedTuple, Optional, Sequence

import requests
from ducktape.mark import ok_to_fail
from ducktape.cluster.cluster import ClusterNode
from ducktape.mark import matrix
from ducktape.tests.test import TestContext
from ducktape.utils.util import wait_until

from rptest.archival.s3_client import S3Client
from rptest.clients.kafka_cli_tools import KafkaCliTools
from rptest.clients.rp_storage_tool import RpStorageTool
from rptest.clients.rpk import RpkTool
from rptest.clients.types import TopicSpec
from rptest.services.admin import Admin
from rptest.services.cluster import cluster
from rptest.services.redpanda import (CloudStorageType, FileToChecksumSize,
RedpandaService, SISettings,
get_cloud_storage_type)
from rptest.services.kgo_verifier_services import KgoVerifierProducer
from rptest.services.redpanda import (FileToChecksumSize, RedpandaService,
SISettings, get_cloud_storage_type)
from rptest.services.rpk_producer import RpkProducer
from rptest.tests.redpanda_test import RedpandaTest
from rptest.util import wait_until_result
from rptest.utils.si_utils import (
EMPTY_SEGMENT_SIZE, MISSING_DATA_ERRORS, NTP, TRANSIENT_ERRORS,
PathMatcher, BucketView, SegmentReader, default_log_segment_size,
get_expected_ntp_restored_size, get_on_disk_size_per_ntp, is_close_size,
parse_s3_manifest_path, parse_s3_segment_path, verify_file_layout)
from rptest.services.kgo_verifier_services import KgoVerifierProducer
parse_s3_manifest_path, parse_s3_segment_path, verify_file_layout, NTPR,
gen_local_path_from_remote)

CLOUD_STORAGE_SEGMENT_MAX_UPLOAD_INTERVAL_SEC = 10

Expand Down Expand Up @@ -1110,6 +1112,21 @@ def _wipe_data(self):
f"All data will be removed from node {node.account.hostname}")
self.redpanda.remove_local_data(node)

def _collect_replaced_segments(self, replaced: dict[NTPR, list],
manifest_key: str):
data = self.cloud_storage_client.get_object_data(
self.s3_bucket, manifest_key)
manifest = None
if manifest_key.endswith('.bin'):
manifest = RpStorageTool(
self.logger).decode_partition_manifest(data)
elif manifest_key.endswith('.json'):
manifest = json.loads(data)
assert manifest is not None, f"failed to load manifest from path {manifest_key}"
if replaced_segments := manifest.get('replaced'):
replaced[parse_s3_manifest_path(manifest_key)] = list(
replaced_segments.keys())

def _wait_for_data_in_s3(self,
expected_topics,
timeout=datetime.timedelta(minutes=1)):
Expand All @@ -1121,7 +1138,19 @@ def _wait_for_data_in_s3(self,
total_partitions = sum([t.partition_count for t in expected_topics])
tolerance = default_log_segment_size * total_partitions
path_matcher = PathMatcher(expected_topics)
replaced_segments = {}

def is_replaced(segment):
parsed = parse_s3_segment_path(
re.sub(r'log.\d+', 'log',
gen_local_path_from_remote(segment.key)))
for k, v in replaced_segments.items():
if parsed.name in v and parsed.ntpr == k:
return True
return False

"""Wait until all topics are uploaded to S3"""

def verify():
manifests = []
topic_manifests = []
Expand All @@ -1133,11 +1162,21 @@ def verify():
self.logger.debug(f'checking S3 object: {obj.key}')
if path_matcher.is_partition_manifest(obj):
manifests.append(obj)
self._collect_replaced_segments(replaced_segments, obj.key)
elif path_matcher.is_topic_manifest(obj):
topic_manifests.append(obj)
elif path_matcher.is_segment(obj):
segments.append(obj)

for segment in segments:
if is_replaced(segment):
self.logger.debug(
f'removing replaced segment {segment} from segments')

segments = [
segment for segment in segments if not is_replaced(segment)
]

if len(expected_topics) != len(topic_manifests):
self.logger.info(
f"can't find enough topic_manifest.json objects, "
Expand Down

0 comments on commit 4750d19

Please sign in to comment.