Skip to content

Commit

Permalink
Merge pull request #24636 from WillemKauf/e_compaction_alignment_test
Browse files Browse the repository at this point in the history
`storage`: compaction alignment test
  • Loading branch information
Lazin authored Jan 11, 2025
2 parents f7e0746 + 1edec69 commit e9ad492
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 3 deletions.
22 changes: 22 additions & 0 deletions src/v/storage/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -853,3 +853,25 @@ redpanda_cc_gtest(
"@seastar//:testing",
],
)

redpanda_cc_btest(
name = "compaction_fuzz_test",
timeout = "short",
srcs = [
"compaction_fuzz_test.cc",
],
deps = [
":disk_log_builder",
"//src/v/base",
"//src/v/container:fragmented_vector",
"//src/v/model",
"//src/v/random:generators",
"//src/v/storage",
"//src/v/storage:record_batch_builder",
"//src/v/test_utils:seastar_boost",
"@abseil-cpp//absl/container:btree",
"@boost//:test",
"@seastar",
"@seastar//:testing",
],
)
1 change: 1 addition & 0 deletions src/v/storage/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ rp_test(
file_sanitizer_test.cc
compaction_reducer_test.cc
batch_consumer_utils_test.cc
compaction_fuzz_test.cc
LIBRARIES v::seastar_testing_main v::storage_test_utils v::model_test_utils
LABELS storage
ARGS "-- -c 1"
Expand Down
249 changes: 249 additions & 0 deletions src/v/storage/tests/compaction_fuzz_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "base/vlog.h"
#include "container/fragmented_vector.h"
#include "model/namespace.h"
#include "model/record_batch_types.h"
#include "model/timeout_clock.h"
#include "random/generators.h"
#include "storage/record_batch_builder.h"
#include "storage/tests/utils/disk_log_builder.h"
#include "storage/types.h"

#include <seastar/core/io_priority_class.hh>
#include <seastar/core/sleep.hh>
#include <seastar/testing/thread_test_case.hh>

#include <absl/container/btree_map.h>
#include <boost/test/tools/old/interface.hpp>
#include <boost/test/unit_test.hpp>

#include <exception>

using namespace std::chrono_literals;

namespace {
ss::logger cmp_testlog("cmp-fuzz");
} // anonymous namespace

static model::record_batch make_random_batch(
model::offset offset,
bool empty,
model::record_batch_type type,
std::vector<std::optional<ss::sstring>> keys,
std::vector<std::optional<ss::sstring>> values,
int num_records) {
BOOST_REQUIRE(keys.size() == values.size());
storage::record_batch_builder builder(type, offset);
auto to_iobuf = [](std::optional<ss::sstring> x) {
std::optional<iobuf> result;
if (x.has_value()) {
iobuf buf;
buf.append(x->data(), x->size());
result = std::move(buf);
}
return result;
};
if (!empty) {
for (int i = 0; i < num_records; i++) {
auto key = random_generators::random_choice(keys);
auto val = random_generators::random_choice(values);
builder.add_raw_kv(to_iobuf(key), to_iobuf(val));
}
}
return std::move(builder).build();
}

static fragmented_vector<model::record_batch>
generate_random_record_batches(int num, int cardinality) {
fragmented_vector<model::record_batch> result;
std::vector<std::optional<ss::sstring>> keys;
std::vector<std::optional<ss::sstring>> values;
std::vector<model::record_batch_type> types{
model::record_batch_type::raft_configuration,
model::record_batch_type::raft_data,
model::record_batch_type::archival_metadata,
};
for (int i = 0; i < cardinality; i++) {
if (i == 0) {
keys.emplace_back(std::nullopt);
} else {
keys.emplace_back(
random_generators::gen_alphanum_string(20, false));
}
values.emplace_back(random_generators::gen_alphanum_string(20, false));
}
// Generate actual batches
model::offset current{0};
for (int i = 0; i < num; i++) {
result.emplace_back(make_random_batch(
current,
false,
random_generators::random_choice(types),
keys,
values,
random_generators::get_int(1, 10)));
current = model::next_offset(result.back().last_offset());
}
return result;
}

/// Offset translator state observed at some point in time
struct ot_state {
std::deque<model::offset> gap_offset;
std::deque<int64_t> gap_length;
};

/// Consumer that builds the map of all non-data
/// batches!
struct ot_state_consumer {
ss::future<ss::stop_iteration> operator()(model::record_batch rb) {
static const auto translation_batches
= model::offset_translator_batch_types();
if (
std::find(
translation_batches.begin(),
translation_batches.end(),
rb.header().type)
!= translation_batches.end()) {
// save information about the non-data batch
st->gap_offset.push_back(rb.base_offset());
st->gap_length.push_back(rb.record_count());
}
co_return ss::stop_iteration::no;
}

void end_of_stream() {}

ot_state* st;
};

/// Insert data into the log and maintain particular
/// segment arrangement. The arrangement is defined
/// by the set of segment base offset values.
ss::future<ot_state> arrange_and_compact(
const fragmented_vector<model::record_batch>& batches,
std::deque<model::offset> arrangement,
bool simulate_internal_topic_compaction = false) {
std::sort(arrangement.begin(), arrangement.end());
storage::log_config cfg = storage::log_builder_config();
auto offset_translator_types = model::offset_translator_batch_types();
auto raft_group_id = raft::group_id{0};
storage::disk_log_builder b1(cfg, offset_translator_types, raft_group_id);

auto ns = simulate_internal_topic_compaction
? model::kafka_internal_namespace
: model::kafka_namespace;
model::ntp log_ntp(
ns,
model::topic_partition(
model::topic(random_generators::gen_alphanum_string(8)),
model::partition_id{0}));
std::exception_ptr error = nullptr;
co_await b1.start(log_ntp);

// Must initialize translator state.
co_await b1.get_disk_log_impl().start(std::nullopt);

try {
for (const auto& b : batches) {
co_await b1.add_batch(b.copy());
if (
!arrangement.empty() && b.base_offset() >= arrangement.front()) {
arrangement.pop_front();
co_await b1.get_disk_log_impl().force_roll(
ss::default_priority_class());
}
}
ss::abort_source as;
auto compact_cfg = storage::compaction_config(
batches.back().last_offset(),
std::nullopt,
ss::default_priority_class(),
as);
std::ignore = co_await b1.apply_sliding_window_compaction(compact_cfg);
co_await b1.apply_adjacent_merge_compaction(compact_cfg);
} catch (...) {
error = std::current_exception();
}
auto reader = co_await b1.get_disk_log_impl().make_reader(
storage::log_reader_config(
model::offset{0}, model::offset::max(), ss::default_priority_class()));
ot_state st{};
co_await std::move(reader).consume(
ot_state_consumer{.st = &st}, model::no_timeout);
co_await b1.stop();
if (error) {
vlog(
cmp_testlog.error,
"Error triggered while appending or compacting: {}",
error);
}
BOOST_REQUIRE(error == nullptr);
co_return st;
}

/// This function generates random alignment based on the set of batches
/// that will be written into the log.
std::deque<model::offset> generate_random_arrangement(
const fragmented_vector<model::record_batch>& batches, size_t num_segments) {
BOOST_REQUIRE(num_segments <= batches.size());
std::deque<model::offset> arr;
// User reservoir sample to produce num_segments
for (size_t i = 0; i < num_segments; i++) {
arr.push_back(batches[i].base_offset());
}
for (size_t i = num_segments; i < batches.size(); i++) {
auto r = random_generators::get_int<size_t>(0, i);
if (r < num_segments) {
arr[r] = batches[i].base_offset();
}
}
return arr;
}

SEASTAR_THREAD_TEST_CASE(test_compaction_with_different_segment_arrangements) {
#ifdef NDEBUG
static constexpr auto num_batches = 1000;
std::vector<size_t> num_segments = {10, 100, 1000};
#else
static constexpr auto num_batches = 10;
std::vector<size_t> num_segments = {10};
#endif
auto batches = generate_random_record_batches(num_batches, 10);
auto expected_ot
= arrange_and_compact(batches, std::deque<model::offset>{}, false).get();
for (auto num : num_segments) {
auto arrangement = generate_random_arrangement(batches, num);
auto actual_ot = arrange_and_compact(batches, arrangement, false).get();
BOOST_REQUIRE(expected_ot.gap_offset == actual_ot.gap_offset);
BOOST_REQUIRE(expected_ot.gap_length == actual_ot.gap_length);
}
}

SEASTAR_THREAD_TEST_CASE(
test_compaction_with_different_segment_arrangements_simulate_internal_topic) {
#ifdef NDEBUG
static constexpr auto num_batches = 1000;
std::vector<size_t> num_segments = {10, 100, 1000};
#else
static constexpr auto num_batches = 10;
std::vector<size_t> num_segments = {10};
#endif
auto batches = generate_random_record_batches(num_batches, 10);
auto expected_ot
= arrange_and_compact(batches, std::deque<model::offset>{}, true).get();
for (auto num : num_segments) {
auto arrangement = generate_random_arrangement(batches, num);
auto actual_ot = arrange_and_compact(batches, arrangement, true).get();
BOOST_REQUIRE(expected_ot.gap_offset == actual_ot.gap_offset);
BOOST_REQUIRE(expected_ot.gap_length == actual_ot.gap_length);
}
}
2 changes: 1 addition & 1 deletion src/v/storage/tests/storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4061,7 +4061,7 @@ FIXTURE_TEST(test_skipping_compaction_below_start_offset, log_builder_fixture) {
auto& first_seg = log.segments().front();
BOOST_REQUIRE_EQUAL(first_seg->finished_self_compaction(), false);

b.apply_compaction(cfg.compact, *new_start_offset).get();
b.apply_adjacent_merge_compaction(cfg.compact, *new_start_offset).get();

BOOST_REQUIRE_EQUAL(first_seg->finished_self_compaction(), false);

Expand Down
7 changes: 6 additions & 1 deletion src/v/storage/tests/utils/disk_log_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,16 @@ disk_log_builder::apply_retention(gc_config cfg) {
return get_disk_log_impl().do_gc(cfg);
}

ss::future<> disk_log_builder::apply_compaction(
ss::future<> disk_log_builder::apply_adjacent_merge_compaction(
compaction_config cfg, std::optional<model::offset> new_start_offset) {
return get_disk_log_impl().adjacent_merge_compact(cfg, new_start_offset);
}

ss::future<bool> disk_log_builder::apply_sliding_window_compaction(
compaction_config cfg, std::optional<model::offset> new_start_offset) {
return get_disk_log_impl().sliding_window_compact(cfg, new_start_offset);
}

ss::future<bool>
disk_log_builder::update_start_offset(model::offset start_offset) {
return get_disk_log_impl().update_start_offset(start_offset);
Expand Down
5 changes: 4 additions & 1 deletion src/v/storage/tests/utils/disk_log_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,10 @@ class disk_log_builder {
model::timestamp collection_upper_bound,
std::optional<size_t> max_partition_retention_size);
ss::future<std::optional<model::offset>> apply_retention(gc_config cfg);
ss::future<> apply_compaction(
ss::future<> apply_adjacent_merge_compaction(
compaction_config cfg,
std::optional<model::offset> new_start_offset = std::nullopt);
ss::future<bool> apply_sliding_window_compaction(
compaction_config cfg,
std::optional<model::offset> new_start_offset = std::nullopt);
ss::future<bool> update_start_offset(model::offset start_offset);
Expand Down

0 comments on commit e9ad492

Please sign in to comment.