Skip to content

Commit

Permalink
cluster: initialize log with metadata from Raft
Browse files Browse the repository at this point in the history
Updates partition creation to first load some Raft metadata to be passed
to the log upon initialization.
  • Loading branch information
andrwng committed Mar 13, 2024
1 parent 32af1fd commit be24c9f
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 107 deletions.
6 changes: 4 additions & 2 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -377,12 +377,14 @@ kafka_stages partition::replicate_in_stages(
std::move(res.request_enqueued), std::move(replicate_finished));
}

ss::future<> partition::start(state_machine_registry& stm_registry) {
ss::future<> partition::start(
state_machine_registry& stm_registry,
std::optional<raft::snapshot_metadata> metadata) {
const auto& ntp = _raft->ntp();
raft::state_machine_manager_builder builder = stm_registry.make_builder_for(
_raft.get());

co_await _raft->start(std::move(builder));
co_await _raft->start(std::move(builder), metadata);
// store rm_stm pointer in partition as this is commonly used stm
_rm_stm = _raft->stm_manager()->get<cluster::rm_stm>();
_log_eviction_stm = _raft->stm_manager()->get<cluster::log_eviction_stm>();
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class partition {
~partition();

raft::group_id group() const { return _raft->group(); }
ss::future<> start(state_machine_registry&);
ss::future<>
start(state_machine_registry&, std::optional<raft::snapshot_metadata>);
ss::future<> stop();

bool should_construct_archiver();
Expand Down
37 changes: 35 additions & 2 deletions src/v/cluster/partition_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "cluster/partition_recovery_manager.h"
#include "cluster/types.h"
#include "config/configuration.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "raft/consensus.h"
#include "raft/consensus_utils.h"
Expand All @@ -33,6 +34,7 @@
#include "ssx/async-clear.h"
#include "storage/segment_utils.h"
#include "storage/snapshot.h"
#include "storage/types.h"
#include "utils/retry_chain_node.h"

#include <seastar/core/coroutine.hh>
Expand Down Expand Up @@ -212,10 +214,41 @@ ss::future<consensus_ptr> partition_manager::manage(
ntp_cfg, manifest, max_offset);
}
}
storage::simple_snapshot_manager snapshot_mgr(
std::filesystem::path(ntp_cfg.work_directory()),
storage::simple_snapshot_manager::default_snapshot_filename,
ss::default_priority_class());
std::optional<raft::snapshot_metadata> raft_metadata;
std::optional<storage::log_snapshot_metadata> log_metadata;
auto snapshot_reader = co_await snapshot_mgr.open_snapshot();
if (snapshot_reader.has_value()) {
std::exception_ptr eptr;
try {
auto buf = co_await snapshot_reader->read_metadata();
auto parser = iobuf_parser(std::move(buf));
raft_metadata = reflection::adl<raft::snapshot_metadata>{}.from(
parser);
log_metadata = {
model::next_offset(raft_metadata->last_included_index),
model::offset_delta(raft_metadata->log_start_delta),
keep_snapshotted_log == raft::keep_snapshotted_log::no,
};
} catch (...) {
eptr = std::current_exception();
}
co_await snapshot_reader->close();
if (eptr) {
rethrow_exception(eptr);
}
}

auto translator_batch_types = raft::offset_translator_batch_types(
ntp_cfg.ntp());
auto log = co_await _storage.log_mgr().manage(
std::move(ntp_cfg), group, std::move(translator_batch_types));
std::move(ntp_cfg),
group,
std::move(translator_batch_types),
log_metadata);
vlog(
clusterlog.debug,
"Log created manage completed, ntp: {}, rev: {}, {} "
Expand Down Expand Up @@ -259,7 +292,7 @@ ss::future<consensus_ptr> partition_manager::manage(

_manage_watchers.notify(p->ntp(), p);

co_await p->start(_stm_registry);
co_await p->start(_stm_registry, std::move(raft_metadata));

co_return c;
}
Expand Down
174 changes: 88 additions & 86 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
#include "ssx/future-util.h"
#include "storage/api.h"
#include "storage/kvstore.h"
#include "storage/snapshot.h"
#include "storage/types.h"

#include <seastar/core/condition-variable.hh>
#include <seastar/core/coroutine.hh>
Expand Down Expand Up @@ -1345,14 +1347,16 @@ ss::future<std::error_code> consensus::force_replace_configuration_locally(
}

ss::future<> consensus::start(
std::optional<state_machine_manager_builder> stm_manager_builder) {
std::optional<state_machine_manager_builder> stm_manager_builder,
std::optional<snapshot_metadata> metadata) {
if (stm_manager_builder) {
_stm_manager = std::move(stm_manager_builder.value()).build(this);
}
return ss::try_with_gate(_bg, [this] { return do_start(); });
_bg.hold();
co_await do_start(metadata);
}

ss::future<> consensus::do_start() {
ss::future<> consensus::do_start(std::optional<snapshot_metadata> metadata) {
try {
auto u = co_await _op_lock.get_units();

Expand All @@ -1372,9 +1376,19 @@ ss::future<> consensus::do_start() {
"Configuration manager started: {}",
_configuration_manager);

co_await _snapshot_lock.with([this] {
return hydrate_snapshot(/*rebuild_translation_state=*/true);
});
if (metadata) {
load_from_metadata(metadata.value());
co_await _configuration_manager.add(
_last_snapshot_index, std::move(metadata->latest_configuration));
_probe->configuration_update();

// It's expected that the log has already been initialized and
// truncated with metadata from the snapshot. We may still need to
// tweak the log in case based on additional metadata.
co_await maybe_truncate_log(
metadata.value(), /*rebuild_translation_state=*/true);
_snapshot_size = co_await _snapshot_mgr.get_snapshot_size();
}

vlog(
_ctxlog.debug,
Expand Down Expand Up @@ -2123,24 +2137,41 @@ consensus::install_snapshot(install_snapshot_request&& r) {
});
}

ss::future<> consensus::hydrate_snapshot(bool rebuild_translation_state) {
ss::future<> consensus::hydrate_snapshot() {
// Read snapshot, reset state machine using snapshot contents (and load
// snapshot’s cluster configuration) (§7.8)
return _snapshot_mgr.open_snapshot().then(
[this, rebuild_translation_state](
std::optional<storage::snapshot_reader> reader) {
// no snapshot do nothing
if (!reader) {
return ss::now();
}
return ss::do_with(
std::move(*reader),
[this,
rebuild_translation_state](storage::snapshot_reader& reader) {
return do_hydrate_snapshot(reader, rebuild_translation_state)
.finally([&reader] { return reader.close(); });
});
});
auto reader = co_await _snapshot_mgr.open_snapshot();
if (reader) {
co_await do_hydrate_snapshot(reader.value()).finally([&reader] {
return reader->close();
});
}
}

ss::future<> consensus::maybe_truncate_log(
const snapshot_metadata& metadata, bool rebuild_translation_state) {
if (
_keep_snapshotted_log
&& _log->offsets().dirty_offset >= _last_snapshot_index) {
// skip prefix truncating if we want to preserve the log (e.g. for the
// controller partition), but only if there is no gap between old end
// offset and new start offset, otherwise we must still advance the log
// start offset by prefix-truncating.
co_return;
}
auto delta = metadata.log_start_delta;
if (delta < offset_translator_delta(0)) {
delta = offset_translator_delta(
_configuration_manager.offset_delta(_last_snapshot_index));
vlog(
_ctxlog.warn,
"received snapshot without delta field in metadata, "
"falling back to delta obtained from configuration "
"manager: {}",
delta);
}
co_await truncate_to_latest_snapshot(
model::offset_delta{delta}, rebuild_translation_state);
}

ss::future<> consensus::truncate_to_latest_snapshot(
Expand All @@ -2165,72 +2196,43 @@ ss::future<> consensus::truncate_to_latest_snapshot(
});
}

ss::future<> consensus::do_hydrate_snapshot(
storage::snapshot_reader& reader, bool rebuild_translation_state) {
return reader.read_metadata()
.then([this, rebuild_translation_state](iobuf buf) {
auto parser = iobuf_parser(std::move(buf));
auto metadata = reflection::adl<snapshot_metadata>{}.from(parser);
vassert(
metadata.last_included_index >= _last_snapshot_index,
"Tried to load stale snapshot. Loaded snapshot last "
"index {}, current snapshot last index {}",
metadata.last_included_index,
_last_snapshot_index);
void consensus::load_from_metadata(const raft::snapshot_metadata& metadata) {
vlog(
_ctxlog.info,
"hydrating snapshot with last included index: {}",
metadata.last_included_index);

vlog(
_ctxlog.info,
"hydrating snapshot with last included index: {}",
metadata.last_included_index);

_last_snapshot_index = metadata.last_included_index;
_last_snapshot_term = metadata.last_included_term;

// TODO: add applying snapshot content to state machine
auto prev_commit_index = _commit_index;
_commit_index = std::max(_last_snapshot_index, _commit_index);
maybe_update_last_visible_index(_commit_index);
if (prev_commit_index != _commit_index) {
_commit_index_updated.broadcast();
_event_manager.notify_commit_index();
}
_last_snapshot_index = metadata.last_included_index;
_last_snapshot_term = metadata.last_included_term;

update_follower_stats(metadata.latest_configuration);
return _configuration_manager
.add(_last_snapshot_index, std::move(metadata.latest_configuration))
.then([this,
delta = metadata.log_start_delta,
rebuild_translation_state]() mutable {
_probe->configuration_update();
if (
_keep_snapshotted_log
&& _log->offsets().dirty_offset >= _last_snapshot_index) {
// skip prefix truncating if we want to preserve the log
// (e.g. for the controller partition), but only if there is
// no gap between old end offset and new start offset,
// otherwise we must still advance the log start offset by
// prefix-truncating.
return ss::now();
}
if (delta < offset_translator_delta(0)) {
// No delta persisted in snapshot: translate the start
// offset.
delta = offset_translator_delta(
_configuration_manager.offset_delta(
_last_snapshot_index));
vlog(
_ctxlog.warn,
"received snapshot without delta field in metadata, "
"falling back to delta obtained from configuration "
"manager: {}",
delta);
}
return truncate_to_latest_snapshot(
model::offset_delta{delta}, rebuild_translation_state);
});
})
.then([this] { return _snapshot_mgr.get_snapshot_size(); })
.then([this](uint64_t size) { _snapshot_size = size; });
// TODO: add applying snapshot content to state machine
auto prev_commit_index = _commit_index;
_commit_index = std::max(_last_snapshot_index, _commit_index);
maybe_update_last_visible_index(_commit_index);
if (prev_commit_index != _commit_index) {
_commit_index_updated.broadcast();
_event_manager.notify_commit_index();
}

update_follower_stats(metadata.latest_configuration);
}

ss::future<> consensus::do_hydrate_snapshot(storage::snapshot_reader& reader) {
auto buf = co_await reader.read_metadata();
auto parser = iobuf_parser(std::move(buf));
auto metadata = reflection::adl<snapshot_metadata>{}.from(parser);
vassert(
metadata.last_included_index >= _last_snapshot_index,
"Tried to load stale snapshot. Loaded snapshot last "
"index {}, current snapshot last index {}",
metadata.last_included_index,
_last_snapshot_index);
load_from_metadata(metadata);
co_await _configuration_manager.add(
_last_snapshot_index, std::move(metadata.latest_configuration));
_probe->configuration_update();
co_await maybe_truncate_log(metadata, /*rebuild_translation_state=*/false);
_snapshot_size = co_await _snapshot_mgr.get_snapshot_size();
}

ss::future<install_snapshot_reply>
Expand Down
17 changes: 11 additions & 6 deletions src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "raft/replicate_batcher.h"
#include "raft/state_machine_manager.h"
#include "raft/timeout_jitter.h"
#include "raft/types.h"
#include "ssx/semaphore.h"
#include "storage/log.h"
#include "storage/snapshot.h"
Expand Down Expand Up @@ -110,8 +111,9 @@ class consensus {
keep_snapshotted_log = keep_snapshotted_log::no);

/// Initial call. Allow for internal state recovery
ss::future<>
start(std::optional<state_machine_manager_builder> = std::nullopt);
ss::future<> start(
std::optional<state_machine_manager_builder> = std::nullopt,
std::optional<raft::snapshot_metadata> = std::nullopt);

/// Stop all communications.
ss::future<> stop();
Expand Down Expand Up @@ -530,7 +532,7 @@ class consensus {
do_append_entries(append_entries_request&&);
ss::future<install_snapshot_reply>
do_install_snapshot(install_snapshot_request r);
ss::future<> do_start();
ss::future<> do_start(std::optional<raft::snapshot_metadata>);

ss::future<result<replicate_result>> dispatch_replicate(
append_entries_request,
Expand All @@ -539,13 +541,16 @@ class consensus {
/**
* Hydrate the consensus state with the data from the snapshot
*/
ss::future<> hydrate_snapshot(bool rebuild_translation_state = false);
ss::future<> do_hydrate_snapshot(
storage::snapshot_reader&, bool rebuild_translation_state = false);
ss::future<> hydrate_snapshot();
ss::future<> do_hydrate_snapshot(storage::snapshot_reader&);

void load_from_metadata(const snapshot_metadata&);

/**
* Truncates the log up the last offset stored in the snapshot
*/
ss::future<> maybe_truncate_log(
const snapshot_metadata&, bool rebuild_translation_state);
ss::future<> truncate_to_latest_snapshot(
std::optional<model::offset_delta> force_truncate_delta = std::nullopt,
bool rebuild_translation_state = false);
Expand Down
17 changes: 14 additions & 3 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2984,7 +2984,8 @@ ss::future<ss::shared_ptr<log>> make_disk_backed_log(
kvstore& kvstore,
ss::sharded<features::feature_table>& feature_table,
ss::abort_source& as,
std::vector<model::record_batch_type> translator_batch_types) {
std::vector<model::record_batch_type> translator_batch_types,
std::optional<log_snapshot_metadata> metadata) {
auto disk_log = ss::make_shared<disk_log_impl>(
std::move(cfg),
group,
Expand All @@ -2994,11 +2995,21 @@ ss::future<ss::shared_ptr<log>> make_disk_backed_log(
feature_table,
std::move(translator_batch_types));

// Reset or load the offset translator state, depending on whether this is
// a brand new log.
auto is_new = disk_log->is_new_log();
co_await disk_log->offset_translator().start(
raft::offset_translator::must_reset{is_new});
if (metadata.has_value() && metadata->truncate) {
co_await disk_log->truncate_prefix(truncate_prefix_config(
metadata->log_start_offset,
ss::default_priority_class(),
metadata->log_start_delta,
false));
}
// Reset or load the offset translator state, depending on whether this is
// a brand new log.
if (!is_new) {
co_await disk_log->offset_translator().sync_with_log(*disk_log, as);
}
co_return disk_log;
}

Expand Down
Loading

0 comments on commit be24c9f

Please sign in to comment.