Skip to content

Commit

Permalink
Merge branch 'main' into publication-setting-change
Browse files Browse the repository at this point in the history
  • Loading branch information
shiv0408 committed Sep 5, 2024
2 parents eca4681 + f33c786 commit eb38647
Show file tree
Hide file tree
Showing 16 changed files with 506 additions and 32 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428)))
- Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494))
- Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290))
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
- [Reader Writer Separation] Add experimental search replica shard type to achieve reader writer separation ([#15237](https://github.com/opensearch-project/OpenSearch/pull/15237))
- [Range Queries] Add new approximateable query framework to short-circuit range queries ([#13788](https://github.com/opensearch-project/OpenSearch/pull/13788))
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
- Add support for pluggable deciders for concurrent search ([#15363](https://github.com/opensearch-project/OpenSearch/pull/15363))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {

private static final String REPOSITORY_NAME = "test-remote-store-repo";
protected Path absolutePath;

private Boolean useRemoteStore;

@Before
public void randomizeRemoteStoreEnabled() {
useRemoteStore = randomBoolean();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (useRemoteStore) {
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
.build();
}
return super.nodeSettings(nodeOrdinal);
}

@After
public void teardown() {
if (useRemoteStore) {
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
}
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build();
}

public void testReplication() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final int docCount = 10;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(docCount, primary, replica);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ public void testDeleteShallowCopyV2MultipleSnapshots() throws Exception {

}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/15692")
public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -602,28 +603,28 @@ public void testSnapshotStatusApiFailureForTooManyShardsAcrossSnapshots() throws

// across a single snapshot
assertBusy(() -> {
TooManyShardsInSnapshotsStatusException exception = expectThrows(
TooManyShardsInSnapshotsStatusException.class,
CircuitBreakingException exception = expectThrows(
CircuitBreakingException.class,
() -> client().admin().cluster().prepareSnapshotStatus(repositoryName).setSnapshots(snapshot1).execute().actionGet()
);
assertEquals(exception.status(), RestStatus.REQUEST_ENTITY_TOO_LARGE);
assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS);
assertTrue(
exception.getMessage().endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request")
);
}, 1, TimeUnit.MINUTES);

// across multiple snapshots
assertBusy(() -> {
TooManyShardsInSnapshotsStatusException exception = expectThrows(
TooManyShardsInSnapshotsStatusException.class,
CircuitBreakingException exception = expectThrows(
CircuitBreakingException.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(snapshot1, snapshot2)
.execute()
.actionGet()
);
assertEquals(exception.status(), RestStatus.REQUEST_ENTITY_TOO_LARGE);
assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS);
assertTrue(
exception.getMessage().endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request")
);
Expand Down Expand Up @@ -741,8 +742,8 @@ public void testSnapshotStatusFailuresWithIndexFilter() throws Exception {
updateSettingsRequest.persistentSettings(Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 2));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

TooManyShardsInSnapshotsStatusException ex = expectThrows(
TooManyShardsInSnapshotsStatusException.class,
CircuitBreakingException ex = expectThrows(
CircuitBreakingException.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
Expand All @@ -751,7 +752,7 @@ public void testSnapshotStatusFailuresWithIndexFilter() throws Exception {
.execute()
.actionGet()
);
assertEquals(ex.status(), RestStatus.REQUEST_ENTITY_TOO_LARGE);
assertEquals(ex.status(), RestStatus.TOO_MANY_REQUESTS);
assertTrue(ex.getMessage().endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request"));

logger.info("Reset MAX_SHARDS_ALLOWED_IN_STATUS_API to default value");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.util.CollectionUtils;
import org.opensearch.core.index.shard.ShardId;
Expand All @@ -66,7 +68,6 @@
import org.opensearch.snapshots.SnapshotShardsService;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.snapshots.TooManyShardsInSnapshotsStatusException;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -458,13 +459,17 @@ private Map<SnapshotId, SnapshotInfo> snapshotsInfo(
snapshotsInfoMap.put(snapshotId, snapshotInfo);
}
if (totalShardsAcrossSnapshots > maximumAllowedShardCount && request.indices().length == 0) {
String message = "Total shard count ["
String message = "["
+ repositoryName
+ ":"
+ String.join(", ", request.snapshots())
+ "]"
+ " Total shard count ["
+ totalShardsAcrossSnapshots
+ "] is more than the maximum allowed value of shard count ["
+ maximumAllowedShardCount
+ "] for snapshot status request";

throw new TooManyShardsInSnapshotsStatusException(repositoryName, message, request.snapshots());
throw new CircuitBreakingException(message, CircuitBreaker.Durability.PERMANENT);
}
return unmodifiableMap(snapshotsInfoMap);
}
Expand Down Expand Up @@ -520,15 +525,19 @@ private Map<ShardId, IndexShardSnapshotStatus> snapshotShards(
}

if (totalShardsAcrossIndices > maximumAllowedShardCount && requestedIndexNames.isEmpty() == false && isV2Snapshot == false) {
String message = "Total shard count ["
String message = "["
+ repositoryName
+ ":"
+ String.join(", ", request.snapshots())
+ "]"
+ " Total shard count ["
+ totalShardsAcrossIndices
+ "] across the requested indices ["
+ requestedIndexNames.stream().collect(Collectors.joining(", "))
+ "] is more than the maximum allowed value of shard count ["
+ maximumAllowedShardCount
+ "] for snapshot status request";

throw new TooManyShardsInSnapshotsStatusException(repositoryName, message, snapshotName);
throw new CircuitBreakingException(message, CircuitBreaker.Durability.PERMANENT);
}

final Map<ShardId, IndexShardSnapshotStatus> shardStatus = new HashMap<>();
Expand Down
54 changes: 53 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.similarity.SimilarityService;
Expand Down Expand Up @@ -629,6 +630,56 @@ public IndexService newIndexService(
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings
) throws IOException {
return newIndexService(
indexCreationContext,
environment,
xContentRegistry,
shardStoreDeleter,
circuitBreakerService,
bigArrays,
threadPool,
scriptService,
clusterService,
client,
indicesQueryCache,
mapperRegistry,
indicesFieldDataCache,
namedWriteableRegistry,
idFieldDataEnabled,
valuesSourceRegistry,
remoteDirectoryFactory,
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
recoverySettings,
remoteStoreSettings,
(s) -> {}
);
}

public IndexService newIndexService(
IndexService.IndexCreationContext indexCreationContext,
NodeEnvironment environment,
NamedXContentRegistry xContentRegistry,
IndexService.ShardStoreDeleter shardStoreDeleter,
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays,
ThreadPool threadPool,
ScriptService scriptService,
ClusterService clusterService,
Client client,
IndicesQueryCache indicesQueryCache,
MapperRegistry mapperRegistry,
IndicesFieldDataCache indicesFieldDataCache,
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
Consumer<IndexShard> replicator
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -689,7 +740,8 @@ public IndexService newIndexService(
recoverySettings,
remoteStoreSettings,
fileCache,
compositeIndexSettings
compositeIndexSettings,
replicator
);
success = true;
return indexService;
Expand Down
Loading

0 comments on commit eb38647

Please sign in to comment.