Skip to content

Commit

Permalink
k/replicated_partition: fixed querying end offset of an empty log
Browse files Browse the repository at this point in the history
When querying offset translator we are relying on the property of the
end offset or high watermark being exclusive. Changed the behavior of
log end offset to increment offset in Raft offset space and then
translate to achieve correct behavior when log is ended with a batch
subjected to offset translation.

Fixes: #16612

Signed-off-by: Michał Maślanka <[email protected]>
(cherry picked from commit fd04fd7)
  • Loading branch information
mmaslankaprv committed Apr 16, 2024
1 parent aa369d7 commit 076a78a
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 3 deletions.
4 changes: 2 additions & 2 deletions src/v/kafka/server/replicated_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ class replicated_partition final : public kafka::partition_proxy::impl {
/**
* By default we return a dirty_offset + 1
*/
return model::next_offset(
_translator->from_log_offset(_partition->dirty_offset()));
return _translator->from_log_offset(
model::next_offset(_partition->dirty_offset()));
}

model::offset leader_high_watermark() const {
Expand Down
3 changes: 2 additions & 1 deletion src/v/kafka/server/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ set(srcs
alter_config_test.cc
produce_consume_test.cc
group_metadata_serialization_test.cc
partition_reassignments_test.cc)
partition_reassignments_test.cc
replicated_partition_test.cc)

rp_test(
FIXTURE_TEST
Expand Down
78 changes: 78 additions & 0 deletions src/v/kafka/server/tests/replicated_partition_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "cluster/fwd.h"
#include "cluster/types.h"
#include "kafka/server/partition_proxy.h"
#include "kafka/server/replicated_partition.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/namespace.h"
#include "model/record_batch_reader.h"
#include "model/record_batch_types.h"
#include "model/timeout_clock.h"
#include "raft/replicate.h"
#include "redpanda/tests/fixture.h"
#include "storage/record_batch_builder.h"
#include "test_utils/async.h"

FIXTURE_TEST(test_replicated_partition_end_offset, redpanda_thread_fixture) {
wait_for_controller_leadership().get();

model::topic_namespace tp_ns(
model::kafka_namespace, model::topic("test-topic"));

add_topic(tp_ns).get();
model::ntp ntp(tp_ns.ns, tp_ns.tp, model::partition_id(0));
auto shard = app.shard_table.local().shard_for(ntp);

tests::cooperative_spin_wait_with_timeout(10s, [this, shard, &ntp] {
return app.partition_manager.invoke_on(
*shard, [&ntp](cluster::partition_manager& pm) {
auto p = pm.get(ntp);
return p->get_leader_id().has_value();
});
}).get();

app.partition_manager
.invoke_on(
*shard,
[&ntp](cluster::partition_manager& pm) {
auto p = pm.get(ntp);
kafka::replicated_partition rp(p);
auto p_info = rp.get_partition_info();
/**
* Since log is empty from Kafka client perspective (no data
* batches), the end offset which is exclusive must be equal to 0
*/
BOOST_REQUIRE_EQUAL(rp.log_end_offset(), model::offset{0});
BOOST_REQUIRE_EQUAL(rp.high_watermark(), model::offset{0});

storage::record_batch_builder builder(
model::record_batch_type::version_fence, model::offset(0));
builder.add_raw_kv(iobuf{}, iobuf{});
builder.add_raw_kv(iobuf{}, iobuf{});
builder.add_raw_kv(iobuf{}, iobuf{});

// replicate a batch that is subjected to offset translation
return p
->replicate(
model::make_memory_record_batch_reader(
{std::move(builder).build()}),
raft::replicate_options(raft::consistency_level::quorum_ack))
.then([p, rp](result<cluster::kafka_result> rr) {
BOOST_REQUIRE(rr.has_value());
BOOST_REQUIRE_GT(p->dirty_offset(), model::offset{0});

BOOST_REQUIRE_EQUAL(rp.log_end_offset(), model::offset{0});
BOOST_REQUIRE_EQUAL(rp.high_watermark(), model::offset{0});
});
})
.get();
}

0 comments on commit 076a78a

Please sign in to comment.