Skip to content

Commit

Permalink
feat(metrics): add metrics for object upload
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Jul 11, 2023
1 parent 762f56c commit ae541e9
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 12 deletions.
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@
<suppress checks="ClassDataAbstractionCoupling" files="RemoteStorageManager.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="Metrics.java"/>
<suppress checks="CyclomaticComplexity" files="MetricCollector.java"/>
<suppress checks="JavaNCSS" files="Metrics.java"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void copyLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegmentMe
final InputStream transactionIndex = Files.newInputStream(logSegmentData.transactionIndex().get());
uploadIndexFile(remoteLogSegmentMetadata, transactionIndex, TRANSACTION);
}
final ByteBufferInputStream leaderEpoch = new ByteBufferInputStream(logSegmentData.leaderEpochIndex());
final InputStream leaderEpoch = new ByteBufferInputStream(logSegmentData.leaderEpochIndex());
uploadIndexFile(remoteLogSegmentMetadata, leaderEpoch, LEADER_EPOCH);
} catch (final StorageBackendException | IOException e) {
metrics.recordSegmentCopyError();
Expand Down Expand Up @@ -238,17 +238,20 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet
throws IOException, StorageBackendException {
final String fileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.LOG);
try (final var sis = transformFinisher.toInputStream()) {
uploader.upload(sis, fileKey);
final var bytes = uploader.upload(sis, fileKey);
metrics.recordObjectUpload(ObjectKey.Suffix.LOG, bytes);
}
}

private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final InputStream index,
final IndexType indexType)
throws StorageBackendException, IOException {
final String key = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.fromIndexType(indexType));
final var suffix = ObjectKey.Suffix.fromIndexType(indexType);
final String key = objectKey.key(remoteLogSegmentMetadata, suffix);
try (index) {
uploader.upload(index, key);
final var bytes = uploader.upload(index, key);
metrics.recordObjectUpload(suffix, bytes);
}
}

Expand All @@ -259,7 +262,8 @@ private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetad
final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);

try (final ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes())) {
uploader.upload(manifestContent, manifestFileKey);
final var bytes = uploader.upload(manifestContent, manifestFileKey);
metrics.recordObjectUpload(ObjectKey.Suffix.MANIFEST, bytes);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package io.aiven.kafka.tieredstorage.metrics;

import java.util.List;
import java.util.Map;

import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
Expand All @@ -29,6 +31,8 @@
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;

import io.aiven.kafka.tieredstorage.ObjectKey;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,11 +54,16 @@ public class Metrics {
private final Sensor segmentFetchRequests;
private final Sensor segmentFetchRequestedBytes;

private final Sensor objectUploadRequests;
private final Sensor objectUploadBytes;

public Metrics(final Time time) {
final JmxReporter reporter = new JmxReporter();

metrics = new org.apache.kafka.common.metrics.Metrics(
new MetricConfig(), List.of(reporter), time,
new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), // TODO: fixed when rebased with #309
List.of(reporter),
time,
new KafkaMetricsContext("aiven.kafka.server.tieredstorage")
);
final String metricGroup = "remote-storage-manager-metrics";
Expand Down Expand Up @@ -102,6 +111,36 @@ public Metrics(final Time time) {
segmentFetchRequestedBytes.add(
metrics.metricName("segment-fetch-requested-bytes-total", metricGroup),
new CumulativeSum());

objectUploadRequests = metrics.sensor("object-upload");
objectUploadRequests.add(metrics.metricName("object-upload-rate", metricGroup), new Rate());
objectUploadRequests.add(metrics.metricName("object-upload-total", metricGroup), new CumulativeCount());

objectUploadBytes = metrics.sensor("object-upload-bytes");
objectUploadBytes.add(metrics.metricName("object-upload-bytes-rate", metricGroup), new Rate());
objectUploadBytes.add(metrics.metricName("object-upload-bytes-total", metricGroup), new CumulativeSum());

final var objectUploadRequestsRate =
new MetricNameTemplate("object-upload-rate", metricGroup, "", "object-type");
final var objectUploadRequestsTotal =
new MetricNameTemplate("object-upload-total", metricGroup, "", "object-type");
final var objectUploadBytesRate =
new MetricNameTemplate("object-upload-bytes-rate", metricGroup, "", "object-type");
final var objectUploadBytesTotal =
new MetricNameTemplate("object-upload-bytes-total", metricGroup, "", "object-type");

for (final ObjectKey.Suffix suffix : ObjectKey.Suffix.values()) {
final var objectUploadRequests =
metrics.sensor("object." + suffix.value + ".object-upload", Sensor.RecordingLevel.DEBUG);
final var tags = Map.of("object-type", suffix.value);
objectUploadRequests.add(metrics.metricInstance(objectUploadRequestsRate, tags), new Rate());
objectUploadRequests.add(metrics.metricInstance(objectUploadRequestsTotal, tags), new CumulativeCount());

final var objectUploadBytes =
metrics.sensor("object." + suffix.value + ".object-upload-bytes", Sensor.RecordingLevel.DEBUG);
objectUploadBytes.add(metrics.metricInstance(objectUploadBytesRate, tags), new Rate());
objectUploadBytes.add(metrics.metricInstance(objectUploadBytesTotal, tags), new CumulativeSum());
}
}

public void recordSegmentCopy(final int bytes) {
Expand Down Expand Up @@ -135,6 +174,15 @@ public void recordSegmentFetch(final int bytes) {
segmentFetchRequestedBytes.record(bytes);
}

public void recordObjectUpload(final ObjectKey.Suffix suffix, final long bytes) {
objectUploadRequests.record();
final var sensorRequests = metrics.getSensor("object." + suffix.value + ".object-upload");
sensorRequests.record();
objectUploadBytes.record(bytes);
final var sensorBytes = metrics.getSensor("object." + suffix.value + ".object-upload-bytes");
sensorBytes.record(bytes);
}

public void close() {
try {
metrics.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,26 @@ void setup(@TempDir final Path tmpDir,
final Path sourceFile = source.resolve("file");
Files.write(sourceFile, new byte[LOG_SEGMENT_BYTES]);

final var leaderEpoch = ByteBuffer.wrap(new byte[LOG_SEGMENT_BYTES]);
logSegmentData = new LogSegmentData(
sourceFile, sourceFile, sourceFile, Optional.empty(), sourceFile,
ByteBuffer.allocate(0)
leaderEpoch
);
}

@Test
void metricsShouldBeReported() throws RemoteStorageException, JMException {
rsm.copyLogSegmentData(REMOTE_LOG_SEGMENT_METADATA, logSegmentData);
logSegmentData.leaderEpochIndex().flip(); // so leader epoch can be consumed again
rsm.copyLogSegmentData(REMOTE_LOG_SEGMENT_METADATA, logSegmentData);
logSegmentData.leaderEpochIndex().flip();
rsm.copyLogSegmentData(REMOTE_LOG_SEGMENT_METADATA, logSegmentData);
logSegmentData.leaderEpochIndex().flip();

rsm.fetchLogSegment(REMOTE_LOG_SEGMENT_METADATA, 0);

final ObjectName rsmMetricsName = ObjectName.getInstance(
"aiven.kafka.server.tieredstorage:type=remote-storage-manager-metrics");
final var metricObjectName = "aiven.kafka.server.tieredstorage:type=remote-storage-manager-metrics";
final ObjectName rsmMetricsName = ObjectName.getInstance(metricObjectName);
assertThat(MBEAN_SERVER.getAttribute(rsmMetricsName, "segment-copy-rate"))
.isEqualTo(3.0 / METRIC_TIME_WINDOW_SEC);
assertThat(MBEAN_SERVER.getAttribute(rsmMetricsName, "segment-copy-total"))
Expand All @@ -140,6 +144,49 @@ void metricsShouldBeReported() throws RemoteStorageException, JMException {
assertThat(MBEAN_SERVER.getAttribute(rsmMetricsName, "segment-fetch-requested-bytes-total"))
.isEqualTo(10.0);

assertThat(MBEAN_SERVER.getAttribute(rsmMetricsName, "object-upload-total"))
.isEqualTo(18.0);
assertThat(MBEAN_SERVER.getAttribute(rsmMetricsName, "object-upload-rate"))
.isEqualTo(18.0 / METRIC_TIME_WINDOW_SEC);

assertThat(MBEAN_SERVER.getAttribute(rsmMetricsName, "object-upload-bytes-total"))
.isEqualTo(657.0);
assertThat(MBEAN_SERVER.getAttribute(rsmMetricsName, "object-upload-bytes-rate"))
.isEqualTo(657.0 / METRIC_TIME_WINDOW_SEC);

for (final var suffix : ObjectKey.Suffix.values()) {
final ObjectName storageMetricsName =
ObjectName.getInstance(metricObjectName + ",object-type=" + suffix.value);
if (suffix == ObjectKey.Suffix.TXN_INDEX) {
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total"))
.isEqualTo(0.0);
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total"))
.isEqualTo(0.0);
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-rate"))
.isEqualTo(0.0);
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-total"))
.isEqualTo(0.0);
} else if (suffix == ObjectKey.Suffix.MANIFEST) {
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total"))
.isEqualTo(3.0);
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total"))
.isEqualTo(3.0);
assertThat((double) MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-rate"))
.isGreaterThan(0.0);
assertThat((double) MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-total"))
.isGreaterThan(0.0);
} else {
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-rate"))
.isEqualTo(3.0 / METRIC_TIME_WINDOW_SEC);
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total"))
.isEqualTo(3.0);
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-rate"))
.isEqualTo(30.0 / METRIC_TIME_WINDOW_SEC);
assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-total"))
.isEqualTo(30.0);
}
}

rsm.deleteLogSegmentData(REMOTE_LOG_SEGMENT_METADATA);
rsm.deleteLogSegmentData(REMOTE_LOG_SEGMENT_METADATA);

Expand Down Expand Up @@ -168,17 +215,17 @@ public void delete(final String key) throws StorageBackendException {
}

@Override
public InputStream fetch(final String key) throws StorageBackendException {
public InputStream fetch(final String key) {
return null;
}

@Override
public InputStream fetch(final String key, final BytesRange range) throws StorageBackendException {
public InputStream fetch(final String key, final BytesRange range) {
return null;
}

@Override
public void upload(final InputStream inputStream, final String key) throws StorageBackendException {
public long upload(final InputStream inputStream, final String key) throws StorageBackendException {
throw new StorageBackendException("something wrong");
}

Expand Down

0 comments on commit ae541e9

Please sign in to comment.