From e69972b0f45cd3a5d1d2fc0f5e4c2b3d72cb2a42 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Fri, 20 Sep 2024 11:59:03 +0530 Subject: [PATCH] more changes Signed-off-by: Gaurav Bafna --- .../remotestore/RemoteRestoreSnapshotIT.java | 2 +- .../snapshots/CloneSnapshotV2IT.java | 26 +- .../cluster/SnapshotsInProgress.java | 5 +- .../blobstore/BlobStoreRepository.java | 4 +- .../snapshots/SnapshotsService.java | 353 ++++++------------ 5 files changed, 148 insertions(+), 242 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java index 89084e5d47a23..ccc337b3f1791 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java @@ -1062,7 +1062,7 @@ public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws Interr assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(1)); thread.join(); } - + public void testCreateSnapshotV2WithRedIndex() throws Exception { internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); internalCluster().startDataOnlyNode(pinnedTimestampSettings()); diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java index c6744ae62db60..689359e24c16e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotV2IT.java @@ -50,11 +50,12 @@ import org.opensearch.test.OpenSearchIntegTestCase; import java.nio.file.Path; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; -import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class CloneSnapshotV2IT extends AbstractSnapshotIntegTestCase { @@ -135,11 +136,24 @@ public void testCloneShallowCopyV2() throws Exception { awaitClusterManagerFinishRepoOperations(); // Validate that snapshot is present in repository data - PlainActionFuture repositoryDataPlainActionFutureClone = new PlainActionFuture<>(); - repository.getRepositoryData(repositoryDataPlainActionFutureClone); + waitUntil( + () -> { + PlainActionFuture repositoryDataPlainActionFutureClone = new PlainActionFuture<>(); + repository.getRepositoryData(repositoryDataPlainActionFutureClone); + + RepositoryData repositoryData1 = null; + try { + repositoryData1 = repositoryDataPlainActionFutureClone.get(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + logger.info("Testing it now "); + return (repositoryData1.getSnapshotIds().size() == 2); + }, 90, TimeUnit.SECONDS + ); - repositoryData = repositoryDataPlainActionFutureClone.get(); - assertEquals(repositoryData.getSnapshotIds().size(), 2); boolean foundCloneInRepoData = false; SnapshotId cloneSnapshotId = null; for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { diff --git a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java index a1a4eb3e2cecb..58366b93eafef 100644 --- a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java @@ -231,9 +231,8 @@ public static Entry startClone( source, Map.of(), remoteStoreIndexShallowCopyV2, - remoteStoreIndexShallowCopyV2// initialising to false, will be updated in startCloning method of SnapshotsService while updating - // entry with - // clone jobs + remoteStoreIndexShallowCopyV2// initialising to false, will be updated in startCloning method of SnapshotsService + // while updating entry with clone jobs ); } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 4a3294c75a976..d9a05b421987c 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -807,7 +807,7 @@ public void updateState(ClusterState state) { + "] to generation [" + metadata.generation() + "]"; - logger.debug("Updated repository generation from [{}] to [{}]", previousBest, metadata.generation()); + logger.info("Updated repository generation from [{}] to [{}]", previousBest, metadata.generation()); } } } @@ -2868,6 +2868,8 @@ public void getRepositoryData(ActionListener listener) { } final Tuple cached = latestKnownRepositoryData.get(); // Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with + // Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with + // Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with // the latest known repository generation if (bestEffortConsistency == false && cached != null && cached.v1() == latestKnownRepoGen.get()) { try { diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index eb18e9e7dede9..84e0739e91c4c 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -452,128 +452,6 @@ public TimeValue timeout() { }, "create_snapshot [" + snapshotName + ']', listener::onFailure); } - public void createSnapshotV3(final CreateSnapshotRequest request, final ActionListener listener) { - long pinnedTimestamp = System.currentTimeMillis(); - final String repositoryName = request.repository(); - final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); - - Repository repository = repositoriesService.repository(repositoryName); - validate(repositoryName, snapshotName); - - final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot - - if (repository.isReadOnly()) { - listener.onFailure( - new RepositoryException(repository.getMetadata().name(), "cannot create snapshot-v2 in a readonly repository") - ); - return; - } - - final Snapshot snapshot = new Snapshot(repositoryName, snapshotId); - ClusterState currentState = clusterService.state(); - final Map userMeta = repository.adaptUserMetadata(request.userMetadata()); - try { - final StepListener repositoryDataListener = new StepListener<>(); - repositoriesService.getRepositoryData(repositoryName, repositoryDataListener); - - repositoryDataListener.whenComplete(repositoryData -> { - createSnapshotPreValidations(currentState, repositoryData, repositoryName, snapshotName); - - List indices = new ArrayList<>(currentState.metadata().indices().keySet()); - - final List dataStreams = indexNameExpressionResolver.dataStreamNames( - currentState, - request.indicesOptions(), - request.indices() - ); - - logger.trace("[{}][{}] creating snapshot-v2 for indices [{}]", repositoryName, snapshotName, indices); - - final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); - final List runningSnapshots = snapshots.entries(); - - final List indexIds = repositoryData.resolveNewIndices( - indices, - getInFlightIndexIds(runningSnapshots, repositoryName), - IndexId.DEFAULT_SHARD_PATH_TYPE - ); - final Version version = minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null); - final ShardGenerations shardGenerations = buildShardsGenerationFromRepositoryData( - currentState.metadata(), - currentState.routingTable(), - indexIds, - repositoryData - ); - - if (repositoryData.getGenId() == RepositoryData.UNKNOWN_REPO_GEN) { - logger.debug("[{}] was aborted before starting", snapshot); - throw new SnapshotException(snapshot, "Aborted on initialization"); - } - final SnapshotInfo snapshotInfo = new SnapshotInfo( - snapshot.getSnapshotId(), - shardGenerations.indices().stream().map(IndexId::getName).collect(Collectors.toList()), - dataStreams, - pinnedTimestamp, - null, - System.currentTimeMillis(), - shardGenerations.totalShards(), - Collections.emptyList(), - request.includeGlobalState(), - userMeta, - true, - pinnedTimestamp - ); - if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) { - throw new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2, no longer cluster manager"); - } - final StepListener pinnedTimestampListener = new StepListener<>(); - pinnedTimestampListener.whenComplete(repoData -> { listener.onResponse(snapshotInfo); }, listener::onFailure); - repository.finalizeSnapshot( - shardGenerations, - repositoryData.getGenId(), - metadataForSnapshot(currentState.metadata(), request.includeGlobalState(), false, dataStreams, indexIds), - snapshotInfo, - version, - state -> state, - Priority.IMMEDIATE, - new ActionListener() { - @Override - public void onResponse(RepositoryData repositoryData) { - if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) { - failSnapshotCompletionListeners( - snapshot, - new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager") - ); - listener.onFailure( - new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2, no longer cluster manager") - ); - return; - } - updateSnapshotPinnedTimestamp(repositoryData, snapshot, pinnedTimestamp, pinnedTimestampListener); - } - - @Override - public void onFailure(Exception e) { - logger.error( - "Failed to upload files to snapshot repo {} for snapshot-v2 {} due to {} ", - repositoryName, - snapshotName, - e - ); - listener.onFailure(e); - } - } - ); - - }, listener::onFailure); - } catch (Exception e) { - assert false : new AssertionError(e); - logger.error("Snapshot-v2 {} creation failed with exception {}", snapshot.getSnapshotId().getName(), e); - listener.onFailure(e); - } - - } - /** * Initializes the snapshotting process for clients when Snapshot v2 is enabled. This method is responsible for taking * a shallow snapshot and pinning the snapshot timestamp.The entire process is executed on the cluster manager node. @@ -664,6 +542,7 @@ public ClusterState execute(ClusterState currentState) { final List newEntries = new ArrayList<>(runningSnapshots); newEntries.add(newEntry); + // Entering finalize loop here to prevent concurrent snapshots v2 snapshots enteredLoop = tryEnterRepoLoop(repositoryName); if (enteredLoop == false) { throw new ConcurrentSnapshotExecutionException( @@ -684,7 +563,6 @@ public void onFailure(String source, Exception e) { if (enteredLoop) { leaveRepoLoop(repositoryName); } - } @Override @@ -714,55 +592,51 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl true, pinnedTimestamp ); - // if (snapshotName.contains("snapshot-concurrent-")) { - // try { - // listener.onResponse(snapshotInfo); - // leaveRepoLoop(repositoryName); - // return; - // } catch (Exception e) { - // } - // } - final Version version = minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null); final StepListener pinnedTimestampListener = new StepListener<>(); - pinnedTimestampListener.whenComplete(repoData -> { listener.onResponse(snapshotInfo); }, listener::onFailure); - repository.finalizeSnapshot( - shardGenerations, - repositoryData.getGenId(), - metadataForSnapshot(newState.metadata(), request.includeGlobalState(), false, dataStreams, newEntry.indices()), - snapshotInfo, - version, - state -> stateWithoutSnapshot(state, snapshot), - Priority.IMMEDIATE, - new ActionListener() { - @Override - public void onResponse(RepositoryData repositoryData) { - if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) { - failSnapshotCompletionListeners( - snapshot, - new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager") - ); - listener.onFailure( - new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2, no longer cluster manager") - ); + pinnedTimestampListener.whenComplete(repoData -> { + repository.finalizeSnapshot( + shardGenerations, + repositoryData.getGenId(), + metadataForSnapshot(newState.metadata(), request.includeGlobalState(), false, dataStreams, newEntry.indices()), + snapshotInfo, + version, + state -> stateWithoutSnapshot(state, snapshot), + Priority.IMMEDIATE, + new ActionListener() { + @Override + public void onResponse(RepositoryData repositoryData) { + if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) { + failSnapshotCompletionListeners( + snapshot, + new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager") + ); + listener.onFailure( + new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2, no longer cluster manager") + ); + + return; + } + leaveRepoLoop(repositoryName); + listener.onResponse(snapshotInfo); + } - return; + @Override + public void onFailure(Exception e) { + logger.error("Failed to finalize snapshot repo {} for snapshot-v2 {} ", repositoryName, snapshotName); + leaveRepoLoop(repositoryName); + listener.onFailure(e); + } } - endingSnapshots.remove(snapshot); - leaveRepoLoop(repositoryName); - updateSnapshotPinnedTimestamp(repositoryData, snapshot, pinnedTimestamp, pinnedTimestampListener); - } - - @Override - public void onFailure(Exception e) { - logger.error("Failed to upload files to snapshot repo {} for snapshot-v2 {} ", repositoryName, snapshotName); - leaveRepoLoop(repositoryName); - listener.onFailure(e); - } - } - ); + ); + }, + e -> { + logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} ", repositoryName, snapshotName); + leaveRepoLoop(repositoryName); + listener.onFailure(e); + }); + updateSnapshotPinnedTimestamp(repositoryData, snapshot, pinnedTimestamp, pinnedTimestampListener); } - }, "create_snapshot [" + snapshotName + ']', listener::onFailure); } @@ -953,22 +827,22 @@ public void cloneSnapshotV2( private SnapshotId sourceSnapshotId; private List indicesForSnapshot; + boolean enteredLoop; + @Override public ClusterState execute(ClusterState currentState) { createSnapshotPreValidations(currentState, repositoryData, repositoryName, snapshotName); final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); final List runningSnapshots = snapshots.entries(); - if (runningSnapshots.isEmpty() == false) { - // fail if ongoing snapshots in same repository. - for (SnapshotsInProgress.Entry entry : runningSnapshots) { - if (Objects.equals(entry.repository(), repositoryName)) { - throw new ConcurrentSnapshotExecutionException( - repositoryName, - sourceSnapshotId.getName(), - "cannot clone from snapshot when there is ongoing snapshot" - ); - } - } + + // Entering finalize loop here to prevent concurrent snapshots v2 snapshots + enteredLoop = tryEnterRepoLoop(repositoryName); + if (enteredLoop == false) { + throw new ConcurrentSnapshotExecutionException( + repositoryName, + snapshotName, + "cannot start snapshot-v2 while a repository is in finalization state" + ); } sourceSnapshotId = repositoryData.getSnapshotIds() @@ -1013,6 +887,9 @@ public ClusterState execute(ClusterState currentState) { public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to clone snapshot-v2", repositoryName, snapshotName), e); listener.onFailure(e); + if (enteredLoop) { + leaveRepoLoop(repositoryName); + } } @Override @@ -1039,67 +916,81 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl true, snapshotInfo.getPinnedTimestamp() ); - if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) { + if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) { throw new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2 clone, no longer cluster manager"); } final StepListener pinnedTimestampListener = new StepListener<>(); + pinnedTimestampListener.whenComplete(repoData -> { - logger.info("snapshot-v2 clone [{}] completed successfully", snapshot); - listener.onResponse(null); - }, listener::onFailure); - repository.finalizeSnapshot( - shardGenerations, - repositoryData.getGenId(), - metadataForSnapshot( - currentState.metadata(), - newEntry.includeGlobalState(), - false, - newEntry.dataStreams(), - newEntry.indices() - ), - cloneSnapshotInfo, - repositoryData.getVersion(sourceSnapshotId), - state -> stateWithoutSnapshot(state, snapshot), - Priority.IMMEDIATE, - new ActionListener() { - @Override - public void onResponse(RepositoryData repositoryData) { - if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) { - failSnapshotCompletionListeners( - snapshot, - new SnapshotException(snapshot, "Aborting Snapshot-v2 clone, no longer cluster manager") - ); - listener.onFailure( - new SnapshotException( - repositoryName, - snapshotName, - "Aborting Snapshot-v2 clone, no longer cluster manager" - ) - ); - return; + repository.finalizeSnapshot( + shardGenerations, + repositoryData.getGenId(), + metadataForSnapshot( + currentState.metadata(), + newEntry.includeGlobalState(), + false, + newEntry.dataStreams(), + newEntry.indices() + ), + cloneSnapshotInfo, + repositoryData.getVersion(sourceSnapshotId), + state -> stateWithoutSnapshot(state, snapshot), + Priority.IMMEDIATE, + new ActionListener() { + @Override + public void onResponse(RepositoryData repositoryData) { + if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) { + failSnapshotCompletionListeners( + snapshot, + new SnapshotException(snapshot, "Aborting Snapshot-v2 clone, no longer cluster manager") + ); + listener.onFailure( + new SnapshotException( + repositoryName, + snapshotName, + "Aborting Snapshot-v2 clone, no longer cluster manager" + ) + ); + return; + } + logger.info("snapshot-v2 clone [{}] completed successfully", snapshot); + leaveRepoLoop(repositoryName); + listener.onResponse(null); } - cloneSnapshotPinnedTimestamp( - repositoryData, - sourceSnapshotId, - snapshot, - snapshotInfo.getPinnedTimestamp(), - pinnedTimestampListener - ); - } - @Override - public void onFailure(Exception e) { - logger.error( - "Failed to upload files to snapshot repo {} for clone snapshot-v2 {} ", - repositoryName, - snapshotName - ); - listener.onFailure(e); + @Override + public void onFailure(Exception e) { + logger.error( + "Failed to upload files to snapshot repo {} for clone snapshot-v2 {} ", + repositoryName, + snapshotName + ); + leaveRepoLoop(repositoryName); + listener.onFailure(e); + } } - } + ); + listener.onResponse(null); + }, + e -> { + logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} ", repositoryName, snapshotName); + leaveRepoLoop(repositoryName); + listener.onFailure(e); + }); + + cloneSnapshotPinnedTimestamp( + repositoryData, + sourceSnapshotId, + snapshot, + snapshotInfo.getPinnedTimestamp(), + pinnedTimestampListener ); - - }, listener::onFailure); + }, + e -> { + logger.error("Failed to retrieve snapshot info for snapshot-v2 {} {} ", repositoryName, snapshotName); + leaveRepoLoop(repositoryName); + listener.onFailure(e); + }); } @Override