From c5d7217522f7237e4080f7298fd8528a4340f3fb Mon Sep 17 00:00:00 2001 From: Sagar <99425694+sgup432@users.noreply.github.com> Date: Wed, 8 May 2024 09:15:36 -0700 Subject: [PATCH] Fix IndicesRequestCache clean up logic (#13597) Signed-off-by: Sagar Upadhyaya Co-authored-by: Sagar Upadhyaya --- .../indices/IndicesRequestCache.java | 28 ++-- .../IRCKeyWriteableSerializerTests.java | 6 +- .../indices/IndicesRequestCacheTests.java | 121 ++++++++++++++++++ 3 files changed, 144 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 1b1820836b66f..44af83bb839c1 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -53,6 +53,7 @@ import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder; import org.opensearch.common.cache.store.config.CacheConfig; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Setting; @@ -410,7 +411,8 @@ static class Key implements Accountable, Writeable { this.shardId = in.readOptionalWriteable(ShardId::new); this.readerCacheKeyId = in.readOptionalString(); this.value = in.readBytesReference(); - this.indexShardHashCode = in.readInt(); + this.indexShardHashCode = in.readInt(); // We are serializing/de-serializing this as we need to store the + // key as part of tiered/disk cache. The key is not passed between nodes at this point. } @Override @@ -450,7 +452,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(shardId); out.writeOptionalString(readerCacheKeyId); out.writeBytesReference(value); - out.writeInt(indexShardHashCode); + out.writeInt(indexShardHashCode); // We are serializing/de-serializing this as we need to store the + // key as part of tiered/disk cache. The key is not passed between nodes at this point. } } @@ -713,15 +716,16 @@ private synchronized void cleanCache(double stalenessThreshold) { // Contains CleanupKey objects with open shard but invalidated readerCacheKeyId. final Set cleanupKeysFromOutdatedReaders = new HashSet<>(); // Contains CleanupKey objects of a closed shard. - final Set cleanupKeysFromClosedShards = new HashSet<>(); + final Set> cleanupKeysFromClosedShards = new HashSet<>(); for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { CleanupKey cleanupKey = iterator.next(); iterator.remove(); if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) { // null indicates full cleanup, as does a closed shard - ShardId shardId = ((IndexShard) cleanupKey.entity.getCacheIdentity()).shardId(); - cleanupKeysFromClosedShards.add(shardId); + IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); + // Add both shardId and indexShardHashCode to uniquely identify an indexShard. + cleanupKeysFromClosedShards.add(new Tuple<>(indexShard.shardId(), indexShard.hashCode())); } else { cleanupKeysFromOutdatedReaders.add(cleanupKey); } @@ -735,14 +739,22 @@ private synchronized void cleanCache(double stalenessThreshold) { for (Iterator> iterator = cache.keys().iterator(); iterator.hasNext();) { ICacheKey key = iterator.next(); - if (cleanupKeysFromClosedShards.contains(key.key.shardId)) { + Key delegatingKey = key.key; + if (cleanupKeysFromClosedShards.contains(new Tuple<>(delegatingKey.shardId, delegatingKey.indexShardHashCode))) { // Since the shard is closed, the cache should drop stats for this shard. dimensionListsToDrop.add(key.dimensions); iterator.remove(); } else { - CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.key.shardId).orElse(null), key.key.readerCacheKeyId); - if (cleanupKeysFromOutdatedReaders.contains(cleanupKey)) { + CacheEntity cacheEntity = cacheEntityLookup.apply(delegatingKey.shardId).orElse(null); + if (cacheEntity == null) { + // If cache entity is null, it means that index or shard got deleted/closed meanwhile. + // So we will delete this key. iterator.remove(); + } else { + CleanupKey cleanupKey = new CleanupKey(cacheEntity, delegatingKey.readerCacheKeyId); + if (cleanupKeysFromOutdatedReaders.contains(cleanupKey)) { + iterator.remove(); + } } } } diff --git a/server/src/test/java/org/opensearch/indices/IRCKeyWriteableSerializerTests.java b/server/src/test/java/org/opensearch/indices/IRCKeyWriteableSerializerTests.java index a5014675ce0ed..fb5c0a3f9c8f7 100644 --- a/server/src/test/java/org/opensearch/indices/IRCKeyWriteableSerializerTests.java +++ b/server/src/test/java/org/opensearch/indices/IRCKeyWriteableSerializerTests.java @@ -30,7 +30,7 @@ public void testSerializer() throws Exception { Random rand = Randomness.get(); for (int valueLength : valueLengths) { for (int i = 0; i < NUM_KEYS; i++) { - IndicesRequestCache.Key key = getRandomIRCKey(valueLength, rand, indexShard.shardId()); + IndicesRequestCache.Key key = getRandomIRCKey(valueLength, rand, indexShard.shardId(), System.identityHashCode(indexShard)); byte[] serialized = ser.serialize(key); assertTrue(ser.equals(key, serialized)); IndicesRequestCache.Key deserialized = ser.deserialize(serialized); @@ -39,13 +39,13 @@ public void testSerializer() throws Exception { } } - private IndicesRequestCache.Key getRandomIRCKey(int valueLength, Random random, ShardId shard) { + private IndicesRequestCache.Key getRandomIRCKey(int valueLength, Random random, ShardId shard, int indexShardHashCode) { byte[] value = new byte[valueLength]; for (int i = 0; i < valueLength; i++) { value[i] = (byte) (random.nextInt(126 - 32) + 32); } BytesReference keyValue = new BytesArray(value); - return new IndicesRequestCache.Key(shard, keyValue, UUID.randomUUID().toString(), shard.hashCode()); // same UUID + return new IndicesRequestCache.Key(shard, keyValue, UUID.randomUUID().toString(), indexShardHashCode); // same UUID // source as used in real key } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index bc99c895cb782..dcddd9f3d1318 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -44,7 +44,14 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; +import org.opensearch.Version; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingHelper; +import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.common.CheckedSupplier; import org.opensearch.common.cache.ICacheKey; import org.opensearch.common.cache.RemovalNotification; @@ -55,12 +62,14 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.bytes.AbstractBytesReference; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentHelper; @@ -69,9 +78,12 @@ import org.opensearch.index.cache.request.RequestCacheStats; import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; +import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.shard.ShardNotFoundException; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.node.Node; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchSingleNodeTestCase; @@ -95,6 +107,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; import static org.opensearch.indices.IndicesRequestCache.INDEX_DIMENSION_NAME; import static org.opensearch.indices.IndicesRequestCache.INDICES_CACHE_QUERY_SIZE; import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING; @@ -1298,6 +1312,113 @@ public void testGetOrComputeConcurrentlyWithMultipleIndices() throws Exception { executorService.shutdownNow(); } + public void testDeleteAndCreateIndexShardOnSameNodeAndVerifyStats() throws Exception { + threadPool = getThreadPool(); + String indexName = "test1"; + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + // Create a shard + IndexService indexService = createIndex( + indexName, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + Index idx = resolveIndex(indexName); + ShardRouting shardRouting = indicesService.indexService(idx).getShard(0).routingEntry(); + IndexShard indexShard = indexService.getShard(0); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + writer.addDocument(newDoc(0, "foo")); + writer.addDocument(newDoc(1, "hack")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), indexShard.shardId()); + Loader loader = new Loader(reader, 0); + + // Set clean interval to a high value as we will do it manually here. + IndicesRequestCache cache = getIndicesRequestCache( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, TimeValue.timeValueMillis(100000)) + .build() + ); + IndicesService.IndexShardCacheEntity cacheEntity = new IndicesService.IndexShardCacheEntity(indexShard); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "bar"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + + // Cache some values for indexShard + BytesReference value = cache.getOrCompute(cacheEntity, loader, reader, getTermBytes()); + + // Verify response and stats. + assertEquals("foo", value.streamInput().readString()); + RequestCacheStats stats = indexShard.requestCache().stats(); + assertEquals("foo", value.streamInput().readString()); + assertEquals(1, cache.count()); + assertEquals(1, stats.getMissCount()); + assertTrue(stats.getMemorySizeInBytes() > 0); + + // Remove the shard making its cache entries stale + IOUtils.close(reader, writer, dir); + indexService.removeShard(0, "force"); + + // We again try to create a shard with same ShardId + ShardRouting newRouting = shardRouting; + String nodeId = newRouting.currentNodeId(); + UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "boom"); + newRouting = newRouting.moveToUnassigned(unassignedInfo) + .updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE); + newRouting = ShardRoutingHelper.initialize(newRouting, nodeId); + final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + indexShard = indexService.createShard( + newRouting, + s -> {}, + RetentionLeaseSyncer.EMPTY, + SegmentReplicationCheckpointPublisher.EMPTY, + null, + null, + localNode, + null, + DiscoveryNodes.builder().add(localNode).build() + ); + + // Verify that the new shard requestStats entries are empty. + stats = indexShard.requestCache().stats(); + assertEquals("foo", value.streamInput().readString()); + assertEquals(1, cache.count()); // Still contains the old indexShard stale entry + assertEquals(0, stats.getMissCount()); + assertTrue(stats.getMemorySizeInBytes() == 0); + IndexShardTestCase.updateRoutingEntry(indexShard, newRouting); + + // Now we cache again with new IndexShard(same shardId as older one). + dir = newDirectory(); + writer = new IndexWriter(dir, newIndexWriterConfig()); + writer.addDocument(newDoc(0, "foo")); + writer.addDocument(newDoc(1, "hack")); + reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), indexShard.shardId()); + loader = new Loader(reader, 0); + cacheEntity = new IndicesService.IndexShardCacheEntity(indexShard); + termQuery = new TermQueryBuilder("id", "bar"); + termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + value = cache.getOrCompute(cacheEntity, loader, reader, getTermBytes()); + + // Assert response and stats. We verify that cache now has 2 entries, one for older/removed shard and other + // for the current shard. + assertEquals("foo", value.streamInput().readString()); + stats = indexShard.requestCache().stats(); + assertEquals("foo", value.streamInput().readString()); + assertEquals(2, cache.count()); // One entry for older shard and other for the current shard. + assertEquals(1, stats.getMissCount()); + assertTrue(stats.getMemorySizeInBytes() > 0); + + // Trigger clean up of cache. + cache.cacheCleanupManager.cleanCache(); + // Verify that cache still has entries for current shard and only removed older shards entries. + assertEquals(1, cache.count()); + + // Now make current indexShard entries stale as well. + reader.close(); + // Trigger clean up of cache and verify that cache has no entries now. + cache.cacheCleanupManager.cleanCache(); + assertEquals(0, cache.count()); + + IOUtils.close(reader, writer, dir, cache); + } + public static String generateString(int length) { String characters = "abcdefghijklmnopqrstuvwxyz"; StringBuilder sb = new StringBuilder(length);