From 1852b7e4fe62711ac86107ee68823878fdf56bb4 Mon Sep 17 00:00:00 2001 From: Sagar <99425694+sgup432@users.noreply.github.com> Date: Thu, 6 Jun 2024 15:02:00 -0700 Subject: [PATCH 1/5] [Caching] Move cache removal notifications outside lru lock (#14017) --------- Signed-off-by: Sagar Upadhyaya --- CHANGELOG.md | 1 + .../org/opensearch/common/cache/Cache.java | 53 ++++++++++++++++--- .../common/cache/RemovalListener.java | 5 ++ 3 files changed, 52 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 30a7c9d8e9ab0..fb465153512bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add ability for Boolean and date field queries to run when only doc_values are enabled ([#11650](https://github.com/opensearch-project/OpenSearch/pull/11650)) - Refactor implementations of query phase searcher, allow QueryCollectorContext to have zero collectors ([#13481](https://github.com/opensearch-project/OpenSearch/pull/13481)) - Adds support to inject telemetry instances to plugins ([#13636](https://github.com/opensearch-project/OpenSearch/pull/13636)) +- Move cache removal notifications outside lru lock ([#14017](https://github.com/opensearch-project/OpenSearch/pull/14017)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/common/cache/Cache.java b/server/src/main/java/org/opensearch/common/cache/Cache.java index 6d346de25cadf..caae81e4387b4 100644 --- a/server/src/main/java/org/opensearch/common/cache/Cache.java +++ b/server/src/main/java/org/opensearch/common/cache/Cache.java @@ -36,9 +36,11 @@ import org.opensearch.common.collect.Tuple; import org.opensearch.common.util.concurrent.ReleasableLock; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -396,7 +398,12 @@ private V get(K key, long now, Consumer> onExpiration) { if (entry == null) { return null; } else { - promote(entry, now); + List> removalNotifications = promote(entry, now).v2(); + if (!removalNotifications.isEmpty()) { + for (RemovalNotification removalNotification : removalNotifications) { + removalListener.onRemoval(removalNotification); + } + } return entry.value; } } @@ -446,8 +453,14 @@ private V compute(K key, CacheLoader loader) throws ExecutionException { BiFunction, Throwable, ? extends V> handler = (ok, ex) -> { if (ok != null) { + List> removalNotifications = new ArrayList<>(); try (ReleasableLock ignored = lruLock.acquire()) { - promote(ok, now); + removalNotifications = promote(ok, now).v2(); + } + if (!removalNotifications.isEmpty()) { + for (RemovalNotification removalNotification : removalNotifications) { + removalListener.onRemoval(removalNotification); + } } return ok.value; } else { @@ -512,16 +525,22 @@ private void put(K key, V value, long now) { CacheSegment segment = getCacheSegment(key); Tuple, Entry> tuple = segment.put(key, value, now); boolean replaced = false; + List> removalNotifications = new ArrayList<>(); try (ReleasableLock ignored = lruLock.acquire()) { if (tuple.v2() != null && tuple.v2().state == State.EXISTING) { if (unlink(tuple.v2())) { replaced = true; } } - promote(tuple.v1(), now); + removalNotifications = promote(tuple.v1(), now).v2(); } if (replaced) { - removalListener.onRemoval(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED)); + removalNotifications.add(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED)); + } + if (!removalNotifications.isEmpty()) { + for (RemovalNotification removalNotification : removalNotifications) { + removalListener.onRemoval(removalNotification); + } } } @@ -767,8 +786,17 @@ public long getEvictions() { } } - private boolean promote(Entry entry, long now) { + /** + * Promotes the desired entry to the head of the lru list and tries to see if it needs to evict any entries in + * case the cache size is exceeding or the entry got expired. + * @param entry Entry to be promoted + * @param now the current time + * @return Returns a tuple. v1 signifies whether an entry got promoted, v2 signifies the list of removal + * notifications that the callers needs to handle. + */ + private Tuple>> promote(Entry entry, long now) { boolean promoted = true; + List> removalNotifications = new ArrayList<>(); try (ReleasableLock ignored = lruLock.acquire()) { switch (entry.state) { case DELETED: @@ -782,10 +810,21 @@ private boolean promote(Entry entry, long now) { break; } if (promoted) { - evict(now); + while (tail != null && shouldPrune(tail, now)) { + Entry entryToBeRemoved = tail; + CacheSegment segment = getCacheSegment(entryToBeRemoved.key); + if (segment != null) { + segment.remove(entryToBeRemoved.key, entryToBeRemoved.value, f -> {}); + } + if (unlink(entryToBeRemoved)) { + removalNotifications.add( + new RemovalNotification<>(entryToBeRemoved.key, entryToBeRemoved.value, RemovalReason.EVICTED) + ); + } + } } } - return promoted; + return new Tuple<>(promoted, removalNotifications); } private void evict(long now) { diff --git a/server/src/main/java/org/opensearch/common/cache/RemovalListener.java b/server/src/main/java/org/opensearch/common/cache/RemovalListener.java index 68e1cdf6139e2..eaaaec2bb07e0 100644 --- a/server/src/main/java/org/opensearch/common/cache/RemovalListener.java +++ b/server/src/main/java/org/opensearch/common/cache/RemovalListener.java @@ -42,5 +42,10 @@ @ExperimentalApi @FunctionalInterface public interface RemovalListener { + + /** + * This may be called from multiple threads at once. So implementation needs to be thread safe. + * @param notification removal notification for desired entry. + */ void onRemoval(RemovalNotification notification); } From c987d6b7d9a3d603c09bddfe82c8995d81a7e759 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Thu, 6 Jun 2024 16:11:15 -0700 Subject: [PATCH 2/5] Revert "Update checkpoint from remote nodes replicas (#13888)" (#14056) This reverts commit c89a17cecaf8348d936cd42d3000c1a1fa7cf120. Signed-off-by: Chenyang Ji --- .../MigrationBaseTestCase.java | 3 +- .../RemotePrimaryRelocationIT.java | 40 +++++++--- .../RemoteReplicaRecoveryIT.java | 79 +++++++++++++------ .../opensearch/index/shard/IndexShard.java | 1 + .../SegmentReplicationTargetService.java | 2 +- .../replication/common/ReplicationTarget.java | 4 +- 6 files changed, 88 insertions(+), 41 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index b65f6f056aae6..0493bcf800c97 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -186,11 +186,10 @@ private Thread getIndexingThread() { indexSingleDoc(indexName); long currentDocCount = indexedDocs.incrementAndGet(); if (currentDocCount > 0 && currentDocCount % refreshFrequency == 0) { + logger.info("--> [iteration {}] flushing index", currentDocCount); if (rarely()) { - logger.info("--> [iteration {}] flushing index", currentDocCount); client().admin().indices().prepareFlush(indexName).get(); } else { - logger.info("--> [iteration {}] refreshing index", currentDocCount); client().admin().indices().prepareRefresh(indexName).get(); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java index cea653c0ead4b..293691ace2edd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java @@ -8,11 +8,14 @@ package org.opensearch.remotemigration; +import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.index.IndexResponse; import org.opensearch.client.Client; import org.opensearch.client.Requests; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; @@ -63,8 +66,8 @@ public void testRemotePrimaryRelocation() throws Exception { AtomicInteger numAutoGenDocs = new AtomicInteger(); final AtomicBoolean finished = new AtomicBoolean(false); - AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test"); - asyncIndexingService.startIndexing(); + Thread indexingThread = getIndexingThread(finished, numAutoGenDocs); + refresh("test"); // add remote node in mixed mode cluster @@ -138,19 +141,17 @@ public void testRemotePrimaryRelocation() throws Exception { logger.info("--> relocation from remote to remote complete"); finished.set(true); - asyncIndexingService.stopIndexing(); + indexingThread.join(); refresh("test"); - OpenSearchAssertions.assertHitCount( - client().prepareSearch("test").setTrackTotalHits(true).get(), - asyncIndexingService.getIndexedDocs() - ); + OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get()); OpenSearchAssertions.assertHitCount( client().prepareSearch("test") .setTrackTotalHits(true)// extra paranoia ;) .setQuery(QueryBuilders.termQuery("auto", true)) .get(), - asyncIndexingService.getIndexedDocs() + numAutoGenDocs.get() ); + } public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { @@ -164,8 +165,9 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get(); ensureGreen("test"); - AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test"); - asyncIndexingService.startIndexing(); + AtomicInteger numAutoGenDocs = new AtomicInteger(); + final AtomicBoolean finished = new AtomicBoolean(false); + Thread indexingThread = getIndexingThread(finished, numAutoGenDocs); refresh("test"); @@ -207,11 +209,27 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { assertEquals(actionGet.getRelocatingShards(), 0); assertEquals(docRepNode, primaryNodeName("test")); - asyncIndexingService.stopIndexing(); + finished.set(true); + indexingThread.join(); client().admin() .cluster() .prepareUpdateSettings() .setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null)) .get(); } + + private static Thread getIndexingThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) { + Thread indexingThread = new Thread(() -> { + while (finished.get() == false && numAutoGenDocs.get() < 10_000) { + IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get(); + assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); + DeleteResponse deleteResponse = client().prepareDelete("test", "id").get(); + assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); + client().prepareIndex("test").setSource("auto", true).get(); + numAutoGenDocs.incrementAndGet(); + } + }); + indexingThread.start(); + return indexingThread; + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java index 7270341202990..196ecb991bbc0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java @@ -8,27 +8,32 @@ package org.opensearch.remotemigration; +import com.carrotsearch.randomizedtesting.generators.RandomNumbers; + +import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; -import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.index.IndexResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.query.QueryBuilders; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.hamcrest.OpenSearchAssertions; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) + public class RemoteReplicaRecoveryIT extends MigrationBaseTestCase { protected int maximumNumberOfShards() { @@ -58,8 +63,10 @@ public void testReplicaRecovery() throws Exception { client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get(); String replicaNode = internalCluster().startNode(); ensureGreen("test"); - AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test"); - asyncIndexingService.startIndexing(); + + AtomicInteger numAutoGenDocs = new AtomicInteger(); + final AtomicBoolean finished = new AtomicBoolean(false); + Thread indexingThread = getThread(finished, numAutoGenDocs); refresh("test"); @@ -71,10 +78,12 @@ public void testReplicaRecovery() throws Exception { updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - internalCluster().startNode(); + String remoteNode2 = internalCluster().startNode(); internalCluster().validateClusterFormed(); // identify the primary + + Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); logger.info("--> relocating primary from {} to {} ", primaryNode, remoteNode); client().admin() .cluster() @@ -93,6 +102,7 @@ public void testReplicaRecovery() throws Exception { assertEquals(0, clusterHealthResponse.getRelocatingShards()); logger.info("--> relocation of primary from docrep to remote complete"); + Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); logger.info("--> getting up the new replicas now to doc rep node as well as remote node "); // Increase replica count to 3 @@ -119,33 +129,52 @@ public void testReplicaRecovery() throws Exception { logger.info("--> replica is up now on another docrep now as well as remote node"); assertEquals(0, clusterHealthResponse.getRelocatingShards()); - asyncIndexingService.stopIndexing(); - refresh("test"); - // segrep lag should be zero - assertBusy(() -> { - SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() - .indices() - .prepareSegmentReplicationStats("test") - .setDetailed(true) - .execute() - .actionGet(); - SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get("test").get(0); - assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1); - perGroupStats.getReplicaStats().stream().forEach(e -> assertEquals(e.getCurrentReplicationLagMillis(), 0)); - }, 20, TimeUnit.SECONDS); + Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); - OpenSearchAssertions.assertHitCount( - client().prepareSearch("test").setTrackTotalHits(true).get(), - asyncIndexingService.getIndexedDocs() - ); + // Stop replicas on docrep now. + // ToDo : Remove once we have dual replication enabled + client().admin() + .indices() + .updateSettings( + new UpdateSettingsRequest("test").settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.routing.allocation.exclude._name", primaryNode + "," + replicaNode) + .build() + ) + ) + .get(); + + finished.set(true); + indexingThread.join(); + refresh("test"); + OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get()); OpenSearchAssertions.assertHitCount( client().prepareSearch("test") .setTrackTotalHits(true)// extra paranoia ;) .setQuery(QueryBuilders.termQuery("auto", true)) + // .setPreference("_prefer_nodes:" + (remoteNode+ "," + remoteNode2)) .get(), - asyncIndexingService.getIndexedDocs() + numAutoGenDocs.get() ); } + + private Thread getThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) { + Thread indexingThread = new Thread(() -> { + while (finished.get() == false && numAutoGenDocs.get() < 100) { + IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get(); + assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); + DeleteResponse deleteResponse = client().prepareDelete("test", "id").get(); + assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); + client().prepareIndex("test").setSource("auto", true).get(); + numAutoGenDocs.incrementAndGet(); + logger.info("Indexed {} docs here", numAutoGenDocs.get()); + } + }); + indexingThread.start(); + return indexingThread; + } + } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 49cb710c915fc..3517579856d43 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -523,6 +523,7 @@ public boolean shouldSeedRemoteStore() { public Function isShardOnRemoteEnabledNode = nodeId -> { DiscoveryNode node = discoveryNodes.get(nodeId); if (node != null) { + logger.trace("Node {} has remote_enabled as {}", nodeId, node.isRemoteStoreNode()); return node.isRemoteStoreNode(); } return false; diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index f6ed113019897..fbd7ab7cea346 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -384,7 +384,7 @@ private void logReplicationFailure(SegmentReplicationState state, ReplicationFai protected void updateVisibleCheckpoint(long replicationId, IndexShard replicaShard) { // Update replication checkpoint on source via transport call only supported for remote store integration. For node- // node communication, checkpoint update is piggy-backed to GET_SEGMENT_FILES transport call - if (replicaShard.indexSettings().isAssignedOnRemoteNode() == false) { + if (replicaShard.indexSettings().isRemoteStoreEnabled() == false) { return; } ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(replicaShard.shardId()).primaryShard(); diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index 76401eaabbf39..aac59df4f6573 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -91,7 +91,7 @@ public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIn // make sure the store is not released until we are done. this.cancellableThreads = new CancellableThreads(); store.incRef(); - if (indexShard.indexSettings().isAssignedOnRemoteNode()) { + if (indexShard.indexSettings().isRemoteStoreEnabled()) { indexShard.remoteStore().incRef(); } } @@ -284,7 +284,7 @@ protected void closeInternal() { try { store.decRef(); } finally { - if (indexShard.indexSettings().isAssignedOnRemoteNode()) { + if (indexShard.indexSettings().isRemoteStoreEnabled()) { indexShard.remoteStore().decRef(); } } From 9d3cf43e7f06d2011c931cf829439e9fba97f18d Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Thu, 6 Jun 2024 19:16:50 -0700 Subject: [PATCH 3/5] Add X-Opaque-Id to search request metadata for query insights (#13374) --------- Signed-off-by: Chenyang Ji --- CHANGELOG.md | 1 + .../core/listener/QueryInsightsListener.java | 10 ++++++ .../insights/rules/model/Attribute.java | 6 +++- .../listener/QueryInsightsListenerTests.java | 31 ++++++++++++++++++- .../action/search/SearchRequestContext.java | 4 +++ .../SearchRequestOperationsListener.java | 6 ++-- 6 files changed, 53 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fb465153512bf..db0e26375cbfb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304)) - [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027)) - [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982)) +- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374)) ### Dependencies - Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559)) diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index 9ec8673147c38..cad2fe374f1b6 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -21,6 +21,7 @@ import org.opensearch.plugin.insights.rules.model.Attribute; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.tasks.Task; import java.util.Collections; import java.util.HashMap; @@ -138,6 +139,15 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards()); attributes.put(Attribute.INDICES, request.indices()); attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap()); + + Map labels = new HashMap<>(); + // Retrieve user provided label if exists + String userProvidedLabel = context.getTask().getHeader(Task.X_OPAQUE_ID); + if (userProvidedLabel != null) { + labels.put(Task.X_OPAQUE_ID, userProvidedLabel); + } + attributes.put(Attribute.LABELS, labels); + // construct SearchQueryRecord from attributes and measurements SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes); queryInsightsService.addRecord(record); } catch (Exception e) { diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java index c1d17edf9ff14..7ee4883c54023 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java @@ -43,7 +43,11 @@ public enum Attribute { /** * The node id for this request */ - NODE_ID; + NODE_ID, + /** + * Custom search request labels + */ + LABELS; /** * Read an Attribute from a StreamInput diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java index 328ed0cd2ed15..b794a2e4b8608 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -11,28 +11,39 @@ import org.opensearch.action.search.SearchPhaseContext; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchRequestContext; +import org.opensearch.action.search.SearchTask; import org.opensearch.action.search.SearchType; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.plugin.insights.core.service.QueryInsightsService; import org.opensearch.plugin.insights.core.service.TopQueriesService; +import org.opensearch.plugin.insights.rules.model.Attribute; import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.opensearch.search.aggregations.support.ValueType; import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.tasks.Task; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; import org.junit.Before; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; +import org.mockito.ArgumentCaptor; + import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -48,6 +59,7 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase { private final SearchRequest searchRequest = mock(SearchRequest.class); private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class); private final TopQueriesService topQueriesService = mock(TopQueriesService.class); + private final ThreadPool threadPool = mock(ThreadPool.class); private ClusterService clusterService; @Before @@ -61,8 +73,13 @@ public void setup() { clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null); when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService); + + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + threadContext.setHeaders(new Tuple<>(Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"), new HashMap<>())); + when(threadPool.getThreadContext()).thenReturn(threadContext); } + @SuppressWarnings("unchecked") public void testOnRequestEnd() throws InterruptedException { Long timestamp = System.currentTimeMillis() - 100L; SearchType searchType = SearchType.QUERY_THEN_FETCH; @@ -70,6 +87,7 @@ public void testOnRequestEnd() throws InterruptedException { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword")); searchSourceBuilder.size(0); + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel")); String[] indices = new String[] { "index-1", "index-2" }; @@ -89,10 +107,19 @@ public void testOnRequestEnd() throws InterruptedException { when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap); when(searchPhaseContext.getRequest()).thenReturn(searchRequest); when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards); + when(searchPhaseContext.getTask()).thenReturn(task); + ArgumentCaptor captor = ArgumentCaptor.forClass(SearchQueryRecord.class); queryInsightsListener.onRequestEnd(searchPhaseContext, searchRequestContext); - verify(queryInsightsService, times(1)).addRecord(any()); + verify(queryInsightsService, times(1)).addRecord(captor.capture()); + SearchQueryRecord generatedRecord = captor.getValue(); + assertEquals(timestamp.longValue(), generatedRecord.getTimestamp()); + assertEquals(numberOfShards, generatedRecord.getAttributes().get(Attribute.TOTAL_SHARDS)); + assertEquals(searchType.toString().toLowerCase(Locale.ROOT), generatedRecord.getAttributes().get(Attribute.SEARCH_TYPE)); + assertEquals(searchSourceBuilder.toString(), generatedRecord.getAttributes().get(Attribute.SOURCE)); + Map labels = (Map) generatedRecord.getAttributes().get(Attribute.LABELS); + assertEquals("userLabel", labels.get(Task.X_OPAQUE_ID)); } public void testConcurrentOnRequestEnd() throws InterruptedException { @@ -102,6 +129,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword")); searchSourceBuilder.size(0); + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel")); String[] indices = new String[] { "index-1", "index-2" }; @@ -121,6 +149,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap); when(searchPhaseContext.getRequest()).thenReturn(searchRequest); when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards); + when(searchPhaseContext.getTask()).thenReturn(task); int numRequests = 50; Thread[] threads = new Thread[numRequests]; diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java index b8bbde65ca6bc..5b133ba0554f4 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java @@ -107,6 +107,10 @@ String formattedShardStats() { ); } } + + public SearchRequest getRequest() { + return searchRequest; + } } enum ShardStatsFieldNames { diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java index 53efade174502..b944572cef122 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java @@ -41,11 +41,11 @@ protected SearchRequestOperationsListener(final boolean enabled) { this.enabled = enabled; } - protected abstract void onPhaseStart(SearchPhaseContext context); + protected void onPhaseStart(SearchPhaseContext context) {}; - protected abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext); + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}; - protected abstract void onPhaseFailure(SearchPhaseContext context, Throwable cause); + protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {}; protected void onRequestStart(SearchRequestContext searchRequestContext) {} From 5bad14cf06b4fc1c6f0d5ff03a72f06bb38d1026 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Fri, 7 Jun 2024 10:36:04 +0530 Subject: [PATCH 4/5] Make recovery action retry timeout configurable (#14022) Signed-off-by: Gaurav Bafna --- CHANGELOG.md | 1 + .../opensearch/common/settings/ClusterSettings.java | 1 + .../opensearch/indices/recovery/RecoverySettings.java | 8 ++++++++ .../recovery/RecoverySettingsDynamicUpdateTests.java | 11 +++++++++++ 4 files changed, 21 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index db0e26375cbfb..0a334945d69ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776)) - Add capability to disable source recovery_source for an index ([#13590](https://github.com/opensearch-project/OpenSearch/pull/13590)) - Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304)) +- Add dynamic action retry timeout setting ([#14022](https://github.com/opensearch-project/OpenSearch/issues/14022)) - [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027)) - [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982)) - [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374)) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 95b4462d4f62e..09f32884e0ae1 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -302,6 +302,7 @@ public void apply(Settings value, Settings current, Settings previous) { RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING, RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING, + RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING, RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING, diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index 8f9da6babdd99..576c42d629732 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -239,6 +239,10 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { ); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout); clusterSettings.addSettingsUpdateConsumer(INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT, this::setInternalRemoteUploadTimeout); + clusterSettings.addSettingsUpdateConsumer( + INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING, + this::setInternalActionRetryTimeout + ); } @@ -313,6 +317,10 @@ public void setInternalRemoteUploadTimeout(TimeValue internalRemoteUploadTimeout this.internalRemoteUploadTimeout = internalRemoteUploadTimeout; } + public void setInternalActionRetryTimeout(TimeValue internalActionRetryTimeout) { + this.internalActionRetryTimeout = internalActionRetryTimeout; + } + private void setRecoveryMaxBytesPerSec(ByteSizeValue recoveryMaxBytesPerSec) { this.recoveryMaxBytesPerSec = recoveryMaxBytesPerSec; if (recoveryMaxBytesPerSec.getBytes() <= 0) { diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java index 2793d446d66c8..ccba745f9b126 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java @@ -118,4 +118,15 @@ public void testInternalLongActionTimeout() { ); assertEquals(new TimeValue(duration, timeUnit), recoverySettings.internalActionLongTimeout()); } + + public void testInternalActionRetryTimeout() { + long duration = between(1, 1000); + TimeUnit timeUnit = randomFrom(TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS); + clusterSettings.applySettings( + Settings.builder() + .put(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING.getKey(), duration, timeUnit) + .build() + ); + assertEquals(new TimeValue(duration, timeUnit), recoverySettings.internalActionRetryTimeout()); + } } From b06d0b93b7565c577e6e141967defac3dd888696 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Fri, 7 Jun 2024 16:32:51 +0530 Subject: [PATCH 5/5] Update checkpoint from remote nodes replicas (#13888) (#14062) Signed-off-by: Gaurav Bafna --- .../MigrationBaseTestCase.java | 3 +- .../RemoteMigrationIndexMetadataUpdateIT.java | 3 + .../RemotePrimaryRelocationIT.java | 40 +++------- .../RemoteReplicaRecoveryIT.java | 80 ++++++------------- .../opensearch/index/shard/IndexShard.java | 1 - .../SegmentReplicationTargetService.java | 2 +- .../replication/common/ReplicationTarget.java | 4 +- 7 files changed, 45 insertions(+), 88 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index 0493bcf800c97..b65f6f056aae6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -186,10 +186,11 @@ private Thread getIndexingThread() { indexSingleDoc(indexName); long currentDocCount = indexedDocs.incrementAndGet(); if (currentDocCount > 0 && currentDocCount % refreshFrequency == 0) { - logger.info("--> [iteration {}] flushing index", currentDocCount); if (rarely()) { + logger.info("--> [iteration {}] flushing index", currentDocCount); client().admin().indices().prepareFlush(indexName).get(); } else { + logger.info("--> [iteration {}] refreshing index", currentDocCount); client().admin().indices().prepareRefresh(indexName).get(); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java index c72b6851c1125..793adef0594fc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java @@ -273,6 +273,7 @@ initalMetadataVersion < internalCluster().client() * After shard relocation completes, shuts down the docrep nodes and asserts remote * index settings are applied even when the index is in YELLOW state */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13737") public void testIndexSettingsUpdatedEvenForMisconfiguredReplicas() throws Exception { internalCluster().startClusterManagerOnlyNode(); @@ -329,6 +330,7 @@ public void testIndexSettingsUpdatedEvenForMisconfiguredReplicas() throws Except * After shard relocation completes, restarts the docrep node holding extra replica shard copy * and asserts remote index settings are applied as soon as the docrep replica copy is unassigned */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13871") public void testIndexSettingsUpdatedWhenDocrepNodeIsRestarted() throws Exception { internalCluster().startClusterManagerOnlyNode(); @@ -469,6 +471,7 @@ public void testRemotePathMetadataAddedWithFirstPrimaryMovingToRemote() throws E * exclude docrep nodes, assert that remote index path file exists * when shards start relocating to the remote nodes. */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13939") public void testRemoteIndexPathFileExistsAfterMigration() throws Exception { String docrepClusterManager = internalCluster().startClusterManagerOnlyNode(); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java index 293691ace2edd..cea653c0ead4b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java @@ -8,14 +8,11 @@ package org.opensearch.remotemigration; -import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; -import org.opensearch.action.delete.DeleteResponse; -import org.opensearch.action.index.IndexResponse; import org.opensearch.client.Client; import org.opensearch.client.Requests; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; @@ -66,8 +63,8 @@ public void testRemotePrimaryRelocation() throws Exception { AtomicInteger numAutoGenDocs = new AtomicInteger(); final AtomicBoolean finished = new AtomicBoolean(false); - Thread indexingThread = getIndexingThread(finished, numAutoGenDocs); - + AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test"); + asyncIndexingService.startIndexing(); refresh("test"); // add remote node in mixed mode cluster @@ -141,17 +138,19 @@ public void testRemotePrimaryRelocation() throws Exception { logger.info("--> relocation from remote to remote complete"); finished.set(true); - indexingThread.join(); + asyncIndexingService.stopIndexing(); refresh("test"); - OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get()); + OpenSearchAssertions.assertHitCount( + client().prepareSearch("test").setTrackTotalHits(true).get(), + asyncIndexingService.getIndexedDocs() + ); OpenSearchAssertions.assertHitCount( client().prepareSearch("test") .setTrackTotalHits(true)// extra paranoia ;) .setQuery(QueryBuilders.termQuery("auto", true)) .get(), - numAutoGenDocs.get() + asyncIndexingService.getIndexedDocs() ); - } public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { @@ -165,9 +164,8 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get(); ensureGreen("test"); - AtomicInteger numAutoGenDocs = new AtomicInteger(); - final AtomicBoolean finished = new AtomicBoolean(false); - Thread indexingThread = getIndexingThread(finished, numAutoGenDocs); + AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test"); + asyncIndexingService.startIndexing(); refresh("test"); @@ -209,27 +207,11 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { assertEquals(actionGet.getRelocatingShards(), 0); assertEquals(docRepNode, primaryNodeName("test")); - finished.set(true); - indexingThread.join(); + asyncIndexingService.stopIndexing(); client().admin() .cluster() .prepareUpdateSettings() .setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null)) .get(); } - - private static Thread getIndexingThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) { - Thread indexingThread = new Thread(() -> { - while (finished.get() == false && numAutoGenDocs.get() < 10_000) { - IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get(); - assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); - DeleteResponse deleteResponse = client().prepareDelete("test", "id").get(); - assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); - client().prepareIndex("test").setSource("auto", true).get(); - numAutoGenDocs.incrementAndGet(); - } - }); - indexingThread.start(); - return indexingThread; - } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java index 196ecb991bbc0..aae726fe2a6bc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java @@ -8,32 +8,27 @@ package org.opensearch.remotemigration; -import com.carrotsearch.randomizedtesting.generators.RandomNumbers; - -import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; -import org.opensearch.action.delete.DeleteResponse; -import org.opensearch.action.index.IndexResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.query.QueryBuilders; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.hamcrest.OpenSearchAssertions; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) - public class RemoteReplicaRecoveryIT extends MigrationBaseTestCase { protected int maximumNumberOfShards() { @@ -52,6 +47,7 @@ protected int minimumNumberOfReplicas() { Brings up new replica copies on remote and docrep nodes, when primary is on a remote node Live indexing is happening meanwhile */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13473") public void testReplicaRecovery() throws Exception { internalCluster().setBootstrapClusterManagerNodeIndex(0); String primaryNode = internalCluster().startNode(); @@ -63,10 +59,8 @@ public void testReplicaRecovery() throws Exception { client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get(); String replicaNode = internalCluster().startNode(); ensureGreen("test"); - - AtomicInteger numAutoGenDocs = new AtomicInteger(); - final AtomicBoolean finished = new AtomicBoolean(false); - Thread indexingThread = getThread(finished, numAutoGenDocs); + AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test"); + asyncIndexingService.startIndexing(); refresh("test"); @@ -78,12 +72,10 @@ public void testReplicaRecovery() throws Exception { updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - String remoteNode2 = internalCluster().startNode(); + internalCluster().startNode(); internalCluster().validateClusterFormed(); // identify the primary - - Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); logger.info("--> relocating primary from {} to {} ", primaryNode, remoteNode); client().admin() .cluster() @@ -102,7 +94,6 @@ public void testReplicaRecovery() throws Exception { assertEquals(0, clusterHealthResponse.getRelocatingShards()); logger.info("--> relocation of primary from docrep to remote complete"); - Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); logger.info("--> getting up the new replicas now to doc rep node as well as remote node "); // Increase replica count to 3 @@ -129,52 +120,33 @@ public void testReplicaRecovery() throws Exception { logger.info("--> replica is up now on another docrep now as well as remote node"); assertEquals(0, clusterHealthResponse.getRelocatingShards()); + asyncIndexingService.stopIndexing(); + refresh("test"); - Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); + // segrep lag should be zero + assertBusy(() -> { + SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() + .indices() + .prepareSegmentReplicationStats("test") + .setDetailed(true) + .execute() + .actionGet(); + SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get("test").get(0); + assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1); + perGroupStats.getReplicaStats().stream().forEach(e -> assertEquals(e.getCurrentReplicationLagMillis(), 0)); + }, 20, TimeUnit.SECONDS); - // Stop replicas on docrep now. - // ToDo : Remove once we have dual replication enabled - client().admin() - .indices() - .updateSettings( - new UpdateSettingsRequest("test").settings( - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) - .put("index.routing.allocation.exclude._name", primaryNode + "," + replicaNode) - .build() - ) - ) - .get(); - - finished.set(true); - indexingThread.join(); - refresh("test"); - OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get()); + OpenSearchAssertions.assertHitCount( + client().prepareSearch("test").setTrackTotalHits(true).get(), + asyncIndexingService.getIndexedDocs() + ); OpenSearchAssertions.assertHitCount( client().prepareSearch("test") .setTrackTotalHits(true)// extra paranoia ;) .setQuery(QueryBuilders.termQuery("auto", true)) - // .setPreference("_prefer_nodes:" + (remoteNode+ "," + remoteNode2)) .get(), - numAutoGenDocs.get() + asyncIndexingService.getIndexedDocs() ); } - - private Thread getThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) { - Thread indexingThread = new Thread(() -> { - while (finished.get() == false && numAutoGenDocs.get() < 100) { - IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get(); - assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); - DeleteResponse deleteResponse = client().prepareDelete("test", "id").get(); - assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); - client().prepareIndex("test").setSource("auto", true).get(); - numAutoGenDocs.incrementAndGet(); - logger.info("Indexed {} docs here", numAutoGenDocs.get()); - } - }); - indexingThread.start(); - return indexingThread; - } - } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 3517579856d43..49cb710c915fc 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -523,7 +523,6 @@ public boolean shouldSeedRemoteStore() { public Function isShardOnRemoteEnabledNode = nodeId -> { DiscoveryNode node = discoveryNodes.get(nodeId); if (node != null) { - logger.trace("Node {} has remote_enabled as {}", nodeId, node.isRemoteStoreNode()); return node.isRemoteStoreNode(); } return false; diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index fbd7ab7cea346..f6ed113019897 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -384,7 +384,7 @@ private void logReplicationFailure(SegmentReplicationState state, ReplicationFai protected void updateVisibleCheckpoint(long replicationId, IndexShard replicaShard) { // Update replication checkpoint on source via transport call only supported for remote store integration. For node- // node communication, checkpoint update is piggy-backed to GET_SEGMENT_FILES transport call - if (replicaShard.indexSettings().isRemoteStoreEnabled() == false) { + if (replicaShard.indexSettings().isAssignedOnRemoteNode() == false) { return; } ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(replicaShard.shardId()).primaryShard(); diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index aac59df4f6573..76401eaabbf39 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -91,7 +91,7 @@ public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIn // make sure the store is not released until we are done. this.cancellableThreads = new CancellableThreads(); store.incRef(); - if (indexShard.indexSettings().isRemoteStoreEnabled()) { + if (indexShard.indexSettings().isAssignedOnRemoteNode()) { indexShard.remoteStore().incRef(); } } @@ -284,7 +284,7 @@ protected void closeInternal() { try { store.decRef(); } finally { - if (indexShard.indexSettings().isRemoteStoreEnabled()) { + if (indexShard.indexSettings().isAssignedOnRemoteNode()) { indexShard.remoteStore().decRef(); } }