From 9ffc23ac63a89722d3c99e913cc5601d85321a8a Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Mon, 28 Aug 2023 20:44:37 +0530 Subject: [PATCH 1/3] adding relcocation supplier to txlog --- .../RemoteIndexPrimaryRelocationIT.java | 3 +-- .../opensearch/index/engine/EngineConfig.java | 16 +++++++------- .../index/engine/EngineConfigFactory.java | 6 +++--- .../index/engine/InternalEngine.java | 2 +- .../index/engine/NRTReplicationEngine.java | 2 +- .../opensearch/index/engine/NoOpEngine.java | 2 +- .../index/engine/ReadOnlyEngine.java | 2 +- .../opensearch/index/shard/IndexShard.java | 21 ++++++++++++++++++- .../translog/InternalTranslogFactory.java | 17 ++++++++------- .../translog/InternalTranslogManager.java | 10 ++++----- ...emoteBlobStoreInternalTranslogFactory.java | 6 +++--- .../index/translog/RemoteFsTranslog.java | 11 ++++++++-- .../index/translog/TranslogFactory.java | 5 +++-- .../translog/WriteOnlyTranslogManager.java | 6 +++--- .../engine/EngineConfigFactoryTests.java | 5 +++-- .../InternalTranslogManagerTests.java | 17 ++++++++------- .../index/translog/RemoteFSTranslogTests.java | 19 +++++++++-------- 17 files changed, 90 insertions(+), 60 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java index e4dcd637ac448..8792e959751e8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java @@ -63,8 +63,7 @@ protected Settings featureFlagSettings() { .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") .build(); } - - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9191") + public void testPrimaryRelocationWhileIndexing() throws Exception { super.testPrimaryRelocationWhileIndexing(); } diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java index 3351931a6b068..34371cb6a1985 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java @@ -53,6 +53,7 @@ import org.opensearch.index.codec.CodecSettings; import org.opensearch.index.mapper.ParsedDocument; import org.opensearch.index.seqno.RetentionLeases; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.TranslogConfig; @@ -65,7 +66,6 @@ import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.function.BooleanSupplier; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -105,7 +105,7 @@ public final class EngineConfig { private final LongSupplier globalCheckpointSupplier; private final Supplier retentionLeasesSupplier; private final boolean isReadOnlyReplica; - private final BooleanSupplier primaryModeSupplier; + private final IndexShard.IndexShardConfig indexShardConfig; private final Comparator leafSorter; /** @@ -266,7 +266,7 @@ private EngineConfig(Builder builder) { this.primaryTermSupplier = builder.primaryTermSupplier; this.tombstoneDocSupplier = builder.tombstoneDocSupplier; this.isReadOnlyReplica = builder.isReadOnlyReplica; - this.primaryModeSupplier = builder.primaryModeSupplier; + this.indexShardConfig = builder.indexShardConfig; this.translogFactory = builder.translogFactory; this.leafSorter = builder.leafSorter; } @@ -477,8 +477,8 @@ public boolean isReadOnlyReplica() { * Returns the underlying primaryModeSupplier. * @return the primary mode supplier. */ - public BooleanSupplier getPrimaryModeSupplier() { - return primaryModeSupplier; + public IndexShard.IndexShardConfig getIndexShardConfig() { + return indexShardConfig; } /** @@ -555,7 +555,7 @@ public static class Builder { private TombstoneDocSupplier tombstoneDocSupplier; private TranslogDeletionPolicyFactory translogDeletionPolicyFactory; private boolean isReadOnlyReplica; - private BooleanSupplier primaryModeSupplier; + private IndexShard.IndexShardConfig indexShardConfig; private TranslogFactory translogFactory = new InternalTranslogFactory(); Comparator leafSorter; @@ -679,8 +679,8 @@ public Builder readOnlyReplica(boolean isReadOnlyReplica) { return this; } - public Builder primaryModeSupplier(BooleanSupplier primaryModeSupplier) { - this.primaryModeSupplier = primaryModeSupplier; + public Builder indexShardConfig(IndexShard.IndexShardConfig indexShardConfig) { + this.indexShardConfig = indexShardConfig; return this; } diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java index 38eea92b6c757..4b3220a420efd 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java @@ -27,6 +27,7 @@ import org.opensearch.index.codec.CodecServiceFactory; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.seqno.RetentionLeases; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogDeletionPolicyFactory; @@ -40,7 +41,6 @@ import java.util.Comparator; import java.util.List; import java.util.Optional; -import java.util.function.BooleanSupplier; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -152,7 +152,7 @@ public EngineConfig newEngineConfig( LongSupplier primaryTermSupplier, EngineConfig.TombstoneDocSupplier tombstoneDocSupplier, boolean isReadOnlyReplica, - BooleanSupplier primaryModeSupplier, + IndexShard.IndexShardConfig indexShardConfig, TranslogFactory translogFactory, Comparator leafSorter ) { @@ -185,7 +185,7 @@ public EngineConfig newEngineConfig( .primaryTermSupplier(primaryTermSupplier) .tombstoneDocSupplier(tombstoneDocSupplier) .readOnlyReplica(isReadOnlyReplica) - .primaryModeSupplier(primaryModeSupplier) + .indexShardConfig(indexShardConfig) .translogFactory(translogFactory) .leafSorter(leafSorter) .build(); diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index e8e5042cfe944..4e289def2adb6 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -292,7 +292,7 @@ public void onFailure(String reason, Exception ex) { new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId), this::ensureOpen, engineConfig.getTranslogFactory(), - engineConfig.getPrimaryModeSupplier() + engineConfig.getIndexShardConfig() ); this.translogManager = translogManagerRef; this.softDeletesPolicy = newSoftDeletesPolicy(); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 48556cc6b9709..758d034fcb909 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -118,7 +118,7 @@ public void onAfterTranslogSync() { }, this, engineConfig.getTranslogFactory(), - engineConfig.getPrimaryModeSupplier() + engineConfig.getIndexShardConfig() ); this.translogManager = translogManagerRef; } catch (IOException e) { diff --git a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java index ca31d5518df47..1417c50e6ab63 100644 --- a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java @@ -203,7 +203,7 @@ public void trimUnreferencedTranslogFiles() throws TranslogException { engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {}, - engineConfig.getPrimaryModeSupplier() + engineConfig.getIndexShardConfig() ) ) { translog.trimUnreferencedReaders(); diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index 80cef214a08cd..3fdfea9e2e925 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -278,7 +278,7 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(), seqNo -> {}, - config.getPrimaryModeSupplier() + config.getIndexShardConfig() ) ) { return translog.stats(); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 1d0184de9d93c..05871899bcd82 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -216,6 +216,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; @@ -337,6 +338,24 @@ Runnable getGlobalCheckpointSyncer() { private final List internalRefreshListener = new ArrayList<>(); + static public class IndexShardConfig { + BooleanSupplier primaryModeSupplier; + BooleanSupplier relocatingSupplier; + + public IndexShardConfig(BooleanSupplier primaryModeSupplier, BooleanSupplier relocatingSupplier) { + this.primaryModeSupplier = primaryModeSupplier; + this.relocatingSupplier = relocatingSupplier; + } + + public BooleanSupplier getPrimaryModeSupplier() { + return primaryModeSupplier; + } + + public BooleanSupplier getRelocatingSupplier() { + return relocatingSupplier; + } + } + public IndexShard( final ShardRouting shardRouting, final IndexSettings indexSettings, @@ -3733,7 +3752,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro () -> getOperationPrimaryTerm(), tombstoneDocSupplier(), isReadOnlyReplica, - replicationTracker::isPrimaryMode, + new IndexShardConfig(replicationTracker::isPrimaryMode, () -> shardRouting.relocating()), translogFactorySupplier.apply(indexSettings, shardRouting), isTimeSeriesDescSortOptimizationEnabled() ? DataStream.TIMESERIES_LEAF_SORTER : null // DESC @timestamp default order for // timeseries diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java index d7be1250c0b5b..d432bfa2e0fb8 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java @@ -8,8 +8,9 @@ package org.opensearch.index.translog; +import org.opensearch.index.shard.IndexShard; + import java.io.IOException; -import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; @@ -22,13 +23,13 @@ public class InternalTranslogFactory implements TranslogFactory { @Override public Translog newTranslog( - TranslogConfig translogConfig, - String translogUUID, - TranslogDeletionPolicy translogDeletionPolicy, - LongSupplier globalCheckpointSupplier, - LongSupplier primaryTermSupplier, - LongConsumer persistedSequenceNumberConsumer, - BooleanSupplier primaryModeSupplier + TranslogConfig translogConfig, + String translogUUID, + TranslogDeletionPolicy translogDeletionPolicy, + LongSupplier globalCheckpointSupplier, + LongSupplier primaryTermSupplier, + LongConsumer persistedSequenceNumberConsumer, + IndexShard.IndexShardConfig indexShardConfig ) throws IOException { return new LocalTranslog( diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 6b51cb17dcc41..c62270372f310 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -17,12 +17,12 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.engine.LifecycleAware; import org.opensearch.index.seqno.LocalCheckpointTracker; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.translog.listener.TranslogEventListener; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -57,7 +57,7 @@ public InternalTranslogManager( TranslogEventListener translogEventListener, LifecycleAware engineLifeCycleAware, TranslogFactory translogFactory, - BooleanSupplier primaryModeSupplier + IndexShard.IndexShardConfig indexShardConfig ) throws IOException { this.shardId = shardId; this.readLock = readLock; @@ -70,7 +70,7 @@ public InternalTranslogManager( if (tracker != null) { tracker.markSeqNoAsPersisted(seqNo); } - }, translogUUID, translogFactory, primaryModeSupplier); + }, translogUUID, translogFactory, indexShardConfig); assert translog.getGeneration() != null; this.translog = translog; assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; @@ -357,7 +357,7 @@ protected Translog openTranslog( LongConsumer persistedSequenceNumberConsumer, String translogUUID, TranslogFactory translogFactory, - BooleanSupplier primaryModeSupplier + IndexShard.IndexShardConfig indexShardConfig ) throws IOException { return translogFactory.newTranslog( translogConfig, @@ -366,7 +366,7 @@ protected Translog openTranslog( globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer, - primaryModeSupplier + indexShardConfig ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java index 339e16db6f360..34d5e178d73af 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog; +import org.opensearch.index.shard.IndexShard; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryMissingException; @@ -15,7 +16,6 @@ import org.opensearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -54,7 +54,7 @@ public Translog newTranslog( LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier, LongConsumer persistedSequenceNumberConsumer, - BooleanSupplier primaryModeSupplier + IndexShard.IndexShardConfig indexShardConfig ) throws IOException { assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; @@ -68,7 +68,7 @@ public Translog newTranslog( persistedSequenceNumberConsumer, blobStoreRepository, threadPool, - primaryModeSupplier + indexShardConfig ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index b23374a2cce3b..f087050e75008 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -17,6 +17,7 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.util.FileSystemUtils; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.index.translog.transfer.FileTransferTracker; import org.opensearch.index.translog.transfer.TransferSnapshot; @@ -55,6 +56,7 @@ public class RemoteFsTranslog extends Translog { private final TranslogTransferManager translogTransferManager; private final FileTransferTracker fileTransferTracker; private final BooleanSupplier primaryModeSupplier; + private final BooleanSupplier relocatingSupplier; private volatile long maxRemoteTranslogGenerationUploaded; private volatile long minSeqNoToKeep; @@ -80,12 +82,13 @@ public RemoteFsTranslog( LongConsumer persistedSequenceNumberConsumer, BlobStoreRepository blobStoreRepository, ThreadPool threadPool, - BooleanSupplier primaryModeSupplier + IndexShard.IndexShardConfig indexShardConfig ) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); logger = Loggers.getLogger(getClass(), shardId); this.blobStoreRepository = blobStoreRepository; - this.primaryModeSupplier = primaryModeSupplier; + this.primaryModeSupplier = indexShardConfig.getPrimaryModeSupplier(); + this.relocatingSupplier = indexShardConfig.getRelocatingSupplier(); fileTransferTracker = new FileTransferTracker(shardId); this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, threadPool, shardId, fileTransferTracker); try { @@ -385,6 +388,10 @@ public void trimUnreferencedReaders() throws IOException { return; } + if(relocatingSupplier.getAsBoolean() == true) { + return; + } + // cleans up remote translog files not referenced in latest uploaded metadata. // This enables us to restore translog from the metadata in case of failover or relocation. Set generationsToDelete = new HashSet<>(); diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java index ab8e2b7752e66..488222f0211e2 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java @@ -8,8 +8,9 @@ package org.opensearch.index.translog; +import org.opensearch.index.shard.IndexShard; + import java.io.IOException; -import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; @@ -29,6 +30,6 @@ Translog newTranslog( final LongSupplier globalCheckpointSupplier, final LongSupplier primaryTermSupplier, final LongConsumer persistedSequenceNumberConsumer, - final BooleanSupplier primaryModeSupplier + final IndexShard.IndexShardConfig indexShardConfig ) throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java index adeeb213b2913..ee85caae2232b 100644 --- a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java @@ -12,10 +12,10 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.engine.LifecycleAware; import org.opensearch.index.seqno.LocalCheckpointTracker; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.translog.listener.TranslogEventListener; import java.io.IOException; -import java.util.function.BooleanSupplier; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -38,7 +38,7 @@ public WriteOnlyTranslogManager( TranslogEventListener translogEventListener, LifecycleAware engineLifecycleAware, TranslogFactory translogFactory, - BooleanSupplier primaryModeSupplier + IndexShard.IndexShardConfig indexShardConfig ) throws IOException { super( translogConfig, @@ -52,7 +52,7 @@ public WriteOnlyTranslogManager( translogEventListener, engineLifecycleAware, translogFactory, - primaryModeSupplier + indexShardConfig ); } diff --git a/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java b/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java index bf9a86cff8b76..2e9d1bd3ee723 100644 --- a/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java @@ -16,6 +16,7 @@ import org.opensearch.index.codec.CodecService; import org.opensearch.index.codec.CodecServiceFactory; import org.opensearch.index.seqno.RetentionLeases; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.TranslogDeletionPolicy; import org.opensearch.index.translog.TranslogDeletionPolicyFactory; @@ -68,7 +69,7 @@ public void testCreateEngineConfigFromFactory() { null, null, false, - () -> Boolean.TRUE, + new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE), new InternalTranslogFactory(), null ); @@ -148,7 +149,7 @@ public void testCreateCodecServiceFromFactory() { null, null, false, - () -> Boolean.TRUE, + new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE), new InternalTranslogFactory(), null ); diff --git a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java index 2de36574064cb..c87b09961465f 100644 --- a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java @@ -14,6 +14,7 @@ import org.opensearch.index.mapper.ParsedDocument; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.translog.listener.TranslogEventListener; import java.io.IOException; @@ -23,9 +24,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; +import static org.hamcrest.Matchers.equalTo; import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; -import static org.hamcrest.Matchers.equalTo; public class InternalTranslogManagerTests extends TranslogManagerTestCase { @@ -49,7 +50,7 @@ public void testRecoveryFromTranslog() throws IOException { TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, () -> {}, new InternalTranslogFactory(), - () -> Boolean.TRUE + new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE) ); final int docs = randomIntBetween(1, 100); for (int i = 0; i < docs; i++) { @@ -89,7 +90,7 @@ public void onBeginTranslogRecovery() { }, () -> {}, new InternalTranslogFactory(), - () -> Boolean.TRUE + new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE) ); AtomicInteger opsRecovered = new AtomicInteger(); int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { @@ -128,7 +129,7 @@ public void testTranslogRollsGeneration() throws IOException { TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, () -> {}, new InternalTranslogFactory(), - () -> Boolean.TRUE + new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE) ); final int docs = randomIntBetween(1, 100); for (int i = 0; i < docs; i++) { @@ -158,7 +159,7 @@ public void testTranslogRollsGeneration() throws IOException { TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, () -> {}, new InternalTranslogFactory(), - () -> Boolean.TRUE + new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE) ); AtomicInteger opsRecovered = new AtomicInteger(); int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { @@ -193,7 +194,7 @@ public void testTrimOperationsFromTranslog() throws IOException { TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, () -> {}, new InternalTranslogFactory(), - () -> Boolean.TRUE + new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE) ); final int docs = randomIntBetween(1, 100); for (int i = 0; i < docs; i++) { @@ -225,7 +226,7 @@ public void testTrimOperationsFromTranslog() throws IOException { TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, () -> {}, new InternalTranslogFactory(), - () -> Boolean.TRUE + new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE) ); AtomicInteger opsRecovered = new AtomicInteger(); int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { @@ -274,7 +275,7 @@ public void onAfterTranslogSync() { }, () -> {}, new InternalTranslogFactory(), - () -> Boolean.TRUE + new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE) ); translogManagerAtomicReference.set(translogManager); Engine.Index index = indexForDoc(doc); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index e13c0be93c6fe..ad7accb6869d4 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -15,6 +15,8 @@ import org.apache.lucene.store.DataOutput; import org.apache.lucene.tests.mockfile.FilterFileChannel; import org.apache.lucene.tests.util.LuceneTestCase; +import org.junit.After; +import org.junit.Before; import org.opensearch.OpenSearchException; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; @@ -43,6 +45,7 @@ import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.LocalCheckpointTrackerTests; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.common.ReplicationType; @@ -53,8 +56,6 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; -import org.junit.After; -import org.junit.Before; import java.io.Closeable; import java.io.EOFException; @@ -89,14 +90,14 @@ import java.util.zip.CRC32; import java.util.zip.CheckedInputStream; -import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; -import static org.opensearch.index.translog.RemoteFsTranslog.TRANSLOG; -import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; -import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; +import static org.opensearch.index.translog.RemoteFsTranslog.TRANSLOG; +import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; +import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; @LuceneTestCase.SuppressFileSystems("ExtrasFS") @@ -172,7 +173,7 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin getPersistedSeqNoConsumer(), repository, threadPool, - primaryMode::get + new IndexShard.IndexShardConfig(primaryMode::get, () -> Boolean.FALSE) ); } @@ -1223,7 +1224,7 @@ public int write(ByteBuffer src) throws IOException { persistedSeqNos::add, repository, threadPool, - () -> Boolean.TRUE + new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE) ) { @Override ChannelFactory getChannelFactory() { @@ -1329,7 +1330,7 @@ public void force(boolean metaData) throws IOException { persistedSeqNos::add, repository, threadPool, - () -> Boolean.TRUE + new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE) ) { @Override ChannelFactory getChannelFactory() { From 091e4ca06e8dc827dd59b5647c72c8af1f1e91d8 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Tue, 29 Aug 2023 10:02:40 +0530 Subject: [PATCH 2/3] renaming to index shard config supplier Signed-off-by: Gaurav Bafna --- .../RemoteIndexPrimaryRelocationIT.java | 2 +- .../opensearch/index/engine/EngineConfig.java | 14 +++++++------- .../index/engine/EngineConfigFactory.java | 4 ++-- .../index/engine/InternalEngine.java | 2 +- .../index/engine/NRTReplicationEngine.java | 2 +- .../opensearch/index/engine/NoOpEngine.java | 2 +- .../index/engine/ReadOnlyEngine.java | 2 +- .../org/opensearch/index/shard/IndexShard.java | 11 ++++++++--- .../translog/InternalTranslogFactory.java | 14 +++++++------- .../translog/InternalTranslogManager.java | 8 ++++---- ...RemoteBlobStoreInternalTranslogFactory.java | 4 ++-- .../index/translog/RemoteFsTranslog.java | 8 ++++---- .../index/translog/TranslogFactory.java | 2 +- .../translog/WriteOnlyTranslogManager.java | 4 ++-- .../index/engine/EngineConfigFactoryTests.java | 4 ++-- .../translog/InternalTranslogManagerTests.java | 16 ++++++++-------- .../index/translog/RemoteFSTranslogTests.java | 18 +++++++++--------- 17 files changed, 61 insertions(+), 56 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java index 8792e959751e8..97c44280f4429 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java @@ -63,7 +63,7 @@ protected Settings featureFlagSettings() { .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") .build(); } - + public void testPrimaryRelocationWhileIndexing() throws Exception { super.testPrimaryRelocationWhileIndexing(); } diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java index 34371cb6a1985..1cd90bc2c620b 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java @@ -105,7 +105,7 @@ public final class EngineConfig { private final LongSupplier globalCheckpointSupplier; private final Supplier retentionLeasesSupplier; private final boolean isReadOnlyReplica; - private final IndexShard.IndexShardConfig indexShardConfig; + private final IndexShard.IndexShardConfigSupplier indexShardConfigSupplier; private final Comparator leafSorter; /** @@ -266,7 +266,7 @@ private EngineConfig(Builder builder) { this.primaryTermSupplier = builder.primaryTermSupplier; this.tombstoneDocSupplier = builder.tombstoneDocSupplier; this.isReadOnlyReplica = builder.isReadOnlyReplica; - this.indexShardConfig = builder.indexShardConfig; + this.indexShardConfigSupplier = builder.indexShardConfigSupplier; this.translogFactory = builder.translogFactory; this.leafSorter = builder.leafSorter; } @@ -477,8 +477,8 @@ public boolean isReadOnlyReplica() { * Returns the underlying primaryModeSupplier. * @return the primary mode supplier. */ - public IndexShard.IndexShardConfig getIndexShardConfig() { - return indexShardConfig; + public IndexShard.IndexShardConfigSupplier getIndexShardConfigSupplier() { + return indexShardConfigSupplier; } /** @@ -555,7 +555,7 @@ public static class Builder { private TombstoneDocSupplier tombstoneDocSupplier; private TranslogDeletionPolicyFactory translogDeletionPolicyFactory; private boolean isReadOnlyReplica; - private IndexShard.IndexShardConfig indexShardConfig; + private IndexShard.IndexShardConfigSupplier indexShardConfigSupplier; private TranslogFactory translogFactory = new InternalTranslogFactory(); Comparator leafSorter; @@ -679,8 +679,8 @@ public Builder readOnlyReplica(boolean isReadOnlyReplica) { return this; } - public Builder indexShardConfig(IndexShard.IndexShardConfig indexShardConfig) { - this.indexShardConfig = indexShardConfig; + public Builder indexShardConfig(IndexShard.IndexShardConfigSupplier indexShardConfigSupplier) { + this.indexShardConfigSupplier = indexShardConfigSupplier; return this; } diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java index 4b3220a420efd..462942ae1450d 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java @@ -152,7 +152,7 @@ public EngineConfig newEngineConfig( LongSupplier primaryTermSupplier, EngineConfig.TombstoneDocSupplier tombstoneDocSupplier, boolean isReadOnlyReplica, - IndexShard.IndexShardConfig indexShardConfig, + IndexShard.IndexShardConfigSupplier indexShardConfigSupplier, TranslogFactory translogFactory, Comparator leafSorter ) { @@ -185,7 +185,7 @@ public EngineConfig newEngineConfig( .primaryTermSupplier(primaryTermSupplier) .tombstoneDocSupplier(tombstoneDocSupplier) .readOnlyReplica(isReadOnlyReplica) - .indexShardConfig(indexShardConfig) + .indexShardConfig(indexShardConfigSupplier) .translogFactory(translogFactory) .leafSorter(leafSorter) .build(); diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 4e289def2adb6..ff9a8877f3959 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -292,7 +292,7 @@ public void onFailure(String reason, Exception ex) { new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId), this::ensureOpen, engineConfig.getTranslogFactory(), - engineConfig.getIndexShardConfig() + engineConfig.getIndexShardConfigSupplier() ); this.translogManager = translogManagerRef; this.softDeletesPolicy = newSoftDeletesPolicy(); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 758d034fcb909..20aaf09381795 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -118,7 +118,7 @@ public void onAfterTranslogSync() { }, this, engineConfig.getTranslogFactory(), - engineConfig.getIndexShardConfig() + engineConfig.getIndexShardConfigSupplier() ); this.translogManager = translogManagerRef; } catch (IOException e) { diff --git a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java index 1417c50e6ab63..b49d4660ae8ea 100644 --- a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java @@ -203,7 +203,7 @@ public void trimUnreferencedTranslogFiles() throws TranslogException { engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {}, - engineConfig.getIndexShardConfig() + engineConfig.getIndexShardConfigSupplier() ) ) { translog.trimUnreferencedReaders(); diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index 3fdfea9e2e925..168306def7b56 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -278,7 +278,7 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(), seqNo -> {}, - config.getIndexShardConfig() + config.getIndexShardConfigSupplier() ) ) { return translog.stats(); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 05871899bcd82..df243fda4178d 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -338,11 +338,16 @@ Runnable getGlobalCheckpointSyncer() { private final List internalRefreshListener = new ArrayList<>(); - static public class IndexShardConfig { + /** + * An OpenSearch index shard config supplier + * + * @opensearch.internal + */ + static public class IndexShardConfigSupplier { BooleanSupplier primaryModeSupplier; BooleanSupplier relocatingSupplier; - public IndexShardConfig(BooleanSupplier primaryModeSupplier, BooleanSupplier relocatingSupplier) { + public IndexShardConfigSupplier(BooleanSupplier primaryModeSupplier, BooleanSupplier relocatingSupplier) { this.primaryModeSupplier = primaryModeSupplier; this.relocatingSupplier = relocatingSupplier; } @@ -3752,7 +3757,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro () -> getOperationPrimaryTerm(), tombstoneDocSupplier(), isReadOnlyReplica, - new IndexShardConfig(replicationTracker::isPrimaryMode, () -> shardRouting.relocating()), + new IndexShardConfigSupplier(replicationTracker::isPrimaryMode, () -> shardRouting.relocating()), translogFactorySupplier.apply(indexSettings, shardRouting), isTimeSeriesDescSortOptimizationEnabled() ? DataStream.TIMESERIES_LEAF_SORTER : null // DESC @timestamp default order for // timeseries diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java index d432bfa2e0fb8..84cb2e247c211 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java @@ -23,13 +23,13 @@ public class InternalTranslogFactory implements TranslogFactory { @Override public Translog newTranslog( - TranslogConfig translogConfig, - String translogUUID, - TranslogDeletionPolicy translogDeletionPolicy, - LongSupplier globalCheckpointSupplier, - LongSupplier primaryTermSupplier, - LongConsumer persistedSequenceNumberConsumer, - IndexShard.IndexShardConfig indexShardConfig + TranslogConfig translogConfig, + String translogUUID, + TranslogDeletionPolicy translogDeletionPolicy, + LongSupplier globalCheckpointSupplier, + LongSupplier primaryTermSupplier, + LongConsumer persistedSequenceNumberConsumer, + IndexShard.IndexShardConfigSupplier indexShardConfigSupplier ) throws IOException { return new LocalTranslog( diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index c62270372f310..5661ed5f47f8f 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -57,7 +57,7 @@ public InternalTranslogManager( TranslogEventListener translogEventListener, LifecycleAware engineLifeCycleAware, TranslogFactory translogFactory, - IndexShard.IndexShardConfig indexShardConfig + IndexShard.IndexShardConfigSupplier indexShardConfigSupplier ) throws IOException { this.shardId = shardId; this.readLock = readLock; @@ -70,7 +70,7 @@ public InternalTranslogManager( if (tracker != null) { tracker.markSeqNoAsPersisted(seqNo); } - }, translogUUID, translogFactory, indexShardConfig); + }, translogUUID, translogFactory, indexShardConfigSupplier); assert translog.getGeneration() != null; this.translog = translog; assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; @@ -357,7 +357,7 @@ protected Translog openTranslog( LongConsumer persistedSequenceNumberConsumer, String translogUUID, TranslogFactory translogFactory, - IndexShard.IndexShardConfig indexShardConfig + IndexShard.IndexShardConfigSupplier indexShardConfigSupplier ) throws IOException { return translogFactory.newTranslog( translogConfig, @@ -366,7 +366,7 @@ protected Translog openTranslog( globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer, - indexShardConfig + indexShardConfigSupplier ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java index 34d5e178d73af..fefd0e2118223 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java @@ -54,7 +54,7 @@ public Translog newTranslog( LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier, LongConsumer persistedSequenceNumberConsumer, - IndexShard.IndexShardConfig indexShardConfig + IndexShard.IndexShardConfigSupplier indexShardConfigSupplier ) throws IOException { assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; @@ -68,7 +68,7 @@ public Translog newTranslog( persistedSequenceNumberConsumer, blobStoreRepository, threadPool, - indexShardConfig + indexShardConfigSupplier ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index f087050e75008..214a794e639da 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -82,13 +82,13 @@ public RemoteFsTranslog( LongConsumer persistedSequenceNumberConsumer, BlobStoreRepository blobStoreRepository, ThreadPool threadPool, - IndexShard.IndexShardConfig indexShardConfig + IndexShard.IndexShardConfigSupplier indexShardConfigSupplier ) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); logger = Loggers.getLogger(getClass(), shardId); this.blobStoreRepository = blobStoreRepository; - this.primaryModeSupplier = indexShardConfig.getPrimaryModeSupplier(); - this.relocatingSupplier = indexShardConfig.getRelocatingSupplier(); + this.primaryModeSupplier = indexShardConfigSupplier.getPrimaryModeSupplier(); + this.relocatingSupplier = indexShardConfigSupplier.getRelocatingSupplier(); fileTransferTracker = new FileTransferTracker(shardId); this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, threadPool, shardId, fileTransferTracker); try { @@ -388,7 +388,7 @@ public void trimUnreferencedReaders() throws IOException { return; } - if(relocatingSupplier.getAsBoolean() == true) { + if (relocatingSupplier.getAsBoolean() == true) { return; } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java index 488222f0211e2..24a3c1851cb52 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java @@ -30,6 +30,6 @@ Translog newTranslog( final LongSupplier globalCheckpointSupplier, final LongSupplier primaryTermSupplier, final LongConsumer persistedSequenceNumberConsumer, - final IndexShard.IndexShardConfig indexShardConfig + final IndexShard.IndexShardConfigSupplier indexShardConfigSupplier ) throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java index ee85caae2232b..1244d2d1635be 100644 --- a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java @@ -38,7 +38,7 @@ public WriteOnlyTranslogManager( TranslogEventListener translogEventListener, LifecycleAware engineLifecycleAware, TranslogFactory translogFactory, - IndexShard.IndexShardConfig indexShardConfig + IndexShard.IndexShardConfigSupplier indexShardConfigSupplier ) throws IOException { super( translogConfig, @@ -52,7 +52,7 @@ public WriteOnlyTranslogManager( translogEventListener, engineLifecycleAware, translogFactory, - indexShardConfig + indexShardConfigSupplier ); } diff --git a/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java b/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java index 2e9d1bd3ee723..ee6be8bbe9052 100644 --- a/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java @@ -69,7 +69,7 @@ public void testCreateEngineConfigFromFactory() { null, null, false, - new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE), + new IndexShard.IndexShardConfigSupplier(() -> Boolean.TRUE, () -> Boolean.FALSE), new InternalTranslogFactory(), null ); @@ -149,7 +149,7 @@ public void testCreateCodecServiceFromFactory() { null, null, false, - new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE), + new IndexShard.IndexShardConfigSupplier(() -> Boolean.TRUE, () -> Boolean.FALSE), new InternalTranslogFactory(), null ); diff --git a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java index c87b09961465f..efb0ddccfa9f0 100644 --- a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java @@ -24,9 +24,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static org.hamcrest.Matchers.equalTo; import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; +import static org.hamcrest.Matchers.equalTo; public class InternalTranslogManagerTests extends TranslogManagerTestCase { @@ -50,7 +50,7 @@ public void testRecoveryFromTranslog() throws IOException { TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, () -> {}, new InternalTranslogFactory(), - new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE) + new IndexShard.IndexShardConfigSupplier(() -> Boolean.TRUE, () -> Boolean.FALSE) ); final int docs = randomIntBetween(1, 100); for (int i = 0; i < docs; i++) { @@ -90,7 +90,7 @@ public void onBeginTranslogRecovery() { }, () -> {}, new InternalTranslogFactory(), - new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE) + new IndexShard.IndexShardConfigSupplier(() -> Boolean.TRUE, () -> Boolean.FALSE) ); AtomicInteger opsRecovered = new AtomicInteger(); int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { @@ -129,7 +129,7 @@ public void testTranslogRollsGeneration() throws IOException { TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, () -> {}, new InternalTranslogFactory(), - new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE) + new IndexShard.IndexShardConfigSupplier(() -> Boolean.TRUE, () -> Boolean.FALSE) ); final int docs = randomIntBetween(1, 100); for (int i = 0; i < docs; i++) { @@ -159,7 +159,7 @@ public void testTranslogRollsGeneration() throws IOException { TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, () -> {}, new InternalTranslogFactory(), - new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE) + new IndexShard.IndexShardConfigSupplier(() -> Boolean.TRUE, () -> Boolean.FALSE) ); AtomicInteger opsRecovered = new AtomicInteger(); int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { @@ -194,7 +194,7 @@ public void testTrimOperationsFromTranslog() throws IOException { TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, () -> {}, new InternalTranslogFactory(), - new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE) + new IndexShard.IndexShardConfigSupplier(() -> Boolean.TRUE, () -> Boolean.FALSE) ); final int docs = randomIntBetween(1, 100); for (int i = 0; i < docs; i++) { @@ -226,7 +226,7 @@ public void testTrimOperationsFromTranslog() throws IOException { TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, () -> {}, new InternalTranslogFactory(), - new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE) + new IndexShard.IndexShardConfigSupplier(() -> Boolean.TRUE, () -> Boolean.FALSE) ); AtomicInteger opsRecovered = new AtomicInteger(); int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { @@ -275,7 +275,7 @@ public void onAfterTranslogSync() { }, () -> {}, new InternalTranslogFactory(), - new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE) + new IndexShard.IndexShardConfigSupplier(() -> Boolean.TRUE, () -> Boolean.FALSE) ); translogManagerAtomicReference.set(translogManager); Engine.Index index = indexForDoc(doc); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index ad7accb6869d4..94b8a378e1686 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -15,8 +15,6 @@ import org.apache.lucene.store.DataOutput; import org.apache.lucene.tests.mockfile.FilterFileChannel; import org.apache.lucene.tests.util.LuceneTestCase; -import org.junit.After; -import org.junit.Before; import org.opensearch.OpenSearchException; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; @@ -56,6 +54,8 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; import java.io.Closeable; import java.io.EOFException; @@ -90,14 +90,14 @@ import java.util.zip.CRC32; import java.util.zip.CheckedInputStream; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; import static org.opensearch.index.translog.RemoteFsTranslog.TRANSLOG; import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; @LuceneTestCase.SuppressFileSystems("ExtrasFS") @@ -173,7 +173,7 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin getPersistedSeqNoConsumer(), repository, threadPool, - new IndexShard.IndexShardConfig(primaryMode::get, () -> Boolean.FALSE) + new IndexShard.IndexShardConfigSupplier(primaryMode::get, () -> Boolean.FALSE) ); } @@ -1224,7 +1224,7 @@ public int write(ByteBuffer src) throws IOException { persistedSeqNos::add, repository, threadPool, - new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE) + new IndexShard.IndexShardConfigSupplier(() -> Boolean.TRUE, () -> Boolean.FALSE) ) { @Override ChannelFactory getChannelFactory() { @@ -1330,7 +1330,7 @@ public void force(boolean metaData) throws IOException { persistedSeqNos::add, repository, threadPool, - new IndexShard.IndexShardConfig(() -> Boolean.TRUE, () -> Boolean.FALSE) + new IndexShard.IndexShardConfigSupplier(() -> Boolean.TRUE, () -> Boolean.FALSE) ) { @Override ChannelFactory getChannelFactory() { From 00305d8455d85a72d8e506da0bb8bc112e391eea Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Tue, 29 Aug 2023 18:34:36 +0530 Subject: [PATCH 3/3] PR Comments Signed-off-by: Gaurav Bafna --- .../RemoteIndexPrimaryRelocationIT.java | 4 -- .../index/translog/RemoteFsTranslog.java | 9 +-- .../index/translog/RemoteFSTranslogTests.java | 69 ++++++++++++++++++- 3 files changed, 73 insertions(+), 9 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java index 97c44280f4429..a9482c8c19187 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java @@ -63,8 +63,4 @@ protected Settings featureFlagSettings() { .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") .build(); } - - public void testPrimaryRelocationWhileIndexing() throws Exception { - super.testPrimaryRelocationWhileIndexing(); - } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 214a794e639da..1b6f7bc4ba01b 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -381,17 +381,18 @@ public void trimUnreferencedReaders() throws IOException { // clean up local translog files and updates readers super.trimUnreferencedReaders(); + if (relocatingSupplier.getAsBoolean() == true) { + return; + } + // Since remote generation deletion is async, this ensures that only one generation deletion happens at a time. // Remote generations involves 2 async operations - 1) Delete translog generation files 2) Delete metadata files // We try to acquire 2 permits and if we can not, we return from here itself. + // Make sure we release permits if return prematurely after acquiring permits if (remoteGenerationDeletionPermits.tryAcquire(REMOTE_DELETION_PERMITS) == false) { return; } - if (relocatingSupplier.getAsBoolean() == true) { - return; - } - // cleans up remote translog files not referenced in latest uploaded metadata. // This enables us to restore translog from the metadata in case of failover or relocation. Set generationsToDelete = new HashSet<>(); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index 94b8a378e1686..bc4bb33770959 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -86,6 +86,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.zip.CRC32; import java.util.zip.CheckedInputStream; @@ -111,6 +112,8 @@ public class RemoteFSTranslogTests extends OpenSearchTestCase { // A default primary term is used by translog instances created in this test. private final AtomicLong primaryTerm = new AtomicLong(); private final AtomicBoolean primaryMode = new AtomicBoolean(true); + + private BooleanSupplier relocatingMode = () -> false; private final AtomicReference persistedSeqNoConsumer = new AtomicReference<>(); private ThreadPool threadPool; private final static String METADATA_DIR = "metadata"; @@ -157,6 +160,31 @@ private RemoteFsTranslog create(Path path) throws IOException { return create(path, createRepository(), translogUUID); } + private RemoteFsTranslog create(Path path, ThreadPool threadPool) throws IOException { + final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); + return create(path, createRepository(), translogUUID, threadPool); + } + + private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID, ThreadPool threadPool) + throws IOException { + this.repository = repository; + globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final TranslogConfig translogConfig = getTranslogConfig(path); + final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); + blobStoreTransferService = new BlobStoreTransferService(repository.blobStore(), threadPool); + return new RemoteFsTranslog( + translogConfig, + translogUUID, + deletionPolicy, + () -> globalCheckpoint.get(), + primaryTerm::get, + getPersistedSeqNoConsumer(), + repository, + threadPool, + new IndexShard.IndexShardConfigSupplier(primaryMode::get, relocatingMode) + ); + } + private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID) throws IOException { this.repository = repository; globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); @@ -173,7 +201,7 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin getPersistedSeqNoConsumer(), repository, threadPool, - new IndexShard.IndexShardConfigSupplier(primaryMode::get, () -> Boolean.FALSE) + new IndexShard.IndexShardConfigSupplier(primaryMode::get, relocatingMode) ); } @@ -372,6 +400,45 @@ public void testReadLocationDownload() throws IOException { } } + public void testDeletionWithRelocation() throws IOException { + relocatingMode = () -> true; + + // Creating RemoteFsTranslog with the same location + translogDir = createTempDir(); + RemoteFsTranslog translog = create(translogDir, this.threadPool); + + ArrayList ops = new ArrayList<>(); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 })); + + assertEquals(6, translog.allUploaded().size()); + + translog.setMinSeqNoToKeep(1); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 2, primaryTerm.get(), new byte[] { 1 })); + + // cleans up local + translog.trimUnreferencedReaders(); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 3, primaryTerm.get(), new byte[] { 1 })); + // cleans up remote files only if relocating mode is false + translog.trimUnreferencedReaders(); + + assertEquals( + 10, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + + try { + translog.close(); + } catch (Exception e) { + // Ignoring this exception for now. Once the download flow populates FileTracker, + // we can remove this try-catch block + } + + // resetting to false for other tests + relocatingMode = () -> false; + } + public void testSnapshotWithNewTranslog() throws IOException { List toClose = new ArrayList<>(); try {