Skip to content

Commit

Permalink
Merge pull request #24174 from bharathv/coordinator_snapshots
Browse files Browse the repository at this point in the history
datalake/coordinator: snapshots
  • Loading branch information
bharathv authored Nov 21, 2024
2 parents 45b8529 + 884324c commit b567269
Show file tree
Hide file tree
Showing 15 changed files with 515 additions and 64 deletions.
8 changes: 8 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3706,6 +3706,14 @@ configuration::configuration()
"Iceberg is enabled, do not change this value.",
{.needs_restart = needs_restart::yes, .visibility = visibility::user},
"redpanda-iceberg-catalog")
, datalake_coordinator_snapshot_max_delay_secs(
*this,
"datalake_coordinator_snapshot_max_delay_secs",
"Maximum amount of time the coordinator waits to snapshot after a "
"command appears in the log.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
std::chrono::seconds(15min),
{.min = 10s})
, iceberg_catalog_type(
*this,
"iceberg_catalog_type",
Expand Down
2 changes: 2 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,8 @@ struct configuration final : public config_store {
bounded_property<std::chrono::milliseconds>
iceberg_catalog_commit_interval_ms;
property<ss::sstring> iceberg_catalog_base_location;
bounded_property<std::chrono::seconds>
datalake_coordinator_snapshot_max_delay_secs;

// datalake catalog configuration
enum_property<datalake_catalog_type> iceberg_catalog_type;
Expand Down
29 changes: 15 additions & 14 deletions src/v/datalake/coordinator/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -184,42 +184,43 @@ redpanda_cc_library(
)

redpanda_cc_library(
name = "model",
name = "state",
srcs = [
"types.cc",
"state.cc",
],
hdrs = [
"types.h",
"state.h",
],
include_prefix = "datalake/coordinator",
visibility = [":__subpackages__"],
deps = [
":translated_offset_range",
"//src/v/datalake:types",
"//src/v/container:chunked_hash_map",
"//src/v/model",
"//src/v/serde",
"//src/v/serde:enum",
# todo: split writer further once it evolves
"//src/v/datalake:writer",
"//src/v/container:fragmented_vector",
"//src/v/utils:to_string",
],
)

redpanda_cc_library(
name = "state",
name = "model",
srcs = [
"state.cc",
"types.cc",
],
hdrs = [
"state.h",
"types.h",
],
include_prefix = "datalake/coordinator",
visibility = [":__subpackages__"],
deps = [
":translated_offset_range",
"//src/v/container:chunked_hash_map",
"//src/v/datalake:types",
"//src/v/datalake/coordinator:state",
"//src/v/model",
"//src/v/serde",
"//src/v/serde:enum",
# todo: split writer further once it evolves
"//src/v/datalake:writer",
"//src/v/container:fragmented_vector",
"//src/v/utils:to_string",
],
)

Expand Down
31 changes: 31 additions & 0 deletions src/v/datalake/coordinator/state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,28 @@

namespace datalake::coordinator {

pending_entry pending_entry::copy() const {
return {.data = data.copy(), .added_pending_at = added_pending_at};
}

partition_state partition_state::copy() const {
partition_state result;
result.last_committed = last_committed;
for (const auto& entry : pending_entries) {
result.pending_entries.push_back(entry.copy());
}
return result;
}

topic_state topic_state::copy() const {
topic_state result;
result.pid_to_pending_files.reserve(pid_to_pending_files.size());
for (const auto& [id, state] : pid_to_pending_files) {
result.pid_to_pending_files[id] = state.copy();
}
return result;
}

std::optional<std::reference_wrapper<const partition_state>>
topics_state::partition_state(const model::topic_partition& tp) const {
auto state_iter = topic_to_state.find(tp.topic);
Expand All @@ -25,4 +47,13 @@ topics_state::partition_state(const model::topic_partition& tp) const {
return prt_iter->second;
}

topics_state topics_state::copy() const {
topics_state result;
result.topic_to_state.reserve(topic_to_state.size());
for (const auto& [id, state] : topic_to_state) {
result.topic_to_state[id] = state.copy();
}
return result;
}

} // namespace datalake::coordinator
8 changes: 8 additions & 0 deletions src/v/datalake/coordinator/state.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ struct pending_entry
// Offset of the control topic partition at which this data entry was added
// to the state machine as a pending entry.
model::offset added_pending_at;

pending_entry copy() const;
};

// State tracked per Kafka partition. Groups of files get added added to this
Expand Down Expand Up @@ -69,6 +71,8 @@ struct partition_state
//
// Is nullopt iff we have never committed any files to the table.
std::optional<kafka::offset> last_committed;

partition_state copy() const;
};

// Tracks the state managed for each Kafka partition. Since data workers are
Expand All @@ -82,6 +86,8 @@ struct topic_state
// Map from Redpanda partition id to the files pending per partition.
chunked_hash_map<model::partition_id, partition_state> pid_to_pending_files;

topic_state copy() const;

// TODO: add table-wide metadata like Kafka schema id, Iceberg table uuid,
// etc.
};
Expand All @@ -96,6 +102,8 @@ struct topics_state
// files per partition.
chunked_hash_map<model::topic, topic_state> topic_to_state;

topics_state copy() const;

// Returns the state for the given partition.
std::optional<std::reference_wrapper<const partition_state>>
partition_state(const model::topic_partition&) const;
Expand Down
90 changes: 75 additions & 15 deletions src/v/datalake/coordinator/state_machine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,19 @@ void maybe_log_update_error(
}
} // namespace

coordinator_stm::coordinator_stm(ss::logger& logger, raft::consensus* raft)
: raft::persisted_stm<>("datalake_coordinator_stm.snapshot", logger, raft) {}
coordinator_stm::coordinator_stm(
ss::logger& logger,
raft::consensus* raft,
config::binding<std::chrono::seconds> snapshot_delay)
: coordinator_stm_base("datalake_coordinator_stm.snapshot", logger, raft)
, snapshot_delay_secs_(std::move(snapshot_delay)) {
snapshot_timer_.set_callback([this] { write_snapshot_async(); });
}

ss::future<checked<model::term_id, coordinator_stm::errc>>
coordinator_stm::sync(model::timeout_clock::duration timeout) {
auto sync_res = co_await ss::coroutine::as_future(
raft::persisted_stm<>::sync(timeout));
coordinator_stm_base::sync(timeout));
if (sync_res.failed()) {
auto eptr = sync_res.get_exception();
auto msg = fmt::format("Exception caught while syncing: {}", eptr);
Expand Down Expand Up @@ -125,27 +131,77 @@ ss::future<> coordinator_stm::do_apply(const model::record_batch& b) {
key,
b.header());
}
rearm_snapshot_timer();
}

model::offset coordinator_stm::max_collectible_offset() { return {}; }

ss::future<>
coordinator_stm::apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) {
co_return;
ss::future<> coordinator_stm::apply_local_snapshot(
raft::stm_snapshot_header, iobuf&& snapshot_buf) {
auto parser = iobuf_parser(std::move(snapshot_buf));
auto snapshot = co_await serde::read_async<stm_snapshot>(parser);
state_ = std::move(snapshot.topics);
}

ss::future<raft::stm_snapshot>
coordinator_stm::take_local_snapshot(ssx::semaphore_units) {
// temporarily ignore snapshots, to be fixed later.
// throwing here results in uncaught exceptions, so a dummy snapshot
// at offset 0 avoids that.
co_return raft::stm_snapshot::create(0, model::offset{0}, iobuf{});
coordinator_stm::take_local_snapshot(ssx::semaphore_units units) {
auto snapshot_offset = last_applied_offset();
auto snapshot = make_snapshot();
units.return_all();
iobuf snapshot_buf;
co_await serde::write_async(snapshot_buf, std::move(snapshot));
co_return raft::stm_snapshot::create(
0, snapshot_offset, std::move(snapshot_buf));
}

ss::future<> coordinator_stm::apply_raft_snapshot(const iobuf&) { co_return; }
ss::future<> coordinator_stm::apply_raft_snapshot(const iobuf& snapshot_buf) {
auto parser = iobuf_parser(snapshot_buf.copy());
auto snapshot = co_await serde::read_async<stm_snapshot>(parser);
state_ = std::move(snapshot.topics);
}

ss::future<iobuf> coordinator_stm::take_snapshot(model::offset) {
co_return iobuf{};
ss::future<iobuf> coordinator_stm::take_snapshot() {
iobuf snapshot_buf;
co_await serde::write_async(snapshot_buf, make_snapshot());
co_return std::move(snapshot_buf);
}

stm_snapshot coordinator_stm::make_snapshot() const {
return {.topics = state_.copy()};
}

ss::future<> coordinator_stm::stop() {
snapshot_timer_.cancel();
return coordinator_stm_base::stop();
}

ss::future<> coordinator_stm::maybe_write_snapshot() {
if (_raft->last_snapshot_index() >= last_applied()) {
co_return;
}
auto snapshot = co_await _raft->stm_manager()->take_snapshot();
vlog(
_log.debug,
"creating snapshot at offset: {}",
snapshot.last_included_offset);
co_await _raft->write_snapshot(raft::write_snapshot_cfg(
snapshot.last_included_offset, std::move(snapshot.data)));
}

void coordinator_stm::write_snapshot_async() {
ssx::background = ssx::spawn_with_gate_then(
_gate, [this] { return maybe_write_snapshot(); })
.handle_exception([this](const std::exception_ptr& e) {
vlog(_log.warn, "failed to write snapshot: {}", e);
})
.finally([holder = _gate.hold()] {});
}

void coordinator_stm::rearm_snapshot_timer() {
if (_gate.is_closed() || snapshot_timer_.armed()) {
return;
}
snapshot_timer_.arm(snapshot_delay_secs_());
}

bool stm_factory::is_applicable_for(const storage::ntp_config& config) const {
Expand All @@ -156,7 +212,11 @@ bool stm_factory::is_applicable_for(const storage::ntp_config& config) const {

void stm_factory::create(
raft::state_machine_manager_builder& builder, raft::consensus* raft) {
auto stm = builder.create_stm<coordinator_stm>(datalake_log, raft);
auto stm = builder.create_stm<coordinator_stm>(
datalake_log,
raft,
config::shard_local_cfg()
.datalake_coordinator_snapshot_max_delay_secs.bind());
raft->log()->stm_manager()->add_stm(stm);
}

Expand Down
19 changes: 16 additions & 3 deletions src/v/datalake/coordinator/state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

namespace datalake::coordinator {

class coordinator_stm final : public raft::persisted_stm<> {
using coordinator_stm_base = raft::persisted_stm_no_snapshot_at_offset<>;

class coordinator_stm final : public coordinator_stm_base {
public:
static constexpr std::string_view name = "datalake_coordinator_stm";
enum class errc {
Expand All @@ -25,7 +27,8 @@ class coordinator_stm final : public raft::persisted_stm<> {
shutting_down,
};

explicit coordinator_stm(ss::logger&, raft::consensus*);
explicit coordinator_stm(
ss::logger&, raft::consensus*, config::binding<std::chrono::seconds>);
raft::consensus* raft() { return _raft; }

// Syncs the STM such that we're guaranteed that it has applied all records
Expand All @@ -46,6 +49,10 @@ class coordinator_stm final : public raft::persisted_stm<> {
const topics_state& state() const { return state_; }

protected:
ss::future<> stop() override;

stm_snapshot make_snapshot() const;

ss::future<> do_apply(const model::record_batch&) override;

model::offset max_collectible_offset() override;
Expand All @@ -58,11 +65,17 @@ class coordinator_stm final : public raft::persisted_stm<> {

ss::future<> apply_raft_snapshot(const iobuf&) final;

ss::future<iobuf> take_snapshot(model::offset) final;
ss::future<iobuf> take_snapshot() final;

private:
void rearm_snapshot_timer();
void write_snapshot_async();
ss::future<> maybe_write_snapshot();

// The deterministic state managed by this STM.
topics_state state_;
config::binding<std::chrono::seconds> snapshot_delay_secs_;
ss::timer<ss::lowres_clock> snapshot_timer_;
};
class stm_factory : public cluster::state_machine_factory {
public:
Expand Down
19 changes: 19 additions & 0 deletions src/v/datalake/coordinator/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ redpanda_test_cc_library(
include_prefix = "datalake/coordinator/tests",
deps = [
"//src/v/container:fragmented_vector",
"//src/v/datalake/coordinator:file_committer",
"//src/v/datalake/coordinator:state",
"//src/v/datalake/coordinator:translated_offset_range",
"//src/v/model",
Expand Down Expand Up @@ -89,3 +90,21 @@ redpanda_cc_gtest(
"@googletest//:gtest",
],
)

redpanda_cc_gtest(
name = "state_machine_test",
timeout = "short",
srcs = [
"state_machine_test.cc",
],
cpu = 1,
deps = [
":state_test_utils",
"//src/v/datalake/coordinator",
"//src/v/datalake/coordinator:stm",
"//src/v/raft/tests:stm_raft_fixture",
"//src/v/storage",
"@googletest//:gtest",
"@seastar",
],
)
1 change: 1 addition & 0 deletions src/v/datalake/coordinator/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ rp_test(
SOURCES
coordinator_test.cc
state_update_test.cc
state_machine_test.cc
LIBRARIES
v::datalake_coordinator
v::datalake_coordinator_test_utils
Expand Down
Loading

0 comments on commit b567269

Please sign in to comment.