From a2f2c99c87015c0dedf2b8d64feb58b930c0157d Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Wed, 10 Jul 2024 13:12:01 -0700 Subject: [PATCH 1/5] archival: bump offset if seeking undershoots convert_begin_offset_to_file_pos can undershoot, in which case the metadata returned by the archival policy won't perfectly align with the caller's expectations. This could result in an offset_overlap when applying the upload metadata. It's unclear exactly why there are missing batches, given this code is used for the non-compacted uploader. But the fix is simple enough, and is one that we use in the compacted upload policy already[1]. [1] https://github.com/redpanda-data/redpanda/blob/4b8135ea5dbdca322ccf8efc85424413a20c5ade/src/v/archival/segment_reupload.cc#L516-L522 (cherry picked from commit 8b616df65bfbddb5dc526bb1e9313ee8d125e81f) --- src/v/archival/archival_policy.cc | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/v/archival/archival_policy.cc b/src/v/archival/archival_policy.cc index 27541aa31bc2..c24c55e1aeaf 100644 --- a/src/v/archival/archival_policy.cc +++ b/src/v/archival/archival_policy.cc @@ -306,6 +306,25 @@ static ss::future> get_file_range( co_return seek_result.error(); } auto seek = seek_result.value(); + vlog( + archival_log.debug, + "Found offset {} when looking for target {}", + seek.offset, + begin_inclusive); + if (seek.offset < begin_inclusive) { + // `convert_begin_offset_to_file_pos` may return a lower value than + // the target, e.g. if the target was compacted away. + // + // [...][10, 20][40, 50][...] + // Target offset: 30 + // Seek result offset: 21 + // + // If so, the upload will still logically contain offset 30 if we + // return bytes starting at offset 21, but we need to lie about the + // offsets because the caller expects the returned metadata to + // align with the target. + seek.offset = begin_inclusive; + } upl->starting_offset = seek.offset; upl->file_offset = seek.bytes; upl->base_timestamp = seek.ts; From 002ee58788c6a1b98a8ef4faae49ab6ff87c4cda Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Wed, 10 Jul 2024 13:16:16 -0700 Subject: [PATCH 2/5] storage: add comments to offset_to_filepos header The headers are often a go-to place for information about the behavior callers should expect from a method. Adds some details explaining at a high level what to expect from a couple of methods. (cherry picked from commit ff120d0d74544d9de9a8dd37ebb63cf37260b52b) --- src/v/storage/offset_to_filepos.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/v/storage/offset_to_filepos.h b/src/v/storage/offset_to_filepos.h index 6f03f33aac46..6b1301756235 100644 --- a/src/v/storage/offset_to_filepos.h +++ b/src/v/storage/offset_to_filepos.h @@ -68,6 +68,10 @@ struct offset_to_file_pos_result { using should_fail_on_missing_offset = ss::bool_class; +// Returns the highest batch start offset that is <= the target offset. +// The batch corresponding to this start offset may not actually exist, e.g. if +// it was compacted away. This method may return a lower offset than the target +// offset. ss::future> convert_begin_offset_to_file_pos( model::offset begin_inclusive, ss::lw_shared_ptr segment, @@ -76,6 +80,7 @@ ss::future> convert_begin_offset_to_file_pos( should_fail_on_missing_offset fail_on_missing_offset = should_fail_on_missing_offset::yes); +// Returns the highest batch end offset that is <= the target offset. ss::future> convert_end_offset_to_file_pos( model::offset end_inclusive, ss::lw_shared_ptr segment, From 3aea139e11011e18d964ab3d9ef5b6caa4c30fef Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Mon, 28 Oct 2024 16:59:07 -0400 Subject: [PATCH 3/5] `archival`: move `get_file_range()` declaration to header (cherry picked from commit 5e99303170c9f81fd890b7e63fc0548eb2f59270) --- src/v/archival/archival_policy.cc | 6 +----- src/v/archival/archival_policy.h | 11 +++++++++++ 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/v/archival/archival_policy.cc b/src/v/archival/archival_policy.cc index c24c55e1aeaf..42dc058e637d 100644 --- a/src/v/archival/archival_policy.cc +++ b/src/v/archival/archival_policy.cc @@ -266,11 +266,7 @@ archival_policy::lookup_result archival_policy::find_segment( return {.segment = *it, .ntp_conf = &ntp_conf, .forced = force_upload}; } -/// This function computes offsets for the upload (inc. file offets) -/// If the full segment is uploaded the segment is not scanned. -/// If the upload is partial, the partial scan will be performed if -/// the segment has the index and full scan otherwise. -static ss::future> get_file_range( +ss::future> get_file_range( model::offset begin_inclusive, std::optional end_inclusive, ss::lw_shared_ptr segment, diff --git a/src/v/archival/archival_policy.h b/src/v/archival/archival_policy.h index 0468d36526d7..9ceae8928559 100644 --- a/src/v/archival/archival_policy.h +++ b/src/v/archival/archival_policy.h @@ -134,4 +134,15 @@ class archival_policy { ss::io_priority_class _io_priority; }; +/// This function computes offsets for the upload (inc. file offets) +/// If the full segment is uploaded the segment is not scanned. +/// If the upload is partial, the partial scan will be performed if +/// the segment has the index and full scan otherwise. +ss::future> get_file_range( + model::offset begin_inclusive, + std::optional end_inclusive, + ss::lw_shared_ptr segment, + ss::lw_shared_ptr upl, + ss::io_priority_class io_priority); + } // namespace archival From e16988d87292c0a1beed988e87b90697d7dde2cf Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Mon, 28 Oct 2024 17:00:26 -0400 Subject: [PATCH 4/5] `archival`: add `archival_policy_test.cc` (cherry picked from commit 5f18ba97c54f66793e7fbd82365e5216ea71d3ca) --- src/v/archival/tests/CMakeLists.txt | 16 +++++ src/v/archival/tests/archival_policy_test.cc | 67 ++++++++++++++++++++ 2 files changed, 83 insertions(+) create mode 100644 src/v/archival/tests/archival_policy_test.cc diff --git a/src/v/archival/tests/CMakeLists.txt b/src/v/archival/tests/CMakeLists.txt index c852cc958c18..e4dc431b96c0 100644 --- a/src/v/archival/tests/CMakeLists.txt +++ b/src/v/archival/tests/CMakeLists.txt @@ -77,4 +77,20 @@ else() LABELS archival ARGS "-- -c 1" ) + + rp_test( + UNIT_TEST + GTEST + BINARY_NAME gtest_archival_policy + SOURCES + archival_policy_test.cc + LIBRARIES + v::cluster + v::storage + v::storage_test_utils + v::gtest_main + LABELS archival + ARGS "-- -c 1" + ) + endif() diff --git a/src/v/archival/tests/archival_policy_test.cc b/src/v/archival/tests/archival_policy_test.cc new file mode 100644 index 000000000000..c775808c9728 --- /dev/null +++ b/src/v/archival/tests/archival_policy_test.cc @@ -0,0 +1,67 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "cluster/archival/archival_policy.h" +#include "storage/tests/utils/disk_log_builder.h" +#include "test_utils/tmp_dir.h" + +#include + +#include + +TEST(ArchivalPolicyTest, TestGetFileRangeSeekOffsetBelowBeginInclusive) { + temporary_dir tmp_dir("archival_policy_test"); + auto data_path = tmp_dir.get_path(); + + auto b = storage::disk_log_builder{storage::log_config{ + data_path.c_str(), + 4_KiB, + ss::default_priority_class(), + storage::make_sanitized_file_config()}}; + b + | storage::start( + storage::ntp_config{{"test_ns", "test_tpc", 0}, {data_path}}); + auto defer = ss::defer([&b] { b.stop().get(); }); + b | storage::add_segment(0); + auto ts = model::timestamp::now(); + + // Produce batches [0,9],[10,19],[30,39] + for (auto i : {0, 10, 30}) { + model::test::record_batch_spec spec{ + .offset = model::offset{i}, + .count = 10, + .records = 10, + .timestamp = ts, + }; + b.add_random_batch(spec).get(); + ts = model::timestamp{ts.value() + 1}; + } + + auto segment = b.get_log_segments().back(); + auto upl = ss::make_lw_shared( + {.sources = {segment}}); + + for (int i = 20; i < 30; ++i) { + // The seek result found in the provided segment will be less than + // begin_inclusive_offset, due to missing offsets [20,29]. Assert that + // the starting offset for the upload candidate is adjusted to be equal + // to begin_inclusive_offset. + auto begin_inclusive_offset = model::offset{i}; + auto result = archival::get_file_range( + begin_inclusive_offset, + std::nullopt, + segment, + upl, + ss::default_priority_class()) + .get(); + + ASSERT_FALSE(result.has_value()); + ASSERT_EQ(upl->starting_offset, begin_inclusive_offset); + } +} From 4aa22ad61db61fe72af61f233ce274db2a1879eb Mon Sep 17 00:00:00 2001 From: Eugene Lazin <4lazin@gmail.com> Date: Wed, 13 Nov 2024 12:35:27 +0100 Subject: [PATCH 5/5] cs: Fix include in "archival_policy_test.cc" --- src/v/archival/tests/archival_policy_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v/archival/tests/archival_policy_test.cc b/src/v/archival/tests/archival_policy_test.cc index c775808c9728..7c7e948caefc 100644 --- a/src/v/archival/tests/archival_policy_test.cc +++ b/src/v/archival/tests/archival_policy_test.cc @@ -7,7 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 -#include "cluster/archival/archival_policy.h" +#include "archival/archival_policy.h" #include "storage/tests/utils/disk_log_builder.h" #include "test_utils/tmp_dir.h"