Skip to content

Commit

Permalink
Merge pull request #24365 from nvartolomei/nv/manual-backport-24342-v…
Browse files Browse the repository at this point in the history
…24.2.x-442

[v24.2.x] archival: consistent log size probes across replicas (pull)
  • Loading branch information
nvartolomei authored Dec 2, 2024
2 parents ad3e211 + 11ea3f6 commit 2528c1f
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 49 deletions.
24 changes: 4 additions & 20 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ ss::future<> ntp_archiver::upload_until_abort() {
co_return;
}
if (!_probe) {
_probe.emplace(_conf->ntp_metrics_disabled, _ntp);
_probe.emplace(
_conf->ntp_metrics_disabled, _ntp, _parent.archival_meta_stm());
}

while (!_as.abort_requested()) {
Expand Down Expand Up @@ -436,7 +437,8 @@ ss::future<> ntp_archiver::sync_manifest_until_abort() {
co_return;
}
if (!_probe) {
_probe.emplace(_conf->ntp_metrics_disabled, _ntp);
_probe.emplace(
_conf->ntp_metrics_disabled, _ntp, _parent.archival_meta_stm());
}

while (!_as.abort_requested()) {
Expand Down Expand Up @@ -769,8 +771,6 @@ ss::future<> ntp_archiver::upload_until_term_change() {
co_await maybe_flush_manifest_clean_offset();
}

update_probe();

// Drop _uploads_active lock: we are not considered active while
// sleeping for backoff at the end of the loop.
units.return_all();
Expand Down Expand Up @@ -896,22 +896,6 @@ ss::future<cloud_storage::download_result> ntp_archiver::sync_manifest() {
co_return cloud_storage::download_result::success;
}

void ntp_archiver::update_probe() {
const auto& man = manifest();

_probe->segments_in_manifest(man.size());

const auto first_addressable = man.first_addressable_segment();
const auto truncated_seg_count = first_addressable == man.end()
? 0
: first_addressable.index();

_probe->segments_to_delete(
truncated_seg_count + man.replaced_segments_count());

_probe->cloud_log_size(man.cloud_log_size());
}

bool ntp_archiver::can_update_archival_metadata() const {
return !_as.abort_requested() && !_gate.is_closed() && _parent.is_leader()
&& _parent.term() == _start_term;
Expand Down
2 changes: 0 additions & 2 deletions src/v/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -621,8 +621,6 @@ class ntp_archiver {
// the state of manifest uploads.
bool uploaded_data_past_flush_offset() const;

void update_probe();

/// Return true if archival metadata can be replicated.
/// This means that the replica is a leader, the term did not
/// change and the archiver is not stopping.
Expand Down
22 changes: 18 additions & 4 deletions src/v/archival/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "archival/probe.h"

#include "archival/archival_metadata_stm.h"
#include "config/configuration.h"
#include "metrics/prometheus_sanitize.h"

Expand All @@ -19,7 +20,10 @@
namespace archival {

ntp_level_probe::ntp_level_probe(
per_ntp_metrics_disabled disabled, const model::ntp& ntp) {
per_ntp_metrics_disabled disabled,
const model::ntp& ntp,
ss::shared_ptr<const cluster::archival_metadata_stm> stm)
: _stm(std::move(stm)) {
if (!disabled) {
setup_ntp_metrics(ntp);
}
Expand Down Expand Up @@ -118,21 +122,31 @@ void ntp_level_probe::setup_public_metrics(const model::ntp& ntp) {
.aggregate(aggregate_labels),
sm::make_gauge(
"segments",
[this] { return _segments_in_manifest; },
[this] { return _stm->manifest().size(); },
sm::description(
"Total number of accounted segments in the cloud for the topic"),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"segments_pending_deletion",
[this] { return _segments_to_delete; },
[this] {
const auto first_addressable
= _stm->manifest().first_addressable_segment();
const auto truncated_seg_count = first_addressable
== _stm->manifest().end()
? 0
: first_addressable.index();

return truncated_seg_count
+ _stm->manifest().replaced_segments_count();
},
sm::description("Total number of segments pending deletion from the "
"cloud for the topic"),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"cloud_log_size",
[this] { return _cloud_log_size; },
[this] { return _stm->manifest().cloud_log_size(); },
sm::description(
"Total size in bytes of the user-visible log for the topic"),
labels)
Expand Down
23 changes: 10 additions & 13 deletions src/v/archival/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

#include <cstdint>

namespace cluster {
class archival_metadata_stm;
}

namespace archival {

/// \brief Per-ntp archval service probe
Expand All @@ -30,7 +34,10 @@ namespace archival {
/// The unit of measure is offset delta.
class ntp_level_probe {
public:
ntp_level_probe(per_ntp_metrics_disabled disabled, const model::ntp& ntp);
ntp_level_probe(
per_ntp_metrics_disabled disabled,
const model::ntp& ntp,
ss::shared_ptr<const cluster::archival_metadata_stm> stm);
ntp_level_probe(const ntp_level_probe&) = delete;
ntp_level_probe& operator=(const ntp_level_probe&) = delete;
ntp_level_probe(ntp_level_probe&&) = delete;
Expand All @@ -56,12 +63,6 @@ class ntp_level_probe {
_segments_deleted += deleted_count;
};

void segments_in_manifest(int64_t count) { _segments_in_manifest = count; };

void segments_to_delete(int64_t count) { _segments_to_delete = count; };

void cloud_log_size(uint64_t size) { _cloud_log_size = size; }

void compacted_replaced_bytes(size_t bytes) {
_compacted_replaced_bytes = bytes;
}
Expand All @@ -77,17 +78,13 @@ class ntp_level_probe {
int64_t _pending = 0;
/// Number of segments deleted by garbage collection
int64_t _segments_deleted = 0;
/// Number of accounted segments in the cloud
int64_t _segments_in_manifest = 0;
/// Number of segments awaiting deletion
int64_t _segments_to_delete = 0;
/// size in bytes of the user-visible log
uint64_t _cloud_log_size = 0;
/// cloud bytes "removed" due to compaction operation
size_t _compacted_replaced_bytes = 0;

metrics::internal_metric_groups _metrics;
metrics::public_metric_groups _public_metrics;

ss::shared_ptr<const cluster::archival_metadata_stm> _stm;
};

/// Metrics probe for upload housekeeping service
Expand Down
40 changes: 30 additions & 10 deletions tests/rptest/tests/usage_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import time
import random
import operator
from rptest.services.redpanda import RedpandaService
from rptest.services.redpanda import MetricsEndpoint, RedpandaService
from rptest.clients.rpk import RpkTool
from requests.exceptions import HTTPError
from rptest.services.cluster import cluster
Expand Down Expand Up @@ -498,6 +498,19 @@ def test_usage_manager_cloud_storage(self):

bucket_view = BucketView(self.redpanda)

def fetch_metric_usage():
total_usage = 0
for topic in self.topics:
usage = self.redpanda.metric_sum(
"redpanda_cloud_storage_cloud_log_size",
metrics_endpoint=MetricsEndpoint.PUBLIC_METRICS,
topic=topic.name) / topic.replication_factor
self.logger.info(
f"Metric reported cloud storage usage for topic {topic.name}: {usage}"
)
total_usage += usage
return int(total_usage)

def check_usage():
# Check that the usage reporting system has reported correct values
manifest_usage = bucket_view.cloud_log_sizes_sum().total(
Expand All @@ -514,12 +527,19 @@ def check_usage():
self.logger.info(
f"Max reported usages via kafka/usage_manager: {max(reported_usages)}"
)
return manifest_usage in reported_usages

wait_until(
check_usage,
timeout_sec=30,
backoff_sec=1,
err_msg=
"Reported cloud storage usage (via usage endpoint) did not match the manifest inferred usage"
)
if manifest_usage not in reported_usages:
self.logger.info(
f"Reported usages: {reported_usages} does not contain {manifest_usage}"
)
return False

metric_usage = fetch_metric_usage()
self.logger.info(
f"Metric reported cloud storage usage: {metric_usage}")

return manifest_usage == metric_usage

wait_until(check_usage,
timeout_sec=30,
backoff_sec=1,
err_msg="Inconsistent cloud storage usage reported")

0 comments on commit 2528c1f

Please sign in to comment.