Skip to content

Commit

Permalink
storage: adopt offset_translator in namespace
Browse files Browse the repository at this point in the history
Now that the build pieces have landed for storage:: taking ownership of
the offset_translator, this updates its namespace to follow suit.

This is follow-up to redpanda-data#16892
  • Loading branch information
andrwng committed Mar 26, 2024
1 parent e64151c commit e66fbd7
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1736,7 +1736,7 @@ ss::future<std::error_code> controller_backend::transfer_partition(
// TODO: copy, not move
co_await raft::details::move_persistent_state(
group, ss::this_shard_id(), destination, _storage);
co_await raft::offset_translator::move_persistent_state(
co_await storage::offset_translator::move_persistent_state(
group, ss::this_shard_id(), destination, _storage);
co_await raft::move_persistent_stm_state(
ntp, ss::this_shard_id(), destination, _storage);
Expand Down
4 changes: 2 additions & 2 deletions src/v/raft/consensus_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ ss::future<> create_offset_translator_state_for_pre_existing_partition(
max_rp_offset);
co_await api.kvs().put(
storage::kvstore::key_space::offset_translator,
raft::offset_translator::kvstore_offsetmap_key(group),
storage::offset_translator::kvstore_offsetmap_key(group),
ot_state->serialize_map());
vlog(
raftlog.debug,
Expand All @@ -455,7 +455,7 @@ ss::future<> create_offset_translator_state_for_pre_existing_partition(
max_rp_offset);
co_await api.kvs().put(
storage::kvstore::key_space::offset_translator,
raft::offset_translator::kvstore_highest_known_offset_key(group),
storage::offset_translator::kvstore_highest_known_offset_key(group),
reflection::to_iobuf(max_rp_offset));
}

Expand Down
2 changes: 1 addition & 1 deletion src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ ss::future<>
disk_log_impl::start(std::optional<truncate_prefix_config> truncate_cfg) {
auto is_new = is_new_log();
co_await offset_translator().start(
raft::offset_translator::must_reset{is_new});
storage::offset_translator::must_reset{is_new});
if (truncate_cfg.has_value()) {
co_await truncate_prefix(truncate_cfg.value());
}
Expand Down
4 changes: 2 additions & 2 deletions src/v/storage/disk_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class disk_log_impl final : public log {
get_offset_translator_state() const final {
return _offset_translator.state();
}
raft::offset_translator& offset_translator() { return _offset_translator; }
storage::offset_translator& offset_translator() { return _offset_translator; }
model::offset_delta offset_delta(model::offset) const final;
model::offset from_log_offset(model::offset) const final;
model::offset to_log_offset(model::offset) const final;
Expand Down Expand Up @@ -333,7 +333,7 @@ class disk_log_impl final : public log {
// method.
mutex _start_offset_lock{"disk_log_impl::start_offset_lock"};
lock_manager _lock_mngr;
raft::offset_translator _offset_translator;
storage::offset_translator _offset_translator;

std::unique_ptr<storage::probe> _probe;
failure_probes _failure_probes;
Expand Down
6 changes: 1 addition & 5 deletions src/v/storage/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class ntp_config;
class log;
class log_manager;
class probe;
class offset_translator;
class offset_translator_state;
class readers_cache;
class segment;
Expand All @@ -35,8 +36,3 @@ struct log_reader_config;
struct timequery_config;

} // namespace storage

// TODO: a subsequent commit will move this into the storage namespace.
namespace raft {
class offset_translator;
} // namespace raft
4 changes: 2 additions & 2 deletions src/v/storage/offset_translator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <seastar/core/coroutine.hh>
#include <seastar/util/log.hh>

namespace raft {
namespace storage {

static ss::logger logger{"offset_translator"};

Expand Down Expand Up @@ -446,4 +446,4 @@ ss::future<> offset_translator::move_persistent_state(
});
}

} // namespace raft
} // namespace storage
4 changes: 2 additions & 2 deletions src/v/storage/offset_translator.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

#include <absl/container/btree_map.h>

namespace raft {
namespace storage {

/// See also comments for storage::offset_translator_state.
///
Expand Down Expand Up @@ -155,4 +155,4 @@ class offset_translator {
storage::storage_resources& _resources;
};

} // namespace raft
} // namespace storage
28 changes: 14 additions & 14 deletions src/v/storage/tests/offset_translator_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ struct base_fixture {
storage::make_sanitized_file_config());
}

raft::offset_translator make_offset_translator() {
return raft::offset_translator{
storage::offset_translator make_offset_translator() {
return storage::offset_translator{
{model::record_batch_type::raft_configuration,
model::record_batch_type::checkpoint},
raft::group_id(0),
Expand All @@ -100,7 +100,7 @@ struct base_fixture {
};

void validate_translation(
raft::offset_translator& tr,
storage::offset_translator& tr,
model::offset log_offset,
model::offset kafka_offset) {
BOOST_REQUIRE_EQUAL(tr.state()->from_log_offset(log_offset), kafka_offset);
Expand All @@ -110,15 +110,15 @@ void validate_translation(
struct offset_translator_fixture : base_fixture {
offset_translator_fixture()
: tr(make_offset_translator()) {
tr.start(raft::offset_translator::must_reset::yes).get();
tr.start(storage::offset_translator::must_reset::yes).get();
}

void validate_offset_translation(
model::offset log_offset, model::offset kafka_offset) {
validate_translation(tr, log_offset, kafka_offset);
}

raft::offset_translator tr;
storage::offset_translator tr;
};

FIXTURE_TEST(test_translating_to_kafka_offsets, offset_translator_fixture) {
Expand Down Expand Up @@ -317,13 +317,13 @@ collect_base_offsets(ss::shared_ptr<storage::log> log) {
struct fuzz_checker {
fuzz_checker(
ss::shared_ptr<storage::log> log,
std::function<raft::offset_translator()>&& make_offset_translator)
std::function<storage::offset_translator()>&& make_offset_translator)
: _make_offset_translator(std::move(make_offset_translator))
, _log(std::move(log)) {}

ss::future<> start() {
_tr.emplace(_make_offset_translator());
co_await _tr->start(raft::offset_translator::must_reset::yes);
co_await _tr->start(storage::offset_translator::must_reset::yes);
co_await _tr->sync_with_log(*_log, std::nullopt);
}

Expand Down Expand Up @@ -474,7 +474,7 @@ struct fuzz_checker {
_tr.emplace(_make_offset_translator());
_gate = ss::gate{};

co_await _tr->start(raft::offset_translator::must_reset::no);
co_await _tr->start(storage::offset_translator::must_reset::no);
co_await _tr->prefix_truncate(
_snapshot_offset, model::offset_delta(_snapshot_delta));
co_await _tr->sync_with_log(*_log, std::nullopt);
Expand Down Expand Up @@ -522,9 +522,9 @@ struct fuzz_checker {

static const std::vector<model::record_batch_type> all_batch_types;

std::function<raft::offset_translator()> _make_offset_translator;
std::function<storage::offset_translator()> _make_offset_translator;

std::optional<raft::offset_translator> _tr;
std::optional<storage::offset_translator> _tr;
ss::gate _gate;

ss::shared_ptr<storage::log> _log;
Expand Down Expand Up @@ -615,7 +615,7 @@ FIXTURE_TEST(test_moving_persistent_state, base_fixture) {
// data batch @ 9 -> kafka 3
};
auto local_ot = make_offset_translator();
local_ot.start(raft::offset_translator::must_reset::yes).get();
local_ot.start(storage::offset_translator::must_reset::yes).get();
for (auto o : batch_offsets) {
local_ot.process(
create_batch(model::record_batch_type::raft_configuration, o));
Expand All @@ -641,22 +641,22 @@ FIXTURE_TEST(test_moving_persistent_state, base_fixture) {
// use last available shard
auto target_shard = ss::smp::count - 1;
// move state to target shard
raft::offset_translator::move_persistent_state(
storage::offset_translator::move_persistent_state(
raft::group_id(0), ss::this_shard_id(), target_shard, _api)
.get();

// validate translation on target shard
ss::smp::submit_to(
target_shard,
[&api = _api, ntp = test_ntp]() -> ss::future<> {
auto remote_ot = raft::offset_translator{
auto remote_ot = storage::offset_translator{
{model::record_batch_type::raft_configuration,
model::record_batch_type::checkpoint},
raft::group_id(0),
ntp,
api.local()};
return ss::do_with(std::move(remote_ot), [](auto& remote_ot) {
return remote_ot.start(raft::offset_translator::must_reset::no)
return remote_ot.start(storage::offset_translator::must_reset::no)
.then([&remote_ot] {
validate_translation(
remote_ot, model::offset(0), model::offset(0));
Expand Down

0 comments on commit e66fbd7

Please sign in to comment.