Skip to content

Commit

Permalink
Merge pull request #14839 from vbotbuildovich/backport-pr-14816-v23.2…
Browse files Browse the repository at this point in the history
….x-481

[v23.2.x] archival: start reupload range after log start
  • Loading branch information
andrwng authored Nov 19, 2023
2 parents 8ce99c2 + 88aee86 commit bb3ad60
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 1 deletion.
7 changes: 6 additions & 1 deletion src/v/archival/segment_reupload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,12 @@ void segment_collector::do_collect(segment_collector_mode mode) {
if (
_segments.empty()
&& mode == segment_collector_mode::collect_compacted) {
_begin_inclusive = result.segment->offsets().base_offset;
// We may have found our first segment, but we can't always use its
// base offset -- it's possible the log has been prefix truncated
// within a segment (e.g. with delete records).
_begin_inclusive = std::max(
_log.offsets().start_offset,
result.segment->offsets().base_offset);
align_begin_offset_to_manifest();
}
_segments.push_back(result.segment);
Expand Down
80 changes: 80 additions & 0 deletions src/v/archival/tests/segment_reupload_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,86 @@ SEASTAR_THREAD_TEST_CASE(test_do_not_reupload_self_concatenated) {
}
}

SEASTAR_THREAD_TEST_CASE(test_do_not_reupload_prefix_truncated) {
auto ntp = model::ntp{"test_ns", "test_tpc", 0};
temporary_dir tmp_dir("concat_segment_read");
auto data_path = tmp_dir.get_path();
using namespace storage;

auto b = make_log_builder(data_path.string());

auto o = std::make_unique<ntp_config::default_overrides>();
o->cleanup_policy_bitflags = model::cleanup_policy_bitflags::compaction;
b | start(ntp_config{ntp, {data_path}, std::move(o)});
auto defer = ss::defer([&b] { b.stop().get(); });

b | storage::add_segment(0) | storage::add_random_batch(0, 1000)
| storage::add_segment(1000) | storage::add_random_batch(1000, 1000)
| storage::add_segment(2000) | storage::add_random_batch(2000, 1000);

// Set up our manifest to look as if our local data is a compacted version
// of what's in the cloud.
auto seg_size = b.get_segment(0).size_bytes();
cloud_storage::partition_manifest m(ntp, model::initial_revision_id{1});
m.add(
segment_name("0-499-v1.log"),
cloud_storage::segment_meta{
.is_compacted = false,
.size_bytes = seg_size,
.base_offset = model::offset(0),
.committed_offset = model::offset(499),
.delta_offset = model::offset_delta(0),
.delta_offset_end = model::offset_delta(0)});
m.add(
segment_name("500-999-v1.log"),
cloud_storage::segment_meta{
.is_compacted = false,
.size_bytes = seg_size,
.base_offset = model::offset(500),
.committed_offset = model::offset(999),
.delta_offset = model::offset_delta(0),
.delta_offset_end = model::offset_delta(0)});
m.add(
segment_name("1000-1999-v1.log"),
cloud_storage::segment_meta{
.is_compacted = false,
.size_bytes = seg_size,
.base_offset = model::offset(1000),
.committed_offset = model::offset(1999),
.delta_offset = model::offset_delta(0),
.delta_offset_end = model::offset_delta(0)});
m.add(
segment_name("2000-2999-v1.log"),
cloud_storage::segment_meta{
.is_compacted = false,
.size_bytes = seg_size,
.base_offset = model::offset(2000),
.committed_offset = model::offset(2999),
.delta_offset = model::offset_delta(0),
.delta_offset_end = model::offset_delta(0)});

// Mark our local segments compacted, making them eligible for reupload.
for (int i = 0; i < 3; i++) {
b.get_segment(i).mark_as_compacted_segment();
b.get_segment(i).mark_as_finished_self_compaction();
}

// Prefix truncate without aligning to a segment boundary, a la
// delete-records.
b.update_start_offset(model::offset{100}).get();

archival::segment_collector collector{
model::offset{0}, m, b.get_disk_log_impl(), seg_size * 10};

// Since we can't replace offsets starting at 0, the first remote segment
// isn't eligible for reupload and we should start from the next segment.
collector.collect_segments();
BOOST_REQUIRE_EQUAL(collector.begin_inclusive()(), 500);
BOOST_REQUIRE_EQUAL(collector.segments().size(), 3);
BOOST_REQUIRE_EQUAL(collector.segments()[0]->offsets().base_offset(), 0);
BOOST_REQUIRE(collector.should_replace_manifest_segment());
}

SEASTAR_THREAD_TEST_CASE(test_adjacent_segment_collection) {
auto ntp = model::ntp{"test_ns", "test_tpc", 0};
temporary_dir tmp_dir("concat_segment_read");
Expand Down

0 comments on commit bb3ad60

Please sign in to comment.