Skip to content

Commit

Permalink
[v23.2.x] cloud: Add config options to trigger cache trim
Browse files Browse the repository at this point in the history
When the cloud storage cache is completely filled, puts to the cache
are blocked. This prevents readers from progressing. In some clusters
the trim operation can take a long time, up to hours. When this happens,
readers will time out, generating errors.

This commit adds two configuration options that control when the trim
is triggered. By setting these lower than the caches maximum size,
cloud storage can begin a trim operation before the cache is completely
filled, allowing readers to progress as the trim is going on in the
background.
- cloud_storage_cache_trim_threshold_size
- cloud_storage_cache_trim_threshold_objects

When a put occurs, if the cache is over these limits it triggers a trim
in the background.

(cherry picked from commit 027632c)
  • Loading branch information
jcipar committed Jun 11, 2024
1 parent b0ed98c commit e71f512
Show file tree
Hide file tree
Showing 6 changed files with 508 additions and 13 deletions.
256 changes: 244 additions & 12 deletions src/v/cloud_storage/cache_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <algorithm>
#include <exception>
#include <filesystem>
#include <optional>
#include <stdexcept>
#include <string_view>

Expand Down Expand Up @@ -134,10 +135,7 @@ void cache::update_max_bytes() {
_max_percent());

if (_current_cache_size > _max_bytes) {
ssx::spawn_with_gate(_gate, [this]() {
return ss::with_semaphore(
_cleanup_sm, 1, [this]() { return trim_throttled(); });
});
ssx::spawn_with_gate(_gate, [this]() { return trim_throttled(); });
}
}

Expand Down Expand Up @@ -263,7 +261,9 @@ std::optional<std::chrono::milliseconds> cache::get_trim_delay() const {
}
}

ss::future<> cache::trim_throttled() {
ss::future<> cache::trim_throttled_unlocked(
std::optional<uint64_t> size_limit_override,
std::optional<size_t> object_limit_override) {
// If we trimmed very recently then do not do it immediately:
// this reduces load and improves chance of currently promoted
// segments finishing their read work before we demote their
Expand All @@ -279,7 +279,15 @@ ss::future<> cache::trim_throttled() {
co_await ss::sleep_abortable(*trim_delay, _as);
}

co_await trim();
co_await trim(size_limit_override, object_limit_override);
}

ss::future<> cache::trim_throttled(
std::optional<uint64_t> size_limit_override,
std::optional<size_t> object_limit_override) {
auto units = co_await ss::get_units(_cleanup_sm, 1);
co_await trim_throttled_unlocked(
size_limit_override, object_limit_override);
}

ss::future<> cache::trim_manually(
Expand Down Expand Up @@ -1033,10 +1041,7 @@ ss::future<> cache::put(
// Trim proactively: if many fibers hit this concurrently,
// they'll contend for cleanup_sm and the losers will skip
// trim due to throttling.
{
auto units = co_await ss::get_units(_cleanup_sm, 1);
co_await trim_throttled();
}
co_await trim_throttled();

throw disk_full_error;
}
Expand Down Expand Up @@ -1213,9 +1218,167 @@ bool cache::may_exceed_limits(uint64_t bytes, size_t objects) {
&& !would_fit_in_cache;
}

<<<<<<< HEAD
=======
ss::future<cache::trim_result>
cache::trim_carryover(uint64_t delete_bytes, uint64_t delete_objects) {
// During the normal trim we're doing the recursive directory walk to
// generate a exhaustive list of files stored in the cache. If we store very
// large number of files in the cache this operation could take long time.
// We have a limit for number of objects that the cache could support but
// it's often set to relatively high value. Also, when we reach the object
// count limit the cache blocks all new 'put' operations because it doesn't
// allow any overallocation in this case.
//
// This creates a corner case when every trim is caused by the object count
// limit being reached. In this case the trim is blocking readers every
// time.
//
// The solution is to quickly delete objects without doing the full
// recursive directory walk and unblock the readers proactively allowing
// them object count to overshoot for very brief period of time. In order to
// be able to do this we need to have the list of candidates for deletion.
// Such list is stored in the _last_trim_carryover field. This is a list of
// files with oldest access times from the last directory walk. The
// carryover trim compares the access times from the carryover list to their
// actual access times from the access time tracker. All objects with
// matching access times wasn't accessed since the last trim and can be
// deleted. This doesn't change the LRU behavior since the
// _last_trim_carryover stores objects in LRU order.
trim_result result;
vlog(
cst_log.trace,
"trim carryover: list available {}",
_last_trim_carryover.has_value());

if (!_last_trim_carryover.has_value()) {
co_return result;
}
probe.carryover_trim();
auto it = _last_trim_carryover->begin();
for (; it < _last_trim_carryover->end(); it++) {
vlog(
cst_log.trace,
"carryover trim: check object {} ({})",
it->path,
it->size);
if (
result.deleted_size >= delete_bytes
&& result.deleted_count >= delete_objects) {
vlog(
cst_log.trace,
"carryover trim: stop, deleted {} / {}, requested to delete {} / "
"{}",
human::bytes(result.deleted_size),
result.deleted_count,
human::bytes(delete_bytes),
delete_objects);
break;
}
auto& file_stat = *it;
// Don't hit access time tracker file/tmp
if (
is_trim_exempt(file_stat.path)
|| std::string_view(file_stat.path).ends_with(tmp_extension)) {
continue;
}
// Both tx and index files are handled as part of the segment
// deletion.
if (
std::string_view(file_stat.path).ends_with(".tx")
|| std::string_view(file_stat.path).ends_with(".index")) {
continue;
}
// Check that access time didn't change
auto rel_path = _cache_dir
/ std::filesystem::relative(
std::filesystem::path(file_stat.path), _cache_dir);
auto estimate = _access_time_tracker.estimate_timestamp(
rel_path.native());
if (estimate != file_stat.access_time) {
vlog(
cst_log.trace,
"carryover file {} was accessed ({}) since the last trim ({}), "
"ignoring",
rel_path.native(),
estimate->time_since_epoch().count(),
file_stat.access_time.time_since_epoch().count());
// The file was accessed since we get the stats
continue;
}
auto op_res = co_await this->remove_segment_full(file_stat);
result.deleted_count += op_res.deleted_count;
result.deleted_size += op_res.deleted_size;
}
vlog(
cst_log.debug,
"carryover trim reclaimed {} bytes from {} files",
result.deleted_size,
result.deleted_count);

if (it == _last_trim_carryover->end()) {
_last_trim_carryover = std::nullopt;
} else {
fragmented_vector<file_list_item> tmp;
size_t estimate = _last_trim_carryover->end() - it;
tmp.reserve(estimate);
std::copy(it, _last_trim_carryover->end(), std::back_inserter(tmp));
_last_trim_carryover = std::move(tmp);
}

co_return result;
}

void cache::maybe_background_trim() {
auto& trim_threshold_pct_objects
= config::shard_local_cfg()
.cloud_storage_cache_trim_threshold_percent_objects;
auto& trim_threshold_pct_size
= config::shard_local_cfg()
.cloud_storage_cache_trim_threshold_percent_size;
if (
!trim_threshold_pct_size.value().has_value()
&& !trim_threshold_pct_objects.value().has_value()) {
return;
}

uint64_t target_bytes = uint64_t(
_max_bytes * trim_threshold_pct_size.value().value_or(100.0) / 100.0);
uint32_t target_objects = uint32_t(
_max_objects() * trim_threshold_pct_objects.value().value_or(100.0)
/ 100.0);

bool bytes_over_limit = _current_cache_size + _reserved_cache_size
> target_bytes;
bool objects_over_limit = _current_cache_objects + _reserved_cache_objects
> target_objects;

if (bytes_over_limit || objects_over_limit) {
auto units = ss::try_get_units(_cleanup_sm, 1);
if (units.has_value()) {
vlog(cst_log.debug, "Spawning background trim");
ssx::spawn_with_gate(
_gate,
[this,
target_bytes,
target_objects,
u = std::move(units)]() mutable {
return trim_throttled_unlocked(target_bytes, target_objects)
.finally([u = std::move(u)] {});
});
} else {
vlog(
cst_log.debug, "Not spawning background trim: already started");
}
}
}

>>>>>>> 027632c424 (cloud: Add config options to trigger cache trim)
ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) {
vassert(ss::this_shard_id() == ss::shard_id{0}, "Only call on shard 0");

maybe_background_trim();

if (may_reserve_space(bytes, objects)) {
// Fast path: space was available.
_reserved_cache_size += bytes;
Expand All @@ -1241,6 +1404,76 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) {
_reservations_pending,
_reservations_pending_objects);

<<<<<<< HEAD
=======
auto units = co_await ss::get_units(_cleanup_sm, 1);

// Situation may change after a scheduling point. Another fiber could
// trigger carryover trim which released some resources. Exit early in this
// case.
if (may_reserve_space(bytes, objects)) {
_reserved_cache_size += bytes;
_reserved_cache_objects += objects;
co_return;
}

// Do not increment _reservations_pending* before carryover trim is
// completed.
if (_last_trim_carryover.has_value()) {
// Slow path: try to run carryover trim if we have data
// from the previous trim.

auto short_term_hydrations_estimate
= config::shard_local_cfg().cloud_storage_max_connections()
* ss::smp::count;

// Here we're trying to estimate how much space do we need to
// free to allow all TS resources to be used again to download
// data from S3. This is only a crude estimate.
auto trim_bytes = std::min(
config::shard_local_cfg().log_segment_size()
* short_term_hydrations_estimate / 3,
_max_bytes);
auto trim_objects = std::min(
short_term_hydrations_estimate * 3, _max_objects());

vlog(
cst_log.debug,
"Carryover trim list has {} elements, trying to remove {} bytes "
"and {} objects",
_last_trim_carryover->size(),
human::bytes(trim_bytes),
trim_objects);

co_await trim_carryover(trim_bytes, trim_objects);
} else {
vlog(cst_log.debug, "Carryover trim list is empty");
}

if (may_reserve_space(bytes, objects)) {
_reserved_cache_size += bytes;
_reserved_cache_objects += objects;
// Carryover trim released enough space for this fiber to continue. But
// we are starting the trim in the background to release more space and
// refresh the carryover list. Without this subsequent 'reserve_space'
// calls will be removing elements from the carryover list until it's
// empty. After that the blocking trim will be forced and the readers
// will be blocked for the duration of the trim. To avoid this we need
// to run trim in the background even if the fiber is unblocked.
// We want number of full trims to match number of carryover trims.
vlog(cst_log.debug, "Spawning background trim_throttled");
ssx::spawn_with_gate(_gate, [this, u = std::move(units)]() mutable {
return trim_throttled_unlocked(std::nullopt, std::nullopt)
.finally([u = std::move(u)] {});
});
co_return;
}

// Slowest path: register a pending need for bytes that will be used in
// clean_up_cache to make space available, and then proceed to
// cooperatively call clean_up_cache along with anyone else who is
// waiting.
>>>>>>> 027632c424 (cloud: Add config options to trigger cache trim)
try {
auto units = co_await ss::get_units(_cleanup_sm, 1);
while (!may_reserve_space(bytes, objects)) {
Expand All @@ -1263,7 +1496,7 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) {
// After taking lock, there still isn't space: means someone
// else didn't take it and free space for us already, so we will
// do the trim.
co_await trim_throttled();
co_await trim_throttled_unlocked();
did_trim = true;
}

Expand Down Expand Up @@ -1411,5 +1644,4 @@ ss::future<> cache::initialize(std::filesystem::path cache_dir) {
co_await ss::recursive_touch_directory(cache_dir.string());
}
}

} // namespace cloud_storage
14 changes: 13 additions & 1 deletion src/v/cloud_storage/cache_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
#include <seastar/core/gate.hh>
#include <seastar/core/io_priority_class.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/thread.hh>

#include <filesystem>
#include <iterator>
#include <optional>
#include <set>
#include <string_view>

Expand Down Expand Up @@ -216,7 +219,16 @@ class cache : public ss::peering_sharded_service<cache> {
std::optional<std::chrono::milliseconds> get_trim_delay() const;

/// Invoke trim, waiting if not enough time passed since the last trim
ss::future<> trim_throttled();
ss::future<> trim_throttled_unlocked(
std::optional<uint64_t> size_limit_override = std::nullopt,
std::optional<size_t> object_limit_override = std::nullopt);

// Take the cleanup semaphore before calling trim_throttled
ss::future<> trim_throttled(
std::optional<uint64_t> size_limit_override = std::nullopt,
std::optional<size_t> object_limit_override = std::nullopt);

void maybe_background_trim();

/// Whether an objects path makes it impervious to pinning, like
/// the access time tracker.
Expand Down
Loading

0 comments on commit e71f512

Please sign in to comment.