diff --git a/src/v/archival/archival_policy.cc b/src/v/archival/archival_policy.cc index d23dfcb0c84e4..44d72991375d0 100644 --- a/src/v/archival/archival_policy.cc +++ b/src/v/archival/archival_policy.cc @@ -17,7 +17,6 @@ #include "storage/disk_log_impl.h" #include "storage/fs_utils.h" #include "storage/offset_to_filepos.h" -#include "storage/offset_translator_state.h" #include "storage/parser.h" #include "storage/segment.h" #include "storage/segment_set.h" @@ -168,8 +167,7 @@ bool archival_policy::upload_deadline_reached() { archival_policy::lookup_result archival_policy::find_segment( model::offset start_offset, model::offset adjusted_lso, - ss::shared_ptr log, - const storage::offset_translator_state& ot_state) { + ss::shared_ptr log) { vlog( archival_log.debug, "Upload policy for {} invoked, start offset: {}", @@ -228,9 +226,8 @@ archival_policy::lookup_result archival_policy::find_segment( } if (!closed) { - auto kafka_start_offset = ot_state.from_log_offset(start_offset); - auto kafka_lso = ot_state.from_log_offset( - model::next_offset(adjusted_lso)); + auto kafka_start_offset = log->from_log_offset(start_offset); + auto kafka_lso = log->from_log_offset(model::next_offset(adjusted_lso)); if (kafka_start_offset >= kafka_lso) { // If timeboxed uploads are enabled and there is no producer // activity, we can get into a nasty loop where we upload a segment, @@ -434,13 +431,12 @@ ss::future archival_policy::get_next_candidate( model::offset begin_inclusive, model::offset end_exclusive, ss::shared_ptr log, - const storage::offset_translator_state& ot_state, ss::lowres_clock::duration segment_lock_duration) { // NOTE: end_exclusive (which is initialized with LSO) points to the first // unstable recordbatch we need to look at the previous batch if needed. auto adjusted_lso = end_exclusive - model::offset(1); auto [segment, ntp_conf, forced] = find_segment( - begin_inclusive, adjusted_lso, std::move(log), ot_state); + begin_inclusive, adjusted_lso, std::move(log)); if (segment.get() == nullptr) { co_return candidate_creation_error::no_segment_for_begin_offset; } diff --git a/src/v/archival/archival_policy.h b/src/v/archival/archival_policy.h index e5fec41b5385a..0468d36526d7d 100644 --- a/src/v/archival/archival_policy.h +++ b/src/v/archival/archival_policy.h @@ -99,7 +99,6 @@ class archival_policy { model::offset begin_inclusive, model::offset end_exclusive, ss::shared_ptr, - const storage::offset_translator_state&, ss::lowres_clock::duration segment_lock_duration); ss::future get_next_compacted_segment( @@ -127,8 +126,7 @@ class archival_policy { lookup_result find_segment( model::offset last_offset, model::offset adjusted_lso, - ss::shared_ptr, - const storage::offset_translator_state&); + ss::shared_ptr); model::ntp _ntp; std::optional _upload_limit; diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index 3900062500c27..c43e3db4208bb 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -1403,7 +1403,7 @@ ntp_archiver::make_segment_index( std::string_view index_path, ss::input_stream stream) { auto base_kafka_offset = model::offset_cast( - _parent.get_offset_translator_state()->from_log_offset(base_rp_offset)); + _parent.log()->from_log_offset(base_rp_offset)); cloud_storage::offset_index ix{ base_rp_offset, @@ -1469,9 +1469,10 @@ ntp_archiver::do_schedule_single_upload( auto first_source = upload.sources.front(); auto offset = upload.final_offset; auto base = upload.starting_offset; - auto ot_state = _parent.get_offset_translator_state(); - auto delta = base - model::offset_cast(ot_state->from_log_offset(base)); - auto delta_offset_next = ot_state->next_offset_delta(upload.final_offset); + auto log = _parent.log(); + auto delta = base - model::offset_cast(log->from_log_offset(base)); + auto delta_offset_next = log->offset_delta( + model::next_offset(upload.final_offset)); // The upload is successful only if the segment, and tx_range are // uploaded. @@ -1558,7 +1559,6 @@ ntp_archiver::schedule_single_upload(const upload_context& upload_ctx) { start_upload_offset, last_stable_offset, log, - *_parent.get_offset_translator_state(), _conf->segment_upload_timeout); break; case segment_upload_kind::compacted: @@ -2925,9 +2925,10 @@ ss::future ntp_archiver::do_upload_local( auto offset = upload.final_offset; auto base = upload.starting_offset; - auto ot_state = _parent.get_offset_translator_state(); - auto delta = base - model::offset_cast(ot_state->from_log_offset(base)); - auto delta_offset_next = ot_state->next_offset_delta(upload.final_offset); + auto log = _parent.log(); + auto delta = base - model::offset_cast(log->from_log_offset(base)); + auto delta_offset_next = log->offset_delta( + model::next_offset(upload.final_offset)); auto archiver_term = _start_term; // Upload segments and tx-manifest in parallel diff --git a/src/v/archival/tests/ntp_archiver_test.cc b/src/v/archival/tests/ntp_archiver_test.cc index 90b4d6b2aa3d0..9855a46bd8768 100644 --- a/src/v/archival/tests/ntp_archiver_test.cc +++ b/src/v/archival/tests/ntp_archiver_test.cc @@ -24,9 +24,9 @@ #include "config/property.h" #include "model/fundamental.h" #include "model/metadata.h" +#include "model/record_batch_types.h" #include "net/types.h" #include "net/unresolved_address.h" -#include "raft/offset_translator.h" #include "ssx/sformat.h" #include "storage/disk_log_impl.h" #include "storage/parser.h" @@ -826,17 +826,14 @@ FIXTURE_TEST(test_archiver_policy, archiver_fixture) { auto partition = app.partition_manager.local().get(manifest_ntp); BOOST_REQUIRE(partition); - const storage::offset_translator_state& tr - = *partition->get_offset_translator_state(); // Starting offset is lower than offset1 - auto upload1 - = require_upload_candidate( - policy - .get_next_candidate( - model::offset(0), lso, log, tr, segment_read_lock_timeout) - .get()) - .candidate; + auto upload1 = require_upload_candidate( + policy + .get_next_candidate( + model::offset(0), lso, log, segment_read_lock_timeout) + .get()) + .candidate; log_upload_candidate(upload1); BOOST_REQUIRE(!upload1.sources.empty()); BOOST_REQUIRE(upload1.starting_offset == offset1); @@ -848,7 +845,7 @@ FIXTURE_TEST(test_archiver_policy, archiver_fixture) { auto upload2 = require_upload_candidate( policy .get_next_candidate( - start_offset, lso, log, tr, segment_read_lock_timeout) + start_offset, lso, log, segment_read_lock_timeout) .get()) .candidate; log_upload_candidate(upload2); @@ -863,7 +860,7 @@ FIXTURE_TEST(test_archiver_policy, archiver_fixture) { auto upload3 = require_upload_candidate( policy .get_next_candidate( - start_offset, lso, log, tr, segment_read_lock_timeout) + start_offset, lso, log, segment_read_lock_timeout) .get()) .candidate; log_upload_candidate(upload3); @@ -877,14 +874,13 @@ FIXTURE_TEST(test_archiver_policy, archiver_fixture) { + model::offset(1); require_candidate_creation_error( policy - .get_next_candidate( - start_offset, lso, log, tr, segment_read_lock_timeout) + .get_next_candidate(start_offset, lso, log, segment_read_lock_timeout) .get(), candidate_creation_error::no_segment_for_begin_offset); require_candidate_creation_error( policy .get_next_candidate( - lso + model::offset(1), lso, log, tr, segment_read_lock_timeout) + lso + model::offset(1), lso, log, segment_read_lock_timeout) .get(), candidate_creation_error::no_segment_for_begin_offset); } @@ -926,16 +922,13 @@ FIXTURE_TEST( auto partition = app.partition_manager.local().get(manifest_ntp); BOOST_REQUIRE(partition); - auto candidate = require_upload_candidate( - archival::archival_policy{manifest_ntp} - .get_next_candidate( - model::offset(0), - lso, - log, - *partition->get_offset_translator_state(), - segment_read_lock_timeout) - .get()) - .candidate; + auto candidate + = require_upload_candidate( + archival::archival_policy{manifest_ntp} + .get_next_candidate( + model::offset(0), lso, log, segment_read_lock_timeout) + .get()) + .candidate; // The search is expected to find the next segment after the compacted // segment, skipping the compacted one. @@ -947,21 +940,18 @@ FIXTURE_TEST( // NOLINTNEXTLINE SEASTAR_THREAD_TEST_CASE(test_archival_policy_timeboxed_uploads) { - storage::disk_log_builder b; + storage::disk_log_builder b( + storage::log_builder_config(), + model::offset_translator_batch_types(), + raft::group_id{0}); b | storage::start(manifest_ntp); archival::archival_policy policy(manifest_ntp, segment_time_limit{0s}); auto log = b.get_log(); - raft::offset_translator tr( - {model::record_batch_type::raft_configuration, - model::record_batch_type::archival_metadata}, - raft::group_id{0}, - manifest_ntp, - b.storage()); - tr.start(raft::offset_translator::must_reset::yes).get(); - const auto& tr_state = *tr.state(); + // Must initialize translator state. + log->start(std::nullopt).get(); // first offset that is not yet uploaded auto start_offset = model::offset{0}; @@ -974,7 +964,6 @@ SEASTAR_THREAD_TEST_CASE(test_archival_policy_timeboxed_uploads) { start_offset, last_stable_offset, log, - tr_state, segment_read_lock_timeout) .get()) .candidate; @@ -996,7 +985,6 @@ SEASTAR_THREAD_TEST_CASE(test_archival_policy_timeboxed_uploads) { storage::maybe_compress_batches::no, model::record_batch_type::archival_metadata); BOOST_REQUIRE_EQUAL(log->offsets().dirty_offset, model::offset{13}); - tr.sync_with_log(log, std::nullopt).get(); // should upload [0-13] { @@ -1010,7 +998,6 @@ SEASTAR_THREAD_TEST_CASE(test_archival_policy_timeboxed_uploads) { // data[14-14] b | storage::add_random_batch(model::offset{14}, 1); BOOST_REQUIRE_EQUAL(log->offsets().dirty_offset, model::offset{14}); - tr.sync_with_log(log, std::nullopt).get(); // should upload [14-14] { @@ -1029,7 +1016,6 @@ SEASTAR_THREAD_TEST_CASE(test_archival_policy_timeboxed_uploads) { storage::maybe_compress_batches::no, model::record_batch_type::archival_metadata); BOOST_REQUIRE_EQUAL(log->offsets().dirty_offset, model::offset{16}); - tr.sync_with_log(log, std::nullopt).get(); // should skip uploading because there are no data batches to upload { @@ -1039,7 +1025,6 @@ SEASTAR_THREAD_TEST_CASE(test_archival_policy_timeboxed_uploads) { start_offset, log->offsets().dirty_offset + model::offset{1}, log, - tr_state, segment_read_lock_timeout) .get(), candidate_creation_error::no_segment_for_begin_offset); @@ -1048,7 +1033,6 @@ SEASTAR_THREAD_TEST_CASE(test_archival_policy_timeboxed_uploads) { // data[17-17] b | storage::add_random_batch(model::offset{17}, 1); BOOST_REQUIRE_EQUAL(log->offsets().dirty_offset, model::offset{17}); - tr.sync_with_log(log, std::nullopt).get(); // should upload [15-17] { @@ -1067,7 +1051,6 @@ SEASTAR_THREAD_TEST_CASE(test_archival_policy_timeboxed_uploads) { storage::maybe_compress_batches::no, model::record_batch_type::archival_metadata); BOOST_REQUIRE_EQUAL(log->offsets().dirty_offset, model::offset{18}); - tr.sync_with_log(log, std::nullopt).get(); // should skip uploading because there are no data batches to upload { @@ -1077,7 +1060,6 @@ SEASTAR_THREAD_TEST_CASE(test_archival_policy_timeboxed_uploads) { start_offset, log->offsets().dirty_offset + model::offset{1}, log, - tr_state, segment_read_lock_timeout) .get(), candidate_creation_error::no_segment_for_begin_offset); @@ -1560,8 +1542,6 @@ FIXTURE_TEST(test_upload_segments_with_overlap, archiver_fixture) { auto partition = app.partition_manager.local().get(manifest_ntp); BOOST_REQUIRE(partition); - const storage::offset_translator_state& tr - = *partition->get_offset_translator_state(); model::offset start_offset{0}; model::offset lso{9999}; @@ -1569,7 +1549,7 @@ FIXTURE_TEST(test_upload_segments_with_overlap, archiver_fixture) { auto upload1 = require_upload_candidate( policy .get_next_candidate( - start_offset, lso, log, tr, segment_read_lock_timeout) + start_offset, lso, log, segment_read_lock_timeout) .get()) .candidate; log_upload_candidate(upload1); @@ -1581,7 +1561,7 @@ FIXTURE_TEST(test_upload_segments_with_overlap, archiver_fixture) { auto upload2 = require_upload_candidate( policy .get_next_candidate( - start_offset, lso, log, tr, segment_read_lock_timeout) + start_offset, lso, log, segment_read_lock_timeout) .get()) .candidate; log_upload_candidate(upload2); @@ -1596,7 +1576,7 @@ FIXTURE_TEST(test_upload_segments_with_overlap, archiver_fixture) { auto upload3 = require_upload_candidate( policy .get_next_candidate( - start_offset, lso, log, tr, segment_read_lock_timeout) + start_offset, lso, log, segment_read_lock_timeout) .get()) .candidate; log_upload_candidate(upload3); @@ -1610,8 +1590,7 @@ FIXTURE_TEST(test_upload_segments_with_overlap, archiver_fixture) { + model::offset(1); require_candidate_creation_error( policy - .get_next_candidate( - start_offset, lso, log, tr, segment_read_lock_timeout) + .get_next_candidate(start_offset, lso, log, segment_read_lock_timeout) .get(), candidate_creation_error::no_segment_for_begin_offset); } diff --git a/src/v/cluster/cluster_utils.cc b/src/v/cluster/cluster_utils.cc index b66f207578bee..881daec1c4230 100644 --- a/src/v/cluster/cluster_utils.cc +++ b/src/v/cluster/cluster_utils.cc @@ -20,7 +20,7 @@ #include "raft/errc.h" #include "rpc/backoff_policy.h" #include "rpc/types.h" -#include "storage/kvstore.h" +#include "storage/disk_log_impl.h" #include @@ -279,8 +279,9 @@ partition_raft_state get_partition_raft_state(consensus_ptr ptr) { } raft_state.node = ptr->self().id(); raft_state.term = ptr->term(); + auto& disk_log = dynamic_cast(*ptr->log()); raft_state.offset_translator_state = fmt::format( - "{}", *(ptr->get_offset_translator_state())); + "{}", *(disk_log.offset_translator().state())); raft_state.group_configuration = fmt::format("{}", ptr->config()); raft_state.confirmed_term = ptr->confirmed_term(); raft_state.flushed_offset = ptr->flushed_offset(); diff --git a/src/v/cluster/controller_backend.cc b/src/v/cluster/controller_backend.cc index f3e245aa984b6..4b063291adfd4 100644 --- a/src/v/cluster/controller_backend.cc +++ b/src/v/cluster/controller_backend.cc @@ -34,6 +34,7 @@ #include "raft/types.h" #include "ssx/event.h" #include "ssx/future-util.h" +#include "storage/offset_translator.h" #include #include diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 2603d3acb4f57..e6a8fef45add6 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -166,8 +166,7 @@ partition_cloud_storage_status partition::get_cloud_storage_status() const { if (o == model::offset{}) { return std::nullopt; } - return model::offset_cast( - get_offset_translator_state()->from_log_offset(o)); + return model::offset_cast(log()->from_log_offset(o)); }; auto time_point_to_delta = [](ss::lowres_clock::time_point tp) @@ -321,9 +320,8 @@ ss::future> partition::replicate( if (!res) { co_return ret_t(res.error()); } - co_return ret_t( - kafka_result{kafka::offset(get_offset_translator_state()->from_log_offset( - res.value().last_offset)())}); + co_return ret_t(kafka_result{ + kafka::offset(log()->from_log_offset(res.value().last_offset)())}); } ss::shared_ptr partition::rm_stm() { @@ -373,8 +371,7 @@ kafka_stages partition::replicate_in_stages( return ret_t(r.error()); } auto old_offset = r.value().last_offset; - auto new_offset = kafka::offset( - get_offset_translator_state()->from_log_offset(old_offset)()); + auto new_offset = kafka::offset(log()->from_log_offset(old_offset)()); return ret_t(kafka_result{new_offset}); }); return kafka_stages( @@ -583,8 +580,7 @@ partition::local_timequery(storage::timequery_config cfg) { cfg.time, cfg.max_offset); - cfg.max_offset = _raft->get_offset_translator_state()->to_log_offset( - cfg.max_offset); + cfg.max_offset = _raft->log()->to_log_offset(cfg.max_offset); auto result = co_await _raft->timequery(cfg); @@ -662,8 +658,7 @@ partition::local_timequery(storage::timequery_config cfg) { cfg.time, cfg.max_offset, result->offset); - result->offset = _raft->get_offset_translator_state()->from_log_offset( - result->offset); + result->offset = _raft->log()->from_log_offset(result->offset); } co_return result; diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 32b387d2ee96f..7cb292a97658f 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -108,13 +108,11 @@ class partition : public ss::enable_lw_shared_from_this { } // The eviction STM only keeps track of DeleteRecords truncations // as Raft offsets. Translate if possible. - auto offset_translator_state = get_offset_translator_state(); if ( offset_res.value() != model::offset{} && _raft->start_offset() < offset_res.value()) { - auto start_kafka_offset - = offset_translator_state->from_log_offset( - offset_res.value()); + auto start_kafka_offset = log()->from_log_offset( + offset_res.value()); co_return start_kafka_offset; } // If a start override is no longer in the offset translator state, @@ -283,7 +281,7 @@ class partition : public ss::enable_lw_shared_from_this { ss::lw_shared_ptr get_offset_translator_state() const { - return _raft->get_offset_translator_state(); + return _raft->log()->get_offset_translator_state(); } ss::shared_ptr rm_stm(); @@ -378,9 +376,7 @@ class partition : public ss::enable_lw_shared_from_this { if (_log_eviction_stm && !is_read_replica_mode_enabled()) { auto o = _log_eviction_stm->start_offset_override(); if (o != model::offset{} && _raft->start_offset() < o) { - auto offset_translator_state = get_offset_translator_state(); - auto start_kafka_offset - = offset_translator_state->from_log_offset(o); + auto start_kafka_offset = log()->from_log_offset(o); return start_kafka_offset; } // If a start override is no longer in the offset translator state, diff --git a/src/v/cluster/partition_manager.cc b/src/v/cluster/partition_manager.cc index 16f9319c12517..9c84e6e14bac5 100644 --- a/src/v/cluster/partition_manager.cc +++ b/src/v/cluster/partition_manager.cc @@ -27,12 +27,10 @@ #include "raft/consensus.h" #include "raft/consensus_utils.h" #include "raft/group_configuration.h" -#include "raft/offset_translator.h" #include "raft/rpc_client_protocol.h" #include "raft/types.h" #include "resource_mgmt/io_priority.h" #include "ssx/async-clear.h" -#include "storage/offset_translator_state.h" #include "storage/segment_utils.h" #include "storage/snapshot.h" #include "utils/retry_chain_node.h" @@ -214,7 +212,10 @@ ss::future partition_manager::manage( ntp_cfg, manifest, max_offset); } } - auto log = co_await _storage.log_mgr().manage(std::move(ntp_cfg)); + auto translator_batch_types = raft::offset_translator_batch_types( + ntp_cfg.ntp()); + auto log = co_await _storage.log_mgr().manage( + std::move(ntp_cfg), group, std::move(translator_batch_types)); vlog( clusterlog.debug, "Log created manage completed, ntp: {}, rev: {}, {} " diff --git a/src/v/cluster/partition_probe.cc b/src/v/cluster/partition_probe.cc index 0bfe60d0bbe24..793f4be881866 100644 --- a/src/v/cluster/partition_probe.cc +++ b/src/v/cluster/partition_probe.cc @@ -217,10 +217,9 @@ void replicated_partition_probe::setup_public_metrics(const model::ntp& ntp) { return model::offset(0); } auto log_offset = _partition.high_watermark(); - auto translator = _partition.get_offset_translator_state(); try { - return translator->from_log_offset(log_offset); + return _partition.log()->from_log_offset(log_offset); } catch (std::runtime_error& e) { // Offset translation will throw if nothing was committed // to the partition or if the offset is outside the diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 307a1f310b023..e032f5a1e92c7 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -1800,16 +1800,14 @@ void rm_stm::apply_data( kafka::offset rm_stm::from_log_offset(model::offset log_offset) const { if (log_offset > model::offset{-1}) { - return kafka::offset( - _raft->get_offset_translator_state()->from_log_offset(log_offset)); + return kafka::offset(_raft->log()->from_log_offset(log_offset)); } return kafka::offset(log_offset); } model::offset rm_stm::to_log_offset(kafka::offset k_offset) const { if (k_offset > kafka::offset{-1}) { - return _raft->get_offset_translator_state()->to_log_offset( - model::offset(k_offset())); + return _raft->log()->to_log_offset(model::offset(k_offset())); } return model::offset(k_offset); diff --git a/src/v/kafka/server/tests/produce_consume_test.cc b/src/v/kafka/server/tests/produce_consume_test.cc index 20ff918551e9d..de084ae36eee5 100644 --- a/src/v/kafka/server/tests/produce_consume_test.cc +++ b/src/v/kafka/server/tests/produce_consume_test.cc @@ -18,6 +18,7 @@ #include "kafka/server/tests/delete_records_utils.h" #include "kafka/server/tests/produce_consume_utils.h" #include "model/fundamental.h" +#include "model/timeout_clock.h" #include "random/generators.h" #include "redpanda/tests/fixture.h" #include "storage/record_batch_builder.h" @@ -973,9 +974,14 @@ FIXTURE_TEST(test_offset_for_leader_epoch, prod_consume_fixture) { *shard, [ntp](cluster::partition_manager& mgr) { auto partition = mgr.get(ntp); - storage::truncate_prefix_config cfg( - model::offset(1), ss::default_priority_class()); - partition->log()->truncate_prefix(cfg).get(); + auto local_kafka_start_offset = partition->log()->from_log_offset( + model::offset(1)); + partition + ->prefix_truncate( + model::offset(1), + model::offset_cast(local_kafka_start_offset), + model::no_timeout) + .get(); }) .get(); @@ -1016,8 +1022,7 @@ FIXTURE_TEST(test_offset_for_leader_epoch, prod_consume_fixture) { [ntp](cluster::partition_manager& mgr) { auto partition = mgr.get(ntp); auto start_offset = partition->log()->offsets().start_offset; - return partition->get_offset_translator_state() - ->from_log_offset(start_offset); + return partition->log()->from_log_offset(start_offset); }) .get(); BOOST_REQUIRE_EQUAL(earliest_kafka_offset, partition_resp.end_offset); diff --git a/src/v/model/record_batch_types.h b/src/v/model/record_batch_types.h index fda7cec314ab3..804b10bb8ba9f 100644 --- a/src/v/model/record_batch_types.h +++ b/src/v/model/record_batch_types.h @@ -54,6 +54,12 @@ enum class record_batch_type : int8_t { std::ostream& operator<<(std::ostream& o, record_batch_type bt); +// The set of batch types that may appear in a data partition that aren't +// assigned a new translated offset. When translated, such batches are given an +// offset matching the next batch of type outside this set. +// +// Put simply, batches of these types do not increment the offset that would be +// returned upon translating offsets for Kafka fetches. inline std::vector offset_translator_batch_types() { return { model::record_batch_type::raft_configuration, diff --git a/src/v/raft/CMakeLists.txt b/src/v/raft/CMakeLists.txt index 7f4fa065ab9da..ac05695166886 100644 --- a/src/v/raft/CMakeLists.txt +++ b/src/v/raft/CMakeLists.txt @@ -30,7 +30,6 @@ v_cc_library( group_configuration.cc append_entries_buffer.cc follower_queue.cc - offset_translator.cc recovery_memory_quota.cc coordinated_recovery_throttle.cc heartbeats.cc diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 9866d8d3b04a6..510f7452b58b0 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -38,6 +38,9 @@ #include "ssx/future-util.h" #include "storage/api.h" #include "storage/kvstore.h" +#include "storage/ntp_config.h" +#include "storage/snapshot.h" +#include "storage/types.h" #include #include @@ -111,11 +114,6 @@ consensus::consensus( , _group(group) , _jit(std::move(jit)) , _log(l) - , _offset_translator( - offset_translator_batch_types(_log->config().ntp()), - group, - _log->config().ntp(), - storage) , _scheduling(scheduling_config) , _disk_timeout(std::move(disk_timeout)) , _client_protocol(client) @@ -1373,7 +1371,7 @@ ss::future<> consensus::do_start() { auto u = co_await _op_lock.get_units(); read_voted_for(); - bool initial_state = is_initial_state(); + bool initial_state = _log->is_new_log(); vlog( _ctxlog.info, @@ -1388,10 +1386,26 @@ ss::future<> consensus::do_start() { "Configuration manager started: {}", _configuration_manager); - co_await _offset_translator.start( - offset_translator::must_reset{initial_state}); - - co_await _snapshot_lock.with([this] { return hydrate_snapshot(); }); + std::optional start_truncate_cfg; + auto snapshot_units = co_await _snapshot_lock.get_units(); + auto metadata = co_await read_snapshot_metadata(); + if (metadata.has_value()) { + update_offset_from_snapshot(metadata.value()); + co_await _configuration_manager.add( + _last_snapshot_index, std::move(metadata->latest_configuration)); + _probe->configuration_update(); + + start_truncate_cfg = truncation_cfg_for_snapshot(metadata.value()); + if (start_truncate_cfg.has_value()) { + _flushed_offset = std::max( + _last_snapshot_index, _flushed_offset); + co_await _configuration_manager.prefix_truncate( + _last_snapshot_index); + } + _snapshot_size = co_await _snapshot_mgr.get_snapshot_size(); + } + co_await _log->start(start_truncate_cfg); + snapshot_units.return_all(); vlog( _ctxlog.debug, @@ -1446,8 +1460,6 @@ ss::future<> consensus::do_start() { update_follower_stats(_configuration_manager.get_latest()); - co_await _offset_translator.sync_with_log(_log, _as); - /** * fix for incorrectly persisted configuration index. In * previous version of redpanda due to the issue with @@ -2026,16 +2038,9 @@ consensus::do_append_entries(append_entries_request&& r) { truncate_at); _probe->log_truncated(); - // We are truncating the offset translator before truncating the log - // because if saving offset translator state fails, we will retry and - // eventually log and offset translator will become consistent. OTOH if - // log truncation were first and saving offset translator state failed, - // we wouldn't retry and log and offset translator could diverge. - return _offset_translator.truncate(truncate_at) - .then([this, truncate_at] { - return _log->truncate(storage::truncate_config( - truncate_at, _scheduling.default_iopc)); - }) + return _log + ->truncate( + storage::truncate_config(truncate_at, _scheduling.default_iopc)) .then([this, truncate_at] { _last_quorum_replicated_index_with_flush = std::min( model::prev_offset(truncate_at), @@ -2153,37 +2158,58 @@ consensus::install_snapshot(install_snapshot_request&& r) { ss::future<> consensus::hydrate_snapshot() { // Read snapshot, reset state machine using snapshot contents (and load // snapshot’s cluster configuration) (§7.8) - return _snapshot_mgr.open_snapshot().then( - [this](std::optional reader) { - // no snapshot do nothing - if (!reader) { - return ss::now(); - } - return ss::do_with( - std::move(*reader), [this](storage::snapshot_reader& reader) { - return do_hydrate_snapshot(reader).finally( - [&reader] { return reader.close(); }); - }); - }); + auto metadata = co_await read_snapshot_metadata(); + if (!metadata.has_value()) { + co_return; + } + update_offset_from_snapshot(metadata.value()); + co_await _configuration_manager.add( + _last_snapshot_index, std::move(metadata->latest_configuration)); + _probe->configuration_update(); + auto truncate_cfg = truncation_cfg_for_snapshot(metadata.value()); + if (truncate_cfg.has_value()) { + co_await truncate_to_latest_snapshot(truncate_cfg.value()); + } + _snapshot_size = co_await _snapshot_mgr.get_snapshot_size(); } -ss::future<> consensus::truncate_to_latest_snapshot() { +std::optional +consensus::truncation_cfg_for_snapshot(const snapshot_metadata& metadata) { + if ( + _keep_snapshotted_log + && _log->offsets().dirty_offset >= _last_snapshot_index) { + // skip prefix truncating if we want to preserve the log (e.g. for the + // controller partition), but only if there is no gap between old end + // offset and new start offset, otherwise we must still advance the log + // start offset by prefix-truncating. + return std::nullopt; + } + auto delta = metadata.log_start_delta; + if (delta < offset_translator_delta(0)) { + delta = offset_translator_delta( + _configuration_manager.offset_delta(_last_snapshot_index)); + vlog( + _ctxlog.warn, + "received snapshot without delta field in metadata, " + "falling back to delta obtained from configuration " + "manager: {}", + delta); + } + return storage::truncate_prefix_config( + model::next_offset(_last_snapshot_index), + _scheduling.default_iopc, + model::offset_delta(delta)); +} + +ss::future<> +consensus::truncate_to_latest_snapshot(storage::truncate_prefix_config cfg) { // we have to prefix truncate config manage at exactly last offset included // in snapshot as this is the offset of configuration included in snapshot // metadata. - // - // We truncate the log before truncating offset translator to wait for - // readers that started reading from the start of the log before we advanced - // _last_snapshot_index and thus can still need offset translation info. - return _log - ->truncate_prefix(storage::truncate_prefix_config( - model::next_offset(_last_snapshot_index), _scheduling.default_iopc)) + return _log->truncate_prefix(cfg) .then([this] { return _configuration_manager.prefix_truncate(_last_snapshot_index); }) - .then([this] { - return _offset_translator.prefix_truncate(_last_snapshot_index); - }) .then([this] { // when log was prefix truncate flushed offset should be equal to at // least last snapshot index @@ -2191,71 +2217,54 @@ ss::future<> consensus::truncate_to_latest_snapshot() { }); } -ss::future<> consensus::do_hydrate_snapshot(storage::snapshot_reader& reader) { - return reader.read_metadata() - .then([this](iobuf buf) { - auto parser = iobuf_parser(std::move(buf)); - auto metadata = reflection::adl{}.from(parser); - vassert( - metadata.last_included_index >= _last_snapshot_index, - "Tried to load stale snapshot. Loaded snapshot last " - "index {}, current snapshot last index {}", - metadata.last_included_index, - _last_snapshot_index); +ss::future> +consensus::read_snapshot_metadata() { + std::optional metadata; + auto snapshot_reader = co_await _snapshot_mgr.open_snapshot(); + if (!snapshot_reader.has_value()) { + co_return std::nullopt; + } + std::exception_ptr eptr; + try { + auto buf = co_await snapshot_reader->read_metadata(); + auto parser = iobuf_parser(std::move(buf)); + metadata = reflection::adl{}.from(parser); + } catch (...) { + eptr = std::current_exception(); + } + co_await snapshot_reader->close(); + if (eptr) { + rethrow_exception(eptr); + } + co_return metadata; +} - vlog( - _ctxlog.info, - "hydrating snapshot with last included index: {}", - metadata.last_included_index); - - _last_snapshot_index = metadata.last_included_index; - _last_snapshot_term = metadata.last_included_term; - - // TODO: add applying snapshot content to state machine - auto prev_commit_index = _commit_index; - _commit_index = std::max(_last_snapshot_index, _commit_index); - maybe_update_last_visible_index(_commit_index); - if (prev_commit_index != _commit_index) { - _commit_index_updated.broadcast(); - _event_manager.notify_commit_index(); - } +void consensus::update_offset_from_snapshot( + const raft::snapshot_metadata& metadata) { + vassert( + metadata.last_included_index >= _last_snapshot_index, + "Tried to load stale snapshot. Loaded snapshot last " + "index {}, current snapshot last index {}", + metadata.last_included_index, + _last_snapshot_index); + vlog( + _ctxlog.info, + "hydrating snapshot with last included index: {}", + metadata.last_included_index); - update_follower_stats(metadata.latest_configuration); - return _configuration_manager - .add(_last_snapshot_index, std::move(metadata.latest_configuration)) - .then([this, delta = metadata.log_start_delta]() mutable { - _probe->configuration_update(); + _last_snapshot_index = metadata.last_included_index; + _last_snapshot_term = metadata.last_included_term; - if (delta < offset_translator_delta(0)) { - delta = offset_translator_delta( - _configuration_manager.offset_delta( - _last_snapshot_index)); - vlog( - _ctxlog.warn, - "received snapshot without delta field in metadata, " - "falling back to delta obtained from configuration " - "manager: {}", - delta); - } - return _offset_translator.prefix_truncate_reset( - _last_snapshot_index, delta); - }) - .then([this] { - if ( - _keep_snapshotted_log - && _log->offsets().dirty_offset >= _last_snapshot_index) { - // skip prefix truncating if we want to preserve the log - // (e.g. for the controller partition), but only if there is - // no gap between old end offset and new start offset, - // otherwise we must still advance the log start offset by - // prefix-truncating. - return ss::now(); - } - return truncate_to_latest_snapshot(); - }); - }) - .then([this] { return _snapshot_mgr.get_snapshot_size(); }) - .then([this](uint64_t size) { _snapshot_size = size; }); + // TODO: add applying snapshot content to state machine + auto prev_commit_index = _commit_index; + _commit_index = std::max(_last_snapshot_index, _commit_index); + maybe_update_last_visible_index(_commit_index); + if (prev_commit_index != _commit_index) { + _commit_index_updated.broadcast(); + _event_manager.notify_commit_index(); + } + + update_follower_stats(metadata.latest_configuration); } ss::future @@ -2421,9 +2430,7 @@ ss::future<> consensus::write_snapshot(write_snapshot_cfg cfg) { _flushed_offset = std::max(last_included_index, _flushed_offset); co_await _snapshot_lock.with([this, last_included_index]() mutable { - return ss::when_all_succeed( - _configuration_manager.prefix_truncate(last_included_index), - _offset_translator.prefix_truncate(last_included_index)); + return _configuration_manager.prefix_truncate(last_included_index); }); } @@ -2455,8 +2462,7 @@ consensus::do_write_snapshot(model::offset last_included_index, iobuf&& data) { .latest_configuration = *config, .cluster_time = clock_type::time_point::min(), .log_start_delta = offset_translator_delta( - _offset_translator.state()->delta( - model::next_offset(last_included_index))), + _log->offset_delta(model::next_offset(last_included_index))()), }; return details::persist_snapshot( @@ -2673,29 +2679,24 @@ ss::future consensus::disk_append( class consumer { public: - consumer(offset_translator& translator, storage::log_appender appender) - : _translator(translator) - , _appender(std::move(appender)) {} + consumer(storage::log_appender appender) + : _appender(std::move(appender)) {} ss::future operator()(model::record_batch& batch) { auto ret = co_await _appender(batch); - // passing batch to translator after appender so that correct batch - // offsets are filled. - _translator.process(batch); co_return ret; } auto end_of_stream() { return _appender.end_of_stream(); } private: - offset_translator& _translator; storage::log_appender _appender; }; return details::for_each_ref_extract_configuration( _log->offsets().dirty_offset, std::move(reader), - consumer(_offset_translator, _log->make_appender(cfg)), + consumer(_log->make_appender(cfg)), cfg.timeout) .then([this, should_update_last_quorum_idx]( std::tuple> t) { @@ -2736,12 +2737,6 @@ ss::future consensus::disk_append( return ret; } - // Do checkpointing in the background to avoid latency spikes in - // the write path caused by KVStore flush debouncing. - - ssx::spawn_with_gate( - _bg, [this] { return _offset_translator.maybe_checkpoint(); }); - _configuration_manager .maybe_store_highest_known_offset_in_background( ret.last_offset, ret.byte_size, _bg); @@ -3472,8 +3467,6 @@ ss::future<> consensus::remove_persistent_state() { storage::kvstore::key_space::consensus, last_applied_key()); // configuration manager co_await _configuration_manager.remove_persistent_state(); - // offset translator - co_await _offset_translator.remove_persistent_state(); // snapshot co_await _snapshot_mgr.remove_snapshot(); co_await _snapshot_mgr.remove_partial_snapshots(); diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index a096a2a58eea0..ea594f8b4f4c3 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -28,17 +28,17 @@ #include "raft/group_configuration.h" #include "raft/heartbeats.h" #include "raft/logger.h" -#include "raft/offset_translator.h" #include "raft/probe.h" #include "raft/recovery_memory_quota.h" #include "raft/recovery_scheduler.h" #include "raft/replicate_batcher.h" #include "raft/state_machine_manager.h" #include "raft/timeout_jitter.h" +#include "raft/types.h" #include "ssx/semaphore.h" -#include "storage/fwd.h" #include "storage/log.h" #include "storage/snapshot.h" +#include "storage/types.h" #include "utils/mutex.h" #include @@ -407,11 +407,6 @@ class consensus { ss::shared_ptr log() { return _log; } - ss::lw_shared_ptr - get_offset_translator_state() { - return _offset_translator.state(); - } - /** * In our raft implementation heartbeats are sent outside of the consensus * lock. In order to prevent reordering and do not flood followers with @@ -549,12 +544,16 @@ class consensus { * Hydrate the consensus state with the data from the snapshot */ ss::future<> hydrate_snapshot(); - ss::future<> do_hydrate_snapshot(storage::snapshot_reader&); + + void update_offset_from_snapshot(const snapshot_metadata&); + ss::future> read_snapshot_metadata(); /** * Truncates the log up the last offset stored in the snapshot */ - ss::future<> truncate_to_latest_snapshot(); + std::optional + truncation_cfg_for_snapshot(const snapshot_metadata&); + ss::future<> truncate_to_latest_snapshot(storage::truncate_prefix_config); ss::future finish_snapshot(install_snapshot_request, install_snapshot_reply); @@ -657,19 +656,6 @@ class consensus { voter_priority next_target_priority(); voter_priority get_node_priority(vnode) const; - /** - * Return true if there is no state backing this consensus group i.e. there - * is no snapshot and log is empty - */ - bool is_initial_state() const { - static constexpr model::offset not_initialized{}; - auto lstats = _log->offsets(); - return _log->segment_count() == 0 - && lstats.dirty_offset == not_initialized - && lstats.start_offset == not_initialized - && _last_snapshot_index == not_initialized; - } - template result validate_reply_target_node( std::string_view request, @@ -779,7 +765,6 @@ class consensus { raft::group_id _group; timeout_jitter _jit; ss::shared_ptr _log; - offset_translator _offset_translator; scheduling_config _scheduling; config::binding _disk_timeout; consensus_client_protocol _client_protocol; diff --git a/src/v/raft/consensus_utils.cc b/src/v/raft/consensus_utils.cc index 1d85025d81e99..92b032faeb441 100644 --- a/src/v/raft/consensus_utils.cc +++ b/src/v/raft/consensus_utils.cc @@ -20,7 +20,6 @@ #include "model/timestamp.h" #include "raft/group_configuration.h" #include "raft/logger.h" -#include "raft/offset_translator.h" #include "raft/types.h" #include "random/generators.h" #include "reflection/adl.h" @@ -31,6 +30,7 @@ #include "storage/fs_utils.h" #include "storage/kvstore.h" #include "storage/ntp_config.h" +#include "storage/offset_translator.h" #include "storage/offset_translator_state.h" #include "storage/record_batch_builder.h" #include "storage/segment_appender_utils.h" diff --git a/src/v/raft/recovery_stm.cc b/src/v/raft/recovery_stm.cc index a8a251f038daf..d60b9eb3d8cf0 100644 --- a/src/v/raft/recovery_stm.cc +++ b/src/v/raft/recovery_stm.cc @@ -490,8 +490,7 @@ recovery_stm::take_on_demand_snapshot(model::offset last_included_offset) { .last_included_term = *term, .latest_configuration = std::move(*cfg), .log_start_delta = offset_translator_delta( - _ptr->_offset_translator.state()->delta( - model::next_offset(last_included_offset))), + _ptr->log()->offset_delta(model::next_offset(last_included_offset))), }; co_await writer.write_metadata(reflection::to_iobuf(std::move(metadata))); diff --git a/src/v/raft/tests/CMakeLists.txt b/src/v/raft/tests/CMakeLists.txt index b24ab95eb7570..ca8a5a8975354 100644 --- a/src/v/raft/tests/CMakeLists.txt +++ b/src/v/raft/tests/CMakeLists.txt @@ -66,15 +66,6 @@ rp_test( ARGS "-- -c 8" ) -rp_test( - UNIT_TEST - BINARY_NAME test_offset_translator - SOURCES offset_translator_tests.cc - LIBRARIES v::seastar_testing_main v::raft v::storage_test_utils - LABELS kafka - ARGS "-- -c 8" -) - rp_test( BENCHMARK_TEST BINARY_NAME heartbeat_bench diff --git a/src/v/raft/tests/raft_group_fixture.h b/src/v/raft/tests/raft_group_fixture.h index 5041f6faca567..b0cbf1858f601 100644 --- a/src/v/raft/tests/raft_group_fixture.h +++ b/src/v/raft/tests/raft_group_fixture.h @@ -881,10 +881,9 @@ inline void validate_offset_translation(raft_group& gr) { auto end = gr.get_members().begin()->second.consensus->last_visible_index(); for (auto o = start; o < end; o++) { - reference[o] = gr.get_members() - .begin() - ->second.consensus->get_offset_translator_state() - ->from_log_offset(o); + reference[o] + = gr.get_members().begin()->second.consensus->log()->from_log_offset( + o); } for (auto it = std::next(gr.get_members().begin()); @@ -898,9 +897,7 @@ inline void validate_offset_translation(raft_group& gr) { if (!reference.contains(o)) { continue; } - auto translated = it->second.consensus - ->get_offset_translator_state() - ->from_log_offset(o); + auto translated = it->second.consensus->log()->from_log_offset(o); tstlog.info( "translation for offset {}, validating {} == {}\n", o, diff --git a/src/v/raft/tests/raft_reconfiguration_test.cc b/src/v/raft/tests/raft_reconfiguration_test.cc index 93ad8cb8dc75e..797cd89178250 100644 --- a/src/v/raft/tests/raft_reconfiguration_test.cc +++ b/src/v/raft/tests/raft_reconfiguration_test.cc @@ -145,7 +145,7 @@ void assert_offset_translator_state_is_consistent( std::vector deltas; for (model::offset o : boost::irange(start_offset, dirty_offset)) { - deltas.push_back(first_raft->get_offset_translator_state()->delta(o)); + deltas.push_back(first_raft->log()->offset_delta(o)); } for (auto it = std::next(nodes.begin()); it != nodes.end(); ++it) { @@ -153,8 +153,7 @@ void assert_offset_translator_state_is_consistent( for (model::offset o : boost::irange(start_offset, dirty_offset)) { ASSERT_EQ( - it->second->raft()->get_offset_translator_state()->delta(o), - deltas[idx++]); + it->second->raft()->log()->offset_delta(o), deltas[idx++]); } } } diff --git a/src/v/redpanda/admin/debug.cc b/src/v/redpanda/admin/debug.cc index de0c66f6f9c6b..67157ef332270 100644 --- a/src/v/redpanda/admin/debug.cc +++ b/src/v/redpanda/admin/debug.cc @@ -594,16 +594,16 @@ admin_server::get_local_offsets_translated_handler( "partition with ntp {} could not be found on the node", ntp))); } - const auto ots = partition->get_offset_translator_state(); + const auto& log = partition->log(); std::vector result; for (const auto offset : input) { try { ss::httpd::debug_json::translated_offset to; if (translate_to == to_kafka) { - to.kafka_offset = ots->from_log_offset(offset); + to.kafka_offset = log->from_log_offset(offset); to.rp_offset = offset; } else { - to.rp_offset = ots->to_log_offset(offset); + to.rp_offset = log->to_log_offset(offset); to.kafka_offset = offset; } result.push_back(std::move(to)); diff --git a/src/v/redpanda/admin/partition.cc b/src/v/redpanda/admin/partition.cc index aab84a3f8fc94..afdc9e7dad9eb 100644 --- a/src/v/redpanda/admin/partition.cc +++ b/src/v/redpanda/admin/partition.cc @@ -80,7 +80,7 @@ admin_server::get_transactions_inner_handler( } ss::httpd::partition_json::transactions ans; - auto offset_translator = partition->get_offset_translator_state(); + auto log = partition->log(); for (auto& [id, tx_info] : transactions.value()) { ss::httpd::partition_json::producer_identity pid; @@ -91,8 +91,7 @@ admin_server::get_transactions_inner_handler( new_tx.producer_id = pid; new_tx.status = ss::sstring(tx_info.get_status()); - new_tx.lso_bound = offset_translator->from_log_offset( - tx_info.lso_bound); + new_tx.lso_bound = log->from_log_offset(tx_info.lso_bound); auto staleness = tx_info.get_staleness(); // -1 is returned for expired transaction, because how diff --git a/src/v/storage/CMakeLists.txt b/src/v/storage/CMakeLists.txt index 0fd3c66a050b7..748cc0dcbe358 100644 --- a/src/v/storage/CMakeLists.txt +++ b/src/v/storage/CMakeLists.txt @@ -35,6 +35,7 @@ v_cc_library( backlog_controller.cc compaction_controller.cc offset_to_filepos.cc + offset_translator.cc fs_utils.cc file_sanitizer_types.cc api.cc diff --git a/src/v/storage/disk_log_appender.cc b/src/v/storage/disk_log_appender.cc index 414b3d4b7dc8c..98f133c3dd625 100644 --- a/src/v/storage/disk_log_appender.cc +++ b/src/v/storage/disk_log_appender.cc @@ -110,7 +110,9 @@ disk_log_appender::operator()(model::record_batch& batch) { _last_term, _idx, _config.io_priority); co_await initialize(); } - co_return co_await append_batch_to_segment(batch); + auto stop = co_await append_batch_to_segment(batch); + _log.offset_translator().process(batch); + co_return stop; } catch (...) { release_lock(); vlog( @@ -159,13 +161,14 @@ ss::future disk_log_appender::end_of_stream() { .last_offset = _last_offset, .byte_size = _byte_size, .last_term = _last_term}; - if (_config.should_fsync == storage::log_append_config::fsync::no) { - return ss::make_ready_future(retval); - } - return _log.flush().then([this, retval] { + if (_config.should_fsync == storage::log_append_config::fsync::yes) { + co_await _log.flush(); release_lock(); - return retval; - }); + } + // Do checkpointing in the background to avoid latency spikes in the write + // path caused by KVStore flush debouncing. + _log.bg_checkpoint_offset_translator(); + co_return retval; } std::ostream& operator<<(std::ostream& o, const disk_log_appender& a) { diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index f22d654f648bc..afa85f7a4bf71 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -15,6 +15,7 @@ #include "model/adl_serde.h" #include "model/fundamental.h" #include "model/namespace.h" +#include "model/record_batch_types.h" #include "model/timeout_clock.h" #include "model/timestamp.h" #include "reflection/adl.h" @@ -105,10 +106,12 @@ bool deletion_exempt(const model::ntp& ntp) { disk_log_impl::disk_log_impl( ntp_config cfg, + raft::group_id group, log_manager& manager, segment_set segs, kvstore& kvstore, - ss::sharded& feature_table) + ss::sharded& feature_table, + std::vector translator_batch_types) : log(std::move(cfg)) , _manager(manager) , _segment_size_jitter( @@ -118,6 +121,12 @@ disk_log_impl::disk_log_impl( , _feature_table(feature_table) , _start_offset(read_start_offset()) , _lock_mngr(_segs) + , _offset_translator( + std::move(translator_batch_types), + group, + config().ntp(), + _kvstore, + resources()) , _probe(std::make_unique()) , _max_segment_size(compute_max_segment_size()) , _readers_cache(std::make_unique( @@ -158,6 +167,7 @@ ss::future<> disk_log_impl::remove() { permanent_delete.emplace_back( remove_segment_permanently(s, "disk_log_impl::remove()")); } + co_await _offset_translator.remove_persistent_state(); co_await _readers_cache->stop() .then([this, permanent_delete = std::move(permanent_delete)]() mutable { @@ -181,6 +191,21 @@ ss::future<> disk_log_impl::remove() { .finally([this] { _probe->clear_metrics(); }); } +ss::future<> +disk_log_impl::start(std::optional truncate_cfg) { + auto is_new = is_new_log(); + co_await offset_translator().start( + raft::offset_translator::must_reset{is_new}); + if (truncate_cfg.has_value()) { + co_await truncate_prefix(truncate_cfg.value()); + } + // Reset or load the offset translator state, depending on whether this is + // a brand new log. + if (!is_new) { + co_await offset_translator().sync_with_log(*this, _compaction_as); + } +} + ss::future> disk_log_impl::close() { vassert(!_closed, "Invalid double closing of log - {}", *this); vlog(stlog.debug, "closing log {}", *this); @@ -1393,6 +1418,25 @@ model::term_id disk_log_impl::term() const { return _segs.back()->offsets().term; } +bool disk_log_impl::is_new_log() const { + static constexpr model::offset not_initialized{}; + const auto os = offsets(); + return segment_count() == 0 && os.dirty_offset == not_initialized + && os.start_offset == not_initialized; +} + +model::offset_delta disk_log_impl::offset_delta(model::offset o) const { + return model::offset_delta{_offset_translator.state()->delta(o)}; +} + +model::offset disk_log_impl::from_log_offset(model::offset log_offset) const { + return _offset_translator.state()->from_log_offset(log_offset); +} + +model::offset disk_log_impl::to_log_offset(model::offset data_offset) const { + return _offset_translator.state()->to_log_offset(data_offset); +} + offset_stats disk_log_impl::offsets() const { if (_segs.empty()) { offset_stats ret; @@ -1613,6 +1657,12 @@ size_t disk_log_impl::bytes_left_before_roll() const { return max - fo; } +void disk_log_impl::bg_checkpoint_offset_translator() { + ssx::spawn_with_gate(_compaction_housekeeping_gate, [this] { + return _offset_translator.maybe_checkpoint(); + }); +} + ss::future<> disk_log_impl::force_roll(ss::io_priority_class iopc) { auto roll_lock_holder = co_await _segments_rolling_lock.get_units(); auto t = term(); @@ -2451,7 +2501,7 @@ disk_log_impl::remove_prefix_full_segments(truncate_prefix_config cfg) { ss::future<> disk_log_impl::truncate_prefix(truncate_prefix_config cfg) { vassert(!_closed, "truncate_prefix() on closed log - {}", *this); - return _failure_probes.truncate_prefix().then([this, cfg]() mutable { + co_await _failure_probes.truncate_prefix().then([this, cfg]() mutable { // dispatch the actual truncation return do_truncate_prefix(cfg) .then([this] { @@ -2466,6 +2516,11 @@ ss::future<> disk_log_impl::truncate_prefix(truncate_prefix_config cfg) { }) .discard_result(); }); + // We truncate the segments before truncating offset translator to wait for + // readers that started reading from the start of the log before we advanced + // the start offset and thus can still need offset translation info. + co_await _offset_translator.prefix_truncate( + model::prev_offset(cfg.start_offset), cfg.force_truncate_delta); } ss::future<> disk_log_impl::do_truncate_prefix(truncate_prefix_config cfg) { @@ -2527,7 +2582,15 @@ ss::future<> disk_log_impl::do_truncate_prefix(truncate_prefix_config cfg) { ss::future<> disk_log_impl::truncate(truncate_config cfg) { vassert(!_closed, "truncate() on closed log - {}", *this); - return _failure_probes.truncate().then([this, cfg]() mutable { + // We are truncating the offset translator before truncating the log + // because if saving offset translator state fails (e.g. because of a + // crash), we can retry and eventually log and offset translator will + // become consistent. OTOH if log truncation were first and saving offset + // translator state failed, we wouldn't retry and log and offset translator + // could diverge. + co_await _offset_translator.truncate(cfg.base_offset); + + co_await _failure_probes.truncate().then([this, cfg]() mutable { // Before truncation, erase any claim about a particular segment being // clean: this may refer to a segment we are about to delete, or it // may refer to a segment that we will modify the indices+data for: on @@ -2927,12 +2990,21 @@ std::ostream& operator<<(std::ostream& o, const disk_log_impl& d) { ss::shared_ptr make_disk_backed_log( ntp_config cfg, + raft::group_id group, log_manager& manager, segment_set segs, kvstore& kvstore, - ss::sharded& feature_table) { - return ss::make_shared( - std::move(cfg), manager, std::move(segs), kvstore, feature_table); + ss::sharded& feature_table, + std::vector translator_batch_types) { + auto disk_log = ss::make_shared( + std::move(cfg), + group, + manager, + std::move(segs), + kvstore, + feature_table, + std::move(translator_batch_types)); + return disk_log; } /* diff --git a/src/v/storage/disk_log_impl.h b/src/v/storage/disk_log_impl.h index f492aa969c51c..f1baceca065c0 100644 --- a/src/v/storage/disk_log_impl.h +++ b/src/v/storage/disk_log_impl.h @@ -17,6 +17,7 @@ #include "storage/failure_probes.h" #include "storage/lock_manager.h" #include "storage/log.h" +#include "storage/offset_translator.h" #include "storage/probe.h" #include "storage/readers_cache.h" #include "storage/types.h" @@ -54,16 +55,19 @@ class disk_log_impl final : public log { disk_log_impl( ntp_config, + raft::group_id, log_manager&, segment_set, kvstore&, - ss::sharded& feature_table); + ss::sharded& feature_table, + std::vector translator_batch_types); ~disk_log_impl() override; disk_log_impl(disk_log_impl&&) noexcept = delete; disk_log_impl& operator=(disk_log_impl&&) noexcept = delete; disk_log_impl(const disk_log_impl&) = delete; disk_log_impl& operator=(const disk_log_impl&) = delete; + ss::future<> start(std::optional) final; ss::future> close() final; ss::future<> remove() final; ss::future<> flush() final; @@ -105,7 +109,16 @@ class disk_log_impl final : public log { ss::future> timequery(timequery_config cfg) final; size_t segment_count() const final { return _segs.size(); } + bool is_new_log() const final; offset_stats offsets() const final; + ss::lw_shared_ptr + get_offset_translator_state() const final { + return _offset_translator.state(); + } + raft::offset_translator& offset_translator() { return _offset_translator; } + model::offset_delta offset_delta(model::offset) const final; + model::offset from_log_offset(model::offset) const final; + model::offset to_log_offset(model::offset) const final; model::offset find_last_term_start_offset() const final; model::timestamp start_timestamp() const final; std::optional get_term(model::offset) const final; @@ -118,6 +131,9 @@ class disk_log_impl final : public log { ss::future<> maybe_roll_unlocked( model::term_id, model::offset next_offset, ss::io_priority_class); + // Kicks off a background flush of offset translator state to the kvstore. + void bg_checkpoint_offset_translator(); + ss::future<> force_roll(ss::io_priority_class) override; probe& get_probe() override { return *_probe; } @@ -317,6 +333,8 @@ class disk_log_impl final : public log { // method. mutex _start_offset_lock{"disk_log_impl::start_offset_lock"}; lock_manager _lock_mngr; + raft::offset_translator _offset_translator; + std::unique_ptr _probe; failure_probes _failure_probes; std::optional _eviction_monitor; diff --git a/src/v/storage/fwd.h b/src/v/storage/fwd.h index 9644967d6e401..41dc86bbe2b09 100644 --- a/src/v/storage/fwd.h +++ b/src/v/storage/fwd.h @@ -35,3 +35,8 @@ struct log_reader_config; struct timequery_config; } // namespace storage + +// TODO: a subsequent commit will move this into the storage namespace. +namespace raft { +class offset_translator; +} // namespace raft diff --git a/src/v/storage/log.h b/src/v/storage/log.h index fae4f9679afad..64f71a7e494c2 100644 --- a/src/v/storage/log.h +++ b/src/v/storage/log.h @@ -14,8 +14,10 @@ #include "features/feature_table.h" #include "model/fundamental.h" #include "model/record_batch_reader.h" +#include "model/record_batch_types.h" #include "model/timeout_clock.h" #include "model/timestamp.h" +#include "raft/fundamental.h" #include "storage/log_appender.h" #include "storage/ntp_config.h" #include "storage/segment_reader.h" @@ -43,6 +45,8 @@ class log { log& operator=(const log&) = delete; virtual ~log() noexcept = default; + virtual ss::future<> start(std::optional) = 0; + // it shouldn't block for a long time as it will block other logs // eviction virtual ss::future<> housekeeping(housekeeping_config) = 0; @@ -96,8 +100,28 @@ class log { virtual ss::future> timequery(timequery_config) = 0; + // Prefer to use offset_delta() or from/to_log_offset(). + // TODO: remove direct access to the translator state and instead rely on + // the translation/delta interface. + virtual ss::lw_shared_ptr + get_offset_translator_state() const = 0; + + // Returns the offset delta for a given offset. This can be used for + // example to translate a Raft offset to a data offset. + virtual model::offset_delta offset_delta(model::offset) const = 0; + + // Translate the given log offset into a data offset. + virtual model::offset from_log_offset(model::offset) const = 0; + + // Translate the given data offset into a log offset. + virtual model::offset to_log_offset(model::offset) const = 0; + const ntp_config& config() const { return _config; } + // Returns whether the log has never been appended to. + // NOTE: this is different than having no segments, which also may happen + // if we GC away all our segments. + virtual bool is_new_log() const = 0; virtual size_t segment_count() const = 0; virtual storage::offset_stats offsets() const = 0; // Returns counter which is incremented after every log suffix truncation @@ -210,10 +234,12 @@ class segment_set; class kvstore; ss::shared_ptr make_disk_backed_log( ntp_config, + raft::group_id, log_manager&, segment_set, kvstore&, - ss::sharded& feature_table); + ss::sharded& feature_table, + std::vector translator_batch_types); bool deletion_exempt(const model::ntp& ntp); diff --git a/src/v/storage/log_manager.cc b/src/v/storage/log_manager.cc index fa16f988d7a3e..bdb89b66ee31d 100644 --- a/src/v/storage/log_manager.cc +++ b/src/v/storage/log_manager.cc @@ -37,6 +37,7 @@ #include "storage/segment_set.h" #include "storage/segment_utils.h" #include "storage/storage_resources.h" +#include "storage/types.h" #include "utils/directory_walker.h" #include @@ -481,14 +482,25 @@ log_manager::create_cache(with_cache ntp_cache_enabled) { return batch_cache_index(_batch_cache); } -ss::future> log_manager::manage(ntp_config cfg) { +ss::future> log_manager::manage( + ntp_config cfg, + raft::group_id group, + std::vector translator_batch_types) { auto gate = _open_gate.hold(); + if (!translator_batch_types.empty()) { + // Sanity check to avoid multiple logs overwriting each others' + // translator state in the kvstore, which is keyed by group id. + vassert( + group != raft::group_id{}, + "When configured to translate offsets, must supply a valid group id"); + } auto units = co_await _resources.get_recovery_units(); - co_return co_await do_manage(std::move(cfg)); + co_return co_await do_manage( + std::move(cfg), group, std::move(translator_batch_types)); } -ss::future<> log_manager::recover_log_state(const ntp_config& cfg) { +ss::future<> log_manager::maybe_clear_kvstore(const ntp_config& cfg) { return ss::file_exists(cfg.work_directory()) .then([this, offset_key = internal::start_offset_key(cfg.ntp()), @@ -499,6 +511,7 @@ ss::future<> log_manager::recover_log_state(const ntp_config& cfg) { } // directory was deleted, make sure we do not have any state in KV // store. + // NOTE: this only removes state in the storage key space. return _kvstore.remove(kvstore::key_space::storage, offset_key) .then([this, segment_key] { return _kvstore.remove( @@ -507,7 +520,10 @@ ss::future<> log_manager::recover_log_state(const ntp_config& cfg) { }); } -ss::future> log_manager::do_manage(ntp_config cfg) { +ss::future> log_manager::do_manage( + ntp_config cfg, + raft::group_id group, + std::vector translator_batch_types) { if (_config.base_dir.empty()) { throw std::runtime_error( "log_manager:: cannot have empty config.base_dir"); @@ -525,7 +541,7 @@ ss::future> log_manager::do_manage(ntp_config cfg) { .segment_name; } - co_await recover_log_state(cfg); + co_await maybe_clear_kvstore(cfg); with_cache cache_enabled = cfg.cache_enabled(); auto ntp_sanitizer_cfg = _config.maybe_get_ntp_sanitizer_config(cfg.ntp()); @@ -543,7 +559,13 @@ ss::future> log_manager::do_manage(ntp_config cfg) { std::move(ntp_sanitizer_cfg)); auto l = storage::make_disk_backed_log( - std::move(cfg), *this, std::move(segments), _kvstore, _feature_table); + std::move(cfg), + group, + *this, + std::move(segments), + _kvstore, + _feature_table, + std::move(translator_batch_types)); auto [it, success] = _logs.emplace( l->config().ntp(), std::make_unique(l)); _logs_list.push_back(*it->second); diff --git a/src/v/storage/log_manager.h b/src/v/storage/log_manager.h index 6e79a20bd1e3d..8c485e138d4a1 100644 --- a/src/v/storage/log_manager.h +++ b/src/v/storage/log_manager.h @@ -172,7 +172,10 @@ class log_manager { storage_resources&, ss::sharded&) noexcept; - ss::future> manage(ntp_config); + ss::future> manage( + ntp_config, + raft::group_id = raft::group_id{}, + std::vector translator_batch_types = {}); ss::future<> shutdown(model::ntp); @@ -249,7 +252,10 @@ class log_manager { using compaction_list_type = intrusive_list; - ss::future> do_manage(ntp_config); + ss::future> do_manage( + ntp_config, + raft::group_id, + std::vector translator_batch_types); ss::future<> clean_close(ss::shared_ptr); /** @@ -265,7 +271,7 @@ class log_manager { std::optional create_cache(with_cache); ss::future<> dispatch_topic_dir_deletion(ss::sstring dir); - ss::future<> recover_log_state(const ntp_config&); + ss::future<> maybe_clear_kvstore(const ntp_config&); ss::future<> async_clear_logs(); ss::future<> housekeeping_scan(model::timestamp); diff --git a/src/v/raft/offset_translator.cc b/src/v/storage/offset_translator.cc similarity index 84% rename from src/v/raft/offset_translator.cc rename to src/v/storage/offset_translator.cc index b2bcd7ab8a4d2..ac7189ded094c 100644 --- a/src/v/raft/offset_translator.cc +++ b/src/v/storage/offset_translator.cc @@ -9,15 +9,16 @@ * by the Apache License, Version 2.0 */ -#include "raft/offset_translator.h" +#include "storage/offset_translator.h" #include "base/vlog.h" -#include "raft/consensus_utils.h" -#include "raft/logger.h" #include "storage/api.h" #include "storage/kvstore.h" +#include "storage/logger.h" +#include "storage/storage_resources.h" #include +#include namespace raft { @@ -27,12 +28,26 @@ offset_translator::offset_translator( std::vector filtered_types, raft::group_id group, model::ntp ntp, - storage::api& storage_api) + storage::kvstore& kvs, + storage::storage_resources& resources) : _filtered_types(std::move(filtered_types)) , _state(ss::make_lw_shared(std::move(ntp))) , _group(group) , _logger(logger, ssx::sformat("ntp: {}", _state->ntp())) - , _storage_api(storage_api) {} + , _kvs(kvs) + , _resources(resources) {} + +offset_translator::offset_translator( + std::vector filtered_types, + raft::group_id group, + model::ntp ntp, + storage::api& storage_api) + : offset_translator( + std::move(filtered_types), + group, + std::move(ntp), + storage_api.kvs(), + storage_api.resources()) {} void offset_translator::process(const model::record_batch& batch) { if (_filtered_types.empty()) { @@ -43,7 +58,7 @@ void offset_translator::process(const model::record_batch& batch) { // Update resource manager for the extra dirty bytes, it may hint us // to checkpoint early in response. - _checkpoint_hint |= _storage_api.resources().offset_translator_take_bytes( + _checkpoint_hint |= _resources.offset_translator_take_bytes( batch.size_bytes(), _bytes_processed_units); if ( @@ -110,9 +125,9 @@ ss::future<> offset_translator::start(must_reset reset) { co_await _checkpoint_lock.with([this] { return do_checkpoint(); }); } else { - auto map_buf = _storage_api.kvs().get( + auto map_buf = _kvs.get( storage::kvstore::key_space::offset_translator, offsets_map_key()); - auto highest_known_offset_buf = _storage_api.kvs().get( + auto highest_known_offset_buf = _kvs.get( storage::kvstore::key_space::offset_translator, highest_known_offset_key()); @@ -151,7 +166,7 @@ ss::future<> offset_translator::start(must_reset reset) { } ss::future<> offset_translator::sync_with_log( - ss::shared_ptr log, storage::opt_abort_source_t as) { + storage::log& log, storage::opt_abort_source_t as) { if (_filtered_types.empty()) { co_return; } @@ -161,7 +176,7 @@ ss::future<> offset_translator::sync_with_log( "ntp {}: offset translation state shouldn't be empty", _state->ntp()); - auto log_offsets = log->offsets(); + auto log_offsets = log.offsets(); // Trim the offset2delta map to log dirty_offset (discrepancy can // happen if the offsets map was persisted, but the log wasn't flushed). @@ -174,7 +189,7 @@ ss::future<> offset_translator::sync_with_log( co_await _checkpoint_lock.with([this] { return do_checkpoint(); }); } - // read the log to insert the remaining entries into map + // Read the log to insert the remaining entries into map. model::offset start_offset = model::next_offset(_highest_known_offset); vlog( @@ -186,7 +201,7 @@ ss::future<> offset_translator::sync_with_log( auto reader_cfg = storage::log_reader_config( start_offset, log_offsets.dirty_offset, ss::default_priority_class(), as); - auto reader = co_await log->make_reader(reader_cfg); + auto reader = co_await log.make_reader(reader_cfg); struct log_consumer { explicit log_consumer(offset_translator& self) @@ -239,64 +254,41 @@ ss::future<> offset_translator::truncate(model::offset offset) { co_await _checkpoint_lock.with([this] { return do_checkpoint(); }); } -ss::future<> offset_translator::prefix_truncate(model::offset offset) { +ss::future<> offset_translator::prefix_truncate( + model::offset offset, + std::optional force_truncate_delta) { if (_filtered_types.empty()) { co_return; } - if (offset > _highest_known_offset) { - throw std::runtime_error{_logger.format( - "trying to prefix truncate offset translator at offset {} which " - "is > highest_known_offset {}", - offset, - _highest_known_offset)}; - } - - if (!_state->prefix_truncate(offset)) { + if (unlikely(offset > _highest_known_offset)) { + if (!force_truncate_delta.has_value()) { + throw std::runtime_error{_logger.format( + "trying to prefix truncate offset translator at offset {} which " + "is > highest_known_offset {}", + offset, + _highest_known_offset)}; + } + // If the truncation is for past the end of the log (e.g. in the case + // of a stale replica being caught up from a snapshot), allow it. + *_state = storage::offset_translator_state( + _state->ntp(), offset, force_truncate_delta.value()); + _highest_known_offset = offset; + } else if (!_state->prefix_truncate(offset)) { co_return; } ++_map_version; - vlog( - _logger.debug, - "prefix_truncate at offset: {}, new state: {}", - offset, - _state); - - co_await _checkpoint_lock.with([this] { return do_checkpoint(); }); -} - -ss::future<> -offset_translator::prefix_truncate_reset(model::offset offset, int64_t delta) { - if (_filtered_types.empty()) { - co_return; - } - - if (offset <= _highest_known_offset) { - co_await prefix_truncate(offset); - co_return; - } - - vassert( - delta >= 0, - "not enough state to recover offset translator. Requested to reset " - "at offset {}. Translator highest_known_offset: {}, state: {}", + vlogl( + _logger, + force_truncate_delta.has_value() ? ss::log_level::info + : ss::log_level::debug, + "prefix_truncate at offset: {}, force_truncate_delta: {}, new state: {}", offset, - _highest_known_offset, + force_truncate_delta, _state); - *_state = storage::offset_translator_state(_state->ntp(), offset, delta); - ++_map_version; - - _highest_known_offset = offset; - - vlog( - _logger.info, - "prefix_truncate_reset at offset/delta: {}/{}", - offset, - delta); - co_await _checkpoint_lock.with([this] { return do_checkpoint(); }); } @@ -305,10 +297,10 @@ ss::future<> offset_translator::remove_persistent_state() { co_return; } - co_await _storage_api.kvs().remove( + co_await _kvs.remove( storage::kvstore::key_space::offset_translator, highest_known_offset_key()); - co_await _storage_api.kvs().remove( + co_await _kvs.remove( storage::kvstore::key_space::offset_translator, offsets_map_key()); } @@ -367,14 +359,14 @@ ss::future<> offset_translator::do_checkpoint() { // recreated by reading log from the highest known offset). if (map_buf) { - co_await _storage_api.kvs().put( + co_await _kvs.put( storage::kvstore::key_space::offset_translator, offsets_map_key(), std::move(*map_buf)); _map_version_at_checkpoint = map_version; } - co_await _storage_api.kvs().put( + co_await _kvs.put( storage::kvstore::key_space::offset_translator, highest_known_offset_key(), std::move(hko_buf)); @@ -395,7 +387,7 @@ ss::future<> offset_translator::move_persistent_state( }; using state_ptr = std::unique_ptr; vlog( - raftlog.debug, + storage::stlog.debug, "moving group {} offset translator state from {} to {}", group, source_shard, diff --git a/src/v/raft/offset_translator.h b/src/v/storage/offset_translator.h similarity index 87% rename from src/v/raft/offset_translator.h rename to src/v/storage/offset_translator.h index 890c2c3d23d80..12f522d420259 100644 --- a/src/v/raft/offset_translator.h +++ b/src/v/storage/offset_translator.h @@ -18,6 +18,7 @@ #include "storage/fwd.h" #include "storage/log.h" #include "storage/offset_translator_state.h" +#include "storage/storage_resources.h" #include "utils/mutex.h" #include "utils/prefix_logger.h" @@ -48,6 +49,13 @@ class offset_translator { model::ntp ntp, storage::api& storage_api); + offset_translator( + std::vector filtered_types, + raft::group_id group, + model::ntp ntp, + storage::kvstore& kvstore, + storage::storage_resources& resources); + offset_translator(const offset_translator&) = delete; offset_translator& operator=(const offset_translator&) = delete; @@ -65,8 +73,7 @@ class offset_translator { /// Searches for non-data batches up to the tip of the log. After this /// method succeeds, offset translator is usable. - ss::future<> - sync_with_log(ss::shared_ptr, storage::opt_abort_source_t); + ss::future<> sync_with_log(storage::log&, storage::opt_abort_source_t); /// Process the batch and add it to offset translation state if it is not /// a data batch. @@ -87,13 +94,12 @@ class offset_translator { /// Removes the offset translation state up to and including the offset. The /// offset delta for the next offsets is preserved. - ss::future<> prefix_truncate(model::offset); - - /// Removes the offset translation state up to and including the offset. The - /// offset delta for the next offsets is set to `delta`. If there is offset - /// translation state for the next offsets, it must be consistent with - /// `delta`. - ss::future<> prefix_truncate_reset(model::offset, int64_t delta); + // + /// The offset delta for the next offsets is set to `delta`. If there is + /// offset translation state for the next offsets, it must be consistent + /// with `delta`. + ss::future<> prefix_truncate( + model::offset, std::optional = std::nullopt); ss::future<> remove_persistent_state(); @@ -145,7 +151,8 @@ class offset_translator { size_t _bytes_processed_at_checkpoint = 0; size_t _map_version_at_checkpoint = 0; - storage::api& _storage_api; + storage::kvstore& _kvs; + storage::storage_resources& _resources; }; } // namespace raft diff --git a/src/v/storage/tests/CMakeLists.txt b/src/v/storage/tests/CMakeLists.txt index 048d5f4da70e6..585c893c6acb1 100644 --- a/src/v/storage/tests/CMakeLists.txt +++ b/src/v/storage/tests/CMakeLists.txt @@ -52,6 +52,15 @@ rp_test( ARGS "-- -c 1" ) +rp_test( + UNIT_TEST + BINARY_NAME test_offset_translator + SOURCES offset_translator_tests.cc + LIBRARIES v::seastar_testing_main v::storage_test_utils + LABELS kafka + ARGS "-- -c 8" +) + # Putting this first, last, or in the middle of the other single thread tests # results in: # runtime error: member access within null pointer of type diff --git a/src/v/storage/tests/compaction_e2e_test.cc b/src/v/storage/tests/compaction_e2e_test.cc index 412dc35d2fd59..8658520b268c9 100644 --- a/src/v/storage/tests/compaction_e2e_test.cc +++ b/src/v/storage/tests/compaction_e2e_test.cc @@ -90,7 +90,7 @@ class CompactionFixtureTest co_await wait_for_leader(ntp); partition = app.partition_manager.local().get(ntp).get(); - log = partition->log(); + log = partition->log().get(); } ss::future dir_summary() { @@ -146,7 +146,7 @@ class CompactionFixtureTest const model::topic topic_name{"compaction_e2e_test_topic"}; const model::ntp ntp{model::kafka_namespace, topic_name, 0}; cluster::partition* partition; - ss::shared_ptr log; + storage::log* log; scoped_config test_local_cfg; }; @@ -203,7 +203,10 @@ TEST_P(CompactionFixtureParamTest, TestDedupeOnePass) { // Consume again after restarting and ensure our assertions about // duplicates are still valid. restart(should_wipe::no); + wait_for_leader(ntp).get(); + partition = app.partition_manager.local().get(ntp).get(); + log = partition->log().get(); auto restart_summary = dir_summary().get(); tests::kafka_consume_transport second_consumer(make_kafka_client().get()); diff --git a/src/v/raft/tests/offset_translator_tests.cc b/src/v/storage/tests/offset_translator_tests.cc similarity index 98% rename from src/v/raft/tests/offset_translator_tests.cc rename to src/v/storage/tests/offset_translator_tests.cc index 00b9585aff2bb..52c281f05fc8e 100644 --- a/src/v/raft/tests/offset_translator_tests.cc +++ b/src/v/storage/tests/offset_translator_tests.cc @@ -9,12 +9,13 @@ #include "bytes/random.h" #include "model/fundamental.h" -#include "raft/offset_translator.h" +#include "raft/types.h" #include "random/generators.h" #include "storage/api.h" #include "storage/fwd.h" #include "storage/kvstore.h" #include "storage/log_manager.h" +#include "storage/offset_translator.h" #include "storage/record_batch_builder.h" #include "test_utils/fixture.h" @@ -323,7 +324,7 @@ struct fuzz_checker { ss::future<> start() { _tr.emplace(_make_offset_translator()); co_await _tr->start(raft::offset_translator::must_reset::yes); - co_await _tr->sync_with_log(_log, std::nullopt); + co_await _tr->sync_with_log(*_log, std::nullopt); } ss::future<> append() { @@ -474,9 +475,9 @@ struct fuzz_checker { _gate = ss::gate{}; co_await _tr->start(raft::offset_translator::must_reset::no); - co_await _tr->prefix_truncate_reset( - _snapshot_offset, _snapshot_delta); - co_await _tr->sync_with_log(_log, std::nullopt); + co_await _tr->prefix_truncate( + _snapshot_offset, model::offset_delta(_snapshot_delta)); + co_await _tr->sync_with_log(*_log, std::nullopt); } } diff --git a/src/v/storage/tests/utils/disk_log_builder.cc b/src/v/storage/tests/utils/disk_log_builder.cc index f0c14e527a937..f6580e6cac4dc 100644 --- a/src/v/storage/tests/utils/disk_log_builder.cc +++ b/src/v/storage/tests/utils/disk_log_builder.cc @@ -9,6 +9,7 @@ #include "storage/tests/utils/disk_log_builder.h" +#include "model/record_batch_types.h" #include "storage/disk_log_appender.h" #include "storage/types.h" @@ -21,8 +22,13 @@ using namespace std::chrono_literals; // NOLINT // util functions to be moved from storage_fixture // make_ntp, make_dir etc namespace storage { -disk_log_builder::disk_log_builder(storage::log_config config) +disk_log_builder::disk_log_builder( + storage::log_config config, + std::vector types, + raft::group_id group_id) : _log_config(std::move(config)) + , _translator_batch_types(std::move(types)) + , _group_id(group_id) , _storage( [this]() { return kvstore_config( @@ -100,7 +106,7 @@ ss::future<> disk_log_builder::start(storage::ntp_config cfg) { co_return co_await _storage.start().then( [this, cfg = std::move(cfg)]() mutable { return _storage.log_mgr() - .manage(std::move(cfg)) + .manage(std::move(cfg), _group_id, _translator_batch_types) .then([this](ss::shared_ptr log) { _log = log; }); }); } diff --git a/src/v/storage/tests/utils/disk_log_builder.h b/src/v/storage/tests/utils/disk_log_builder.h index 2c17e0c3a07e8..742e2b0e775e4 100644 --- a/src/v/storage/tests/utils/disk_log_builder.h +++ b/src/v/storage/tests/utils/disk_log_builder.h @@ -132,7 +132,9 @@ class disk_log_builder { using should_flush_after = ss::bool_class; // Constructors explicit disk_log_builder( - storage::log_config config = log_builder_config()); + storage::log_config config = log_builder_config(), + std::vector offset_translator_types = {}, + raft::group_id group_id = raft::group_id{}); ~disk_log_builder() noexcept = default; disk_log_builder(const disk_log_builder&) = delete; disk_log_builder& operator=(const disk_log_builder&) = delete; @@ -411,6 +413,8 @@ class disk_log_builder { ss::logger _test_logger{"disk-log-test-logger"}; ss::sharded _feature_table; storage::log_config _log_config; + std::vector _translator_batch_types; + raft::group_id _group_id; storage::api _storage; ss::shared_ptr _log; size_t _bytes_written{0}; diff --git a/src/v/storage/types.cc b/src/v/storage/types.cc index 507ff1c7c1c17..fc761d0331e72 100644 --- a/src/v/storage/types.cc +++ b/src/v/storage/types.cc @@ -157,7 +157,11 @@ std::ostream& operator<<(std::ostream& o, const truncate_config& cfg) { return o; } std::ostream& operator<<(std::ostream& o, const truncate_prefix_config& cfg) { - fmt::print(o, "{{start_offset:{}}}", cfg.start_offset); + fmt::print( + o, + "{{start_offset:{}, force_truncate_delta:{}}}", + cfg.start_offset, + cfg.force_truncate_delta); return o; } diff --git a/src/v/storage/types.h b/src/v/storage/types.h index c5a4c0b9d2747..7461b1331aea2 100644 --- a/src/v/storage/types.h +++ b/src/v/storage/types.h @@ -279,12 +279,24 @@ struct truncate_config { * starting offset is contained in the log is not removed. */ struct truncate_prefix_config { - truncate_prefix_config(model::offset o, ss::io_priority_class p) + truncate_prefix_config( + model::offset o, + ss::io_priority_class p, + std::optional force_truncate_delta = std::nullopt) : start_offset(o) - , prio(p) {} + , prio(p) + , force_truncate_delta(force_truncate_delta) {} model::offset start_offset; ss::io_priority_class prio; + // When supplied and `start_offset` is ahead of the log's end offset, + // indicates that truncation should proceed and this delta should be the + // delta at the start offset. + // + // When not supplied, truncation past the log's end offset will result in + // an error. + std::optional force_truncate_delta; + friend std::ostream& operator<<(std::ostream&, const truncate_prefix_config&); };