Skip to content

Commit

Permalink
feat: add async cache for segment indexes
Browse files Browse the repository at this point in the history
Indexes are currently cached on broker-side but with a synchronous cache that is not able to source value when interrupted.
This new cache will fetch index asynchronously, and retries would have chances to find the value sourced already.
  • Loading branch information
jeqo committed Jan 10, 2024
1 parent 8b9fdee commit 9d260c2
Show file tree
Hide file tree
Showing 10 changed files with 832 additions and 13 deletions.
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<suppress checks="ClassFanOutComplexity" files=".*Test\.java"/>
<suppress checks="ClassFanOutComplexity" files="RemoteStorageManager.java"/>
<suppress checks="ClassFanOutComplexity" files="ChunkCache.java"/>
<suppress checks="ClassFanOutComplexity" files="MemorySegmentIndexesCache"/>
<suppress checks="ClassFanOutComplexity" files="AzureBlobStorage.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="CaffeineStatsCounter.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="S3StorageConfig.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@
import io.aiven.kafka.tieredstorage.fetch.ChunkManagerFactory;
import io.aiven.kafka.tieredstorage.fetch.FetchChunkEnumeration;
import io.aiven.kafka.tieredstorage.fetch.KeyNotFoundRuntimeException;
import io.aiven.kafka.tieredstorage.fetch.index.MemorySegmentIndexesCache;
import io.aiven.kafka.tieredstorage.fetch.index.SegmentIndexesCache;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1;
import io.aiven.kafka.tieredstorage.manifest.SegmentIndex;
import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1;
import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1Builder;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
Expand Down Expand Up @@ -122,6 +125,7 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote.
private Set<SegmentCustomMetadataField> customMetadataFields;

private SegmentManifestProvider segmentManifestProvider;
private SegmentIndexesCache segmentIndexesCache;

public RemoteStorageManager() {
this(Time.SYSTEM);
Expand Down Expand Up @@ -168,6 +172,9 @@ public void configure(final Map<String, ?> configs) {
mapper,
executor);

segmentIndexesCache = new MemorySegmentIndexesCache();
segmentIndexesCache.configure(config.fetchIndexesCacheConfigs());

customMetadataSerde = new SegmentCustomMetadataSerde();
customMetadataFields = config.customMetadataKeysIncluded();
}
Expand Down Expand Up @@ -513,19 +520,11 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet
if (segmentIndex == null) {
throw new RemoteResourceNotFoundException("Index " + indexType + " not found on " + key);
}
final var in = fetcher.fetch(key, segmentIndex.range());

DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(in);
final Optional<SegmentEncryptionMetadata> encryptionMetadata = segmentManifest.encryption();
if (encryptionMetadata.isPresent()) {
detransformEnum = new DecryptionChunkEnumeration(
detransformEnum,
encryptionMetadata.get().ivSize(),
encryptedChunk -> aesEncryptionProvider.decryptionCipher(encryptedChunk, encryptionMetadata.get())
);
}
final DetransformFinisher detransformFinisher = new DetransformFinisher(detransformEnum);
return detransformFinisher.toInputStream();
return segmentIndexesCache.get(
key,
indexType,
() -> fetchIndexBytes(key, segmentIndex, segmentManifest)
);
} catch (final RemoteResourceNotFoundException e) {
throw e;
} catch (final KeyNotFoundException e) {
Expand All @@ -535,6 +534,36 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet
}
}

private byte[] fetchIndexBytes(
final ObjectKey key,
final SegmentIndex segmentIndex,
final SegmentManifest segmentManifest
) {
final InputStream in;
try {
in = fetcher.fetch(key, segmentIndex.range());
} catch (final StorageBackendException e) {
throw new RuntimeException("Error fetching index from remote storage", e);
}

DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(in);
final Optional<SegmentEncryptionMetadata> encryptionMetadata = segmentManifest.encryption();
if (encryptionMetadata.isPresent()) {
detransformEnum = new DecryptionChunkEnumeration(
detransformEnum,
encryptionMetadata.get().ivSize(),
encryptedChunk -> aesEncryptionProvider.decryptionCipher(encryptedChunk,
encryptionMetadata.get())
);
}
final var detransformFinisher = new DetransformFinisher(detransformEnum);
try (final var is = detransformFinisher.toInputStream()) {
return is.readAllBytes();
} catch (final IOException e) {
throw new RuntimeException("Error reading de-transformed index bytes", e);
}
}

private ObjectKey objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final ObjectKeyFactory.Suffix suffix) {
final ObjectKey segmentKey;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.config;

import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;

public class CacheConfig extends AbstractConfig {
private static final String CACHE_SIZE_CONFIG = "size";
private static final String CACHE_SIZE_DOC = "Cache size in bytes, where \"-1\" represents unbounded cache";
private static final String CACHE_RETENTION_CONFIG = "retention.ms";
private static final String CACHE_RETENTION_DOC = "Cache retention time ms, "
+ "where \"-1\" represents infinite retention";
private static final long DEFAULT_CACHE_RETENTION_MS = 600_000;

private static ConfigDef addCacheConfigs(final OptionalLong maybeDefaultSize) {
final ConfigDef configDef = new ConfigDef();
Object defaultValue = NO_DEFAULT_VALUE;
if (maybeDefaultSize.isPresent()) {
defaultValue = maybeDefaultSize.getAsLong();
}
configDef.define(
CACHE_SIZE_CONFIG,
ConfigDef.Type.LONG,
defaultValue,
ConfigDef.Range.between(-1L, Long.MAX_VALUE),
ConfigDef.Importance.MEDIUM,
CACHE_SIZE_DOC
);
configDef.define(
CACHE_RETENTION_CONFIG,
ConfigDef.Type.LONG,
DEFAULT_CACHE_RETENTION_MS,
ConfigDef.Range.between(-1L, Long.MAX_VALUE),
ConfigDef.Importance.MEDIUM,
CACHE_RETENTION_DOC
);
return configDef;
}

public CacheConfig(final Map<String, ?> props) {
super(addCacheConfigs(OptionalLong.empty()), props);
}

public CacheConfig(final Map<String, ?> props, final long defaultSize) {
super(addCacheConfigs(OptionalLong.of(defaultSize)), props);
}

public Optional<Long> cacheSize() {
final Long rawValue = getLong(CACHE_SIZE_CONFIG);
if (rawValue == -1) {
return Optional.empty();
}
return Optional.of(rawValue);
}

public Optional<Duration> cacheRetention() {
final Long rawValue = getLong(CACHE_RETENTION_CONFIG);
if (rawValue == -1) {
return Optional.empty();
}
return Optional.of(Duration.ofMillis(rawValue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

public class RemoteStorageManagerConfig extends AbstractConfig {
private static final String STORAGE_PREFIX = "storage.";
private static final String FETCH_INDEXES_CACHE_PREFIX = "fetch.indexes.cache.";

private static final String STORAGE_BACKEND_CLASS_CONFIG = STORAGE_PREFIX + "backend.class";
private static final String STORAGE_BACKEND_CLASS_DOC = "The storage backend implementation class";
Expand Down Expand Up @@ -393,4 +394,8 @@ public Set<SegmentCustomMetadataField> customMetadataKeysIncluded() {
.map(SegmentCustomMetadataField::valueOf)
.collect(Collectors.toSet());
}

public Map<String, ?> fetchIndexesCacheConfigs() {
return originalsWithPrefix(FETCH_INDEXES_CACHE_PREFIX);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.fetch.index;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;

import io.aiven.kafka.tieredstorage.config.CacheConfig;
import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.benmanes.caffeine.cache.Weigher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MemorySegmentIndexesCache implements SegmentIndexesCache {
private static final Logger log = LoggerFactory.getLogger(MemorySegmentIndexesCache.class);

private static final long GET_TIMEOUT_SEC = 10;
private static final long DEFAULT_MAX_SIZE_BYTES = 10 * 1024 * 1024;
private static final String METRIC_GROUP = "segment-indexes-cache";

private final Executor executor = new ForkJoinPool();
private final CaffeineStatsCounter statsCounter = new CaffeineStatsCounter(METRIC_GROUP);

protected AsyncCache<SegmentIndexKey, byte[]> cache;

// for testing
RemovalListener<SegmentIndexKey, byte[]> removalListener() {
return (key, content, cause) -> log.debug("Deleted cached value for key {} from cache."
+ " The reason of the deletion is {}", key, cause);
}

private static Weigher<SegmentIndexKey, byte[]> weigher() {
return (key, value) -> value.length;
}

protected AsyncCache<SegmentIndexKey, byte[]> buildCache(final CacheConfig config) {
final Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
config.cacheSize().ifPresent(maximumWeight -> cacheBuilder.maximumWeight(maximumWeight).weigher(weigher()));
config.cacheRetention().ifPresent(cacheBuilder::expireAfterAccess);
final var cache = cacheBuilder.evictionListener(removalListener())
.scheduler(Scheduler.systemScheduler())
.executor(executor)
.recordStats(() -> statsCounter)
.buildAsync();
statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize);
return cache;
}

@Override
public InputStream get(
final ObjectKey objectKey,
final IndexType indexType,
final Supplier<byte[]> indexSupplier
) throws StorageBackendException, IOException {
try {
return cache.asMap()
.compute(new SegmentIndexKey(objectKey, indexType), (key, val) -> {
if (val == null) {
statsCounter.recordMiss();
return CompletableFuture.supplyAsync(indexSupplier, executor);
} else {
statsCounter.recordHit();
return val;
}
})
.thenApplyAsync(ByteArrayInputStream::new, executor)
.get(GET_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (final ExecutionException e) {
// Unwrap previously wrapped exceptions if possible.
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
cause = cause.getCause();
}

// We don't really expect this case, but handle it nevertheless.
if (cause == null) {
throw new RuntimeException(e);
}

if (cause instanceof StorageBackendException) {
throw (StorageBackendException) cause;
}
if (cause instanceof IOException) {
throw (IOException) cause;
}

throw new RuntimeException(e);
} catch (final InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
}
}

@Override
public void configure(final Map<String, ?> configs) {
final var config = new CacheConfig(configs, DEFAULT_MAX_SIZE_BYTES);
this.cache = buildCache(config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.fetch.index;

import java.util.Objects;

import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;

import io.aiven.kafka.tieredstorage.storage.ObjectKey;

public class SegmentIndexKey {
public final ObjectKey indexesKey;
public final RemoteStorageManager.IndexType indexType;

public SegmentIndexKey(final ObjectKey indexesKey, final RemoteStorageManager.IndexType indexType) {
this.indexesKey = indexesKey;
this.indexType = indexType;
}

@Override
public String toString() {
return "SegmentIndexKey{"
+ "indexesKey=" + indexesKey
+ ", indexType=" + indexType
+ '}';
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final SegmentIndexKey that = (SegmentIndexKey) o;
return Objects.equals(indexesKey, that.indexesKey) && indexType == that.indexType;
}

@Override
public int hashCode() {
return Objects.hash(indexesKey, indexType);
}
}
Loading

0 comments on commit 9d260c2

Please sign in to comment.