diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 152259f49e87..1e2757dfff7d 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -3706,6 +3706,14 @@ configuration::configuration() "Iceberg is enabled, do not change this value.", {.needs_restart = needs_restart::yes, .visibility = visibility::user}, "redpanda-iceberg-catalog") + , datalake_coordinator_snapshot_max_delay_secs( + *this, + "datalake_coordinator_snapshot_max_delay_secs", + "Maximum amount of time the coordinator waits to snapshot after a " + "command appears in the log.", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + std::chrono::seconds(15min), + {.min = 10s}) , iceberg_catalog_type( *this, "iceberg_catalog_type", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index d83e078b8b92..4d0c96d6fed8 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -703,6 +703,8 @@ struct configuration final : public config_store { bounded_property iceberg_catalog_commit_interval_ms; property iceberg_catalog_base_location; + bounded_property + datalake_coordinator_snapshot_max_delay_secs; // datalake catalog configuration enum_property iceberg_catalog_type; diff --git a/src/v/datalake/coordinator/BUILD b/src/v/datalake/coordinator/BUILD index 53c93be5ea1f..97804c096f07 100644 --- a/src/v/datalake/coordinator/BUILD +++ b/src/v/datalake/coordinator/BUILD @@ -184,42 +184,43 @@ redpanda_cc_library( ) redpanda_cc_library( - name = "model", + name = "state", srcs = [ - "types.cc", + "state.cc", ], hdrs = [ - "types.h", + "state.h", ], include_prefix = "datalake/coordinator", - visibility = [":__subpackages__"], deps = [ ":translated_offset_range", - "//src/v/datalake:types", + "//src/v/container:chunked_hash_map", "//src/v/model", "//src/v/serde", - "//src/v/serde:enum", - # todo: split writer further once it evolves - "//src/v/datalake:writer", - "//src/v/container:fragmented_vector", - "//src/v/utils:to_string", ], ) redpanda_cc_library( - name = "state", + name = "model", srcs = [ - "state.cc", + "types.cc", ], hdrs = [ - "state.h", + "types.h", ], include_prefix = "datalake/coordinator", + visibility = [":__subpackages__"], deps = [ ":translated_offset_range", - "//src/v/container:chunked_hash_map", + "//src/v/datalake:types", + "//src/v/datalake/coordinator:state", "//src/v/model", "//src/v/serde", + "//src/v/serde:enum", + # todo: split writer further once it evolves + "//src/v/datalake:writer", + "//src/v/container:fragmented_vector", + "//src/v/utils:to_string", ], ) diff --git a/src/v/datalake/coordinator/state.cc b/src/v/datalake/coordinator/state.cc index f2aaee67643f..9e2ed767e266 100644 --- a/src/v/datalake/coordinator/state.cc +++ b/src/v/datalake/coordinator/state.cc @@ -11,6 +11,28 @@ namespace datalake::coordinator { +pending_entry pending_entry::copy() const { + return {.data = data.copy(), .added_pending_at = added_pending_at}; +} + +partition_state partition_state::copy() const { + partition_state result; + result.last_committed = last_committed; + for (const auto& entry : pending_entries) { + result.pending_entries.push_back(entry.copy()); + } + return result; +} + +topic_state topic_state::copy() const { + topic_state result; + result.pid_to_pending_files.reserve(pid_to_pending_files.size()); + for (const auto& [id, state] : pid_to_pending_files) { + result.pid_to_pending_files[id] = state.copy(); + } + return result; +} + std::optional> topics_state::partition_state(const model::topic_partition& tp) const { auto state_iter = topic_to_state.find(tp.topic); @@ -25,4 +47,13 @@ topics_state::partition_state(const model::topic_partition& tp) const { return prt_iter->second; } +topics_state topics_state::copy() const { + topics_state result; + result.topic_to_state.reserve(topic_to_state.size()); + for (const auto& [id, state] : topic_to_state) { + result.topic_to_state[id] = state.copy(); + } + return result; +} + } // namespace datalake::coordinator diff --git a/src/v/datalake/coordinator/state.h b/src/v/datalake/coordinator/state.h index 695229655b37..e5340c7f9f36 100644 --- a/src/v/datalake/coordinator/state.h +++ b/src/v/datalake/coordinator/state.h @@ -31,6 +31,8 @@ struct pending_entry // Offset of the control topic partition at which this data entry was added // to the state machine as a pending entry. model::offset added_pending_at; + + pending_entry copy() const; }; // State tracked per Kafka partition. Groups of files get added added to this @@ -69,6 +71,8 @@ struct partition_state // // Is nullopt iff we have never committed any files to the table. std::optional last_committed; + + partition_state copy() const; }; // Tracks the state managed for each Kafka partition. Since data workers are @@ -82,6 +86,8 @@ struct topic_state // Map from Redpanda partition id to the files pending per partition. chunked_hash_map pid_to_pending_files; + topic_state copy() const; + // TODO: add table-wide metadata like Kafka schema id, Iceberg table uuid, // etc. }; @@ -96,6 +102,8 @@ struct topics_state // files per partition. chunked_hash_map topic_to_state; + topics_state copy() const; + // Returns the state for the given partition. std::optional> partition_state(const model::topic_partition&) const; diff --git a/src/v/datalake/coordinator/state_machine.cc b/src/v/datalake/coordinator/state_machine.cc index f0b99c0027f4..1eab93536011 100644 --- a/src/v/datalake/coordinator/state_machine.cc +++ b/src/v/datalake/coordinator/state_machine.cc @@ -44,13 +44,19 @@ void maybe_log_update_error( } } // namespace -coordinator_stm::coordinator_stm(ss::logger& logger, raft::consensus* raft) - : raft::persisted_stm<>("datalake_coordinator_stm.snapshot", logger, raft) {} +coordinator_stm::coordinator_stm( + ss::logger& logger, + raft::consensus* raft, + config::binding snapshot_delay) + : coordinator_stm_base("datalake_coordinator_stm.snapshot", logger, raft) + , snapshot_delay_secs_(std::move(snapshot_delay)) { + snapshot_timer_.set_callback([this] { write_snapshot_async(); }); +} ss::future> coordinator_stm::sync(model::timeout_clock::duration timeout) { auto sync_res = co_await ss::coroutine::as_future( - raft::persisted_stm<>::sync(timeout)); + coordinator_stm_base::sync(timeout)); if (sync_res.failed()) { auto eptr = sync_res.get_exception(); auto msg = fmt::format("Exception caught while syncing: {}", eptr); @@ -125,27 +131,77 @@ ss::future<> coordinator_stm::do_apply(const model::record_batch& b) { key, b.header()); } + rearm_snapshot_timer(); } model::offset coordinator_stm::max_collectible_offset() { return {}; } -ss::future<> -coordinator_stm::apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) { - co_return; +ss::future<> coordinator_stm::apply_local_snapshot( + raft::stm_snapshot_header, iobuf&& snapshot_buf) { + auto parser = iobuf_parser(std::move(snapshot_buf)); + auto snapshot = co_await serde::read_async(parser); + state_ = std::move(snapshot.topics); } ss::future -coordinator_stm::take_local_snapshot(ssx::semaphore_units) { - // temporarily ignore snapshots, to be fixed later. - // throwing here results in uncaught exceptions, so a dummy snapshot - // at offset 0 avoids that. - co_return raft::stm_snapshot::create(0, model::offset{0}, iobuf{}); +coordinator_stm::take_local_snapshot(ssx::semaphore_units units) { + auto snapshot_offset = last_applied_offset(); + auto snapshot = make_snapshot(); + units.return_all(); + iobuf snapshot_buf; + co_await serde::write_async(snapshot_buf, std::move(snapshot)); + co_return raft::stm_snapshot::create( + 0, snapshot_offset, std::move(snapshot_buf)); } -ss::future<> coordinator_stm::apply_raft_snapshot(const iobuf&) { co_return; } +ss::future<> coordinator_stm::apply_raft_snapshot(const iobuf& snapshot_buf) { + auto parser = iobuf_parser(snapshot_buf.copy()); + auto snapshot = co_await serde::read_async(parser); + state_ = std::move(snapshot.topics); +} -ss::future coordinator_stm::take_snapshot(model::offset) { - co_return iobuf{}; +ss::future coordinator_stm::take_snapshot() { + iobuf snapshot_buf; + co_await serde::write_async(snapshot_buf, make_snapshot()); + co_return std::move(snapshot_buf); +} + +stm_snapshot coordinator_stm::make_snapshot() const { + return {.topics = state_.copy()}; +} + +ss::future<> coordinator_stm::stop() { + snapshot_timer_.cancel(); + return coordinator_stm_base::stop(); +} + +ss::future<> coordinator_stm::maybe_write_snapshot() { + if (_raft->last_snapshot_index() >= last_applied()) { + co_return; + } + auto snapshot = co_await _raft->stm_manager()->take_snapshot(); + vlog( + _log.debug, + "creating snapshot at offset: {}", + snapshot.last_included_offset); + co_await _raft->write_snapshot(raft::write_snapshot_cfg( + snapshot.last_included_offset, std::move(snapshot.data))); +} + +void coordinator_stm::write_snapshot_async() { + ssx::background = ssx::spawn_with_gate_then( + _gate, [this] { return maybe_write_snapshot(); }) + .handle_exception([this](const std::exception_ptr& e) { + vlog(_log.warn, "failed to write snapshot: {}", e); + }) + .finally([holder = _gate.hold()] {}); +} + +void coordinator_stm::rearm_snapshot_timer() { + if (_gate.is_closed() || snapshot_timer_.armed()) { + return; + } + snapshot_timer_.arm(snapshot_delay_secs_()); } bool stm_factory::is_applicable_for(const storage::ntp_config& config) const { @@ -156,7 +212,11 @@ bool stm_factory::is_applicable_for(const storage::ntp_config& config) const { void stm_factory::create( raft::state_machine_manager_builder& builder, raft::consensus* raft) { - auto stm = builder.create_stm(datalake_log, raft); + auto stm = builder.create_stm( + datalake_log, + raft, + config::shard_local_cfg() + .datalake_coordinator_snapshot_max_delay_secs.bind()); raft->log()->stm_manager()->add_stm(stm); } diff --git a/src/v/datalake/coordinator/state_machine.h b/src/v/datalake/coordinator/state_machine.h index e17fa6a9b4e8..b28c71b15a37 100644 --- a/src/v/datalake/coordinator/state_machine.h +++ b/src/v/datalake/coordinator/state_machine.h @@ -15,7 +15,9 @@ namespace datalake::coordinator { -class coordinator_stm final : public raft::persisted_stm<> { +using coordinator_stm_base = raft::persisted_stm_no_snapshot_at_offset<>; + +class coordinator_stm final : public coordinator_stm_base { public: static constexpr std::string_view name = "datalake_coordinator_stm"; enum class errc { @@ -25,7 +27,8 @@ class coordinator_stm final : public raft::persisted_stm<> { shutting_down, }; - explicit coordinator_stm(ss::logger&, raft::consensus*); + explicit coordinator_stm( + ss::logger&, raft::consensus*, config::binding); raft::consensus* raft() { return _raft; } // Syncs the STM such that we're guaranteed that it has applied all records @@ -46,6 +49,10 @@ class coordinator_stm final : public raft::persisted_stm<> { const topics_state& state() const { return state_; } protected: + ss::future<> stop() override; + + stm_snapshot make_snapshot() const; + ss::future<> do_apply(const model::record_batch&) override; model::offset max_collectible_offset() override; @@ -58,11 +65,17 @@ class coordinator_stm final : public raft::persisted_stm<> { ss::future<> apply_raft_snapshot(const iobuf&) final; - ss::future take_snapshot(model::offset) final; + ss::future take_snapshot() final; private: + void rearm_snapshot_timer(); + void write_snapshot_async(); + ss::future<> maybe_write_snapshot(); + // The deterministic state managed by this STM. topics_state state_; + config::binding snapshot_delay_secs_; + ss::timer snapshot_timer_; }; class stm_factory : public cluster::state_machine_factory { public: diff --git a/src/v/datalake/coordinator/tests/BUILD b/src/v/datalake/coordinator/tests/BUILD index 14f459167eef..1c4446f3b6b7 100644 --- a/src/v/datalake/coordinator/tests/BUILD +++ b/src/v/datalake/coordinator/tests/BUILD @@ -11,6 +11,7 @@ redpanda_test_cc_library( include_prefix = "datalake/coordinator/tests", deps = [ "//src/v/container:fragmented_vector", + "//src/v/datalake/coordinator:file_committer", "//src/v/datalake/coordinator:state", "//src/v/datalake/coordinator:translated_offset_range", "//src/v/model", @@ -89,3 +90,21 @@ redpanda_cc_gtest( "@googletest//:gtest", ], ) + +redpanda_cc_gtest( + name = "state_machine_test", + timeout = "short", + srcs = [ + "state_machine_test.cc", + ], + cpu = 1, + deps = [ + ":state_test_utils", + "//src/v/datalake/coordinator", + "//src/v/datalake/coordinator:stm", + "//src/v/raft/tests:stm_raft_fixture", + "//src/v/storage", + "@googletest//:gtest", + "@seastar", + ], +) diff --git a/src/v/datalake/coordinator/tests/CMakeLists.txt b/src/v/datalake/coordinator/tests/CMakeLists.txt index 363c1196959a..d72700bddf02 100644 --- a/src/v/datalake/coordinator/tests/CMakeLists.txt +++ b/src/v/datalake/coordinator/tests/CMakeLists.txt @@ -15,6 +15,7 @@ rp_test( SOURCES coordinator_test.cc state_update_test.cc + state_machine_test.cc LIBRARIES v::datalake_coordinator v::datalake_coordinator_test_utils diff --git a/src/v/datalake/coordinator/tests/coordinator_test.cc b/src/v/datalake/coordinator/tests/coordinator_test.cc index 3d7cd33f43b4..a179618f3a56 100644 --- a/src/v/datalake/coordinator/tests/coordinator_test.cc +++ b/src/v/datalake/coordinator/tests/coordinator_test.cc @@ -39,37 +39,6 @@ class noop_file_committer : public file_committer { ~noop_file_committer() override = default; }; -// Simple committer that returns the set of updates that would mark all the -// pending files as committed. Doesn't affect any external state. -class simple_file_committer : public file_committer { -public: - ss::future, errc>> - commit_topic_files_to_catalog( - model::topic t, const topics_state& state) const override { - chunked_vector ret; - auto t_iter = std::ranges::find( - state.topic_to_state, - t, - &std::pair::first); - if (t_iter == state.topic_to_state.end()) { - co_return ret; - } - // Mark the last file in each partition as committed. - auto& t_state = t_iter->second; - for (const auto& [pid, files] : t_state.pid_to_pending_files) { - if (files.pending_entries.empty()) { - continue; - } - model::topic_partition tp(t, pid); - auto build_res = mark_files_committed_update::build( - state, tp, files.pending_entries.back().data.last_offset); - EXPECT_FALSE(build_res.has_error()); - ret.emplace_back(std::move(build_res.value())); - } - co_return ret; - } - ~simple_file_committer() override = default; -}; const model::topic topic_base{"test_topic"}; model::topic_partition tp(int t, int pid) { return model::topic_partition( @@ -194,7 +163,9 @@ class CoordinatorTest : public raft::raft_fixture { auto* raft = node->raft().get(); raft::state_machine_manager_builder builder; auto stm = builder.create_stm( - datalake::datalake_log, raft); + datalake::datalake_log, + raft, + config::mock_binding(1s)); node->start(std::move(builder)).get(); if (args.noop_commits) { crds.at(id()) = std::make_unique( diff --git a/src/v/datalake/coordinator/tests/state_machine_test.cc b/src/v/datalake/coordinator/tests/state_machine_test.cc new file mode 100644 index 000000000000..fa6893a8e806 --- /dev/null +++ b/src/v/datalake/coordinator/tests/state_machine_test.cc @@ -0,0 +1,200 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "datalake/coordinator/coordinator.h" +#include "datalake/coordinator/state_machine.h" +#include "datalake/coordinator/tests/state_test_utils.h" +#include "raft/tests/stm_test_fixture.h" + +using coordinator = std::unique_ptr; +using stm = datalake::coordinator::coordinator_stm; +using stm_ptr = ss::shared_ptr; + +struct coordinator_stm_fixture : stm_raft_fixture { + ss::future<> TearDownAsync() override { + for (auto& [_, coordinator] : coordinators) { + co_await coordinator->stop_and_wait(); + } + co_return co_await stm_raft_fixture::TearDownAsync(); + } + + config::binding commit_interval() const { + return config::mock_binding(500ms); + } + + config::binding snapshot_interval() const { + return config::mock_binding(1s); + } + + stm_shptrs_t create_stms( + state_machine_manager_builder& builder, + raft_node_instance& node) override { + return builder.create_stm( + logger(), node.raft().get(), snapshot_interval()); + } + + ss::future<> initialize() { + co_await initialize_state_machines(); + co_await parallel_for_each_node([this](raft_node_instance& node) { + auto stm = get_stm<0>(node); + coordinators[node.get_vnode()] + = std::make_unique( + get_stm<0>(node), file_committer, commit_interval()); + coordinators[node.get_vnode()]->start(); + return ss::now(); + }); + } + + model::offset last_snapshot_offset() { + model::offset result = model::offset::max(); + for (auto& [_, stms] : node_stms) { + auto stm_snapshot_index + = std::get<0>(stms)->raft()->last_snapshot_index(); + result = std::min(result, stm_snapshot_index); + } + return result; + } + + template + auto retry_with_leader_coordinator(Func&& func) { + return retry_with_leader( + model::timeout_clock::now() + 5s, + [this, func = std::forward(func)]( + raft_node_instance& leader) mutable { + return func(coordinators[leader.get_vnode()]); + }); + } + + std::vector last_applied_offsets() const { + std::vector result; + result.reserve(node_stms.size()); + for (auto& [_, stms] : node_stms) { + result.push_back(std::get<0>(stms)->last_applied_offset()); + } + return result; + } + + std::vector> + last_committed_offsets(model::topic_partition tp) { + std::vector> result; + result.reserve(node_stms.size()); + for (auto& [_, stms] : node_stms) { + const auto& topic_state = std::get<0>(stms)->state(); + auto it = topic_state.topic_to_state.find(tp.topic); + if (it == topic_state.topic_to_state.end()) { + result.emplace_back(std::nullopt); + continue; + } + const auto& p_state = it->second.pid_to_pending_files; + auto p_it = p_state.find(tp.partition); + if (p_it == p_state.end()) { + result.emplace_back(std::nullopt); + continue; + } + auto& pending_files = p_it->second.pending_entries; + if (!pending_files.empty()) { + result.emplace_back(pending_files.back().data.last_offset); + } else { + result.push_back(p_it->second.last_committed); + } + } + return result; + } + + model::topic_partition random_tp() const { + return { + model::topic{"test"}, + model::partition_id( + random_generators::get_int(0, max_partitions - 1))}; + } + + static constexpr int32_t max_partitions = 5; + model::topic_partition tp{model::topic{"test"}, model::partition_id{0}}; + datalake::coordinator::simple_file_committer file_committer; + absl::flat_hash_map coordinators; +}; + +TEST_F_CORO(coordinator_stm_fixture, test_snapshots) { + co_await initialize(); + co_await wait_for_leader(5s); + + // populate some data until the state machine is snapshotted + // a few times + constexpr auto max_snapshots = 5; + auto completed_snapshots = 0; + auto prev_snapshot_offset = last_snapshot_offset(); + while (completed_snapshots != max_snapshots) { + // mock a translator + auto add_files_result = co_await retry_with_leader_coordinator( + [&, this](coordinator& coordinator) mutable { + auto tp = random_tp(); + return coordinator->sync_get_last_added_offset(tp).then( + [&, tp](auto result) { + if (!result) { + return ss::make_ready_future(false); + } + auto last_committed_offset = kafka::offset_cast( + result.value().value_or(kafka::offset{-1})); + std::vector> offset_pairs; + offset_pairs.reserve(5); + auto next_offset = last_committed_offset() + 1; + for (int i = 0; i < 5; i++) { + offset_pairs.emplace_back(next_offset, next_offset + 5); + next_offset = next_offset + 6; + } + return coordinator + ->sync_add_files( + tp, + datalake::coordinator::make_pending_files(offset_pairs)) + .then([](auto result) { + return ss::make_ready_future( + result.has_value()); + }); + }); + }); + ASSERT_TRUE_CORO(add_files_result) << "Timed out waiting to add files"; + auto snapshot_offset = last_snapshot_offset(); + vlog(logger().debug, "last snapshot offset: {}", snapshot_offset); + if (snapshot_offset > prev_snapshot_offset) { + completed_snapshots++; + } + prev_snapshot_offset = snapshot_offset; + } + + // Add a new raft group to hydrate from snapshot. + auto new_node_id = model::node_id{static_cast(node_stms.size())}; + auto& node = add_node(new_node_id, model::revision_id{10}); + co_await start_node(node); + co_await with_leader( + 10s, [vn = node.get_vnode()](raft_node_instance& node) { + return node.raft()->add_group_member(vn, model::revision_id{10}); + }); + // Wait until all group members converge and there are no further updates. + RPTEST_REQUIRE_EVENTUALLY_CORO(5s, [this]() { + auto offsets = last_applied_offsets(); + return ss::make_ready_future( + std::equal(offsets.begin() + 1, offsets.end(), offsets.begin())); + }); + + ASSERT_GT_CORO(node.raft()->start_offset(), model::offset{0}) + << "New node not seeded with snapshot"; + + for (int32_t pid = 0; pid < max_partitions; pid++) { + auto committed_offsets = last_committed_offsets( + {model::topic{"test"}, model::partition_id{pid}}); + vlog(logger().info, "committed offsets: {}", committed_offsets); + ASSERT_TRUE_CORO(std::equal( + committed_offsets.begin() + 1, + committed_offsets.end(), + committed_offsets.begin())) + << "Topic state mismatch across replicas for partition: " << pid + << ", offsets: " << committed_offsets; + } +} diff --git a/src/v/datalake/coordinator/tests/state_test_utils.h b/src/v/datalake/coordinator/tests/state_test_utils.h index f812c0bdf4e0..9474849b5edf 100644 --- a/src/v/datalake/coordinator/tests/state_test_utils.h +++ b/src/v/datalake/coordinator/tests/state_test_utils.h @@ -10,6 +10,7 @@ #pragma once #include "container/fragmented_vector.h" +#include "datalake/coordinator/file_committer.h" #include "datalake/coordinator/state.h" #include "datalake/coordinator/translated_offset_range.h" #include "model/fundamental.h" @@ -20,6 +21,38 @@ namespace datalake::coordinator { +// Simple committer that returns the set of updates that would mark all the +// pending files as committed. Doesn't affect any external state. +class simple_file_committer : public file_committer { +public: + ss::future, errc>> + commit_topic_files_to_catalog( + model::topic t, const topics_state& state) const override { + chunked_vector ret; + auto t_iter = std::ranges::find( + state.topic_to_state, + t, + &std::pair::first); + if (t_iter == state.topic_to_state.end()) { + co_return ret; + } + // Mark the last file in each partition as committed. + auto& t_state = t_iter->second; + for (const auto& [pid, files] : t_state.pid_to_pending_files) { + if (files.pending_entries.empty()) { + continue; + } + model::topic_partition tp(t, pid); + auto build_res = mark_files_committed_update::build( + state, tp, files.pending_entries.back().data.last_offset); + EXPECT_FALSE(build_res.has_error()); + ret.emplace_back(std::move(build_res.value())); + } + co_return ret; + } + ~simple_file_committer() override = default; +}; + // Utility methods for generating and operating on coordinator state. // Returns file entries corresponding to the given offset ranges. diff --git a/src/v/datalake/coordinator/types.h b/src/v/datalake/coordinator/types.h index bb4e8d3ed019..27ec5e7cf33c 100644 --- a/src/v/datalake/coordinator/types.h +++ b/src/v/datalake/coordinator/types.h @@ -10,6 +10,7 @@ #pragma once #include "container/fragmented_vector.h" +#include "datalake/coordinator/state.h" #include "datalake/coordinator/translated_offset_range.h" #include "datalake/errors.h" #include "model/fundamental.h" @@ -137,4 +138,12 @@ struct fetch_latest_translated_offset_request auto serde_fields() { return std::tie(tp); } }; +struct stm_snapshot + : public serde:: + envelope, serde::compat_version<0>> { + topics_state topics; + + auto serde_fields() { return std::tie(topics); } +}; + } // namespace datalake::coordinator diff --git a/src/v/raft/persisted_stm.h b/src/v/raft/persisted_stm.h index a42f6a95dc1a..a1f8e32f2147 100644 --- a/src/v/raft/persisted_stm.h +++ b/src/v/raft/persisted_stm.h @@ -264,6 +264,10 @@ class persisted_stm_base template using persisted_stm = persisted_stm_base; + +template +using persisted_stm_no_snapshot_at_offset + = persisted_stm_base; /** * Helper to copy persisted_stm kvstore snapshot from the source kvstore to * target shard diff --git a/tests/rptest/tests/datalake/coordinator_retention_test.py b/tests/rptest/tests/datalake/coordinator_retention_test.py new file mode 100644 index 000000000000..e126c7058c38 --- /dev/null +++ b/tests/rptest/tests/datalake/coordinator_retention_test.py @@ -0,0 +1,91 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from rptest.services.kgo_verifier_services import KgoVerifierProducer +from rptest.tests.datalake.datalake_services import DatalakeServices +from rptest.tests.redpanda_test import RedpandaTest +from rptest.services.redpanda import SISettings +from rptest.tests.datalake.utils import supported_storage_types +from ducktape.mark import matrix +from ducktape.utils.util import wait_until +from rptest.services.cluster import cluster + +LOG_ALLOW_LIST = [r'Error cluster::errc:16 processing partition state for ntp'] + + +class CoordinatorRetentionTest(RedpandaTest): + def __init__(self, test_ctx, *args, **kwargs): + super(CoordinatorRetentionTest, self).__init__( + test_ctx, + num_brokers=3, + si_settings=SISettings(test_context=test_ctx), + extra_rp_conf={ + "iceberg_enabled": "true", + "iceberg_catalog_commit_interval_ms": 5000, + "datalake_coordinator_snapshot_max_delay_secs": 10, + }, + *args, + **kwargs) + self.test_ctx = test_ctx + self.topic_name = "test" + + def wait_until_coordinator_snapshots(self): + try: + replica_last_snapshot_offsets = [] + for pid in range(0, 3): + state = self.redpanda._admin.get_partition_state( + "kafka_internal", "datalake_coordinator", pid) + for r in state["replicas"]: + if r["raft_state"]["is_leader"]: + replica_last_snapshot_offsets.append( + r["raft_state"]["last_snapshot_index"] > 0) + # Only one of the partitions must snapshot, particularly the + # one that is coordinating for the test topic. + return len(replica_last_snapshot_offsets + ) == 3 and replica_last_snapshot_offsets.count( + True) == 1 + except: + self.redpanda.logger.debug("Exception querying snapshot states", + exc_info=True) + return False + + def do_test_retention(self, dl: DatalakeServices): + dl.create_iceberg_enabled_topic(self.topic_name, + partitions=10, + replicas=3) + producer = KgoVerifierProducer(self.test_context, self.redpanda, + self.topic_name, 1024, 10000000) + producer.start() + for pid in range(0, 3): + self.redpanda._admin.await_stable_leader( + namespace="kafka_internal", + topic="datalake_coordinator", + partition=pid, + timeout_s=30, + backoff_s=5) + try: + wait_until( + self.wait_until_coordinator_snapshots, + timeout_sec=20, + backoff_sec=3, + err_msg= + "Timed out waiting for coordinator partitions to snapshot.") + finally: + producer.stop() + producer.clean() + producer.free() + + @cluster(num_nodes=5, log_allow_list=LOG_ALLOW_LIST) + @matrix(cloud_storage_type=supported_storage_types()) + def test_retention(self, cloud_storage_type): + with DatalakeServices(self.test_ctx, + redpanda=self.redpanda, + filesystem_catalog_mode=False, + include_query_engines=[]) as dl: + self.do_test_retention(dl)