diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index cbee25b7a..932725d84 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -29,7 +29,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; -import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.storage.LogSegmentData; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; @@ -198,7 +197,7 @@ public void copyLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegmentMe final InputStream transactionIndex = Files.newInputStream(logSegmentData.transactionIndex().get()); uploadIndexFile(remoteLogSegmentMetadata, transactionIndex, TRANSACTION); } - final ByteBufferInputStream leaderEpoch = new ByteBufferInputStream(logSegmentData.leaderEpochIndex()); + final InputStream leaderEpoch = new ByteArrayInputStream(logSegmentData.leaderEpochIndex().array()); uploadIndexFile(remoteLogSegmentMetadata, leaderEpoch, LEADER_EPOCH); } catch (final StorageBackendException | IOException e) { throw new RemoteStorageException(e); @@ -231,7 +230,8 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet throws IOException, StorageBackendException { final String fileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.LOG); try (final var sis = transformFinisher.toInputStream()) { - uploader.upload(sis, fileKey); + final var bytes = uploader.upload(sis, fileKey); + metrics.recordObjectUpload(ObjectKey.Suffix.LOG, bytes); } } @@ -239,9 +239,11 @@ private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMeta final InputStream index, final IndexType indexType) throws StorageBackendException, IOException { - final String key = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.fromIndexType(indexType)); + final var suffix = ObjectKey.Suffix.fromIndexType(indexType); + final String key = objectKey.key(remoteLogSegmentMetadata, suffix); try (index) { - uploader.upload(index, key); + final var bytes = uploader.upload(index, key); + metrics.recordObjectUpload(suffix, bytes); } } @@ -252,7 +254,8 @@ private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetad final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST); try (final ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes())) { - uploader.upload(manifestContent, manifestFileKey); + final var bytes = uploader.upload(manifestContent, manifestFileKey); + metrics.recordObjectUpload(ObjectKey.Suffix.MANIFEST, bytes); } } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java index 89ca48831..c0d064bd6 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java @@ -17,7 +17,9 @@ package io.aiven.kafka.tieredstorage.metrics; import java.util.List; +import java.util.Map; +import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.KafkaMetricsContext; import org.apache.kafka.common.metrics.MetricConfig; @@ -29,6 +31,8 @@ import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.utils.Time; +import io.aiven.kafka.tieredstorage.ObjectKey; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +53,7 @@ public class Metrics { private final Sensor segmentFetchRequestedBytes; final String rsmMetricGroup = "remote-storage-manager-metrics"; + final String storageBackendMetricGroup = "storage-backend-metrics"; public Metrics(final Time time) { final JmxReporter reporter = new JmxReporter(); @@ -93,6 +98,26 @@ public Metrics(final Time time) { segmentFetchRequestedBytes.add( metrics.metricName("segment-fetch-requested-bytes-total", rsmMetricGroup), new CumulativeSum()); + + final var objectUploadRequestsRate = + new MetricNameTemplate("object-upload-rate", storageBackendMetricGroup, "", "object-type"); + final var objectUploadRequestsTotal = + new MetricNameTemplate("object-upload-total", storageBackendMetricGroup, "", "object-type"); + final var objectUploadBytesRate = + new MetricNameTemplate("object-upload-bytes-rate", storageBackendMetricGroup, "", "object-type"); + final var objectUploadBytesTotal = + new MetricNameTemplate("object-upload-bytes-total", storageBackendMetricGroup, "", "object-type"); + + for (ObjectKey.Suffix suffix : ObjectKey.Suffix.values()) { + final var objectUploadRequests = metrics.sensor("object." + suffix.value + ".object-upload"); + final var tags = Map.of("object-type", suffix.value); + objectUploadRequests.add(metrics.metricInstance(objectUploadRequestsRate, tags), new Rate()); + objectUploadRequests.add(metrics.metricInstance(objectUploadRequestsTotal, tags), new CumulativeCount()); + + final var objectUploadBytes = metrics.sensor("object." + suffix.value + ".object-upload-bytes"); + objectUploadBytes.add(metrics.metricInstance(objectUploadBytesRate, tags), new Rate()); + objectUploadBytes.add(metrics.metricInstance(objectUploadBytesTotal, tags), new CumulativeSum()); + } } public void recordSegmentCopy(final int bytes) { @@ -118,6 +143,13 @@ public void recordSegmentFetch(final int bytes) { segmentFetchRequestedBytes.record(bytes); } + public void recordObjectUpload(final ObjectKey.Suffix suffix, final long bytes) { + final var sensorRequests = metrics.getSensor("object." + suffix.value + ".object-upload"); + sensorRequests.record(); + final var sensorBytes = metrics.getSensor("object." + suffix.value + ".object-upload-bytes"); + sensorBytes.record(bytes); + } + public void close() { try { metrics.close(); diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java index 22ef2cc9f..01a1fa164 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java @@ -90,9 +90,10 @@ static void setup(@TempDir final Path tmpDir, final Path sourceFile = source.resolve("file"); Files.write(sourceFile, new byte[LOG_SEGMENT_BYTES]); + final var leaderEpoch = ByteBuffer.wrap(new byte[LOG_SEGMENT_BYTES]); logSegmentData = new LogSegmentData( sourceFile, sourceFile, sourceFile, Optional.empty(), sourceFile, - ByteBuffer.allocate(0) + leaderEpoch ); } @@ -131,6 +132,39 @@ void metricsShouldBeReported() throws RemoteStorageException, JMException { assertThat(MBEAN_SERVER.getAttribute(rsmMetricsName, "segment-fetch-requested-bytes-total")) .isEqualTo(10.0); + for (final var suffix : ObjectKey.Suffix.values()) { + final ObjectName storageMetricsName = ObjectName.getInstance( + "aiven.kafka.server.tieredstorage:type=storage-backend-metrics,object-type=" + suffix.value); + if (suffix == ObjectKey.Suffix.TXN_INDEX) { + assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total")) + .isEqualTo(0.0); + assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total")) + .isEqualTo(0.0); + assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-rate")) + .isEqualTo(0.0); + assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-total")) + .isEqualTo(0.0); + } else if (suffix == ObjectKey.Suffix.MANIFEST) { + assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total")) + .isEqualTo(3.0); + assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total")) + .isEqualTo(3.0); + assertThat((double) MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-rate")) + .isGreaterThan(0.0); + assertThat((double) MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-total")) + .isGreaterThan(0.0); + } else { + assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-rate")) + .isEqualTo(3.0 / METRIC_TIME_WINDOW_SEC); + assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total")) + .isEqualTo(3.0); + assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-rate")) + .isEqualTo(30.0 / METRIC_TIME_WINDOW_SEC); + assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-total")) + .isEqualTo(30.0); + } + } + rsm.deleteLogSegmentData(REMOTE_LOG_SEGMENT_METADATA); rsm.deleteLogSegmentData(REMOTE_LOG_SEGMENT_METADATA);