From 9d260c2ecdf83558c8e85d279bb938f96d4f25d4 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 22 Dec 2023 14:57:42 -0500 Subject: [PATCH] feat: add async cache for segment indexes Indexes are currently cached on broker-side but with a synchronous cache that is not able to source value when interrupted. This new cache will fetch index asynchronously, and retries would have chances to find the value sourced already. --- checkstyle/suppressions.xml | 1 + .../tieredstorage/RemoteStorageManager.java | 55 +++- .../tieredstorage/config/CacheConfig.java | 85 +++++ .../config/RemoteStorageManagerConfig.java | 5 + .../index/MemorySegmentIndexesCache.java | 130 ++++++++ .../fetch/index/SegmentIndexKey.java | 58 ++++ .../fetch/index/SegmentIndexesCache.java | 35 +++ .../tieredstorage/config/CacheConfigTest.java | 123 ++++++++ .../fetch/SegmentIndexKeyTest.java | 59 ++++ .../index/MemorySegmentIndexesCacheTest.java | 294 ++++++++++++++++++ 10 files changed, 832 insertions(+), 13 deletions(-) create mode 100644 core/src/main/java/io/aiven/kafka/tieredstorage/config/CacheConfig.java create mode 100644 core/src/main/java/io/aiven/kafka/tieredstorage/fetch/index/MemorySegmentIndexesCache.java create mode 100644 core/src/main/java/io/aiven/kafka/tieredstorage/fetch/index/SegmentIndexKey.java create mode 100644 core/src/main/java/io/aiven/kafka/tieredstorage/fetch/index/SegmentIndexesCache.java create mode 100644 core/src/test/java/io/aiven/kafka/tieredstorage/config/CacheConfigTest.java create mode 100644 core/src/test/java/io/aiven/kafka/tieredstorage/fetch/SegmentIndexKeyTest.java create mode 100644 core/src/test/java/io/aiven/kafka/tieredstorage/fetch/index/MemorySegmentIndexesCacheTest.java diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index e4415b5e5..663bc3d99 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -23,6 +23,7 @@ + diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index 4b0d52c5a..c3c8b7926 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -52,8 +52,11 @@ import io.aiven.kafka.tieredstorage.fetch.ChunkManagerFactory; import io.aiven.kafka.tieredstorage.fetch.FetchChunkEnumeration; import io.aiven.kafka.tieredstorage.fetch.KeyNotFoundRuntimeException; +import io.aiven.kafka.tieredstorage.fetch.index.MemorySegmentIndexesCache; +import io.aiven.kafka.tieredstorage.fetch.index.SegmentIndexesCache; import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata; import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1; +import io.aiven.kafka.tieredstorage.manifest.SegmentIndex; import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1; import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1Builder; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; @@ -122,6 +125,7 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote. private Set customMetadataFields; private SegmentManifestProvider segmentManifestProvider; + private SegmentIndexesCache segmentIndexesCache; public RemoteStorageManager() { this(Time.SYSTEM); @@ -168,6 +172,9 @@ public void configure(final Map configs) { mapper, executor); + segmentIndexesCache = new MemorySegmentIndexesCache(); + segmentIndexesCache.configure(config.fetchIndexesCacheConfigs()); + customMetadataSerde = new SegmentCustomMetadataSerde(); customMetadataFields = config.customMetadataKeysIncluded(); } @@ -513,19 +520,11 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet if (segmentIndex == null) { throw new RemoteResourceNotFoundException("Index " + indexType + " not found on " + key); } - final var in = fetcher.fetch(key, segmentIndex.range()); - - DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(in); - final Optional encryptionMetadata = segmentManifest.encryption(); - if (encryptionMetadata.isPresent()) { - detransformEnum = new DecryptionChunkEnumeration( - detransformEnum, - encryptionMetadata.get().ivSize(), - encryptedChunk -> aesEncryptionProvider.decryptionCipher(encryptedChunk, encryptionMetadata.get()) - ); - } - final DetransformFinisher detransformFinisher = new DetransformFinisher(detransformEnum); - return detransformFinisher.toInputStream(); + return segmentIndexesCache.get( + key, + indexType, + () -> fetchIndexBytes(key, segmentIndex, segmentManifest) + ); } catch (final RemoteResourceNotFoundException e) { throw e; } catch (final KeyNotFoundException e) { @@ -535,6 +534,36 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet } } + private byte[] fetchIndexBytes( + final ObjectKey key, + final SegmentIndex segmentIndex, + final SegmentManifest segmentManifest + ) { + final InputStream in; + try { + in = fetcher.fetch(key, segmentIndex.range()); + } catch (final StorageBackendException e) { + throw new RuntimeException("Error fetching index from remote storage", e); + } + + DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(in); + final Optional encryptionMetadata = segmentManifest.encryption(); + if (encryptionMetadata.isPresent()) { + detransformEnum = new DecryptionChunkEnumeration( + detransformEnum, + encryptionMetadata.get().ivSize(), + encryptedChunk -> aesEncryptionProvider.decryptionCipher(encryptedChunk, + encryptionMetadata.get()) + ); + } + final var detransformFinisher = new DetransformFinisher(detransformEnum); + try (final var is = detransformFinisher.toInputStream()) { + return is.readAllBytes(); + } catch (final IOException e) { + throw new RuntimeException("Error reading de-transformed index bytes", e); + } + } + private ObjectKey objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final ObjectKeyFactory.Suffix suffix) { final ObjectKey segmentKey; diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/config/CacheConfig.java b/core/src/main/java/io/aiven/kafka/tieredstorage/config/CacheConfig.java new file mode 100644 index 000000000..d082474b3 --- /dev/null +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/config/CacheConfig.java @@ -0,0 +1,85 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.config; + +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE; + +public class CacheConfig extends AbstractConfig { + private static final String CACHE_SIZE_CONFIG = "size"; + private static final String CACHE_SIZE_DOC = "Cache size in bytes, where \"-1\" represents unbounded cache"; + private static final String CACHE_RETENTION_CONFIG = "retention.ms"; + private static final String CACHE_RETENTION_DOC = "Cache retention time ms, " + + "where \"-1\" represents infinite retention"; + private static final long DEFAULT_CACHE_RETENTION_MS = 600_000; + + private static ConfigDef addCacheConfigs(final OptionalLong maybeDefaultSize) { + final ConfigDef configDef = new ConfigDef(); + Object defaultValue = NO_DEFAULT_VALUE; + if (maybeDefaultSize.isPresent()) { + defaultValue = maybeDefaultSize.getAsLong(); + } + configDef.define( + CACHE_SIZE_CONFIG, + ConfigDef.Type.LONG, + defaultValue, + ConfigDef.Range.between(-1L, Long.MAX_VALUE), + ConfigDef.Importance.MEDIUM, + CACHE_SIZE_DOC + ); + configDef.define( + CACHE_RETENTION_CONFIG, + ConfigDef.Type.LONG, + DEFAULT_CACHE_RETENTION_MS, + ConfigDef.Range.between(-1L, Long.MAX_VALUE), + ConfigDef.Importance.MEDIUM, + CACHE_RETENTION_DOC + ); + return configDef; + } + + public CacheConfig(final Map props) { + super(addCacheConfigs(OptionalLong.empty()), props); + } + + public CacheConfig(final Map props, final long defaultSize) { + super(addCacheConfigs(OptionalLong.of(defaultSize)), props); + } + + public Optional cacheSize() { + final Long rawValue = getLong(CACHE_SIZE_CONFIG); + if (rawValue == -1) { + return Optional.empty(); + } + return Optional.of(rawValue); + } + + public Optional cacheRetention() { + final Long rawValue = getLong(CACHE_RETENTION_CONFIG); + if (rawValue == -1) { + return Optional.empty(); + } + return Optional.of(Duration.ofMillis(rawValue)); + } +} diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java b/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java index cf295ba72..f8eda53ca 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java @@ -41,6 +41,7 @@ public class RemoteStorageManagerConfig extends AbstractConfig { private static final String STORAGE_PREFIX = "storage."; + private static final String FETCH_INDEXES_CACHE_PREFIX = "fetch.indexes.cache."; private static final String STORAGE_BACKEND_CLASS_CONFIG = STORAGE_PREFIX + "backend.class"; private static final String STORAGE_BACKEND_CLASS_DOC = "The storage backend implementation class"; @@ -393,4 +394,8 @@ public Set customMetadataKeysIncluded() { .map(SegmentCustomMetadataField::valueOf) .collect(Collectors.toSet()); } + + public Map fetchIndexesCacheConfigs() { + return originalsWithPrefix(FETCH_INDEXES_CACHE_PREFIX); + } } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/index/MemorySegmentIndexesCache.java b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/index/MemorySegmentIndexesCache.java new file mode 100644 index 000000000..8214587e2 --- /dev/null +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/index/MemorySegmentIndexesCache.java @@ -0,0 +1,130 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.fetch.index; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; + +import io.aiven.kafka.tieredstorage.config.CacheConfig; +import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; +import io.aiven.kafka.tieredstorage.storage.StorageBackendException; + +import com.github.benmanes.caffeine.cache.AsyncCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalListener; +import com.github.benmanes.caffeine.cache.Scheduler; +import com.github.benmanes.caffeine.cache.Weigher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MemorySegmentIndexesCache implements SegmentIndexesCache { + private static final Logger log = LoggerFactory.getLogger(MemorySegmentIndexesCache.class); + + private static final long GET_TIMEOUT_SEC = 10; + private static final long DEFAULT_MAX_SIZE_BYTES = 10 * 1024 * 1024; + private static final String METRIC_GROUP = "segment-indexes-cache"; + + private final Executor executor = new ForkJoinPool(); + private final CaffeineStatsCounter statsCounter = new CaffeineStatsCounter(METRIC_GROUP); + + protected AsyncCache cache; + + // for testing + RemovalListener removalListener() { + return (key, content, cause) -> log.debug("Deleted cached value for key {} from cache." + + " The reason of the deletion is {}", key, cause); + } + + private static Weigher weigher() { + return (key, value) -> value.length; + } + + protected AsyncCache buildCache(final CacheConfig config) { + final Caffeine cacheBuilder = Caffeine.newBuilder(); + config.cacheSize().ifPresent(maximumWeight -> cacheBuilder.maximumWeight(maximumWeight).weigher(weigher())); + config.cacheRetention().ifPresent(cacheBuilder::expireAfterAccess); + final var cache = cacheBuilder.evictionListener(removalListener()) + .scheduler(Scheduler.systemScheduler()) + .executor(executor) + .recordStats(() -> statsCounter) + .buildAsync(); + statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize); + return cache; + } + + @Override + public InputStream get( + final ObjectKey objectKey, + final IndexType indexType, + final Supplier indexSupplier + ) throws StorageBackendException, IOException { + try { + return cache.asMap() + .compute(new SegmentIndexKey(objectKey, indexType), (key, val) -> { + if (val == null) { + statsCounter.recordMiss(); + return CompletableFuture.supplyAsync(indexSupplier, executor); + } else { + statsCounter.recordHit(); + return val; + } + }) + .thenApplyAsync(ByteArrayInputStream::new, executor) + .get(GET_TIMEOUT_SEC, TimeUnit.SECONDS); + } catch (final ExecutionException e) { + // Unwrap previously wrapped exceptions if possible. + Throwable cause = e.getCause(); + if (cause instanceof RuntimeException) { + cause = cause.getCause(); + } + + // We don't really expect this case, but handle it nevertheless. + if (cause == null) { + throw new RuntimeException(e); + } + + if (cause instanceof StorageBackendException) { + throw (StorageBackendException) cause; + } + if (cause instanceof IOException) { + throw (IOException) cause; + } + + throw new RuntimeException(e); + } catch (final InterruptedException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + @Override + public void configure(final Map configs) { + final var config = new CacheConfig(configs, DEFAULT_MAX_SIZE_BYTES); + this.cache = buildCache(config); + } +} diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/index/SegmentIndexKey.java b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/index/SegmentIndexKey.java new file mode 100644 index 000000000..4c3a97264 --- /dev/null +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/index/SegmentIndexKey.java @@ -0,0 +1,58 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.fetch.index; + +import java.util.Objects; + +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; + +import io.aiven.kafka.tieredstorage.storage.ObjectKey; + +public class SegmentIndexKey { + public final ObjectKey indexesKey; + public final RemoteStorageManager.IndexType indexType; + + public SegmentIndexKey(final ObjectKey indexesKey, final RemoteStorageManager.IndexType indexType) { + this.indexesKey = indexesKey; + this.indexType = indexType; + } + + @Override + public String toString() { + return "SegmentIndexKey{" + + "indexesKey=" + indexesKey + + ", indexType=" + indexType + + '}'; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final SegmentIndexKey that = (SegmentIndexKey) o; + return Objects.equals(indexesKey, that.indexesKey) && indexType == that.indexType; + } + + @Override + public int hashCode() { + return Objects.hash(indexesKey, indexType); + } +} diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/index/SegmentIndexesCache.java b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/index/SegmentIndexesCache.java new file mode 100644 index 000000000..59ae7c1a8 --- /dev/null +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/index/SegmentIndexesCache.java @@ -0,0 +1,35 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.fetch.index; + +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Supplier; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; + +import io.aiven.kafka.tieredstorage.storage.ObjectKey; +import io.aiven.kafka.tieredstorage.storage.StorageBackendException; + +public interface SegmentIndexesCache extends Configurable { + InputStream get( + final ObjectKey key, + IndexType indexType, + final Supplier indexSupplier + ) throws StorageBackendException, IOException; +} diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/config/CacheConfigTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/config/CacheConfigTest.java new file mode 100644 index 000000000..4791bcd5d --- /dev/null +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/config/CacheConfigTest.java @@ -0,0 +1,123 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.config; + +import java.time.Duration; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigException; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class CacheConfigTest { + + @Test + void cacheUnboundedSize() { + final CacheConfig config = new CacheConfig( + Map.of("size", "-1") + ); + + assertThat(config.cacheSize()).isNotPresent(); + assertThat(config.cacheRetention()).hasValue(Duration.ofMinutes(10)); + } + + @Test + void cacheUnboundedWithDefaultSize() { + final CacheConfig config = new CacheConfig( + Map.of(), + -1 + ); + + assertThat(config.cacheSize()).isNotPresent(); + assertThat(config.cacheRetention()).hasValue(Duration.ofMinutes(10)); + } + + @Test + void cacheSizeBounded() { + final CacheConfig config = new CacheConfig( + Map.of("size", "1024") + ); + assertThat(config.cacheSize()).hasValue(1024L); + } + + @Test + void cacheSizeBoundedWithDefaultSize() { + final CacheConfig config = new CacheConfig( + Map.of(), + 1024 + ); + assertThat(config.cacheSize()).hasValue(1024L); + } + + @Test + void invalidCacheSize() { + assertThatThrownBy(() -> new CacheConfig( + Map.of("size", "-2") + )).isInstanceOf(ConfigException.class) + .hasMessage("Invalid value -2 for configuration size: Value must be at least -1"); + + assertThatThrownBy(() -> new CacheConfig( + Map.of(), + -2 + )).isInstanceOf(ConfigException.class) + .hasMessage("Invalid value -2 for configuration size: Value must be at least -1"); + } + + @Test + void cacheSizeUnspecified() { + assertThatThrownBy(() -> new CacheConfig( + Map.of() + )).isInstanceOf(ConfigException.class) + .hasMessage("Missing required configuration \"size\" which has no default value."); + } + + @Test + void cacheRetentionForever() { + final CacheConfig config = new CacheConfig( + Map.of( + "retention.ms", "-1", + "size", "-1" + ) + ); + assertThat(config.cacheRetention()).isNotPresent(); + } + + @Test + void cacheRetentionLimited() { + final CacheConfig config = new CacheConfig( + Map.of( + "retention.ms", "60000", + "size", "-1" + ) + ); + assertThat(config.cacheRetention()).hasValue(Duration.ofMillis(60000)); + } + + @Test + void invalidRetention() { + assertThatThrownBy(() -> new CacheConfig( + Map.of( + "retention.ms", "-2", + "size", "-1" + ) + )).isInstanceOf(ConfigException.class) + .hasMessage("Invalid value -2 for configuration retention.ms: Value must be at least -1"); + } +} diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/SegmentIndexKeyTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/SegmentIndexKeyTest.java new file mode 100644 index 000000000..aa90e6a05 --- /dev/null +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/SegmentIndexKeyTest.java @@ -0,0 +1,59 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.fetch; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; + +import io.aiven.kafka.tieredstorage.fetch.index.SegmentIndexKey; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class SegmentIndexKeyTest { + static final ObjectKey OBJECT_KEY_1 = () -> "topic/" + Uuid.randomUuid(); + static final ObjectKey OBJECT_KEY_2 = () -> "topic/" + Uuid.randomUuid(); + + @Test + void identical() { + final var ck1 = new SegmentIndexKey(OBJECT_KEY_1, IndexType.OFFSET); + final var ck2 = new SegmentIndexKey(OBJECT_KEY_1, IndexType.OFFSET); + assertThat(ck1).isEqualTo(ck2); + assertThat(ck2).isEqualTo(ck1); + assertThat(ck1).hasSameHashCodeAs(ck2); + } + + @Test + void differentObjectKey() { + final var ck1 = new SegmentIndexKey(OBJECT_KEY_1, IndexType.OFFSET); + final var ck2 = new SegmentIndexKey(OBJECT_KEY_2, IndexType.OFFSET); + assertThat(ck1).isNotEqualTo(ck2); + assertThat(ck2).isNotEqualTo(ck1); + assertThat(ck1).doesNotHaveSameHashCodeAs(ck2); + } + + @Test + void differentIndexTypes() { + final var ck1 = new SegmentIndexKey(OBJECT_KEY_1, IndexType.OFFSET); + final var ck2 = new SegmentIndexKey(OBJECT_KEY_1, IndexType.TIMESTAMP); + assertThat(ck1).isNotEqualTo(ck2); + assertThat(ck2).isNotEqualTo(ck1); + assertThat(ck1).doesNotHaveSameHashCodeAs(ck2); + } +} diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/index/MemorySegmentIndexesCacheTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/index/MemorySegmentIndexesCacheTest.java new file mode 100644 index 000000000..83d4ee748 --- /dev/null +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/index/MemorySegmentIndexesCacheTest.java @@ -0,0 +1,294 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.fetch.index; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.Collections; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; + +import io.aiven.kafka.tieredstorage.ObjectKeyFactory; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; +import io.aiven.kafka.tieredstorage.storage.StorageBackendException; + +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mockingDetails; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class MemorySegmentIndexesCacheTest { + + static final RemoteLogSegmentMetadata REMOTE_LOG_SEGMENT_METADATA = + new RemoteLogSegmentMetadata( + new RemoteLogSegmentId( + new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic", 0)), + Uuid.randomUuid()), + 1, 100, -1, -1, 1L, + 10, Collections.singletonMap(1, 100L)); + private static final byte[] OFFSET_INDEX = "0123456789".getBytes(); + private static final byte[] TIME_INDEX = "1011121314".getBytes(); + + private MemorySegmentIndexesCache cache; + @Mock + private Supplier offsetIndexSupplier; + @Mock + private Supplier timeIndexSupplier; + private final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false); + + @BeforeEach + void setUp() { + cache = spy(new MemorySegmentIndexesCache()); + } + + @SuppressWarnings("unchecked") + @AfterEach + void tearDown() { + reset(offsetIndexSupplier); + reset(timeIndexSupplier); + } + + @Nested + class CacheTests { + @Mock + RemovalListener removalListener; + + @BeforeEach + void setUp() { + doAnswer(invocation -> removalListener).when(cache).removalListener(); + when(offsetIndexSupplier.get()).thenAnswer(invocation -> OFFSET_INDEX); + when(timeIndexSupplier.get()).thenAnswer(invocation -> TIME_INDEX); + } + + @SuppressWarnings("unchecked") + @AfterEach + void tearDown() { + reset(removalListener); + } + + @Test + void noEviction() throws IOException, StorageBackendException { + cache.configure(Map.of( + "size", "-1", + "retention.ms", "-1" + )); + assertThat(cache.cache.asMap()).isEmpty(); + + final ObjectKey key = objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.INDEXES); + final InputStream offsetIndex = cache.get( + key, + IndexType.OFFSET, + offsetIndexSupplier + ); + assertThat(offsetIndex).hasBinaryContent(OFFSET_INDEX); + assertThat(cache.cache.asMap()).hasSize(1); + final InputStream cachedOffsetIndex = cache.get( + key, + IndexType.OFFSET, + offsetIndexSupplier + ); + assertThat(cachedOffsetIndex).hasBinaryContent(OFFSET_INDEX); + verifyNoMoreInteractions(offsetIndexSupplier); + assertThat(cache.cache.asMap()).hasSize(1); + + final InputStream timeIndex = cache.get( + key, + IndexType.TIMESTAMP, + timeIndexSupplier + ); + assertThat(timeIndex).hasBinaryContent(TIME_INDEX); + assertThat(cache.cache.asMap()).hasSize(2); + final InputStream cachedTimeIndex = cache.get( + key, + IndexType.TIMESTAMP, + timeIndexSupplier + ); + assertThat(cachedTimeIndex).hasBinaryContent(TIME_INDEX); + verifyNoMoreInteractions(timeIndexSupplier); + assertThat(cache.cache.asMap()).hasSize(2); + + verifyNoMoreInteractions(removalListener); + } + + @Test + void timeBasedEviction() throws IOException, StorageBackendException, InterruptedException { + cache.configure(Map.of( + "size", "-1", + "retention.ms", "100" + )); + assertThat(cache.cache.asMap()).isEmpty(); + + final ObjectKey key = objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.INDEXES); + final InputStream offsetIndex = cache.get( + key, + IndexType.OFFSET, + offsetIndexSupplier + ); + assertThat(offsetIndex).hasBinaryContent(OFFSET_INDEX); + assertThat(cache.cache.asMap()).hasSize(1); + final InputStream cachedOffsetIndex = cache.get( + key, + IndexType.OFFSET, + offsetIndexSupplier + ); + assertThat(cachedOffsetIndex).hasBinaryContent(OFFSET_INDEX); + verifyNoMoreInteractions(offsetIndexSupplier); + assertThat(cache.cache.asMap()).hasSize(1); + + // Wait enough for the cache entry to be candidate to expire + Thread.sleep(100); + + final InputStream timeIndex = cache.get( + key, + IndexType.TIMESTAMP, + timeIndexSupplier + ); + assertThat(timeIndex).hasBinaryContent(TIME_INDEX); + assertThat(cache.cache.asMap()).isNotEmpty(); + final InputStream cachedTimeIndex = cache.get( + key, + IndexType.TIMESTAMP, + timeIndexSupplier + ); + assertThat(cachedTimeIndex).hasBinaryContent(TIME_INDEX); + verifyNoMoreInteractions(timeIndexSupplier); + assertThat(cache.cache.asMap()).isNotEmpty(); + + await().atMost(Duration.ofMillis(5000)).pollInterval(Duration.ofMillis(100)) + .until(() -> !mockingDetails(removalListener).getInvocations().isEmpty()); + + assertThat(cache.cache.asMap()).hasSize(1); + verify(removalListener) + .onRemoval( + argThat(argument -> argument.indexType == IndexType.OFFSET), + any(), + eq(RemovalCause.EXPIRED)); + } + + @Test + void sizeBasedEviction() throws IOException, StorageBackendException { + cache.configure(Map.of( + "size", "18", + "retention.ms", "-1" + )); + assertThat(cache.cache.asMap()).isEmpty(); + + final ObjectKey key = objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.INDEXES); + final InputStream offsetIndex = cache.get( + key, + IndexType.OFFSET, + offsetIndexSupplier + ); + assertThat(offsetIndex).hasBinaryContent(OFFSET_INDEX); + assertThat(cache.cache.asMap()).hasSize(1); + assertThat(cache.get( + key, + IndexType.OFFSET, + offsetIndexSupplier + )).hasBinaryContent(OFFSET_INDEX); + + // Fetching chunk 0 multiple times from the cache to guarantee that during the next fetch of not-yet-cached + // chunk 0 will not be evicted as least frequently accessed. + for (int i = 0; i < 50; i++) { + assertThat(cache.get( + key, + IndexType.OFFSET, + offsetIndexSupplier + )).hasBinaryContent(OFFSET_INDEX); + } + verifyNoMoreInteractions(offsetIndexSupplier); + assertThat(cache.cache.asMap()).hasSize(1); + + final InputStream timeIndex = cache.get( + key, + IndexType.TIMESTAMP, + timeIndexSupplier + ); + assertThat(timeIndex).hasBinaryContent(TIME_INDEX); + assertThat(cache.cache.asMap()).isNotEmpty(); + + await() + .atMost(Duration.ofSeconds(30)) // increase to reduce chance of flakiness + .pollDelay(Duration.ofSeconds(2)) + .pollInterval(Duration.ofMillis(10)) + .until(() -> !mockingDetails(removalListener).getInvocations().isEmpty()); + + assertThat(cache.cache.asMap()).hasSize(1); + verify(removalListener).onRemoval(any(SegmentIndexKey.class), any(), eq(RemovalCause.SIZE)); + } + } + + private static final String TEST_EXCEPTION_MESSAGE = "test_message"; + + static Stream failedFetching() { + return Stream.of( + Arguments.of(new StorageBackendException(TEST_EXCEPTION_MESSAGE), StorageBackendException.class), + Arguments.of(new IOException(TEST_EXCEPTION_MESSAGE), IOException.class) + ); + } + + @ParameterizedTest + @MethodSource + void failedFetching(final Throwable exception, final Class expectedExceptionClass) { + when(offsetIndexSupplier.get()) + .thenThrow(new RuntimeException(exception)); + + final Map configs = Map.of( + "retention.ms", "-1", + "size", "-1" + ); + cache.configure(configs); + + final ObjectKey key = objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.INDEXES); + assertThatThrownBy(() -> cache + .get(key, IndexType.OFFSET, offsetIndexSupplier)) + .isInstanceOf(expectedExceptionClass) + .hasMessage(TEST_EXCEPTION_MESSAGE); + } +}