Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: adopt offset_translator #16892

Merged
merged 20 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
533ab0b
model: add comment around offset translator types
andrwng Feb 29, 2024
3a42655
compaction_e2e_test: fix log pointer lifetime
andrwng Mar 9, 2024
28fb936
raft: remove unused header
andrwng Feb 29, 2024
db4fa62
raft: move offset_translator build to storage
andrwng Mar 21, 2024
2b8d459
storage: make offset_translator use storage classes
andrwng Mar 5, 2024
18d1dda
storage: add interface to tell if log is brand new
andrwng Mar 5, 2024
96a29df
storage: add offset translator option to force truncate
andrwng Mar 5, 2024
da33e19
storage: instantiate offset_translator in disk_log_impl
andrwng Mar 5, 2024
30c2565
storage: add translation interfaces to log
andrwng Mar 1, 2024
90d9b2d
storage: plumb translator batch types from log_manager
andrwng Mar 14, 2024
cb6ab63
storage: add translator type options to log builder
andrwng Mar 9, 2024
41483d4
storage: add start() method
andrwng Mar 21, 2024
9ece8c0
raft: refactor snapshot hydration
andrwng Mar 16, 2024
0432925
storage: migrate Raft ownership of offset translation
andrwng Mar 16, 2024
1050ae5
raft: refactor delta_for_truncation() away
andrwng Mar 21, 2024
410d2c6
cluster: use storage interface to access offset translator
andrwng Mar 14, 2024
0cccecf
produce_consume_test: fix unit test truncation
andrwng Mar 9, 2024
5a19fe3
storage: add assertion to supply group id
andrwng Mar 21, 2024
3347fe3
raft: rename load_from_metadata -> update_offset_from_snapshot
andrwng Mar 21, 2024
ba937fd
storage: rename delta() to offset_delta()
andrwng Mar 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 42 additions & 54 deletions src/v/raft/consensus.cc
dotnwat marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
#include "ssx/future-util.h"
#include "storage/api.h"
#include "storage/kvstore.h"
#include "storage/ntp_config.h"
#include "storage/snapshot.h"
#include "storage/types.h"

#include <seastar/core/condition-variable.hh>
#include <seastar/core/coroutine.hh>
Expand Down Expand Up @@ -111,11 +114,6 @@ consensus::consensus(
, _group(group)
, _jit(std::move(jit))
, _log(l)
, _offset_translator(
offset_translator_batch_types(_log->config().ntp()),
group,
_log->config().ntp(),
storage)
, _scheduling(scheduling_config)
, _disk_timeout(std::move(disk_timeout))
, _client_protocol(client)
Expand Down Expand Up @@ -1388,10 +1386,31 @@ ss::future<> consensus::do_start() {
"Configuration manager started: {}",
_configuration_manager);

co_await _offset_translator.start(
offset_translator::must_reset{initial_state});

co_await _snapshot_lock.with([this] { return hydrate_snapshot(); });
andrwng marked this conversation as resolved.
Show resolved Hide resolved
std::optional<storage::truncate_prefix_config> start_truncate_cfg;
auto snapshot_units = co_await _snapshot_lock.get_units();
auto metadata = co_await read_snapshot_metadata();
if (metadata.has_value()) {
load_from_metadata(metadata.value());
co_await _configuration_manager.add(
_last_snapshot_index, std::move(metadata->latest_configuration));
_probe->configuration_update();

auto delta = delta_for_truncation(metadata.value());
if (delta.has_value()) {
start_truncate_cfg = storage::truncate_prefix_config(
model::next_offset(_last_snapshot_index),
_scheduling.default_iopc,
delta);

_flushed_offset = std::max(
_last_snapshot_index, _flushed_offset);
co_await _configuration_manager.prefix_truncate(
_last_snapshot_index);
}
_snapshot_size = co_await _snapshot_mgr.get_snapshot_size();
}
co_await _log->start(start_truncate_cfg);
snapshot_units.return_all();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding the commit message from the pervious commit

An upcoming change will rewrite the initial snapshot hydration at
startup to use the new log::start() interface. To that end, this pulls
some logic from snapshot hydration into some reusable methods.

this is the user of those reusable methods? i guess i was a bit surprised that the logic here is different than what hydrate_snapshot had been doing. what is core difference? is it just that you need to do some of the rehydration before the log starts, because starting the log depends on state from the snapshot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. We hydrate a bit more state and do a few more things, notably starting and syncing the offset translator state in this call to start().


vlog(
_ctxlog.debug,
Expand Down Expand Up @@ -1446,8 +1465,6 @@ ss::future<> consensus::do_start() {

update_follower_stats(_configuration_manager.get_latest());

co_await _offset_translator.sync_with_log(*_log, _as);
andrwng marked this conversation as resolved.
Show resolved Hide resolved

/**
* fix for incorrectly persisted configuration index. In
* previous version of redpanda due to the issue with
Expand Down Expand Up @@ -2026,16 +2043,9 @@ consensus::do_append_entries(append_entries_request&& r) {
truncate_at);
_probe->log_truncated();

// We are truncating the offset translator before truncating the log
// because if saving offset translator state fails, we will retry and
// eventually log and offset translator will become consistent. OTOH if
// log truncation were first and saving offset translator state failed,
// we wouldn't retry and log and offset translator could diverge.
andrwng marked this conversation as resolved.
Show resolved Hide resolved
return _offset_translator.truncate(truncate_at)
.then([this, truncate_at] {
return _log->truncate(storage::truncate_config(
truncate_at, _scheduling.default_iopc));
})
return _log
->truncate(
storage::truncate_config(truncate_at, _scheduling.default_iopc))
.then([this, truncate_at] {
_last_quorum_replicated_index_with_flush = std::min(
model::prev_offset(truncate_at),
Expand Down Expand Up @@ -2163,9 +2173,7 @@ ss::future<> consensus::hydrate_snapshot() {
_probe->configuration_update();
auto delta = delta_for_truncation(metadata.value());
if (delta.has_value()) {
co_await _offset_translator.prefix_truncate_reset(
_last_snapshot_index, *delta);
co_await truncate_to_latest_snapshot();
co_await truncate_to_latest_snapshot(model::offset_delta{*delta});
}
_snapshot_size = co_await _snapshot_mgr.get_snapshot_size();
}
Expand Down Expand Up @@ -2195,23 +2203,19 @@ consensus::delta_for_truncation(const snapshot_metadata& metadata) {
return model::offset_delta{delta};
}

ss::future<> consensus::truncate_to_latest_snapshot() {
ss::future<> consensus::truncate_to_latest_snapshot(
std::optional<model::offset_delta> force_truncate_delta) {
// we have to prefix truncate config manage at exactly last offset included
// in snapshot as this is the offset of configuration included in snapshot
// metadata.
//
// We truncate the log before truncating offset translator to wait for
// readers that started reading from the start of the log before we advanced
// _last_snapshot_index and thus can still need offset translation info.
return _log
->truncate_prefix(storage::truncate_prefix_config(
model::next_offset(_last_snapshot_index), _scheduling.default_iopc))
model::next_offset(_last_snapshot_index),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is invoked by hydrate_snapshot, but there the prefix truncate reset was being called with _last_snapshot_index rather than next_offset(_last_snapshot_index). is that a material difference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the log impl's prefix truncate will call offset_translator_reset() with prev(truncate_at), which accounts for the difference, unless you noticed that and there's still a material difference

_scheduling.default_iopc,
force_truncate_delta))
.then([this] {
return _configuration_manager.prefix_truncate(_last_snapshot_index);
})
.then([this] {
return _offset_translator.prefix_truncate(_last_snapshot_index);
})
.then([this] {
// when log was prefix truncate flushed offset should be equal to at
// least last snapshot index
Expand Down Expand Up @@ -2431,9 +2435,7 @@ ss::future<> consensus::write_snapshot(write_snapshot_cfg cfg) {
_flushed_offset = std::max(last_included_index, _flushed_offset);

co_await _snapshot_lock.with([this, last_included_index]() mutable {
return ss::when_all_succeed(
_configuration_manager.prefix_truncate(last_included_index),
_offset_translator.prefix_truncate(last_included_index));
return _configuration_manager.prefix_truncate(last_included_index);
});
}

Expand Down Expand Up @@ -2465,8 +2467,7 @@ consensus::do_write_snapshot(model::offset last_included_index, iobuf&& data) {
.latest_configuration = *config,
.cluster_time = clock_type::time_point::min(),
.log_start_delta = offset_translator_delta(
_offset_translator.state()->delta(
model::next_offset(last_included_index))),
_log->delta(model::next_offset(last_included_index))()),
};

return details::persist_snapshot(
Expand Down Expand Up @@ -2683,29 +2684,24 @@ ss::future<storage::append_result> consensus::disk_append(

class consumer {
public:
consumer(offset_translator& translator, storage::log_appender appender)
: _translator(translator)
, _appender(std::move(appender)) {}
consumer(storage::log_appender appender)
: _appender(std::move(appender)) {}

ss::future<ss::stop_iteration> operator()(model::record_batch& batch) {
auto ret = co_await _appender(batch);
// passing batch to translator after appender so that correct batch
// offsets are filled.
_translator.process(batch);
co_return ret;
}

auto end_of_stream() { return _appender.end_of_stream(); }

private:
offset_translator& _translator;
storage::log_appender _appender;
};

return details::for_each_ref_extract_configuration(
_log->offsets().dirty_offset,
std::move(reader),
consumer(_offset_translator, _log->make_appender(cfg)),
consumer(_log->make_appender(cfg)),
cfg.timeout)
.then([this, should_update_last_quorum_idx](
std::tuple<ret_t, std::vector<offset_configuration>> t) {
Expand Down Expand Up @@ -2746,12 +2742,6 @@ ss::future<storage::append_result> consensus::disk_append(
return ret;
}

// Do checkpointing in the background to avoid latency spikes in
// the write path caused by KVStore flush debouncing.

ssx::spawn_with_gate(
_bg, [this] { return _offset_translator.maybe_checkpoint(); });

_configuration_manager
.maybe_store_highest_known_offset_in_background(
ret.last_offset, ret.byte_size, _bg);
Expand Down Expand Up @@ -3482,8 +3472,6 @@ ss::future<> consensus::remove_persistent_state() {
storage::kvstore::key_space::consensus, last_applied_key());
// configuration manager
co_await _configuration_manager.remove_persistent_state();
// offset translator
co_await _offset_translator.remove_persistent_state();
// snapshot
co_await _snapshot_mgr.remove_snapshot();
co_await _snapshot_mgr.remove_partial_snapshots();
Expand Down
9 changes: 5 additions & 4 deletions src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@
#include "raft/replicate_batcher.h"
#include "raft/state_machine_manager.h"
#include "raft/timeout_jitter.h"
#include "raft/types.h"
#include "ssx/semaphore.h"
#include "storage/log.h"
#include "storage/offset_translator.h"
#include "storage/offset_translator_state.h"
#include "storage/snapshot.h"
#include "utils/mutex.h"

Expand Down Expand Up @@ -408,7 +409,7 @@ class consensus {

ss::lw_shared_ptr<const storage::offset_translator_state>
get_offset_translator_state() {
return _offset_translator.state();
return _log->get_offset_translator_state();
}

/**
Expand Down Expand Up @@ -557,7 +558,8 @@ class consensus {
*/
std::optional<model::offset_delta>
delta_for_truncation(const snapshot_metadata& metadata);
ss::future<> truncate_to_latest_snapshot();
ss::future<> truncate_to_latest_snapshot(
std::optional<model::offset_delta> force_truncate_delta = std::nullopt);
ss::future<install_snapshot_reply>
finish_snapshot(install_snapshot_request, install_snapshot_reply);

Expand Down Expand Up @@ -769,7 +771,6 @@ class consensus {
raft::group_id _group;
timeout_jitter _jit;
ss::shared_ptr<storage::log> _log;
offset_translator _offset_translator;
scheduling_config _scheduling;
config::binding<std::chrono::milliseconds> _disk_timeout;
consensus_client_protocol _client_protocol;
Expand Down
3 changes: 1 addition & 2 deletions src/v/raft/recovery_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,7 @@ recovery_stm::take_on_demand_snapshot(model::offset last_included_offset) {
.last_included_term = *term,
.latest_configuration = std::move(*cfg),
.log_start_delta = offset_translator_delta(
_ptr->_offset_translator.state()->delta(
model::next_offset(last_included_offset))),
_ptr->log()->delta(model::next_offset(last_included_offset))),
};

co_await writer.write_metadata(reflection::to_iobuf(std::move(metadata)));
Expand Down
17 changes: 10 additions & 7 deletions src/v/storage/disk_log_appender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ disk_log_appender::operator()(model::record_batch& batch) {
_last_term, _idx, _config.io_priority);
co_await initialize();
}
co_return co_await append_batch_to_segment(batch);
auto stop = co_await append_batch_to_segment(batch);
_log.offset_translator().process(batch);
co_return stop;
} catch (...) {
release_lock();
vlog(
Expand Down Expand Up @@ -159,13 +161,14 @@ ss::future<append_result> disk_log_appender::end_of_stream() {
.last_offset = _last_offset,
.byte_size = _byte_size,
.last_term = _last_term};
if (_config.should_fsync == storage::log_append_config::fsync::no) {
return ss::make_ready_future<append_result>(retval);
}
return _log.flush().then([this, retval] {
if (_config.should_fsync == storage::log_append_config::fsync::yes) {
co_await _log.flush();
release_lock();
return retval;
});
}
// Do checkpointing in the background to avoid latency spikes in the write
// path caused by KVStore flush debouncing.
_log.bg_checkpoint_offset_translator();
co_return retval;
}

std::ostream& operator<<(std::ostream& o, const disk_log_appender& a) {
Expand Down
24 changes: 22 additions & 2 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ ss::future<> disk_log_impl::remove() {
permanent_delete.emplace_back(
remove_segment_permanently(s, "disk_log_impl::remove()"));
}
co_await _offset_translator.remove_persistent_state();

co_await _readers_cache->stop()
.then([this, permanent_delete = std::move(permanent_delete)]() mutable {
Expand Down Expand Up @@ -1656,6 +1657,12 @@ size_t disk_log_impl::bytes_left_before_roll() const {
return max - fo;
}

void disk_log_impl::bg_checkpoint_offset_translator() {
ssx::spawn_with_gate(_compaction_housekeeping_gate, [this] {
return _offset_translator.maybe_checkpoint();
});
}

ss::future<> disk_log_impl::force_roll(ss::io_priority_class iopc) {
auto roll_lock_holder = co_await _segments_rolling_lock.get_units();
auto t = term();
Expand Down Expand Up @@ -2494,7 +2501,7 @@ disk_log_impl::remove_prefix_full_segments(truncate_prefix_config cfg) {

ss::future<> disk_log_impl::truncate_prefix(truncate_prefix_config cfg) {
vassert(!_closed, "truncate_prefix() on closed log - {}", *this);
return _failure_probes.truncate_prefix().then([this, cfg]() mutable {
co_await _failure_probes.truncate_prefix().then([this, cfg]() mutable {
// dispatch the actual truncation
return do_truncate_prefix(cfg)
.then([this] {
Expand All @@ -2509,6 +2516,11 @@ ss::future<> disk_log_impl::truncate_prefix(truncate_prefix_config cfg) {
})
.discard_result();
});
// We truncate the segments before truncating offset translator to wait for
// readers that started reading from the start of the log before we advanced
// the start offset and thus can still need offset translation info.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why doesn't the same logic about failure scenarios (described in the comment below in ::truncate() apply here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the truncate() comment is related to how we sync the offset translator state with the log at startup. Basically, we'll read through the log starting right after the end of what we have in our offset translator state, and then use that to rebuild state. Truncating the offset translator state is required to allow us to detect this state, since I think it's much harder to detect if the offset translation state passes the end of the log.

For prefix truncation OTOH, prefix truncating the offset translator state last means that our translation state extends below the beginning of the log, which doesn't affect correctness

co_await _offset_translator.prefix_truncate(
model::prev_offset(cfg.start_offset), cfg.force_truncate_delta);
}

ss::future<> disk_log_impl::do_truncate_prefix(truncate_prefix_config cfg) {
Expand Down Expand Up @@ -2570,7 +2582,15 @@ ss::future<> disk_log_impl::do_truncate_prefix(truncate_prefix_config cfg) {

ss::future<> disk_log_impl::truncate(truncate_config cfg) {
vassert(!_closed, "truncate() on closed log - {}", *this);
return _failure_probes.truncate().then([this, cfg]() mutable {
// We are truncating the offset translator before truncating the log
// because if saving offset translator state fails (e.g. because of a
// crash), we can retry and eventually log and offset translator will
// become consistent. OTOH if log truncation were first and saving offset
// translator state failed, we wouldn't retry and log and offset translator
// could diverge.
dotnwat marked this conversation as resolved.
Show resolved Hide resolved
co_await _offset_translator.truncate(cfg.base_offset);

co_await _failure_probes.truncate().then([this, cfg]() mutable {
// Before truncation, erase any claim about a particular segment being
// clean: this may refer to a segment we are about to delete, or it
// may refer to a segment that we will modify the indices+data for: on
Expand Down
7 changes: 7 additions & 0 deletions src/v/storage/disk_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ class disk_log_impl final : public log {
size_t segment_count() const final { return _segs.size(); }
bool is_new_log() const final;
offset_stats offsets() const final;
ss::lw_shared_ptr<const storage::offset_translator_state>
get_offset_translator_state() const final {
return _offset_translator.state();
}
raft::offset_translator& offset_translator() { return _offset_translator; }
model::offset_delta delta(model::offset) const final;
model::offset from_log_offset(model::offset) const final;
Expand All @@ -127,6 +131,9 @@ class disk_log_impl final : public log {
ss::future<> maybe_roll_unlocked(
model::term_id, model::offset next_offset, ss::io_priority_class);

// Kicks off a background flush of offset translator state to the kvstore.
void bg_checkpoint_offset_translator();

ss::future<> force_roll(ss::io_priority_class) override;

probe& get_probe() override { return *_probe; }
Expand Down
6 changes: 6 additions & 0 deletions src/v/storage/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ class log {
virtual ss::future<std::optional<timequery_result>>
timequery(timequery_config) = 0;

// Prefer to use delta() or from/to_log_offset().
// TODO: remove direct access to the translator state and instead rely on
// the translation/delta interface.
virtual ss::lw_shared_ptr<const storage::offset_translator_state>
get_offset_translator_state() const = 0;

// Returns the offset delta for a given offset. This can be used for
// example to translate a Raft offset to a data offset.
virtual model::offset_delta delta(model::offset) const = 0;
andrwng marked this conversation as resolved.
Show resolved Hide resolved
Expand Down