From 32af1fd9e1811fe02d8f47a90e9d8074e975a1d2 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Sun, 10 Mar 2024 17:55:16 -0700 Subject: [PATCH] storage: rebuild translation state at startup Adds an option to prefix truncation (used when hydrating a snapshot) to also sync the offset translator with the log. This is currently done by Raft at startup. In an effort to encapsulate offset translator state within the storage layer, this commit uses this option at startup. --- src/v/raft/consensus.cc | 35 ++++++++++++------- src/v/raft/consensus.h | 8 +++-- src/v/storage/disk_log_impl.cc | 11 +++--- src/v/storage/offset_translator.cc | 8 ++--- src/v/storage/offset_translator.h | 3 +- .../storage/tests/offset_translator_tests.cc | 4 +-- src/v/storage/types.h | 8 +++-- 7 files changed, 48 insertions(+), 29 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 483165d541110..ceec26e674551 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1372,7 +1372,9 @@ ss::future<> consensus::do_start() { "Configuration manager started: {}", _configuration_manager); - co_await _snapshot_lock.with([this] { return hydrate_snapshot(); }); + co_await _snapshot_lock.with([this] { + return hydrate_snapshot(/*rebuild_translation_state=*/true); + }); vlog( _ctxlog.debug, @@ -2121,25 +2123,29 @@ consensus::install_snapshot(install_snapshot_request&& r) { }); } -ss::future<> consensus::hydrate_snapshot() { +ss::future<> consensus::hydrate_snapshot(bool rebuild_translation_state) { // Read snapshot, reset state machine using snapshot contents (and load // snapshot’s cluster configuration) (§7.8) return _snapshot_mgr.open_snapshot().then( - [this](std::optional reader) { + [this, rebuild_translation_state]( + std::optional reader) { // no snapshot do nothing if (!reader) { return ss::now(); } return ss::do_with( - std::move(*reader), [this](storage::snapshot_reader& reader) { - return do_hydrate_snapshot(reader).finally( - [&reader] { return reader.close(); }); + std::move(*reader), + [this, + rebuild_translation_state](storage::snapshot_reader& reader) { + return do_hydrate_snapshot(reader, rebuild_translation_state) + .finally([&reader] { return reader.close(); }); }); }); } ss::future<> consensus::truncate_to_latest_snapshot( - std::optional force_truncate_delta) { + std::optional force_truncate_delta, + bool rebuild_translation_state) { // we have to prefix truncate config manage at exactly last offset included // in snapshot as this is the offset of configuration included in snapshot // metadata. @@ -2147,7 +2153,8 @@ ss::future<> consensus::truncate_to_latest_snapshot( ->truncate_prefix(storage::truncate_prefix_config( model::next_offset(_last_snapshot_index), _scheduling.default_iopc, - force_truncate_delta)) + force_truncate_delta, + rebuild_translation_state)) .then([this] { return _configuration_manager.prefix_truncate(_last_snapshot_index); }) @@ -2158,9 +2165,10 @@ ss::future<> consensus::truncate_to_latest_snapshot( }); } -ss::future<> consensus::do_hydrate_snapshot(storage::snapshot_reader& reader) { +ss::future<> consensus::do_hydrate_snapshot( + storage::snapshot_reader& reader, bool rebuild_translation_state) { return reader.read_metadata() - .then([this](iobuf buf) { + .then([this, rebuild_translation_state](iobuf buf) { auto parser = iobuf_parser(std::move(buf)); auto metadata = reflection::adl{}.from(parser); vassert( @@ -2190,7 +2198,9 @@ ss::future<> consensus::do_hydrate_snapshot(storage::snapshot_reader& reader) { update_follower_stats(metadata.latest_configuration); return _configuration_manager .add(_last_snapshot_index, std::move(metadata.latest_configuration)) - .then([this, delta = metadata.log_start_delta]() mutable { + .then([this, + delta = metadata.log_start_delta, + rebuild_translation_state]() mutable { _probe->configuration_update(); if ( _keep_snapshotted_log @@ -2215,7 +2225,8 @@ ss::future<> consensus::do_hydrate_snapshot(storage::snapshot_reader& reader) { "manager: {}", delta); } - return truncate_to_latest_snapshot(model::offset_delta{delta}); + return truncate_to_latest_snapshot( + model::offset_delta{delta}, rebuild_translation_state); }); }) .then([this] { return _snapshot_mgr.get_snapshot_size(); }) diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index 0dc49495225a4..4f8e85e6d7295 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -539,14 +539,16 @@ class consensus { /** * Hydrate the consensus state with the data from the snapshot */ - ss::future<> hydrate_snapshot(); - ss::future<> do_hydrate_snapshot(storage::snapshot_reader&); + ss::future<> hydrate_snapshot(bool rebuild_translation_state = false); + ss::future<> do_hydrate_snapshot( + storage::snapshot_reader&, bool rebuild_translation_state = false); /** * Truncates the log up the last offset stored in the snapshot */ ss::future<> truncate_to_latest_snapshot( - std::optional force_truncate_delta = std::nullopt); + std::optional force_truncate_delta = std::nullopt, + bool rebuild_translation_state = false); ss::future finish_snapshot(install_snapshot_request, install_snapshot_reply); diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index e4d759dd8d18a..71ebfbc02d060 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -2506,6 +2506,10 @@ ss::future<> disk_log_impl::truncate_prefix(truncate_prefix_config cfg) { // the start offset and thus can still need offset translation info. co_await _offset_translator.prefix_truncate( model::prev_offset(cfg.start_offset), cfg.force_truncate_delta); + + if (cfg.rebuild_translation_state) { + co_await _offset_translator.sync_with_log(*this, _compaction_as); + } } ss::future<> disk_log_impl::do_truncate_prefix(truncate_prefix_config cfg) { @@ -2992,10 +2996,9 @@ ss::future> make_disk_backed_log( // Reset or load the offset translator state, depending on whether this is // a brand new log. - auto& translator = disk_log->offset_translator(); - co_await translator.start( - raft::offset_translator::must_reset{disk_log->is_new_log()}); - co_await translator.sync_with_log(disk_log, as); + auto is_new = disk_log->is_new_log(); + co_await disk_log->offset_translator().start( + raft::offset_translator::must_reset{is_new}); co_return disk_log; } diff --git a/src/v/storage/offset_translator.cc b/src/v/storage/offset_translator.cc index ff71dddc84b22..ccdcb2d83c749 100644 --- a/src/v/storage/offset_translator.cc +++ b/src/v/storage/offset_translator.cc @@ -166,7 +166,7 @@ ss::future<> offset_translator::start(must_reset reset) { } ss::future<> offset_translator::sync_with_log( - ss::shared_ptr log, storage::opt_abort_source_t as) { + storage::log& log, storage::opt_abort_source_t as) { if (_filtered_types.empty()) { co_return; } @@ -176,7 +176,7 @@ ss::future<> offset_translator::sync_with_log( "ntp {}: offset translation state shouldn't be empty", _state->ntp()); - auto log_offsets = log->offsets(); + auto log_offsets = log.offsets(); // Trim the offset2delta map to log dirty_offset (discrepancy can // happen if the offsets map was persisted, but the log wasn't flushed). @@ -189,7 +189,7 @@ ss::future<> offset_translator::sync_with_log( co_await _checkpoint_lock.with([this] { return do_checkpoint(); }); } - // read the log to insert the remaining entries into map + // Read the log to insert the remaining entries into map. model::offset start_offset = model::next_offset(_highest_known_offset); vlog( @@ -201,7 +201,7 @@ ss::future<> offset_translator::sync_with_log( auto reader_cfg = storage::log_reader_config( start_offset, log_offsets.dirty_offset, ss::default_priority_class(), as); - auto reader = co_await log->make_reader(reader_cfg); + auto reader = co_await log.make_reader(reader_cfg); struct log_consumer { explicit log_consumer(offset_translator& self) diff --git a/src/v/storage/offset_translator.h b/src/v/storage/offset_translator.h index 6185d2d70a9fb..eb781bfad9a65 100644 --- a/src/v/storage/offset_translator.h +++ b/src/v/storage/offset_translator.h @@ -73,8 +73,7 @@ class offset_translator { /// Searches for non-data batches up to the tip of the log. After this /// method succeeds, offset translator is usable. - ss::future<> - sync_with_log(ss::shared_ptr, storage::opt_abort_source_t); + ss::future<> sync_with_log(storage::log&, storage::opt_abort_source_t); /// Process the batch and add it to offset translation state if it is not /// a data batch. diff --git a/src/v/storage/tests/offset_translator_tests.cc b/src/v/storage/tests/offset_translator_tests.cc index b7a4243e02711..c4658c4e71507 100644 --- a/src/v/storage/tests/offset_translator_tests.cc +++ b/src/v/storage/tests/offset_translator_tests.cc @@ -324,7 +324,7 @@ struct fuzz_checker { ss::future<> start() { _tr.emplace(_make_offset_translator()); co_await _tr->start(raft::offset_translator::must_reset::yes); - co_await _tr->sync_with_log(_log, std::nullopt); + co_await _tr->sync_with_log(*_log, std::nullopt); } ss::future<> append() { @@ -477,7 +477,7 @@ struct fuzz_checker { co_await _tr->start(raft::offset_translator::must_reset::no); co_await _tr->prefix_truncate_reset( _snapshot_offset, _snapshot_delta); - co_await _tr->sync_with_log(_log, std::nullopt); + co_await _tr->sync_with_log(*_log, std::nullopt); } } diff --git a/src/v/storage/types.h b/src/v/storage/types.h index 7461b1331aea2..c6ea4f062d0f5 100644 --- a/src/v/storage/types.h +++ b/src/v/storage/types.h @@ -282,10 +282,12 @@ struct truncate_prefix_config { truncate_prefix_config( model::offset o, ss::io_priority_class p, - std::optional force_truncate_delta = std::nullopt) + std::optional force_truncate_delta = std::nullopt, + bool rebuild_translation_state = false) : start_offset(o) , prio(p) - , force_truncate_delta(force_truncate_delta) {} + , force_truncate_delta(force_truncate_delta) + , rebuild_translation_state(rebuild_translation_state) {} model::offset start_offset; ss::io_priority_class prio; @@ -297,6 +299,8 @@ struct truncate_prefix_config { // an error. std::optional force_truncate_delta; + bool rebuild_translation_state; + friend std::ostream& operator<<(std::ostream&, const truncate_prefix_config&); };