diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index 0c758060f..d8b7befa2 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -252,7 +252,12 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet throws IOException, StorageBackendException { final String fileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.LOG); try (final var sis = transformFinisher.toInputStream()) { - uploader.upload(sis, fileKey); + final var bytes = uploader.upload(sis, fileKey); + metrics.recordObjectUpload( + remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), + ObjectKey.Suffix.LOG, + bytes + ); } } @@ -260,9 +265,15 @@ private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMeta final InputStream index, final IndexType indexType) throws StorageBackendException, IOException { - final String key = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.fromIndexType(indexType)); + final var suffix = ObjectKey.Suffix.fromIndexType(indexType); + final String key = objectKey.key(remoteLogSegmentMetadata, suffix); try (index) { - uploader.upload(index, key); + final var bytes = uploader.upload(index, key); + metrics.recordObjectUpload( + remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), + suffix, + bytes + ); } } @@ -273,7 +284,12 @@ private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetad final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST); try (final ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes())) { - uploader.upload(manifestContent, manifestFileKey); + final var bytes = uploader.upload(manifestContent, manifestFileKey); + metrics.recordObjectUpload( + remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), + ObjectKey.Suffix.MANIFEST, + bytes + ); } } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java index f32b3447b..1a7d22fd9 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java @@ -30,9 +30,13 @@ import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.utils.Time; +import io.aiven.kafka.tieredstorage.ObjectKey; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.OBJECT_UPLOAD; +import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.OBJECT_UPLOAD_BYTES; import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_COPY; import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_COPY_BYTES; import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_COPY_ERRORS; @@ -43,9 +47,15 @@ import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_DELETE_TIME; import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_FETCH; import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_FETCH_REQUESTED_BYTES; +import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.objectTypeTags; import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.sensorName; +import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.sensorNameByObjectType; import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.sensorNameByTopic; +import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.sensorNameByTopicAndObjectType; import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.sensorNameByTopicPartition; +import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.sensorNameByTopicPartitionAndObjectType; +import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.topicAndObjectTypeTags; +import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.topicPartitionAndObjectTypeTags; import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.topicPartitionTags; import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.topicTags; @@ -60,7 +70,9 @@ public Metrics(final Time time, final MetricConfig metricConfig) { final JmxReporter reporter = new JmxReporter(); metrics = new org.apache.kafka.common.metrics.Metrics( - metricConfig, List.of(reporter), time, + metricConfig, + List.of(reporter), + time, new KafkaMetricsContext("aiven.kafka.server.tieredstorage") ); @@ -288,6 +300,93 @@ private void recordSegmentFetchRequests(final TopicPartition topicPartition) { .record(); } + public void recordObjectUpload(final TopicPartition topicPartition, final ObjectKey.Suffix suffix, + final long bytes) { + recordObjectUploadRequests(topicPartition, suffix); + recordObjectUploadBytes(topicPartition, suffix, bytes); + } + + private void recordObjectUploadBytes(final TopicPartition topicPartition, + final ObjectKey.Suffix suffix, + final long bytes) { + new SensorProvider(metrics, sensorName(OBJECT_UPLOAD_BYTES)) + .with(metricsRegistry.objectUploadBytesRate, new Rate()) + .with(metricsRegistry.objectUploadBytesTotal, new CumulativeSum()) + .get() + .record(bytes); + new SensorProvider(metrics, sensorNameByTopic(topicPartition, OBJECT_UPLOAD_BYTES), + () -> topicTags(topicPartition)) + .with(metricsRegistry.objectUploadBytesRateByTopic, new Rate()) + .with(metricsRegistry.objectUploadBytesTotalByTopic, new CumulativeSum()) + .get() + .record(bytes); + new SensorProvider(metrics, sensorNameByTopicPartition(topicPartition, OBJECT_UPLOAD_BYTES), + () -> topicPartitionTags(topicPartition), Sensor.RecordingLevel.DEBUG) + .with(metricsRegistry.objectUploadBytesRateByTopicPartition, new Rate()) + .with(metricsRegistry.objectUploadBytesTotalByTopicPartition, new CumulativeSum()) + .get() + .record(bytes); + // with suffix + new SensorProvider(metrics, sensorNameByObjectType(suffix, OBJECT_UPLOAD_BYTES), + () -> objectTypeTags(suffix), Sensor.RecordingLevel.DEBUG) + .with(metricsRegistry.objectUploadBytesRateByObjectType, new Rate()) + .with(metricsRegistry.objectUploadBytesTotalByObjectType, new CumulativeSum()) + .get() + .record(bytes); + new SensorProvider(metrics, sensorNameByTopicAndObjectType(topicPartition, suffix, OBJECT_UPLOAD_BYTES), + () -> topicAndObjectTypeTags(topicPartition, suffix), Sensor.RecordingLevel.DEBUG) + .with(metricsRegistry.objectUploadBytesRateByTopicAndObjectType, new Rate()) + .with(metricsRegistry.objectUploadBytesTotalByTopicAndObjectType, new CumulativeSum()) + .get() + .record(bytes); + new SensorProvider(metrics, + sensorNameByTopicPartitionAndObjectType(topicPartition, suffix, OBJECT_UPLOAD_BYTES), + () -> topicPartitionAndObjectTypeTags(topicPartition, suffix), Sensor.RecordingLevel.DEBUG) + .with(metricsRegistry.objectUploadBytesRateByTopicPartitionAndObjectType, new Rate()) + .with(metricsRegistry.objectUploadBytesTotalByTopicPartitionAndObjectType, new CumulativeSum()) + .get() + .record(bytes); + } + + private void recordObjectUploadRequests(final TopicPartition topicPartition, final ObjectKey.Suffix suffix) { + new SensorProvider(metrics, sensorName(OBJECT_UPLOAD)) + .with(metricsRegistry.objectUploadRequestsRate, new Rate()) + .with(metricsRegistry.objectUploadRequestsTotal, new CumulativeCount()) + .get() + .record(); + new SensorProvider(metrics, sensorNameByTopic(topicPartition, OBJECT_UPLOAD), + () -> topicTags(topicPartition)) + .with(metricsRegistry.objectUploadRequestsRateByTopic, new Rate()) + .with(metricsRegistry.objectUploadRequestsTotalByTopic, new CumulativeCount()) + .get() + .record(); + new SensorProvider(metrics, sensorNameByTopicPartition(topicPartition, OBJECT_UPLOAD), + () -> topicPartitionTags(topicPartition), Sensor.RecordingLevel.DEBUG) + .with(metricsRegistry.objectUploadRequestsRateByTopicPartition, new Rate()) + .with(metricsRegistry.objectUploadRequestsTotalByTopicPartition, new CumulativeCount()) + .get() + .record(); + // With suffix + new SensorProvider(metrics, sensorNameByObjectType(suffix, OBJECT_UPLOAD), + () -> objectTypeTags(suffix), Sensor.RecordingLevel.DEBUG) + .with(metricsRegistry.objectUploadRequestsRateByObjectType, new Rate()) + .with(metricsRegistry.objectUploadRequestsTotalByObjectType, new CumulativeCount()) + .get() + .record(); + new SensorProvider(metrics, sensorNameByTopicAndObjectType(topicPartition, suffix, OBJECT_UPLOAD), + () -> topicAndObjectTypeTags(topicPartition, suffix), Sensor.RecordingLevel.DEBUG) + .with(metricsRegistry.objectUploadRequestsRateByTopicAndObjectType, new Rate()) + .with(metricsRegistry.objectUploadRequestsTotalByTopicAndObjectType, new CumulativeCount()) + .get() + .record(); + new SensorProvider(metrics, sensorNameByTopicPartitionAndObjectType(topicPartition, suffix, OBJECT_UPLOAD), + () -> topicPartitionAndObjectTypeTags(topicPartition, suffix), Sensor.RecordingLevel.DEBUG) + .with(metricsRegistry.objectUploadRequestsRateByTopicPartitionAndObjectType, new Rate()) + .with(metricsRegistry.objectUploadRequestsTotalByTopicPartitionAndObjectType, new CumulativeCount()) + .get() + .record(); + } + public void close() { try { metrics.close(); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/MetricsRegistry.java b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/MetricsRegistry.java index 5dd19860a..e0d053726 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/MetricsRegistry.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/MetricsRegistry.java @@ -21,11 +21,20 @@ import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.TopicPartition; +import io.aiven.kafka.tieredstorage.ObjectKey; + public class MetricsRegistry { static final String METRIC_GROUP = "remote-storage-manager-metrics"; - static final String[] TOPIC_TAG_NAMES = {"topic"}; - static final String[] TOPIC_PARTITION_TAG_NAMES = {"topic", "partition"}; + static final String TAG_NAME_OBJECT_TYPE = "object-type"; + static final String[] OBJECT_TYPE_TAG_NAMES = {TAG_NAME_OBJECT_TYPE}; + static final String TAG_NAME_TOPIC = "topic"; + static final String[] TOPIC_TAG_NAMES = {TAG_NAME_TOPIC}; + static final String[] TOPIC_AND_OBJECT_TYPE_TAG_NAMES = {TAG_NAME_TOPIC, TAG_NAME_OBJECT_TYPE}; + static final String TAG_NAME_PARTITION = "partition"; + static final String[] TOPIC_PARTITION_TAG_NAMES = {TAG_NAME_TOPIC, TAG_NAME_PARTITION}; + static final String[] TOPIC_PARTITION_AND_OBJECT_TYPE_TAG_NAMES = + {TAG_NAME_TOPIC, TAG_NAME_PARTITION, TAG_NAME_OBJECT_TYPE}; // Segment copy metric names static final String SEGMENT_COPY = "segment-copy"; @@ -175,26 +184,123 @@ public class MetricsRegistry { final MetricNameTemplate segmentFetchRequestedBytesTotalByTopicPartition = new MetricNameTemplate(SEGMENT_FETCH_REQUESTED_BYTES_TOTAL, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES); + // Object upload metrics + static final String OBJECT_UPLOAD = "object-upload"; + static final String OBJECT_UPLOAD_RATE = OBJECT_UPLOAD + "-rate"; + final MetricNameTemplate objectUploadRequestsRate = new MetricNameTemplate(OBJECT_UPLOAD_RATE, METRIC_GROUP, ""); + final MetricNameTemplate objectUploadRequestsRateByObjectType = + new MetricNameTemplate(OBJECT_UPLOAD_RATE, METRIC_GROUP, "", OBJECT_TYPE_TAG_NAMES); + final MetricNameTemplate objectUploadRequestsRateByTopic = + new MetricNameTemplate(OBJECT_UPLOAD_RATE, METRIC_GROUP, "", TOPIC_TAG_NAMES); + final MetricNameTemplate objectUploadRequestsRateByTopicAndObjectType = + new MetricNameTemplate(OBJECT_UPLOAD_RATE, METRIC_GROUP, "", TOPIC_AND_OBJECT_TYPE_TAG_NAMES); + final MetricNameTemplate objectUploadRequestsRateByTopicPartition = + new MetricNameTemplate(OBJECT_UPLOAD_RATE, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES); + final MetricNameTemplate objectUploadRequestsRateByTopicPartitionAndObjectType = + new MetricNameTemplate(OBJECT_UPLOAD_RATE, METRIC_GROUP, "", TOPIC_PARTITION_AND_OBJECT_TYPE_TAG_NAMES); + static final String OBJECT_UPLOAD_TOTAL = OBJECT_UPLOAD + "-total"; + final MetricNameTemplate objectUploadRequestsTotal = new MetricNameTemplate(OBJECT_UPLOAD_TOTAL, METRIC_GROUP, ""); + final MetricNameTemplate objectUploadRequestsTotalByObjectType = + new MetricNameTemplate(OBJECT_UPLOAD_TOTAL, METRIC_GROUP, "", OBJECT_TYPE_TAG_NAMES); + final MetricNameTemplate objectUploadRequestsTotalByTopic = + new MetricNameTemplate(OBJECT_UPLOAD_TOTAL, METRIC_GROUP, "", TOPIC_TAG_NAMES); + final MetricNameTemplate objectUploadRequestsTotalByTopicAndObjectType = + new MetricNameTemplate(OBJECT_UPLOAD_TOTAL, METRIC_GROUP, "", TOPIC_AND_OBJECT_TYPE_TAG_NAMES); + final MetricNameTemplate objectUploadRequestsTotalByTopicPartition = + new MetricNameTemplate(OBJECT_UPLOAD_TOTAL, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES); + final MetricNameTemplate objectUploadRequestsTotalByTopicPartitionAndObjectType = + new MetricNameTemplate(OBJECT_UPLOAD_TOTAL, METRIC_GROUP, "", TOPIC_PARTITION_AND_OBJECT_TYPE_TAG_NAMES); + static final String OBJECT_UPLOAD_BYTES = OBJECT_UPLOAD + "-bytes"; + static final String OBJECT_UPLOAD_BYTES_RATE = OBJECT_UPLOAD_BYTES + "-rate"; + final MetricNameTemplate objectUploadBytesRate = new MetricNameTemplate(OBJECT_UPLOAD_BYTES_RATE, METRIC_GROUP, ""); + final MetricNameTemplate objectUploadBytesRateByObjectType = + new MetricNameTemplate(OBJECT_UPLOAD_BYTES_RATE, METRIC_GROUP, "", OBJECT_TYPE_TAG_NAMES); + final MetricNameTemplate objectUploadBytesRateByTopic = + new MetricNameTemplate(OBJECT_UPLOAD_BYTES_RATE, METRIC_GROUP, "", TOPIC_TAG_NAMES); + final MetricNameTemplate objectUploadBytesRateByTopicAndObjectType = + new MetricNameTemplate(OBJECT_UPLOAD_BYTES_RATE, METRIC_GROUP, "", TOPIC_AND_OBJECT_TYPE_TAG_NAMES); + final MetricNameTemplate objectUploadBytesRateByTopicPartition = + new MetricNameTemplate(OBJECT_UPLOAD_BYTES_RATE, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES); + final MetricNameTemplate objectUploadBytesRateByTopicPartitionAndObjectType = + new MetricNameTemplate(OBJECT_UPLOAD_BYTES_RATE, METRIC_GROUP, "", TOPIC_PARTITION_AND_OBJECT_TYPE_TAG_NAMES); + public static final String OBJECT_UPLOAD_BYTES_TOTAL = OBJECT_UPLOAD_BYTES + "-total"; + final MetricNameTemplate objectUploadBytesTotal = + new MetricNameTemplate(OBJECT_UPLOAD_BYTES_TOTAL, METRIC_GROUP, ""); + final MetricNameTemplate objectUploadBytesTotalByObjectType = + new MetricNameTemplate(OBJECT_UPLOAD_BYTES_TOTAL, METRIC_GROUP, "", OBJECT_TYPE_TAG_NAMES); + final MetricNameTemplate objectUploadBytesTotalByTopic = + new MetricNameTemplate(OBJECT_UPLOAD_BYTES_TOTAL, METRIC_GROUP, "", TOPIC_TAG_NAMES); + final MetricNameTemplate objectUploadBytesTotalByTopicAndObjectType = + new MetricNameTemplate(OBJECT_UPLOAD_BYTES_TOTAL, METRIC_GROUP, "", TOPIC_AND_OBJECT_TYPE_TAG_NAMES); + final MetricNameTemplate objectUploadBytesTotalByTopicPartition = + new MetricNameTemplate(OBJECT_UPLOAD_BYTES_TOTAL, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES); + final MetricNameTemplate objectUploadBytesTotalByTopicPartitionAndObjectType = + new MetricNameTemplate(OBJECT_UPLOAD_BYTES_TOTAL, METRIC_GROUP, "", TOPIC_PARTITION_AND_OBJECT_TYPE_TAG_NAMES); + public static String sensorName(final String name) { return name; } + public static String sensorNameByObjectType(final ObjectKey.Suffix suffix, final String name) { + return TAG_NAME_OBJECT_TYPE + "." + suffix.value + "." + name; + } + public static String sensorNameByTopic(final TopicPartition topicPartition, final String name) { - return "topic." + topicPartition.topic() + "." + name; + return TAG_NAME_TOPIC + "." + topicPartition.topic() + "." + name; + } + + public static String sensorNameByTopicAndObjectType(final TopicPartition topicPartition, + final ObjectKey.Suffix suffix, + final String name) { + return TAG_NAME_TOPIC + "." + topicPartition.topic() + "." + + TAG_NAME_OBJECT_TYPE + "." + suffix.value + + "." + name; } public static String sensorNameByTopicPartition(final TopicPartition topicPartition, final String name) { - return "topic." + topicPartition.topic() + ".partition." + topicPartition.partition() + "." + name; + return TAG_NAME_TOPIC + "." + topicPartition.topic() + + "." + TAG_NAME_PARTITION + "." + topicPartition.partition() + + "." + name; + } + + public static String sensorNameByTopicPartitionAndObjectType(final TopicPartition topicPartition, + final ObjectKey.Suffix suffix, + final String name) { + return TAG_NAME_TOPIC + "." + topicPartition.topic() + + "." + TAG_NAME_PARTITION + "." + topicPartition.partition() + + "." + TAG_NAME_OBJECT_TYPE + "." + suffix.value + + "." + name; } static Map topicTags(final TopicPartition topicPartition) { - return Map.of("topic", topicPartition.topic()); + return Map.of(TAG_NAME_TOPIC, topicPartition.topic()); + } + + static Map topicAndObjectTypeTags(final TopicPartition topicPartition, + final ObjectKey.Suffix suffix) { + return Map.of( + TAG_NAME_TOPIC, topicPartition.topic(), + TAG_NAME_OBJECT_TYPE, suffix.value + ); } static Map topicPartitionTags(final TopicPartition topicPartition) { return Map.of( - "topic", topicPartition.topic(), - "partition", String.valueOf(topicPartition.partition()) + TAG_NAME_TOPIC, topicPartition.topic(), + TAG_NAME_PARTITION, String.valueOf(topicPartition.partition()) ); } + + static Map topicPartitionAndObjectTypeTags(final TopicPartition topicPartition, + final ObjectKey.Suffix suffix) { + return Map.of( + TAG_NAME_TOPIC, topicPartition.topic(), + TAG_NAME_PARTITION, String.valueOf(topicPartition.partition()), + TAG_NAME_OBJECT_TYPE, suffix.value + ); + } + + static Map objectTypeTags(final ObjectKey.Suffix suffix) { + return Map.of(TAG_NAME_OBJECT_TYPE, suffix.value); + } } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/SensorProvider.java b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/SensorProvider.java index a4e9f0d69..eb6538e58 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/SensorProvider.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/SensorProvider.java @@ -42,6 +42,12 @@ public SensorProvider(final Metrics metrics, this(metrics, name, Collections::emptyMap, Sensor.RecordingLevel.INFO); } + public SensorProvider(final Metrics metrics, + final String name, + final Sensor.RecordingLevel recordingLevel) { + this(metrics, name, Collections::emptyMap, recordingLevel); + } + public SensorProvider(final Metrics metrics, final String name, final Supplier> tagsSupplier) { diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/NoopStorageBackend.java b/core/src/test/java/io/aiven/kafka/tieredstorage/NoopStorageBackend.java index 5c35abd18..46a372e6a 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/NoopStorageBackend.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/NoopStorageBackend.java @@ -37,7 +37,8 @@ public void configure(final Map configs) { } @Override - public void upload(final InputStream inputStream, final String key) throws StorageBackendException { + public long upload(final InputStream inputStream, final String key) throws StorageBackendException { + return 0; } @Override diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java index 53239b08f..4f00afbf8 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java @@ -101,9 +101,10 @@ void setup(@TempDir final Path tmpDir, final Path sourceFile = source.resolve("file"); Files.write(sourceFile, new byte[LOG_SEGMENT_BYTES]); + final var leaderEpoch = ByteBuffer.wrap(new byte[LOG_SEGMENT_BYTES]); logSegmentData = new LogSegmentData( sourceFile, sourceFile, sourceFile, Optional.empty(), sourceFile, - ByteBuffer.allocate(0) + leaderEpoch ); } @@ -111,13 +112,16 @@ void setup(@TempDir final Path tmpDir, @ValueSource(strings = {"", ",topic=topic", ",topic=topic,partition=0"}) void metricsShouldBeReported(final String tags) throws RemoteStorageException, JMException { rsm.copyLogSegmentData(REMOTE_LOG_SEGMENT_METADATA, logSegmentData); + logSegmentData.leaderEpochIndex().flip(); // so leader epoch can be consumed again rsm.copyLogSegmentData(REMOTE_LOG_SEGMENT_METADATA, logSegmentData); + logSegmentData.leaderEpochIndex().flip(); rsm.copyLogSegmentData(REMOTE_LOG_SEGMENT_METADATA, logSegmentData); + logSegmentData.leaderEpochIndex().flip(); rsm.fetchLogSegment(REMOTE_LOG_SEGMENT_METADATA, 0); - final ObjectName metricName = ObjectName.getInstance( - "aiven.kafka.server.tieredstorage:type=remote-storage-manager-metrics" + tags); + final var objectName = "aiven.kafka.server.tieredstorage:type=remote-storage-manager-metrics" + tags; + final ObjectName metricName = ObjectName.getInstance(objectName); assertThat(MBEAN_SERVER.getAttribute(metricName, "segment-copy-total")) .isEqualTo(3.0); assertThat(MBEAN_SERVER.getAttribute(metricName, "segment-copy-rate")) @@ -143,6 +147,44 @@ void metricsShouldBeReported(final String tags) throws RemoteStorageException, J assertThat(MBEAN_SERVER.getAttribute(metricName, "segment-fetch-requested-bytes-total")) .isEqualTo(10.0); + assertThat(MBEAN_SERVER.getAttribute(metricName, "object-upload-total")) + .isEqualTo(18.0); + assertThat(MBEAN_SERVER.getAttribute(metricName, "object-upload-rate")) + .isEqualTo(18.0 / METRIC_TIME_WINDOW_SEC); + + assertThat(MBEAN_SERVER.getAttribute(metricName, "object-upload-bytes-total")) + .isEqualTo(657.0); + assertThat(MBEAN_SERVER.getAttribute(metricName, "object-upload-bytes-rate")) + .isEqualTo(657.0 / METRIC_TIME_WINDOW_SEC); + + for (final var suffix : ObjectKey.Suffix.values()) { + final ObjectName storageMetricsName = ObjectName.getInstance(objectName + ",object-type=" + suffix.value); + switch (suffix) { + case TXN_INDEX: + break; + case MANIFEST: + assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total")) + .isEqualTo(3.0); + assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total")) + .isEqualTo(3.0); + assertThat((double) MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-rate")) + .isGreaterThan(0.0); + assertThat((double) MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-total")) + .isGreaterThan(0.0); + break; + default: + assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-rate")) + .isEqualTo(3.0 / METRIC_TIME_WINDOW_SEC); + assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-total")) + .isEqualTo(3.0); + assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-rate")) + .isEqualTo(30.0 / METRIC_TIME_WINDOW_SEC); + assertThat(MBEAN_SERVER.getAttribute(storageMetricsName, "object-upload-bytes-total")) + .isEqualTo(30.0); + break; + } + } + rsm.deleteLogSegmentData(REMOTE_LOG_SEGMENT_METADATA); rsm.deleteLogSegmentData(REMOTE_LOG_SEGMENT_METADATA); @@ -182,7 +224,7 @@ public InputStream fetch(final String key, final BytesRange range) { } @Override - public void upload(final InputStream inputStream, final String key) throws StorageBackendException { + public long upload(final InputStream inputStream, final String key) throws StorageBackendException { throw new StorageBackendException("something wrong"); } diff --git a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java index 0b3e8f76f..a31366afe 100644 --- a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java +++ b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java @@ -21,7 +21,8 @@ public interface ObjectUploader { /** * @param inputStream content to upload. Not closed as part of the upload. - * @param key path to an object within a storage backend. + * @param key path to an object within a storage backend. + * @return number of bytes uploaded */ - void upload(InputStream inputStream, String key) throws StorageBackendException; + long upload(InputStream inputStream, String key) throws StorageBackendException; } diff --git a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java index 57dd906b3..45c5dd38a 100644 --- a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java +++ b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java @@ -35,7 +35,8 @@ public abstract class BaseStorageTest { void testUploadFetchDelete() throws IOException, StorageBackendException { final byte[] data = "some file".getBytes(); final InputStream file = new ByteArrayInputStream(data); - storage().upload(file, TOPIC_PARTITION_SEGMENT_KEY); + final long size = storage().upload(file, TOPIC_PARTITION_SEGMENT_KEY); + assertThat(size).isEqualTo(data.length); try (final InputStream fetch = storage().fetch(TOPIC_PARTITION_SEGMENT_KEY)) { final String r = new String(fetch.readAllBytes()); @@ -59,7 +60,8 @@ void testUploadFetchDelete() throws IOException, StorageBackendException { void testUploadANewFile() throws StorageBackendException, IOException { final String content = "content"; final ByteArrayInputStream in = new ByteArrayInputStream(content.getBytes()); - storage().upload(in, TOPIC_PARTITION_SEGMENT_KEY); + final long size = storage().upload(in, TOPIC_PARTITION_SEGMENT_KEY); + assertThat(size).isEqualTo(content.length()); assertThat(in).isEmpty(); in.close(); diff --git a/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java b/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java index b54f6285b..120dbb0b2 100644 --- a/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java +++ b/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java @@ -18,10 +18,10 @@ import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.util.Map; import io.aiven.kafka.tieredstorage.storage.BytesRange; @@ -47,13 +47,12 @@ public void configure(final Map configs) { } @Override - public void upload(final InputStream inputStream, final String key) throws StorageBackendException { + public long upload(final InputStream inputStream, final String key) throws StorageBackendException { try { final Path path = fsRoot.resolve(key); Files.createDirectories(path.getParent()); - try (final OutputStream outputStream = Files.newOutputStream(path)) { - inputStream.transferTo(outputStream); - } + Files.copy(inputStream, path, StandardCopyOption.REPLACE_EXISTING); + return Files.size(path); } catch (final IOException e) { throw new StorageBackendException("Failed to upload " + key, e); } diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java index 49f682c63..3e8f5eb98 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java @@ -57,6 +57,7 @@ public class S3MultiPartOutputStream extends OutputStream { private final List partETags = new ArrayList<>(); private boolean closed; + private long processedBytes = 0L; public S3MultiPartOutputStream(final String bucketName, final String key, @@ -93,6 +94,7 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti final int offset = source.arrayOffset() + source.position(); // TODO: get rid of this array copying partBuffer.put(source.array(), offset, transferred); + processedBytes += transferred; source.position(source.position() + transferred); if (!partBuffer.hasRemaining()) { flushBuffer(0, partSize); @@ -168,4 +170,8 @@ private void uploadPart(final InputStream in, final int actualPartSize) { final UploadPartResult uploadResult = client.uploadPart(uploadPartRequest); partETags.add(uploadResult.getPartETag()); } + + public long processedBytes() { + return processedBytes; + } } diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java index bc8b9e100..c086ce203 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java @@ -46,9 +46,10 @@ public void configure(final Map configs) { } @Override - public void upload(final InputStream inputStream, final String key) throws StorageBackendException { + public long upload(final InputStream in, final String key) throws StorageBackendException { try (final var out = s3OutputStream(key)) { - inputStream.transferTo(out); + in.transferTo(out); + return out.processedBytes(); } catch (final AmazonS3Exception | IOException e) { throw new StorageBackendException("Failed to upload " + key, e); }