Skip to content

Commit

Permalink
Merge pull request #24436 from nvartolomei/nv/CORE-8349
Browse files Browse the repository at this point in the history
cluster: reject writes only when data disk is degraded
  • Loading branch information
nvartolomei authored Dec 7, 2024
2 parents 61b3215 + b6b9a60 commit 252f5be
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 22 deletions.
14 changes: 7 additions & 7 deletions src/v/cluster/health_monitor_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ health_monitor_backend::get_cluster_health(
}

ss::future<storage::disk_space_alert>
health_monitor_backend::get_cluster_disk_health(
health_monitor_backend::get_cluster_data_disk_health(
force_refresh refresh, model::timeout_clock::time_point deadline) {
auto ec = co_await maybe_refresh_cluster_health(refresh, deadline);
if (ec) {
Expand All @@ -308,7 +308,7 @@ health_monitor_backend::get_cluster_disk_health(
// operate, I guess.
co_return storage::disk_space_alert::ok;
}
co_return _reports_disk_health;
co_return _reports_data_disk_health;
}

ss::future<std::error_code>
Expand Down Expand Up @@ -444,8 +444,8 @@ ss::future<std::error_code> health_monitor_backend::collect_cluster_health() {

auto old_reports = std::exchange(_reports, {});

// update nodes reports and cache cluster-level disk health
storage::disk_space_alert cluster_disk_health
// update nodes reports and cache cluster-level data disk health
storage::disk_space_alert cluster_data_disk_health
= storage::disk_space_alert::ok;
for (auto& r : reports) {
if (r) {
Expand All @@ -471,14 +471,14 @@ ss::future<std::error_code> health_monitor_backend::collect_cluster_health() {
for (auto& cb : _node_callbacks) {
cb.second(r.value(), old_report);
}
cluster_disk_health = storage::max_severity(
r.value().local_state.get_disk_alert(), cluster_disk_health);
cluster_data_disk_health = storage::max_severity(
r.value().local_state.data_disk.alert, cluster_data_disk_health);

_reports.emplace(
id, ss::make_lw_shared<node_health_report>(std::move(r.value())));
}
}
_reports_disk_health = cluster_disk_health;
_reports_data_disk_health = cluster_data_disk_health;

if (config::shard_local_cfg().enable_usage()) {
vlog(clusterlog.info, "collecting cloud health statistics");
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/health_monitor_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class health_monitor_backend {
ss::future<result<cluster_health_report>> get_cluster_health(
cluster_report_filter, force_refresh, model::timeout_clock::time_point);

ss::future<storage::disk_space_alert> get_cluster_disk_health(
ss::future<storage::disk_space_alert> get_cluster_data_disk_health(
force_refresh refresh, model::timeout_clock::time_point deadline);

ss::future<result<node_health_report>> collect_current_node_health();
Expand Down Expand Up @@ -182,7 +182,7 @@ class health_monitor_backend {

status_cache_t _status;
report_cache_t _reports;
storage::disk_space_alert _reports_disk_health
storage::disk_space_alert _reports_data_disk_health
= storage::disk_space_alert::ok;
std::optional<size_t> _bytes_in_cloud_storage;

Expand Down
20 changes: 11 additions & 9 deletions src/v/cluster/health_monitor_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ health_monitor_frontend::get_cluster_health(
});
}

storage::disk_space_alert health_monitor_frontend::get_cluster_disk_health() {
return _cluster_disk_health;
storage::disk_space_alert
health_monitor_frontend::get_cluster_data_disk_health() {
return _cluster_data_disk_health;
}

/**
Expand Down Expand Up @@ -100,23 +101,24 @@ health_monitor_frontend::get_cluster_health_overview(

ss::future<> health_monitor_frontend::update_other_shards(
const storage::disk_space_alert dsa) {
co_await container().invoke_on_others(
[dsa](health_monitor_frontend& fe) { fe._cluster_disk_health = dsa; });
co_await container().invoke_on_others([dsa](health_monitor_frontend& fe) {
fe._cluster_data_disk_health = dsa;
});
}

ss::future<> health_monitor_frontend::update_frontend_and_backend_cache() {
auto deadline = model::time_from_now(default_timeout);
auto disk_health = co_await dispatch_to_backend(
[deadline](health_monitor_backend& be) {
return be.get_cluster_disk_health(force_refresh::no, deadline);
return be.get_cluster_data_disk_health(force_refresh::no, deadline);
});
if (disk_health != _cluster_disk_health) {
if (disk_health != _cluster_data_disk_health) {
vlog(
clusterlog.debug,
"Update disk health cache {} -> {}",
_cluster_disk_health,
"Update data disk health cache {} -> {}",
_cluster_data_disk_health,
disk_health);
_cluster_disk_health = disk_health;
_cluster_data_disk_health = disk_health;
co_await update_other_shards(disk_health);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/health_monitor_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class health_monitor_frontend
ss::future<result<cluster_health_report>> get_cluster_health(
cluster_report_filter, force_refresh, model::timeout_clock::time_point);

storage::disk_space_alert get_cluster_disk_health();
storage::disk_space_alert get_cluster_data_disk_health();

// Collects or return cached version of current node health report.
ss::future<result<node_health_report_ptr>> get_current_node_health();
Expand Down Expand Up @@ -101,7 +101,7 @@ class health_monitor_frontend
config::binding<std::chrono::milliseconds> _alive_timeout;

// Currently the worst / max of all nodes' disk space state
storage::disk_space_alert _cluster_disk_health{
storage::disk_space_alert _cluster_data_disk_health{
storage::disk_space_alert::ok};
ss::timer<ss::lowres_clock> _refresh_timer;
ss::gate _refresh_gate;
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/metadata_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ std::vector<model::node_id> metadata_cache::node_ids() const {
}

bool metadata_cache::should_reject_writes() const {
return _health_monitor.local().get_cluster_disk_health()
return _health_monitor.local().get_cluster_data_disk_health()
== storage::disk_space_alert::degraded;
}

Expand Down
2 changes: 1 addition & 1 deletion tests/rptest/tests/full_disk_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def check_health_monitor_frontend(disk_space_change: str):
# Looking for a log statement about a change in disk space.
# This is a check for the health monitor frontend because
# that structure logs disk space alerts.
pattern = f"Update disk health cache {disk_space_change}"
pattern = f"Update data disk health cache {disk_space_change}"
wait_until(
lambda: self.redpanda.search_log_any(pattern),
timeout_sec=5,
Expand Down

0 comments on commit 252f5be

Please sign in to comment.