Skip to content

Commit

Permalink
feat(metrics): add metrics for object upload
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Jul 11, 2023
1 parent 0616504 commit 569a1e1
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void copyLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegmentMe
final InputStream transactionIndex = Files.newInputStream(logSegmentData.transactionIndex().get());
uploadIndexFile(remoteLogSegmentMetadata, transactionIndex, TRANSACTION);
}
final ByteBufferInputStream leaderEpoch = new ByteBufferInputStream(logSegmentData.leaderEpochIndex());
final InputStream leaderEpoch = new ByteBufferInputStream(logSegmentData.leaderEpochIndex());
uploadIndexFile(remoteLogSegmentMetadata, leaderEpoch, LEADER_EPOCH);
} catch (final StorageBackendException | IOException e) {
metrics.recordSegmentCopyError(remoteLogSegmentMetadata.remoteLogSegmentId()
Expand Down Expand Up @@ -252,17 +252,28 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet
throws IOException, StorageBackendException {
final String fileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.LOG);
try (final var sis = transformFinisher.toInputStream()) {
uploader.upload(sis, fileKey);
final var bytes = uploader.upload(sis, fileKey);
metrics.recordObjectUpload(
remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
ObjectKey.Suffix.LOG,
bytes
);
}
}

private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final InputStream index,
final IndexType indexType)
throws StorageBackendException, IOException {
final String key = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.fromIndexType(indexType));
final var suffix = ObjectKey.Suffix.fromIndexType(indexType);
final String key = objectKey.key(remoteLogSegmentMetadata, suffix);
try (index) {
uploader.upload(index, key);
final var bytes = uploader.upload(index, key);
metrics.recordObjectUpload(
remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
suffix,
bytes
);
}
}

Expand All @@ -273,7 +284,12 @@ private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetad
final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);

try (final ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes())) {
uploader.upload(manifestContent, manifestFileKey);
final var bytes = uploader.upload(manifestContent, manifestFileKey);
metrics.recordObjectUpload(
remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
ObjectKey.Suffix.MANIFEST,
bytes
);
}
}

Expand Down
101 changes: 100 additions & 1 deletion core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,13 @@
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;

import io.aiven.kafka.tieredstorage.ObjectKey;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.OBJECT_UPLOAD;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.OBJECT_UPLOAD_BYTES;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_COPY;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_COPY_BYTES;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_COPY_ERRORS;
Expand All @@ -43,9 +47,15 @@
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_DELETE_TIME;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_FETCH;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_FETCH_REQUESTED_BYTES;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.objectTypeTags;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.sensorName;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.sensorNameByObjectType;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.sensorNameByTopic;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.sensorNameByTopicAndObjectType;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.sensorNameByTopicPartition;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.sensorNameByTopicPartitionAndObjectType;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.topicAndObjectTypeTags;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.topicPartitionAndObjectTypeTags;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.topicPartitionTags;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.topicTags;

Expand All @@ -60,7 +70,9 @@ public Metrics(final Time time, final MetricConfig metricConfig) {
final JmxReporter reporter = new JmxReporter();

metrics = new org.apache.kafka.common.metrics.Metrics(
metricConfig, List.of(reporter), time,
metricConfig,
List.of(reporter),
time,
new KafkaMetricsContext("aiven.kafka.server.tieredstorage")
);

Expand Down Expand Up @@ -288,6 +300,93 @@ private void recordSegmentFetchRequests(final TopicPartition topicPartition) {
.record();
}

public void recordObjectUpload(final TopicPartition topicPartition, final ObjectKey.Suffix suffix,
final long bytes) {
recordObjectUploadRequests(topicPartition, suffix);
recordObjectUploadBytes(topicPartition, suffix, bytes);
}

private void recordObjectUploadBytes(final TopicPartition topicPartition,
final ObjectKey.Suffix suffix,
final long bytes) {
new SensorProvider(metrics, sensorName(OBJECT_UPLOAD_BYTES))
.with(metricsRegistry.objectUploadBytesRate, new Rate())
.with(metricsRegistry.objectUploadBytesTotal, new CumulativeSum())
.get()
.record(bytes);
new SensorProvider(metrics, sensorNameByTopic(topicPartition, OBJECT_UPLOAD_BYTES),
() -> topicTags(topicPartition))
.with(metricsRegistry.objectUploadBytesRateByTopic, new Rate())
.with(metricsRegistry.objectUploadBytesTotalByTopic, new CumulativeSum())
.get()
.record(bytes);
new SensorProvider(metrics, sensorNameByTopicPartition(topicPartition, OBJECT_UPLOAD_BYTES),
() -> topicPartitionTags(topicPartition), Sensor.RecordingLevel.DEBUG)
.with(metricsRegistry.objectUploadBytesRateByTopicPartition, new Rate())
.with(metricsRegistry.objectUploadBytesTotalByTopicPartition, new CumulativeSum())
.get()
.record(bytes);
// with suffix
new SensorProvider(metrics, sensorNameByObjectType(suffix, OBJECT_UPLOAD_BYTES),
() -> objectTypeTags(suffix), Sensor.RecordingLevel.DEBUG)
.with(metricsRegistry.objectUploadBytesRateByObjectType, new Rate())
.with(metricsRegistry.objectUploadBytesTotalByObjectType, new CumulativeSum())
.get()
.record(bytes);
new SensorProvider(metrics, sensorNameByTopicAndObjectType(topicPartition, suffix, OBJECT_UPLOAD_BYTES),
() -> topicAndObjectTypeTags(topicPartition, suffix), Sensor.RecordingLevel.DEBUG)
.with(metricsRegistry.objectUploadBytesRateByTopicAndObjectType, new Rate())
.with(metricsRegistry.objectUploadBytesTotalByTopicAndObjectType, new CumulativeSum())
.get()
.record(bytes);
new SensorProvider(metrics,
sensorNameByTopicPartitionAndObjectType(topicPartition, suffix, OBJECT_UPLOAD_BYTES),
() -> topicPartitionAndObjectTypeTags(topicPartition, suffix), Sensor.RecordingLevel.DEBUG)
.with(metricsRegistry.objectUploadBytesRateByTopicPartitionAndObjectType, new Rate())
.with(metricsRegistry.objectUploadBytesTotalByTopicPartitionAndObjectType, new CumulativeSum())
.get()
.record(bytes);
}

private void recordObjectUploadRequests(final TopicPartition topicPartition, final ObjectKey.Suffix suffix) {
new SensorProvider(metrics, sensorName(OBJECT_UPLOAD))
.with(metricsRegistry.objectUploadRequestsRate, new Rate())
.with(metricsRegistry.objectUploadRequestsTotal, new CumulativeCount())
.get()
.record();
new SensorProvider(metrics, sensorNameByTopic(topicPartition, OBJECT_UPLOAD),
() -> topicTags(topicPartition))
.with(metricsRegistry.objectUploadRequestsRateByTopic, new Rate())
.with(metricsRegistry.objectUploadRequestsTotalByTopic, new CumulativeCount())
.get()
.record();
new SensorProvider(metrics, sensorNameByTopicPartition(topicPartition, OBJECT_UPLOAD),
() -> topicPartitionTags(topicPartition), Sensor.RecordingLevel.DEBUG)
.with(metricsRegistry.objectUploadRequestsRateByTopicPartition, new Rate())
.with(metricsRegistry.objectUploadRequestsTotalByTopicPartition, new CumulativeCount())
.get()
.record();
// With suffix
new SensorProvider(metrics, sensorNameByObjectType(suffix, OBJECT_UPLOAD),
() -> objectTypeTags(suffix), Sensor.RecordingLevel.DEBUG)
.with(metricsRegistry.objectUploadRequestsRateByObjectType, new Rate())
.with(metricsRegistry.objectUploadRequestsTotalByObjectType, new CumulativeCount())
.get()
.record();
new SensorProvider(metrics, sensorNameByTopicAndObjectType(topicPartition, suffix, OBJECT_UPLOAD),
() -> topicAndObjectTypeTags(topicPartition, suffix), Sensor.RecordingLevel.DEBUG)
.with(metricsRegistry.objectUploadRequestsRateByTopicAndObjectType, new Rate())
.with(metricsRegistry.objectUploadRequestsTotalByTopicAndObjectType, new CumulativeCount())
.get()
.record();
new SensorProvider(metrics, sensorNameByTopicPartitionAndObjectType(topicPartition, suffix, OBJECT_UPLOAD),
() -> topicPartitionAndObjectTypeTags(topicPartition, suffix), Sensor.RecordingLevel.DEBUG)
.with(metricsRegistry.objectUploadRequestsRateByTopicPartitionAndObjectType, new Rate())
.with(metricsRegistry.objectUploadRequestsTotalByTopicPartitionAndObjectType, new CumulativeCount())
.get()
.record();
}

public void close() {
try {
metrics.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.aiven.kafka.tieredstorage.metrics;

import io.aiven.kafka.tieredstorage.ObjectKey;
import java.util.Map;

import org.apache.kafka.common.MetricNameTemplate;
Expand All @@ -24,8 +25,15 @@
public class MetricsRegistry {

static final String METRIC_GROUP = "remote-storage-manager-metrics";
static final String[] TOPIC_TAG_NAMES = {"topic"};
static final String[] TOPIC_PARTITION_TAG_NAMES = {"topic", "partition"};
static final String TAG_NAME_OBJECT_TYPE = "object-type";
static final String[] OBJECT_TYPE_TAG_NAMES = {TAG_NAME_OBJECT_TYPE};
static final String TAG_NAME_TOPIC = "topic";
static final String[] TOPIC_TAG_NAMES = {TAG_NAME_TOPIC};
static final String[] TOPIC_AND_OBJECT_TYPE_TAG_NAMES = {TAG_NAME_TOPIC, TAG_NAME_OBJECT_TYPE};
static final String TAG_NAME_PARTITION = "partition";
static final String[] TOPIC_PARTITION_TAG_NAMES = {TAG_NAME_TOPIC, TAG_NAME_PARTITION};
static final String[] TOPIC_PARTITION_AND_OBJECT_TYPE_TAG_NAMES =
{TAG_NAME_TOPIC, TAG_NAME_PARTITION, TAG_NAME_OBJECT_TYPE};

// Segment copy metric names
static final String SEGMENT_COPY = "segment-copy";
Expand Down Expand Up @@ -175,26 +183,123 @@ public class MetricsRegistry {
final MetricNameTemplate segmentFetchRequestedBytesTotalByTopicPartition =
new MetricNameTemplate(SEGMENT_FETCH_REQUESTED_BYTES_TOTAL, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);

// Object upload metrics
static final String OBJECT_UPLOAD = "object-upload";
static final String OBJECT_UPLOAD_RATE = OBJECT_UPLOAD + "-rate";
final MetricNameTemplate objectUploadRequestsRate = new MetricNameTemplate(OBJECT_UPLOAD_RATE, METRIC_GROUP, "");
final MetricNameTemplate objectUploadRequestsRateByObjectType =
new MetricNameTemplate(OBJECT_UPLOAD_RATE, METRIC_GROUP, "", OBJECT_TYPE_TAG_NAMES);
final MetricNameTemplate objectUploadRequestsRateByTopic =
new MetricNameTemplate(OBJECT_UPLOAD_RATE, METRIC_GROUP, "", TOPIC_TAG_NAMES);
final MetricNameTemplate objectUploadRequestsRateByTopicAndObjectType =
new MetricNameTemplate(OBJECT_UPLOAD_RATE, METRIC_GROUP, "", TOPIC_AND_OBJECT_TYPE_TAG_NAMES);
final MetricNameTemplate objectUploadRequestsRateByTopicPartition =
new MetricNameTemplate(OBJECT_UPLOAD_RATE, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
final MetricNameTemplate objectUploadRequestsRateByTopicPartitionAndObjectType =
new MetricNameTemplate(OBJECT_UPLOAD_RATE, METRIC_GROUP, "", TOPIC_PARTITION_AND_OBJECT_TYPE_TAG_NAMES);
static final String OBJECT_UPLOAD_TOTAL = OBJECT_UPLOAD + "-total";
final MetricNameTemplate objectUploadRequestsTotal = new MetricNameTemplate(OBJECT_UPLOAD_TOTAL, METRIC_GROUP, "");
final MetricNameTemplate objectUploadRequestsTotalByObjectType =
new MetricNameTemplate(OBJECT_UPLOAD_TOTAL, METRIC_GROUP, "", OBJECT_TYPE_TAG_NAMES);
final MetricNameTemplate objectUploadRequestsTotalByTopic =
new MetricNameTemplate(OBJECT_UPLOAD_TOTAL, METRIC_GROUP, "", TOPIC_TAG_NAMES);
final MetricNameTemplate objectUploadRequestsTotalByTopicAndObjectType =
new MetricNameTemplate(OBJECT_UPLOAD_TOTAL, METRIC_GROUP, "", TOPIC_AND_OBJECT_TYPE_TAG_NAMES);
final MetricNameTemplate objectUploadRequestsTotalByTopicPartition =
new MetricNameTemplate(OBJECT_UPLOAD_TOTAL, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
final MetricNameTemplate objectUploadRequestsTotalByTopicPartitionAndObjectType =
new MetricNameTemplate(OBJECT_UPLOAD_TOTAL, METRIC_GROUP, "", TOPIC_PARTITION_AND_OBJECT_TYPE_TAG_NAMES);
static final String OBJECT_UPLOAD_BYTES = OBJECT_UPLOAD + "-bytes";
static final String OBJECT_UPLOAD_BYTES_RATE = OBJECT_UPLOAD_BYTES + "-rate";
final MetricNameTemplate objectUploadBytesRate = new MetricNameTemplate(OBJECT_UPLOAD_BYTES_RATE, METRIC_GROUP, "");
final MetricNameTemplate objectUploadBytesRateByObjectType =
new MetricNameTemplate(OBJECT_UPLOAD_BYTES_RATE, METRIC_GROUP, "", OBJECT_TYPE_TAG_NAMES);
final MetricNameTemplate objectUploadBytesRateByTopic =
new MetricNameTemplate(OBJECT_UPLOAD_BYTES_RATE, METRIC_GROUP, "", TOPIC_TAG_NAMES);
final MetricNameTemplate objectUploadBytesRateByTopicAndObjectType =
new MetricNameTemplate(OBJECT_UPLOAD_BYTES_RATE, METRIC_GROUP, "", TOPIC_AND_OBJECT_TYPE_TAG_NAMES);
final MetricNameTemplate objectUploadBytesRateByTopicPartition =
new MetricNameTemplate(OBJECT_UPLOAD_BYTES_RATE, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
final MetricNameTemplate objectUploadBytesRateByTopicPartitionAndObjectType =
new MetricNameTemplate(OBJECT_UPLOAD_BYTES_RATE, METRIC_GROUP, "", TOPIC_PARTITION_AND_OBJECT_TYPE_TAG_NAMES);
public static final String OBJECT_UPLOAD_BYTES_TOTAL = OBJECT_UPLOAD_BYTES + "-total";
final MetricNameTemplate objectUploadBytesTotal =
new MetricNameTemplate(OBJECT_UPLOAD_BYTES_TOTAL, METRIC_GROUP, "");
final MetricNameTemplate objectUploadBytesTotalByObjectType =
new MetricNameTemplate(OBJECT_UPLOAD_BYTES_TOTAL, METRIC_GROUP, "", OBJECT_TYPE_TAG_NAMES);
final MetricNameTemplate objectUploadBytesTotalByTopic =
new MetricNameTemplate(OBJECT_UPLOAD_BYTES_TOTAL, METRIC_GROUP, "", TOPIC_TAG_NAMES);
final MetricNameTemplate objectUploadBytesTotalByTopicAndObjectType =
new MetricNameTemplate(OBJECT_UPLOAD_BYTES_TOTAL, METRIC_GROUP, "", TOPIC_AND_OBJECT_TYPE_TAG_NAMES);
final MetricNameTemplate objectUploadBytesTotalByTopicPartition =
new MetricNameTemplate(OBJECT_UPLOAD_BYTES_TOTAL, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
final MetricNameTemplate objectUploadBytesTotalByTopicPartitionAndObjectType =
new MetricNameTemplate(OBJECT_UPLOAD_BYTES_TOTAL, METRIC_GROUP, "", TOPIC_PARTITION_AND_OBJECT_TYPE_TAG_NAMES);

public static String sensorName(final String name) {
return name;
}

public static String sensorNameByObjectType(final ObjectKey.Suffix suffix, final String name) {
return TAG_NAME_OBJECT_TYPE + "." + suffix.value + "." + name;
}

public static String sensorNameByTopic(final TopicPartition topicPartition, final String name) {
return "topic." + topicPartition.topic() + "." + name;
return TAG_NAME_TOPIC + "." + topicPartition.topic() + "." + name;
}

public static String sensorNameByTopicAndObjectType(final TopicPartition topicPartition,
final ObjectKey.Suffix suffix,
final String name) {
return TAG_NAME_TOPIC + "." + topicPartition.topic() + "."
+ TAG_NAME_OBJECT_TYPE + "." + suffix.value
+ "." + name;
}

public static String sensorNameByTopicPartition(final TopicPartition topicPartition, final String name) {
return "topic." + topicPartition.topic() + ".partition." + topicPartition.partition() + "." + name;
return TAG_NAME_TOPIC + "." + topicPartition.topic()
+ "." + TAG_NAME_PARTITION + "." + topicPartition.partition()
+ "." + name;
}

public static String sensorNameByTopicPartitionAndObjectType(final TopicPartition topicPartition,
final ObjectKey.Suffix suffix,
final String name) {
return TAG_NAME_TOPIC + "." + topicPartition.topic()
+ "." + TAG_NAME_PARTITION + "." + topicPartition.partition()
+ "." + TAG_NAME_OBJECT_TYPE + "." + suffix.value
+ "." + name;
}

static Map<String, String> topicTags(final TopicPartition topicPartition) {
return Map.of("topic", topicPartition.topic());
return Map.of(TAG_NAME_TOPIC, topicPartition.topic());
}

static Map<String, String> topicAndObjectTypeTags(final TopicPartition topicPartition,
final ObjectKey.Suffix suffix) {
return Map.of(
TAG_NAME_TOPIC, topicPartition.topic(),
TAG_NAME_OBJECT_TYPE, suffix.value
);
}

static Map<String, String> topicPartitionTags(final TopicPartition topicPartition) {
return Map.of(
"topic", topicPartition.topic(),
"partition", String.valueOf(topicPartition.partition())
TAG_NAME_TOPIC, topicPartition.topic(),
TAG_NAME_PARTITION, String.valueOf(topicPartition.partition())
);
}

static Map<String, String> topicPartitionAndObjectTypeTags(final TopicPartition topicPartition,
final ObjectKey.Suffix suffix) {
return Map.of(
TAG_NAME_TOPIC, topicPartition.topic(),
TAG_NAME_PARTITION, String.valueOf(topicPartition.partition()),
TAG_NAME_OBJECT_TYPE, suffix.value
);
}

static Map<String, String> objectTypeTags(final ObjectKey.Suffix suffix) {
return Map.of(TAG_NAME_OBJECT_TYPE, suffix.value);
}
}
Loading

0 comments on commit 569a1e1

Please sign in to comment.