diff --git a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java index acca57669a767..9a45cd80c7f7d 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java @@ -75,6 +75,13 @@ public class ShardRouting implements Writeable, ToXContentObject { private final long expectedShardSize; @Nullable private final ShardRouting targetRelocatingShard; + + /* + Local flag to denote whether the shard copy is assigned to a remote enabled node + Not serialized, meant to be accessed from the data nodes only. + Would always return `false` if accessed from the cluster manager nodes + Set on the `createShard` and `updateShard` flow from IndicesClusterStateService state applier + */ private boolean assignedToRemoteStoreNode; /** diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 1690cb8b53494..f3d4903a6d496 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -253,8 +253,6 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L private volatile ReplicationCheckpoint latestReplicationCheckpoint; - private boolean ongoingEngineMigration; - /** * Get all retention leases tracked on this shard. * @@ -676,10 +674,6 @@ public synchronized void renewPeerRecoveryRetentionLeases() { assert invariant(); } - public void setOngoingEngineMigration(boolean ongoingEngineMigration) { - this.ongoingEngineMigration = ongoingEngineMigration; - } - /** * The state of the lucene checkpoint * @@ -1095,10 +1089,6 @@ private ReplicationGroup calculateReplicationGroup() { newVersion = replicationGroup.getVersion() + 1; } - assert indexSettings().isRemoteTranslogStoreEnabled() || ongoingEngineMigration == true - || checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).allMatch(e -> e.getValue().replicated) - : "In absence of remote translog store and no-ongoing remote migration, all tracked shards must have replication mode as LOGICAL_REPLICATION"; - return new ReplicationGroup( routingTable, checkpoints.entrySet().stream().filter(e -> e.getValue().inSync).map(Map.Entry::getKey).collect(Collectors.toSet()), @@ -1255,7 +1245,12 @@ private void createReplicationLagTimers() { && replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false && isPrimaryRelocation(allocationId) == false && latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint) - && (ongoingEngineMigration && routingTable.getByAllocationId(allocationId).isAssignedToRemoteStoreNode() == true)) { + /* + Handle remote store migration cases. Replication Lag timers would be created if the node on which primary is hosted is either: + - Segrep enabled without remote store + - Destination replica shard is hosted on a remote store enabled node (Remote store enabled nodes have segrep enabled implicitly) + */ + && (indexSettings.isSegRepLocalEnabled() == true || routingTable.getByAllocationId(allocationId).isAssignedToRemoteStoreNode() == true)) { cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> new SegmentReplicationLagTimer()); logger.trace( () -> new ParameterizedMessage( @@ -1529,7 +1524,7 @@ private boolean isReplicated(String allocationId, String primaryAllocationId, St - If remote translog is enabled, then returns true if given allocation id matches the primary or it's relocation target allocation primary and primary target allocation id. - During an ongoing remote migration, the above-mentioned checks are considered when the shard is assigned to a remote store backed node */ - if (indexSettings().isRemoteTranslogStoreEnabled() || (ongoingEngineMigration == true && assignedToRemoteStoreNode == true)) { + if (indexSettings().isRemoteTranslogStoreEnabled() || assignedToRemoteStoreNode == true) { return (allocationId.equals(primaryAllocationId) || allocationId.equals(primaryTargetAllocationId)); } // For other case which is local translog, return true as the requests are replicated to all shards in the replication group. diff --git a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index b5fa2bd551e02..6c0e3a0450cd4 100644 --- a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -51,6 +51,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.tasks.TaskId; import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.indices.IndicesService; @@ -85,6 +86,7 @@ protected Logger getLogger() { return LOGGER; } + private final ClusterService clusterService; @Inject public RetentionLeaseBackgroundSyncAction( final Settings settings, @@ -108,6 +110,7 @@ public RetentionLeaseBackgroundSyncAction( Request::new, ThreadPool.Names.MANAGEMENT ); + this.clusterService = clusterService; } @Override @@ -203,7 +206,7 @@ public ReplicationMode getReplicationMode(IndexShard indexShard) { data consistency on remote to docrep shard copy failover during the migration process. */ - if (indexShard.ongoingEngineMigration()) { + if (RemoteStoreUtils.isMigrationDirectionSet(clusterService)) { return ReplicationMode.FULL_REPLICATION; } return super.getReplicationMode(indexShard); diff --git a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java index 4c449ed7338d5..9a31cce2a8fc8 100644 --- a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java @@ -54,6 +54,7 @@ import org.opensearch.core.tasks.TaskId; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexingPressureService; +import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.indices.IndicesService; @@ -87,6 +88,8 @@ protected Logger getLogger() { return LOGGER; } + private final ClusterService clusterService; + @Inject public RetentionLeaseSyncAction( final Settings settings, @@ -117,6 +120,7 @@ public RetentionLeaseSyncAction( systemIndices, tracer ); + this.clusterService = clusterService; } @Override @@ -209,7 +213,7 @@ protected void dispatchedShardOperationOnReplica(Request request, IndexShard rep @Override public ReplicationMode getReplicationMode(IndexShard indexShard) { // Unblock PRRL publication during remote store migration - if (indexShard.ongoingEngineMigration()) { + if (RemoteStoreUtils.isMigrationDirectionSet(clusterService)) { return ReplicationMode.FULL_REPLICATION; } return super.getReplicationMode(indexShard); diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index 1ed63796655df..675d60ec2b63d 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -43,14 +43,7 @@ protected boolean performAfterRefreshWithPermit(boolean didRefresh) { if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode() - && !shard.indexSettings.isSegRepWithRemoteEnabled() - /* - During remote store migration, the isSegRepWithRemoteEnabled criteria would return false - since we do not alter the remote store based index settings at that stage. Explicitly - blocking checkpoint publication from this refresh listener since it ends up interfering - with the RemoteStoreRefreshListener invocation - */ - && !shard.ongoingEngineMigration()) { + && !shard.indexSettings.isSegRepWithRemoteEnabled()) { publisher.publish(shard, shard.getLatestReplicationCheckpoint()); } return true; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index ac389f0adfa9a..bea3494e3c702 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -637,7 +637,6 @@ public void updateShardState( if (newRouting.primary()) { replicationTracker.updateFromClusterManager(applyingClusterStateVersion, inSyncAllocationIds, routingTable); - replicationTracker.setOngoingEngineMigration(ongoingEngineMigration()); } if (state == IndexShardState.POST_RECOVERY && newRouting.active()) { @@ -3524,7 +3523,6 @@ assert getLocalCheckpoint() == primaryContext.getCheckpointStates().get(allocati + "]"; synchronized (mutex) { replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex - replicationTracker.setOngoingEngineMigration(ongoingEngineMigration()); } postActivatePrimaryMode(); } @@ -4601,7 +4599,8 @@ public final boolean isSearchIdleSupported() { if (isRemoteTranslogEnabled() || indexSettings.isRemoteNode()) { return false; } - if (ongoingEngineMigration()) { + // Explicitly disable search idle during remote store migration + if (RemoteStoreUtils.isMigrationDirectionSet(clusterService)) { return false; } return indexSettings.isSegRepEnabled() == false || indexSettings.getNumberOfReplicas() == 0; @@ -5209,10 +5208,6 @@ public AsyncIOProcessor getTranslogSyncProcessor() { return translogSyncProcessor; } - public boolean ongoingEngineMigration() { - return RemoteStoreUtils.isMigrationDirectionSet(clusterService) == true; - } - static class RemoteMigrationShardState { // Set to true for any primary shard started on remote backed node relocating from docrep node diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index ce16226e97602..01fde0785ef99 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -670,6 +670,11 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR try { final long primaryTerm = state.metadata().index(shardRouting.index()).primaryTerm(shardRouting.id()); logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm); + DiscoveryNode targetNode = nodes.getLocalNode(); + // Set + if (targetNode.isRemoteStoreNode()) { + shardRouting.setAssignedToRemoteStoreNode(true); + } indicesService.createShard( shardRouting, checkpointPublisher, @@ -679,7 +684,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR failedShardHandler, globalCheckpointSyncer, retentionLeaseSyncer, - nodes.getLocalNode(), + targetNode, sourceNode, remoteStoreStatsTrackerFactory ); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index f5cf18b20c206..0dd976fc2bc9b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -27,6 +27,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.index.shard.ShardNotInPrimaryModeException; @@ -58,6 +59,7 @@ public class PublishCheckpointAction extends TransportReplicationAction< protected static Logger logger = LogManager.getLogger(PublishCheckpointAction.class); private final SegmentReplicationTargetService replicationService; + private final ClusterService clusterService; @Inject public PublishCheckpointAction( @@ -84,6 +86,7 @@ public PublishCheckpointAction( ThreadPool.Names.REFRESH ); this.replicationService = targetService; + this.clusterService = clusterService; } @Override @@ -200,7 +203,7 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh ActionListener.completeWith(listener, () -> { logger.trace(() -> new ParameterizedMessage("Checkpoint {} received on replica {}", request, replica.shardId())); // Ignore replica operation if there is an ongoing remote store migration and the replica copy is assigned to a docrep enabled node - if (replica.ongoingEngineMigration() == true && replica.routingEntry().isAssignedToRemoteStoreNode() == false) { + if (RemoteStoreUtils.isMigrationDirectionSet(clusterService) == true && replica.routingEntry().isAssignedToRemoteStoreNode() == false) { logger.trace("Received segrep checkpoint on a docrep shard copy during an ongoing remote migration. NoOp."); return new ReplicaResult(); }