From cc61ad4b28675b50252d759b54db124af0515a37 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Tue, 17 Sep 2024 15:20:02 +0530 Subject: [PATCH] More change Signed-off-by: Gaurav Bafna --- .../remotestore/RemoteRestoreSnapshotIT.java | 55 +++++ .../cluster/SnapshotsInProgress.java | 8 +- .../blobstore/BlobStoreRepository.java | 3 + .../snapshots/SnapshotsService.java | 202 ++++++++---------- 4 files changed, 157 insertions(+), 111 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java index 0acb578e2e7bf..44815dfed90b1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java @@ -997,6 +997,61 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(1)); } + public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws InterruptedException, ExecutionException, IOException { + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String snapshotRepoName = "test-create-snapshot-repo"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + Settings.Builder settings = Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true); + createRepository(snapshotRepoName, FsRepository.TYPE, settings); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + int concurrentSnapshots = 5; + + String snapshotName = "snapshot-concurrent-"; + CreateSnapshotResponse createSnapshotResponse2 = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName) + .setWaitForCompletion(false) + .get(); + + //restart existing master + final String clusterManagerNode = internalCluster().getClusterManagerName(); + stopNode(clusterManagerNode); + + // Validate that only one snapshot has been created + Repository repository = internalCluster().getInstance(RepositoriesService.class).repository(snapshotRepoName); + PlainActionFuture repositoryDataPlainActionFuture = new PlainActionFuture<>(); + repository.getRepositoryData(repositoryDataPlainActionFuture); + + RepositoryData repositoryData = repositoryDataPlainActionFuture.get(); + assertEquals(repositoryData.getSnapshotIds().size() , 0); + } + public void testCreateSnapshotV2WithRedIndex() throws Exception { internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); internalCluster().startDataOnlyNode(pinnedTimestampSettings()); diff --git a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java index b9a254df6bf0b..a1a4eb3e2cecb 100644 --- a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java @@ -202,7 +202,7 @@ public static Entry startClone( Map.of(), false, false// initialising to false, will be updated in startCloning method of SnapshotsService while updating entry with - // clone jobs + // clone jobs ); } @@ -231,7 +231,8 @@ public static Entry startClone( source, Map.of(), remoteStoreIndexShallowCopyV2, - remoteStoreIndexShallowCopyV2// initialising to false, will be updated in startCloning method of SnapshotsService while updating entry with + remoteStoreIndexShallowCopyV2// initialising to false, will be updated in startCloning method of SnapshotsService while updating + // entry with // clone jobs ); } @@ -388,7 +389,8 @@ public Entry( } this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy; this.remoteStoreIndexShallowCopyV2 = remoteStoreIndexShallowCopyV2; - assert this.remoteStoreIndexShallowCopyV2 || assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones); + assert this.remoteStoreIndexShallowCopyV2 + || assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones); } private Entry(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index b954560c1bc94..4a3294c75a976 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -2441,6 +2441,7 @@ public void finalizeSnapshot( // number_of_shards) has increased. Set updatedIndexIds = writeNewIndexShardPaths(existingRepositoryData, updatedRepositoryData, snapshotId); cleanupRedundantSnapshotShardPaths(updatedIndexIds); + logger.info("update repo data for {}", snapshotInfo.snapshotId()); writeIndexGen( updatedRepositoryData, repositoryStateId, @@ -3224,6 +3225,7 @@ public ClusterState execute(ClusterState currentState) { + "] must be larger than latest known generation [" + latestKnownRepoGen.get() + "]"; + logger.info("Setting it to {} {}", safeGeneration, newGen); return ClusterState.builder(currentState) .metadata( Metadata.builder(currentState.getMetadata()) @@ -3344,6 +3346,7 @@ public ClusterState execute(ClusterState currentState) { + "]" ); } + logger.info("Done Setting it to {} {}", newGen, newGen); return updateRepositoryGenerationsIfNecessary( stateFilter.apply( ClusterState.builder(currentState) diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 80a452b9a0cd2..d077888497ad1 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -554,7 +554,7 @@ public void onResponse(RepositoryData repositoryData) { @Override public void onFailure(Exception e) { - logger.error("Failed to upload files to snapshot repo {} for snapshot-v2 {} ", repositoryName, snapshotName); + logger.error("Failed to upload files to snapshot repo {} for snapshot-v2 {} due to {} ", repositoryName, snapshotName, e); listener.onFailure(e); } } @@ -569,18 +569,18 @@ public void onFailure(Exception e) { } - /** - * Initializes the snapshotting process for clients when Snapshot v2 is enabled. This method is responsible for taking - * a shallow snapshot and pinning the snapshot timestamp.The entire process is executed on the cluster manager node. - * - * Unlike traditional snapshot operations, this method performs a synchronous snapshot execution and doesn't - * upload any shard metadata to the snapshot repository. - * The pinned timestamp is later reconciled with remote store segment and translog metadata files during the restore - * operation. - * - * @param request snapshot request - * @param listener snapshot creation listener - */ + /** + * Initializes the snapshotting process for clients when Snapshot v2 is enabled. This method is responsible for taking + * a shallow snapshot and pinning the snapshot timestamp.The entire process is executed on the cluster manager node. + * + * Unlike traditional snapshot operations, this method performs a synchronous snapshot execution and doesn't + * upload any shard metadata to the snapshot repository. + * The pinned timestamp is later reconciled with remote store segment and translog metadata files during the restore + * operation. + * + * @param request snapshot request + * @param listener snapshot creation listener + */ public void createSnapshotV2(final CreateSnapshotRequest request, final ActionListener listener) { long pinnedTimestamp = System.currentTimeMillis(); final String repositoryName = request.repository(); @@ -611,86 +611,74 @@ public ClusterState execute(ClusterState currentState) { snapshot = new Snapshot(repositoryName, snapshotId); final Map userMeta = repository.adaptUserMetadata(request.userMetadata()); - createSnapshotPreValidations(currentState, repositoryData, repositoryName, snapshotName); + createSnapshotPreValidations(currentState, repositoryData, repositoryName, snapshotName); - List indices = new ArrayList<>(currentState.metadata().indices().keySet()); + List indices = new ArrayList<>(currentState.metadata().indices().keySet()); - final List dataStreams = indexNameExpressionResolver.dataStreamNames( - currentState, - request.indicesOptions(), - request.indices() - ); + final List dataStreams = indexNameExpressionResolver.dataStreamNames( + currentState, + request.indicesOptions(), + request.indices() + ); - logger.info("[{}][{}] creating snapshot-v2 for indices [{}]", repositoryName, snapshotName, indices); - - final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); - final List runningSnapshots = snapshots.entries(); - if (runningSnapshots.isEmpty() == false) { - // fail if ongoing snapshots in same repository. - for (SnapshotsInProgress.Entry entry : runningSnapshots) { - if (entry.repository() == repositoryName) { - throw new SnapshotException( - new Snapshot(repositoryName, snapshotId), - "Already a snapshot running in this repository"); - } - } - } - final List indexIds = repositoryData.resolveNewIndices( - indices, - getInFlightIndexIds(runningSnapshots, repositoryName), - IndexId.DEFAULT_SHARD_PATH_TYPE - ); - final Version version = minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null); + logger.info("[{}][{}] creating snapshot-v2 for indices [{}]", repositoryName, snapshotName, indices); - if (repositoryData.getGenId() == RepositoryData.UNKNOWN_REPO_GEN) { - logger.debug("[{}] was aborted before starting", snapshot); - throw new SnapshotException(snapshot, "Aborted on initialization"); - } - final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom( - SnapshotDeletionsInProgress.TYPE, - SnapshotDeletionsInProgress.EMPTY - ); -// Map shards = shards( -// snapshots, -// deletionsInProgress, -// currentState.metadata(), -// currentState.routingTable(), -// indexIds, -// repositoryData, -// repositoryName -// ); - - Map shards = new HashMap<>(); - - newEntry = SnapshotsInProgress.startedEntry( - new Snapshot(repositoryName, snapshotId), - request.includeGlobalState(), - request.partial(), - indexIds, - dataStreams, - threadPool.absoluteTimeInMillis(), - repositoryData.getGenId(), - shards, - userMeta, - version, - true, - true - ); - final List newEntries = new ArrayList<>(runningSnapshots); - newEntries.add(newEntry); - logger.info("Gen id is {}", repositoryData.getGenId()); - return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(new ArrayList<>(newEntries))).build(); + final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); + final List runningSnapshots = snapshots.entries(); + if (tryEnterRepoLoop(repositoryName) == false) { + throw new ConcurrentSnapshotExecutionException( + repositoryName, + snapshotName, + "cannot start snapshot-v2 while a repository is in finalization state" + ); + } + final List indexIds = repositoryData.resolveNewIndices( + indices, + getInFlightIndexIds(runningSnapshots, repositoryName), + IndexId.DEFAULT_SHARD_PATH_TYPE + ); + final Version version = minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null); + + if (repositoryData.getGenId() == RepositoryData.UNKNOWN_REPO_GEN) { + logger.debug("[{}] was aborted before starting", snapshot); + throw new SnapshotException(snapshot, "Aborted on initialization"); } + Map shards = new HashMap<>(); + + newEntry = SnapshotsInProgress.startedEntry( + new Snapshot(repositoryName, snapshotId), + request.includeGlobalState(), + request.partial(), + indexIds, + dataStreams, + threadPool.absoluteTimeInMillis(), + repositoryData.getGenId(), + shards, + userMeta, + version, + true, + true + ); + final List newEntries = new ArrayList<>(runningSnapshots); + newEntries.add(newEntry); + return ClusterState.builder(currentState) + .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(new ArrayList<>(newEntries))) + .build(); + } + @Override public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot-v2", repositoryName, snapshotName), e); listener.onFailure(e); + if ((e instanceof ConcurrentSnapshotExecutionException) == false) { + leaveRepoLoop(repositoryName); + } + } @Override public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { - logger.info("Processing it now {}", snapshotName); final ShardGenerations shardGenerations = buildShardsGenerationFromRepositoryData( newState.metadata(), newState.routingTable(), @@ -720,8 +708,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl final Version version = minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null); final StepListener pinnedTimestampListener = new StepListener<>(); pinnedTimestampListener.whenComplete(repoData -> { - logger.info("Snapshot is completed now {}", snapshotName); - listener.onResponse(snapshotInfo); }, listener::onFailure); + listener.onResponse(snapshotInfo); + }, listener::onFailure); repository.finalizeSnapshot( shardGenerations, repositoryData.getGenId(), @@ -741,26 +729,28 @@ public void onResponse(RepositoryData repositoryData) { listener.onFailure( new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2, no longer cluster manager") ); + return; } + logger.info("Process it now"); + leaveRepoLoop(repositoryName); updateSnapshotPinnedTimestamp(repositoryData, snapshot, pinnedTimestamp, pinnedTimestampListener); } @Override public void onFailure(Exception e) { logger.error("Failed to upload files to snapshot repo {} for snapshot-v2 {} ", repositoryName, snapshotName); + leaveRepoLoop(repositoryName); listener.onFailure(e); } } ); - - } }, "create_snapshot [" + snapshotName + ']', listener::onFailure); } - private void createSnapshotPreValidations( + private void createSnapshotPreValidations( ClusterState currentState, RepositoryData repositoryData, String repositoryName, @@ -955,8 +945,7 @@ public ClusterState execute(ClusterState currentState) { if (runningSnapshots.isEmpty() == false) { // fail if ongoing snapshots in same repository. for (SnapshotsInProgress.Entry entry : runningSnapshots) { - if (entry.repository() == repositoryName) { - logger.info("Running snapshot are present"); + if (Objects.equals(entry.repository(), repositoryName)) { throw new ConcurrentSnapshotExecutionException( repositoryName, sourceSnapshotId.getName(), @@ -1678,7 +1667,8 @@ public void applyClusterState(ClusterChangedEvent event) { SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); final boolean newClusterManager = event.previousState().nodes().isLocalNodeElectedClusterManager() == false; if (newClusterManager && snapshotsInProgress.entries().isEmpty() == false) { - //clean up snapshot v2 in progress or clone v2 present + logger.info("Cleaning it now"); + // clean up snapshot v2 in progress or clone v2 present stateWithoutSnapshotv2(event.state()); } processExternalChanges( @@ -2375,7 +2365,7 @@ private static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sn return readyDeletions(result).v1(); } - private ClusterState stateWithoutSnapshotv2(ClusterState state) { + private ClusterState stateWithoutSnapshotv2(ClusterState state) { SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); ClusterState result = state; boolean changed = false; @@ -2393,33 +2383,29 @@ private ClusterState stateWithoutSnapshotv2(ClusterState state) { .build(); ClusterState finalResult = result; - clusterService.submitStateUpdateTask( - "update snapshot v2 after cluster manager switch", - new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("update snapshot v2 after cluster manager switch", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return finalResult; - } + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return finalResult; + } - @Override - public void onFailure(String source, Exception e) { - // execute never fails today, so we should never hit this. - logger.warn( - () -> new ParameterizedMessage( - "failed to remove in progress snapshot v2 state after cluster manager switch", - source - ), - e - ); - } + @Override + public void onFailure(String source, Exception e) { + // execute never fails today, so we should never hit this. + logger.warn( + () -> new ParameterizedMessage( + "failed to remove in progress snapshot v2 state after cluster manager switch", + source + ), + e + ); } - ); + }); } return result; } - /** * Removes record of running snapshot from cluster state and notifies the listener when this action is complete. This method is only * used when the snapshot fails for some reason. During normal operation the snapshot repository will remove the