Skip to content

Commit

Permalink
Merge pull request #14771 from Lazin/backport/14637
Browse files Browse the repository at this point in the history
[v23.2.x] cloud_storage: Use abort source when waiting for semaphore units
  • Loading branch information
piyushredpanda authored Nov 6, 2023
2 parents 8e9fe51 + b48d3b9 commit 76a7486
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 19 deletions.
26 changes: 20 additions & 6 deletions src/v/cloud_storage/materialized_resources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,19 @@ void materialized_resources::register_segment(materialized_segment_state& s) {
_materialized.push_back(s);
}

namespace {

ss::future<ssx::semaphore_units> get_units_abortable(
adjustable_semaphore& sem, ssize_t units, storage::opt_abort_source_t as) {
return as.has_value() ? sem.get_units(units, as.value())
: sem.get_units(units);
}

} // namespace

ss::future<segment_reader_units>
materialized_resources::get_segment_reader_units() {
materialized_resources::get_segment_reader_units(
storage::opt_abort_source_t as) {
if (_segment_reader_units.available_units() <= 0) {
// Update metrics counter if we are trying to acquire units while
// saturated
Expand All @@ -161,29 +172,32 @@ materialized_resources::get_segment_reader_units() {
trim_segment_readers(max_segment_readers() / 2);
}

auto semaphore_units = co_await _segment_reader_units.get_units(1);
auto semaphore_units = co_await get_units_abortable(
_segment_reader_units, 1, as);
co_return segment_reader_units{std::move(semaphore_units)};
}

ss::future<ssx::semaphore_units>
materialized_resources::get_partition_reader_units(size_t n) {
materialized_resources::get_partition_reader_units(
size_t n, storage::opt_abort_source_t as) {
if (_partition_reader_units.available_units() <= 0) {
// Update metrics counter if we are trying to acquire units while
// saturated
_partition_readers_delayed += 1;
}
return _partition_reader_units.get_units(n);
return get_units_abortable(_partition_reader_units, n, as);
}

ss::future<segment_units> materialized_resources::get_segment_units() {
ss::future<segment_units>
materialized_resources::get_segment_units(storage::opt_abort_source_t as) {
if (_segment_units.available_units() <= 0) {
// Update metrics counter if we are trying to acquire units while
// saturated
_segments_delayed += 1;

trim_segments(max_segments() / 2);
}
auto semaphore_units = co_await _segment_units.get_units(1);
auto semaphore_units = co_await get_units_abortable(_segment_units, 1, as);
co_return segment_units{std::move(semaphore_units)};
}

Expand Down
8 changes: 5 additions & 3 deletions src/v/cloud_storage/materialized_resources.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ class materialized_resources {

void register_segment(materialized_segment_state& s);

ss::future<segment_reader_units> get_segment_reader_units();
ss::future<segment_reader_units>
get_segment_reader_units(storage::opt_abort_source_t as);

ss::future<ssx::semaphore_units> get_partition_reader_units(size_t);
ss::future<ssx::semaphore_units>
get_partition_reader_units(size_t, storage::opt_abort_source_t as);

ss::future<segment_units> get_segment_units();
ss::future<segment_units> get_segment_units(storage::opt_abort_source_t as);

materialized_manifest_cache& get_materialized_manifest_cache();

Expand Down
24 changes: 14 additions & 10 deletions src/v/cloud_storage/remote_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,10 +484,11 @@ class partition_record_batch_reader_impl final
}

ss::future<> init_cursor(storage::log_reader_config config) {
auto segment_unit
= co_await _partition->materialized().get_segment_units();
auto segment_unit = co_await _partition->materialized()
.get_segment_units(config.abort_source);
auto segment_reader_unit
= co_await _partition->materialized().get_segment_reader_units();
= co_await _partition->materialized().get_segment_reader_units(
config.abort_source);

async_view_search_query_t query;
if (config.first_timestamp.has_value()) {
Expand Down Expand Up @@ -662,11 +663,11 @@ class partition_record_batch_reader_impl final
_next_segment_base_offset,
_view_cursor->get_status());

auto segment_unit
= co_await _partition->materialized().get_segment_units();
auto segment_unit = co_await _partition->materialized()
.get_segment_units(config.abort_source);
auto segment_reader_unit
= co_await _partition->materialized().get_segment_reader_units();

= co_await _partition->materialized().get_segment_reader_units(
config.abort_source);
auto maybe_manifest = _view_cursor->manifest();
if (
maybe_manifest.has_value()
Expand Down Expand Up @@ -934,7 +935,8 @@ remote_partition::aborted_transactions(offset_range offsets) {
// in a failure to materialise. This should be transient however.
// One solution for this is to grab all the required segment units
// up front at the start of the function.
auto segment_unit = co_await materialized().get_segment_units();
auto segment_unit = co_await materialized().get_segment_units(
std::nullopt);
auto path = stm_manifest.generate_segment_path(*it);
auto m = get_or_materialize_segment(
path, *it, std::move(segment_unit));
Expand Down Expand Up @@ -982,7 +984,8 @@ remote_partition::aborted_transactions(offset_range offsets) {
});

for (const auto& [meta, path] : meta_to_materialize) {
auto segment_unit = co_await materialized().get_segment_units();
auto segment_unit = co_await materialized().get_segment_units(
std::nullopt);
auto m = get_or_materialize_segment(
path, meta, std::move(segment_unit));
auto tx = co_await m->second->segment->aborted_transactions(
Expand Down Expand Up @@ -1083,7 +1086,8 @@ ss::future<storage::translating_reader> remote_partition::make_reader(
config,
_segments.size());

auto units = co_await _api.materialized().get_partition_reader_units(1);
auto units = co_await _api.materialized().get_partition_reader_units(
1, config.abort_source);
auto ot_state = ss::make_lw_shared<storage::offset_translator_state>(
get_ntp());
auto impl = std::make_unique<partition_record_batch_reader_impl>(
Expand Down
9 changes: 9 additions & 0 deletions src/v/utils/adjustable_semaphore.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ class adjustable_semaphore {
return ss::get_units(_sem, units);
}

/**
* Blocking get units: will block until units are available or until abort
* source is triggered.
*/
ss::future<ssx::semaphore_units>
get_units(size_t units, ss::abort_source& as) {
return ss::get_units(_sem, units, as);
}

size_t current() const noexcept { return _sem.current(); }
ssize_t available_units() const noexcept { return _sem.available_units(); }

Expand Down

0 comments on commit 76a7486

Please sign in to comment.