Skip to content

Commit

Permalink
storage: rebuild translation state at startup
Browse files Browse the repository at this point in the history
Adds an option to prefix truncation (used when hydrating a snapshot) to
also sync the offset translator with the log. This is currently done by
Raft at startup. In an effort to encapsulate offset translator state
within the storage layer, this commit uses this option at startup.
  • Loading branch information
andrwng committed Mar 11, 2024
1 parent 231e980 commit 32af1fd
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 29 deletions.
35 changes: 23 additions & 12 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1372,7 +1372,9 @@ ss::future<> consensus::do_start() {
"Configuration manager started: {}",
_configuration_manager);

co_await _snapshot_lock.with([this] { return hydrate_snapshot(); });
co_await _snapshot_lock.with([this] {
return hydrate_snapshot(/*rebuild_translation_state=*/true);
});

vlog(
_ctxlog.debug,
Expand Down Expand Up @@ -2121,33 +2123,38 @@ consensus::install_snapshot(install_snapshot_request&& r) {
});
}

ss::future<> consensus::hydrate_snapshot() {
ss::future<> consensus::hydrate_snapshot(bool rebuild_translation_state) {
// Read snapshot, reset state machine using snapshot contents (and load
// snapshot’s cluster configuration) (§7.8)
return _snapshot_mgr.open_snapshot().then(
[this](std::optional<storage::snapshot_reader> reader) {
[this, rebuild_translation_state](
std::optional<storage::snapshot_reader> reader) {
// no snapshot do nothing
if (!reader) {
return ss::now();
}
return ss::do_with(
std::move(*reader), [this](storage::snapshot_reader& reader) {
return do_hydrate_snapshot(reader).finally(
[&reader] { return reader.close(); });
std::move(*reader),
[this,
rebuild_translation_state](storage::snapshot_reader& reader) {
return do_hydrate_snapshot(reader, rebuild_translation_state)
.finally([&reader] { return reader.close(); });
});
});
}

ss::future<> consensus::truncate_to_latest_snapshot(
std::optional<model::offset_delta> force_truncate_delta) {
std::optional<model::offset_delta> force_truncate_delta,
bool rebuild_translation_state) {
// we have to prefix truncate config manage at exactly last offset included
// in snapshot as this is the offset of configuration included in snapshot
// metadata.
return _log
->truncate_prefix(storage::truncate_prefix_config(
model::next_offset(_last_snapshot_index),
_scheduling.default_iopc,
force_truncate_delta))
force_truncate_delta,
rebuild_translation_state))
.then([this] {
return _configuration_manager.prefix_truncate(_last_snapshot_index);
})
Expand All @@ -2158,9 +2165,10 @@ ss::future<> consensus::truncate_to_latest_snapshot(
});
}

ss::future<> consensus::do_hydrate_snapshot(storage::snapshot_reader& reader) {
ss::future<> consensus::do_hydrate_snapshot(
storage::snapshot_reader& reader, bool rebuild_translation_state) {
return reader.read_metadata()
.then([this](iobuf buf) {
.then([this, rebuild_translation_state](iobuf buf) {
auto parser = iobuf_parser(std::move(buf));
auto metadata = reflection::adl<snapshot_metadata>{}.from(parser);
vassert(
Expand Down Expand Up @@ -2190,7 +2198,9 @@ ss::future<> consensus::do_hydrate_snapshot(storage::snapshot_reader& reader) {
update_follower_stats(metadata.latest_configuration);
return _configuration_manager
.add(_last_snapshot_index, std::move(metadata.latest_configuration))
.then([this, delta = metadata.log_start_delta]() mutable {
.then([this,
delta = metadata.log_start_delta,
rebuild_translation_state]() mutable {
_probe->configuration_update();
if (
_keep_snapshotted_log
Expand All @@ -2215,7 +2225,8 @@ ss::future<> consensus::do_hydrate_snapshot(storage::snapshot_reader& reader) {
"manager: {}",
delta);
}
return truncate_to_latest_snapshot(model::offset_delta{delta});
return truncate_to_latest_snapshot(
model::offset_delta{delta}, rebuild_translation_state);
});
})
.then([this] { return _snapshot_mgr.get_snapshot_size(); })
Expand Down
8 changes: 5 additions & 3 deletions src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -539,14 +539,16 @@ class consensus {
/**
* Hydrate the consensus state with the data from the snapshot
*/
ss::future<> hydrate_snapshot();
ss::future<> do_hydrate_snapshot(storage::snapshot_reader&);
ss::future<> hydrate_snapshot(bool rebuild_translation_state = false);
ss::future<> do_hydrate_snapshot(
storage::snapshot_reader&, bool rebuild_translation_state = false);

/**
* Truncates the log up the last offset stored in the snapshot
*/
ss::future<> truncate_to_latest_snapshot(
std::optional<model::offset_delta> force_truncate_delta = std::nullopt);
std::optional<model::offset_delta> force_truncate_delta = std::nullopt,
bool rebuild_translation_state = false);
ss::future<install_snapshot_reply>
finish_snapshot(install_snapshot_request, install_snapshot_reply);

Expand Down
11 changes: 7 additions & 4 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2506,6 +2506,10 @@ ss::future<> disk_log_impl::truncate_prefix(truncate_prefix_config cfg) {
// the start offset and thus can still need offset translation info.
co_await _offset_translator.prefix_truncate(
model::prev_offset(cfg.start_offset), cfg.force_truncate_delta);

if (cfg.rebuild_translation_state) {
co_await _offset_translator.sync_with_log(*this, _compaction_as);
}
}

ss::future<> disk_log_impl::do_truncate_prefix(truncate_prefix_config cfg) {
Expand Down Expand Up @@ -2992,10 +2996,9 @@ ss::future<ss::shared_ptr<log>> make_disk_backed_log(

// Reset or load the offset translator state, depending on whether this is
// a brand new log.
auto& translator = disk_log->offset_translator();
co_await translator.start(
raft::offset_translator::must_reset{disk_log->is_new_log()});
co_await translator.sync_with_log(disk_log, as);
auto is_new = disk_log->is_new_log();
co_await disk_log->offset_translator().start(
raft::offset_translator::must_reset{is_new});
co_return disk_log;
}

Expand Down
8 changes: 4 additions & 4 deletions src/v/storage/offset_translator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ ss::future<> offset_translator::start(must_reset reset) {
}

ss::future<> offset_translator::sync_with_log(
ss::shared_ptr<storage::log> log, storage::opt_abort_source_t as) {
storage::log& log, storage::opt_abort_source_t as) {
if (_filtered_types.empty()) {
co_return;
}
Expand All @@ -176,7 +176,7 @@ ss::future<> offset_translator::sync_with_log(
"ntp {}: offset translation state shouldn't be empty",
_state->ntp());

auto log_offsets = log->offsets();
auto log_offsets = log.offsets();

// Trim the offset2delta map to log dirty_offset (discrepancy can
// happen if the offsets map was persisted, but the log wasn't flushed).
Expand All @@ -189,7 +189,7 @@ ss::future<> offset_translator::sync_with_log(
co_await _checkpoint_lock.with([this] { return do_checkpoint(); });
}

// read the log to insert the remaining entries into map
// Read the log to insert the remaining entries into map.
model::offset start_offset = model::next_offset(_highest_known_offset);

vlog(
Expand All @@ -201,7 +201,7 @@ ss::future<> offset_translator::sync_with_log(

auto reader_cfg = storage::log_reader_config(
start_offset, log_offsets.dirty_offset, ss::default_priority_class(), as);
auto reader = co_await log->make_reader(reader_cfg);
auto reader = co_await log.make_reader(reader_cfg);

struct log_consumer {
explicit log_consumer(offset_translator& self)
Expand Down
3 changes: 1 addition & 2 deletions src/v/storage/offset_translator.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ class offset_translator {

/// Searches for non-data batches up to the tip of the log. After this
/// method succeeds, offset translator is usable.
ss::future<>
sync_with_log(ss::shared_ptr<storage::log>, storage::opt_abort_source_t);
ss::future<> sync_with_log(storage::log&, storage::opt_abort_source_t);

/// Process the batch and add it to offset translation state if it is not
/// a data batch.
Expand Down
4 changes: 2 additions & 2 deletions src/v/storage/tests/offset_translator_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ struct fuzz_checker {
ss::future<> start() {
_tr.emplace(_make_offset_translator());
co_await _tr->start(raft::offset_translator::must_reset::yes);
co_await _tr->sync_with_log(_log, std::nullopt);
co_await _tr->sync_with_log(*_log, std::nullopt);
}

ss::future<> append() {
Expand Down Expand Up @@ -477,7 +477,7 @@ struct fuzz_checker {
co_await _tr->start(raft::offset_translator::must_reset::no);
co_await _tr->prefix_truncate_reset(
_snapshot_offset, _snapshot_delta);
co_await _tr->sync_with_log(_log, std::nullopt);
co_await _tr->sync_with_log(*_log, std::nullopt);
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/v/storage/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,12 @@ struct truncate_prefix_config {
truncate_prefix_config(
model::offset o,
ss::io_priority_class p,
std::optional<model::offset_delta> force_truncate_delta = std::nullopt)
std::optional<model::offset_delta> force_truncate_delta = std::nullopt,
bool rebuild_translation_state = false)
: start_offset(o)
, prio(p)
, force_truncate_delta(force_truncate_delta) {}
, force_truncate_delta(force_truncate_delta)
, rebuild_translation_state(rebuild_translation_state) {}
model::offset start_offset;
ss::io_priority_class prio;

Expand All @@ -297,6 +299,8 @@ struct truncate_prefix_config {
// an error.
std::optional<model::offset_delta> force_truncate_delta;

bool rebuild_translation_state;

friend std::ostream&
operator<<(std::ostream&, const truncate_prefix_config&);
};
Expand Down

0 comments on commit 32af1fd

Please sign in to comment.