Skip to content

Commit

Permalink
Merge pull request #23405 from Lazin/pr/archiver-revamp4
Browse files Browse the repository at this point in the history
archival: Update ntp_archiver (pt.4)
  • Loading branch information
Lazin authored Nov 30, 2024
2 parents 7c819ba + e4c1072 commit 8ebd54a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
9 changes: 5 additions & 4 deletions src/v/cluster/archival/async_data_uploader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,20 @@ class reader_ds : public ss::data_source_impl {
/// read buffer.
ss::future<ss::stop_iteration> operator()(model::record_batch batch) {
vlog(
_parent->_ctxlog.debug,
_parent->_ctxlog.trace,
"consume started, buffered {} bytes",
_parent->_buffer.size_bytes());
if (!_range.contains(batch.header())) {
vlog(
_parent->_ctxlog.debug,
_parent->_ctxlog.trace,
"skip batch {}-{}",
batch.base_offset(),
batch.last_offset());

co_return ss::stop_iteration::no;
}
vlog(
_parent->_ctxlog.debug,
_parent->_ctxlog.trace,
"consuming batch {}-{}",
batch.base_offset(),
batch.last_offset());
Expand All @@ -71,7 +71,7 @@ class reader_ds : public ss::data_source_impl {
bool max_bytes_reached = _parent->_buffer.size_bytes()
> _parent->_max_bytes;
vlog(
_parent->_ctxlog.debug,
_parent->_ctxlog.trace,
"max_bytes_reached: {}",
max_bytes_reached);
co_return max_bytes_reached ? ss::stop_iteration::yes
Expand Down Expand Up @@ -137,6 +137,7 @@ class reader_ds : public ss::data_source_impl {
co_return empty;
}
ss::future<> close() override {
vlog(_ctxlog.trace, "Close reader");
co_await std::move(_reader).release()->finally();
}

Expand Down
17 changes: 17 additions & 0 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2285,8 +2285,25 @@ disk_log_impl::offset_range_size(
auto delta = target.target_size - prev;
truncate_after = delta;
}
auto committed_offset = it->get()->offsets().get_committed_offset();
if (committed_offset == model::offset{}) {
co_return std::nullopt;
}
auto last_index_entry = it->get()->index().find_above_size_bytes(
truncate_after);
if (
last_index_entry.has_value()
&& last_index_entry->offset > committed_offset) {
// The index entry overshoots the committed offset
vlog(
stlog.debug,
"Index lookup overshoot committed offset {}, index entry "
"found: {}",
committed_offset,
last_index_entry->offset);
last_index_entry = it->get()->index().find_nearest(
committed_offset);
}
if (
last_index_entry.has_value()
&& model::prev_offset(last_index_entry->offset) > first) {
Expand Down

0 comments on commit 8ebd54a

Please sign in to comment.