Skip to content

Commit

Permalink
feat(metrics): add metrics for object upload
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Jul 3, 2023
1 parent 01e36d6 commit 4c8a116
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;

import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
Expand Down Expand Up @@ -198,7 +197,7 @@ public void copyLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegmentMe
final InputStream transactionIndex = Files.newInputStream(logSegmentData.transactionIndex().get());
uploadIndexFile(remoteLogSegmentMetadata, transactionIndex, TRANSACTION);
}
final ByteBufferInputStream leaderEpoch = new ByteBufferInputStream(logSegmentData.leaderEpochIndex());
final InputStream leaderEpoch = new ByteArrayInputStream(logSegmentData.leaderEpochIndex().array());
uploadIndexFile(remoteLogSegmentMetadata, leaderEpoch, LEADER_EPOCH);
} catch (final StorageBackendException | IOException e) {
throw new RemoteStorageException(e);
Expand Down Expand Up @@ -231,17 +230,20 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet
throws IOException, StorageBackendException {
final String fileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.LOG);
try (final var sis = transformFinisher.toInputStream()) {
uploader.upload(sis, fileKey);
final var bytes = uploader.upload(sis, fileKey);
metrics.recordObjectUpload(ObjectKey.Suffix.LOG, bytes);
}
}

private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final InputStream index,
final IndexType indexType)
throws StorageBackendException, IOException {
final String key = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.fromIndexType(indexType));
final var suffix = ObjectKey.Suffix.fromIndexType(indexType);
final String key = objectKey.key(remoteLogSegmentMetadata, suffix);
try (index) {
uploader.upload(index, key);
final var bytes = uploader.upload(index, key);
metrics.recordObjectUpload(suffix, bytes);
}
}

Expand All @@ -252,7 +254,8 @@ private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetad
final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);

try (final ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes())) {
uploader.upload(manifestContent, manifestFileKey);
final var bytes = uploader.upload(manifestContent, manifestFileKey);
metrics.recordObjectUpload(ObjectKey.Suffix.MANIFEST, bytes);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package io.aiven.kafka.tieredstorage.metrics;

import java.util.List;
import java.util.Map;

import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
Expand All @@ -29,6 +31,8 @@
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;

import io.aiven.kafka.tieredstorage.ObjectKey;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,6 +53,7 @@ public class Metrics {
private final Sensor segmentFetchRequestedBytes;

final String rsmMetricGroup = "remote-storage-manager-metrics";
final String storageBackendMetricGroup = "storage-backend-metrics";

public Metrics(final Time time) {
final JmxReporter reporter = new JmxReporter();
Expand Down Expand Up @@ -93,6 +98,26 @@ public Metrics(final Time time) {
segmentFetchRequestedBytes.add(
metrics.metricName("segment-fetch-requested-bytes-total", rsmMetricGroup),
new CumulativeSum());

final var objectUploadRequestsRate =
new MetricNameTemplate("object-upload-rate", storageBackendMetricGroup, "", "object-type");
final var objectUploadRequestsTotal =
new MetricNameTemplate("object-upload-total", storageBackendMetricGroup, "", "object-type");
final var objectUploadBytesRate =
new MetricNameTemplate("object-upload-bytes-rate", storageBackendMetricGroup, "", "object-type");
final var objectUploadBytesTotal =
new MetricNameTemplate("object-upload-bytes-total", storageBackendMetricGroup, "", "object-type");

for (ObjectKey.Suffix suffix : ObjectKey.Suffix.values()) {
final var objectUploadRequests = metrics.sensor("object." + suffix.value + ".object-upload");
final var tags = Map.of("object-type", suffix.value);
objectUploadRequests.add(metrics.metricInstance(objectUploadRequestsRate, tags), new Rate());
objectUploadRequests.add(metrics.metricInstance(objectUploadRequestsTotal, tags), new CumulativeCount());

final var objectUploadBytes = metrics.sensor("object." + suffix.value + ".object-upload-bytes");
objectUploadBytes.add(metrics.metricInstance(objectUploadBytesRate, tags), new Rate());
objectUploadBytes.add(metrics.metricInstance(objectUploadBytesTotal, tags), new CumulativeSum());
}
}

public void recordSegmentCopy(final int bytes) {
Expand All @@ -118,6 +143,13 @@ public void recordSegmentFetch(final int bytes) {
segmentFetchRequestedBytes.record(bytes);
}

public void recordObjectUpload(final ObjectKey.Suffix suffix, final long bytes) {
final var sensorRequests = metrics.getSensor("object." + suffix.value + ".object-upload");
sensorRequests.record();
final var sensorBytes = metrics.getSensor("object." + suffix.value + ".object-upload-bytes");
sensorBytes.record(bytes);
}

public void close() {
try {
metrics.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ static void setup(@TempDir final Path tmpDir,
final Path sourceFile = source.resolve("file");
Files.write(sourceFile, new byte[LOG_SEGMENT_BYTES]);

final var leaderEpoch = ByteBuffer.wrap(new byte[LOG_SEGMENT_BYTES]);
logSegmentData = new LogSegmentData(
sourceFile, sourceFile, sourceFile, Optional.empty(), sourceFile,
ByteBuffer.allocate(0)
leaderEpoch
);
}

Expand Down Expand Up @@ -131,6 +132,39 @@ void metricsShouldBeReported() throws RemoteStorageException, JMException {
assertThat(MBEAN_SERVER.getAttribute(rsmMetricsName, "segment-fetch-requested-bytes-total"))
.isEqualTo(10.0);

for (final var suffix : ObjectKey.Suffix.values()) {
final ObjectName storageMetricsName = ObjectName.getInstance(
"aiven.kafka.server.tieredstorage:type=storage-backend-metrics,object-type=" + suffix.value);
if (suffix == ObjectKey.Suffix.TXN_INDEX) {
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total"))
.isEqualTo(0.0);
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total"))
.isEqualTo(0.0);
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-rate"))
.isEqualTo(0.0);
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-total"))
.isEqualTo(0.0);
} else if (suffix == ObjectKey.Suffix.MANIFEST) {
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total"))
.isEqualTo(3.0);
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total"))
.isEqualTo(3.0);
assertThat((double) MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-rate"))
.isGreaterThan(0.0);
assertThat((double) MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-total"))
.isGreaterThan(0.0);
} else {
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-rate"))
.isEqualTo(3.0 / METRIC_TIME_WINDOW_SEC);
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total"))
.isEqualTo(3.0);
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-rate"))
.isEqualTo(30.0 / METRIC_TIME_WINDOW_SEC);
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-total"))
.isEqualTo(30.0);
}
}

rsm.deleteLogSegmentData(REMOTE_LOG_SEGMENT_METADATA);
rsm.deleteLogSegmentData(REMOTE_LOG_SEGMENT_METADATA);

Expand Down

0 comments on commit 4c8a116

Please sign in to comment.