Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] archival: bump offset if seeking undershoots (manual backport) #24105

Merged
merged 5 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions src/v/archival/archival_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,7 @@ archival_policy::lookup_result archival_policy::find_segment(
return {.segment = *it, .ntp_conf = &ntp_conf, .forced = force_upload};
}

/// This function computes offsets for the upload (inc. file offets)
/// If the full segment is uploaded the segment is not scanned.
/// If the upload is partial, the partial scan will be performed if
/// the segment has the index and full scan otherwise.
static ss::future<std::optional<std::error_code>> get_file_range(
ss::future<std::optional<std::error_code>> get_file_range(
model::offset begin_inclusive,
std::optional<model::offset> end_inclusive,
ss::lw_shared_ptr<storage::segment> segment,
Expand Down Expand Up @@ -306,6 +302,25 @@ static ss::future<std::optional<std::error_code>> get_file_range(
co_return seek_result.error();
}
auto seek = seek_result.value();
vlog(
archival_log.debug,
"Found offset {} when looking for target {}",
seek.offset,
begin_inclusive);
if (seek.offset < begin_inclusive) {
// `convert_begin_offset_to_file_pos` may return a lower value than
// the target, e.g. if the target was compacted away.
//
// [...][10, 20][40, 50][...]
// Target offset: 30
// Seek result offset: 21
//
// If so, the upload will still logically contain offset 30 if we
// return bytes starting at offset 21, but we need to lie about the
// offsets because the caller expects the returned metadata to
// align with the target.
seek.offset = begin_inclusive;
}
upl->starting_offset = seek.offset;
upl->file_offset = seek.bytes;
upl->base_timestamp = seek.ts;
Expand Down
11 changes: 11 additions & 0 deletions src/v/archival/archival_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,15 @@ class archival_policy {
ss::io_priority_class _io_priority;
};

/// This function computes offsets for the upload (inc. file offets)
/// If the full segment is uploaded the segment is not scanned.
/// If the upload is partial, the partial scan will be performed if
/// the segment has the index and full scan otherwise.
ss::future<std::optional<std::error_code>> get_file_range(
model::offset begin_inclusive,
std::optional<model::offset> end_inclusive,
ss::lw_shared_ptr<storage::segment> segment,
ss::lw_shared_ptr<upload_candidate> upl,
ss::io_priority_class io_priority);

} // namespace archival
16 changes: 16 additions & 0 deletions src/v/archival/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,20 @@ else()
LABELS archival
ARGS "-- -c 1"
)

rp_test(
UNIT_TEST
GTEST
BINARY_NAME gtest_archival_policy
SOURCES
archival_policy_test.cc
LIBRARIES
v::cluster
v::storage
v::storage_test_utils
v::gtest_main
LABELS archival
ARGS "-- -c 1"
)

endif()
67 changes: 67 additions & 0 deletions src/v/archival/tests/archival_policy_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "cluster/archival/archival_policy.h"
Lazin marked this conversation as resolved.
Show resolved Hide resolved
#include "storage/tests/utils/disk_log_builder.h"
#include "test_utils/tmp_dir.h"

#include <seastar/util/defer.hh>

#include <gtest/gtest.h>

TEST(ArchivalPolicyTest, TestGetFileRangeSeekOffsetBelowBeginInclusive) {
temporary_dir tmp_dir("archival_policy_test");
auto data_path = tmp_dir.get_path();

auto b = storage::disk_log_builder{storage::log_config{
data_path.c_str(),
4_KiB,
ss::default_priority_class(),
storage::make_sanitized_file_config()}};
b
| storage::start(
storage::ntp_config{{"test_ns", "test_tpc", 0}, {data_path}});
auto defer = ss::defer([&b] { b.stop().get(); });
b | storage::add_segment(0);
auto ts = model::timestamp::now();

// Produce batches [0,9],[10,19],[30,39]
for (auto i : {0, 10, 30}) {
model::test::record_batch_spec spec{
.offset = model::offset{i},
.count = 10,
.records = 10,
.timestamp = ts,
};
b.add_random_batch(spec).get();
ts = model::timestamp{ts.value() + 1};
}

auto segment = b.get_log_segments().back();
auto upl = ss::make_lw_shared<archival::upload_candidate>(
{.sources = {segment}});

for (int i = 20; i < 30; ++i) {
// The seek result found in the provided segment will be less than
// begin_inclusive_offset, due to missing offsets [20,29]. Assert that
// the starting offset for the upload candidate is adjusted to be equal
// to begin_inclusive_offset.
auto begin_inclusive_offset = model::offset{i};
auto result = archival::get_file_range(
begin_inclusive_offset,
std::nullopt,
segment,
upl,
ss::default_priority_class())
.get();

ASSERT_FALSE(result.has_value());
ASSERT_EQ(upl->starting_offset, begin_inclusive_offset);
}
}
5 changes: 5 additions & 0 deletions src/v/storage/offset_to_filepos.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ struct offset_to_file_pos_result {
using should_fail_on_missing_offset
= ss::bool_class<struct should_fail_on_missing_offset_tag>;

// Returns the highest batch start offset that is <= the target offset.
// The batch corresponding to this start offset may not actually exist, e.g. if
// it was compacted away. This method may return a lower offset than the target
// offset.
ss::future<result<offset_to_file_pos_result>> convert_begin_offset_to_file_pos(
model::offset begin_inclusive,
ss::lw_shared_ptr<storage::segment> segment,
Expand All @@ -76,6 +80,7 @@ ss::future<result<offset_to_file_pos_result>> convert_begin_offset_to_file_pos(
should_fail_on_missing_offset fail_on_missing_offset
= should_fail_on_missing_offset::yes);

// Returns the highest batch end offset that is <= the target offset.
ss::future<result<offset_to_file_pos_result>> convert_end_offset_to_file_pos(
model::offset end_inclusive,
ss::lw_shared_ptr<storage::segment> segment,
Expand Down
Loading