Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.2.x] cloud_storage: various non-functional changes #17531

Open
wants to merge 3 commits into
base: v23.2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 46 additions & 44 deletions src/v/cloud_storage/remote_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,15 @@ class partition_record_batch_reader_impl final
_partition_reader_as = config.abort_source;
auto sub = config.abort_source->get().subscribe([this]() noexcept {
vlog(_ctxlog.debug, "abort requested via config.abort_source");
if (_reader) {
_partition->evict_segment_reader(std::move(_reader));
if (_seg_reader) {
_partition->evict_segment_reader(std::move(_seg_reader));
}
});
if (sub) {
_as_sub = std::move(*sub);
} else {
vlog(_ctxlog.debug, "abort_source is triggered in c-tor");
_reader = {};
_seg_reader = {};
}
}

Expand All @@ -259,7 +259,7 @@ class partition_record_batch_reader_impl final
auto ntp = _partition->get_ntp();
vlog(_ctxlog.trace, "Destructing reader {}", ntp);
_partition->_ts_probe.reader_destroyed();
if (_reader) {
if (_seg_reader) {
// We must not destroy this reader: it is not safe to do so
// without calling stop() on it. The remote_partition is
// responsible for cleaning up readers, including calling
Expand Down Expand Up @@ -298,7 +298,7 @@ class partition_record_batch_reader_impl final
operator=(const partition_record_batch_reader_impl& o)
= delete;

bool is_end_of_stream() const override { return _reader == nullptr; }
bool is_end_of_stream() const override { return _seg_reader == nullptr; }

void throw_on_external_abort() {
_partition->_as.check();
Expand All @@ -319,11 +319,11 @@ class partition_record_batch_reader_impl final
"empty");
co_return storage_t{};
}
if (_reader->config().over_budget) {
if (_seg_reader->config().over_budget) {
vlog(
_ctxlog.debug,
"We're over-budget, stopping, config: {}",
_reader->config());
_seg_reader->config());
// We need to stop in such way that will keep the
// reader in the reusable state, so we could reuse
// it on next iteration
Expand All @@ -339,7 +339,7 @@ class partition_record_batch_reader_impl final
}

throw_on_external_abort();
auto reader_delta = _reader->current_delta();
auto reader_delta = _seg_reader->current_delta();
if (
!_ot_state->empty()
&& _ot_state->last_delta() > reader_delta) {
Expand Down Expand Up @@ -371,7 +371,7 @@ class partition_record_batch_reader_impl final
"of the current reader: {}, delta offset of the offset "
"translator: {}, first offset produced by this reader: "
"{}",
_reader->config(),
_seg_reader->config(),
reader_delta,
_ot_state->last_delta(),
_first_produced_offset);
Expand All @@ -386,10 +386,10 @@ class partition_record_batch_reader_impl final
_ctxlog.debug,
"Invoking 'read_some' on current log reader with config: "
"{}",
_reader->config());
_seg_reader->config());

try {
auto result = co_await _reader->read_some(
auto result = co_await _seg_reader->read_some(
deadline, *_ot_state);
throw_on_external_abort();

Expand Down Expand Up @@ -419,7 +419,7 @@ class partition_record_batch_reader_impl final
_ctxlog.warn,
"stuck reader: current rp offset: {}, max rp offset: {}",
ex.rp_offset,
_reader->max_rp_offset());
_seg_reader->max_rp_offset());

// If the reader is stuck because of a mismatch between
// segment data and manifest entry, set reader to EOF and
Expand All @@ -434,7 +434,7 @@ class partition_record_batch_reader_impl final
if (
model::next_offset(ex.rp_offset)
>= _next_segment_base_offset
&& !_reader->is_eof()) {
&& !_seg_reader->is_eof()) {
vlog(
_ctxlog.info,
"mismatch between current segment end and manifest "
Expand All @@ -443,10 +443,10 @@ class partition_record_batch_reader_impl final
"set EOF on reader and try to "
"reset",
ex.rp_offset,
_reader->max_rp_offset(),
_seg_reader->max_rp_offset(),
_next_segment_base_offset,
_reader->is_eof());
_reader->set_eof();
_seg_reader->is_eof());
_seg_reader->set_eof();
continue;
}
throw;
Expand All @@ -472,7 +472,7 @@ class partition_record_batch_reader_impl final
// we've thrown an exception. Regardless of which error, the reader may
// have been left in an indeterminate state. Re-set the pointer to it
// to ensure that it will not be reused.
if (_reader) {
if (_seg_reader) {
co_await set_end_of_stream();
}
if (unknown_exception_ptr) {
Expand All @@ -482,7 +482,7 @@ class partition_record_batch_reader_impl final
vlog(
_ctxlog.debug,
"EOS reached, reader available: {}, is end of stream: {}",
static_cast<bool>(_reader),
static_cast<bool>(_seg_reader),
is_end_of_stream());
co_return storage_t{};
}
Expand All @@ -494,8 +494,8 @@ class partition_record_batch_reader_impl final
private:
/// Return or evict currently referenced reader
void dispose_current_reader() {
if (_reader) {
_partition->return_segment_reader(std::move(_reader));
if (_seg_reader) {
_partition->return_segment_reader(std::move(_seg_reader));
}
}

Expand Down Expand Up @@ -615,7 +615,7 @@ class partition_record_batch_reader_impl final
std::move(segment_unit),
std::move(segment_reader_unit));
if (reader) {
_reader = std::move(reader);
_seg_reader = std::move(reader);
_next_segment_base_offset = next_offset;
return;
}
Expand All @@ -625,7 +625,7 @@ class partition_record_batch_reader_impl final
"segment not "
"found, config: {}",
config);
_reader = {};
_seg_reader = {};
_next_segment_base_offset = {};
}

Expand Down Expand Up @@ -663,17 +663,19 @@ class partition_record_batch_reader_impl final
/// attached.
ss::future<bool> maybe_reset_reader() {
vlog(_ctxlog.debug, "maybe_reset_reader called");
if (!_reader) {
if (!_seg_reader) {
co_return false;
}
if (_reader->config().start_offset > _reader->config().max_offset) {
if (
_seg_reader->config().start_offset
> _seg_reader->config().max_offset) {
vlog(
_ctxlog.debug,
"maybe_reset_stream called - stream already consumed, start "
"{}, "
"max {}",
_reader->config().start_offset,
_reader->config().max_offset);
_seg_reader->config().start_offset,
_seg_reader->config().max_offset);
// Entire range is consumed, detach from remote_partition and
// close the reader.
co_await set_end_of_stream();
Expand All @@ -683,29 +685,29 @@ class partition_record_batch_reader_impl final
_ctxlog.debug,
"maybe_reset_reader, config start_offset: {}, reader max_offset: "
"{}",
_reader->config().start_offset,
_reader->max_rp_offset());
_seg_reader->config().start_offset,
_seg_reader->max_rp_offset());

// The next offset should be below the next segment base offset if the
// reader has not finished. If the next offset to be read from has
// reached the next segment but the reader is not finished, then the
// state is inconsistent.
if (_reader->is_eof()) {
auto prev_max_offset = _reader->max_rp_offset();
auto config = _reader->config();
if (_seg_reader->is_eof()) {
auto prev_max_offset = _seg_reader->max_rp_offset();
auto config = _seg_reader->config();
vlog(
_ctxlog.debug,
"maybe_reset_stream condition triggered after offset: {}, "
"reader's current log offset: {}, config.start_offset: {}, "
"reader's max log offset: {}, is EOF: {}, next base_offset "
"estimate: {}",
prev_max_offset,
_reader->current_rp_offset(),
_seg_reader->current_rp_offset(),
config.start_offset,
_reader->max_rp_offset(),
_reader->is_eof(),
_seg_reader->max_rp_offset(),
_seg_reader->is_eof(),
_next_segment_base_offset);
_partition->evict_segment_reader(std::move(_reader));
_partition->evict_segment_reader(std::move(_seg_reader));
vlog(
_ctxlog.debug,
"initializing new segment reader {}, next offset {}, manifest "
Expand Down Expand Up @@ -784,32 +786,32 @@ class partition_record_batch_reader_impl final
std::move(segment_reader_unit),
_next_segment_base_offset);
_next_segment_base_offset = new_next_offset;
_reader = std::move(new_reader);
_seg_reader = std::move(new_reader);
}
if (maybe_manifest.has_value() && _reader != nullptr) {
if (maybe_manifest.has_value() && _seg_reader != nullptr) {
vassert(
prev_max_offset != _reader->max_rp_offset(),
prev_max_offset != _seg_reader->max_rp_offset(),
"Progress stall detected, ntp: {}, max offset of prev "
"reader: {}, max offset of the new reader {}",
_partition->get_ntp(),
prev_max_offset,
_reader->max_rp_offset());
_seg_reader->max_rp_offset());
}
}
vlog(
_ctxlog.debug,
"maybe_reset_reader completed, reader is present: {}, is end of "
"stream: {}",
static_cast<bool>(_reader),
static_cast<bool>(_seg_reader),
is_end_of_stream());
co_return static_cast<bool>(_reader);
co_return static_cast<bool>(_seg_reader);
}

/// Transition reader to the completed state. Stop tracking state in
/// the 'remote_partition'
ss::future<> set_end_of_stream() {
co_await _reader->stop();
_reader = {};
co_await _seg_reader->stop();
_seg_reader = {};
}

retry_chain_node _rtc;
Expand Down Expand Up @@ -838,7 +840,7 @@ class partition_record_batch_reader_impl final

ss::lw_shared_ptr<storage::offset_translator_state> _ot_state;
/// Reader state that was borrowed from the materialized_segment_state
std::unique_ptr<remote_segment_batch_reader> _reader;
std::unique_ptr<remote_segment_batch_reader> _seg_reader;
/// Cancellation subscription
ss::abort_source::subscription _as_sub;
/// Reference to the abort source of the partition reader
Expand Down
34 changes: 14 additions & 20 deletions src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,7 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
const model::ntp& ntp,
retry_chain_node& rtc)
: _config(conf)
, _parent(parent)
, _seg_reader(parent)
, _term(term)
, _rtc(&rtc)
, _ctxlog(cst_log, _rtc, ntp.path())
Expand All @@ -1156,18 +1156,11 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
/// \note this can only be applied to current record batch
kafka::offset rp_to_kafka(model::offset k) const noexcept {
vassert(
k() >= _parent._cur_delta(),
k() >= _seg_reader._cur_delta(),
"Redpanda offset {} is smaller than the delta {}",
k,
_parent._cur_delta);
return k - _parent._cur_delta;
}

/// Translate kafka offset to redpanda offset
///
/// \note this can only be applied to current record batch
model::offset kafka_to_rp(kafka::offset k) const noexcept {
return k + _parent._cur_delta;
_seg_reader._cur_delta);
return k - _seg_reader._cur_delta;
}

/// Point config.start_offset to the next record batch
Expand All @@ -1176,7 +1169,7 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
/// \note this can only be applied to current record batch
void
advance_config_offsets(const model::record_batch_header& header) noexcept {
_parent._cur_rp_offset = header.last_offset() + model::offset{1};
_seg_reader._cur_rp_offset = header.last_offset() + model::offset{1};

if (header.type == model::record_batch_type::raft_data) {
auto next = rp_to_kafka(header.last_offset()) + model::offset(1);
Expand All @@ -1193,7 +1186,7 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
"[{}] accept_batch_start {}, current delta: {}",
_config.client_address,
header,
_parent._cur_delta);
_seg_reader._cur_delta);

if (rp_to_kafka(header.base_offset) > _config.max_offset) {
vlog(
Expand Down Expand Up @@ -1290,21 +1283,22 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
_filtered_types.begin(), _filtered_types.end(), header.type)
> 0) {
vassert(
_parent._cur_ot_state,
_seg_reader._cur_ot_state,
"ntp {}: offset translator state for "
"remote_segment_batch_consumer not initialized",
_parent._seg->get_ntp());
_seg_reader._seg->get_ntp());

vlog(
_ctxlog.debug,
"added offset translation gap [{}-{}], current state: {}",
header.base_offset,
header.last_offset(),
_parent._cur_ot_state);
_seg_reader._cur_ot_state);

_parent._cur_ot_state->get().add_gap(
_seg_reader._cur_ot_state->get().add_gap(
header.base_offset, header.last_offset());
_parent._cur_delta += header.last_offset_delta + model::offset(1);
_seg_reader._cur_delta += header.last_offset_delta
+ model::offset(1);
}
}

Expand All @@ -1328,7 +1322,7 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
batch.header().header_crc = model::internal_header_only_crc(
batch.header());

size_t sz = _parent.produce(std::move(batch));
size_t sz = _seg_reader.produce(std::move(batch));

if (_config.over_budget) {
co_return stop_parser::yes;
Expand All @@ -1347,7 +1341,7 @@ class remote_segment_batch_consumer : public storage::batch_consumer {

private:
storage::log_reader_config& _config;
remote_segment_batch_reader& _parent;
remote_segment_batch_reader& _seg_reader;
model::record_batch_header _header;
iobuf _records;
model::term_id _term;
Expand Down
Loading