Skip to content

Commit

Permalink
Use metadata from source snapshot while cloning snapshot V2
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Oct 16, 2024
1 parent 35c366d commit e77aefe
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.master.AcknowledgedResponse;
Expand Down Expand Up @@ -174,6 +175,118 @@ public void testCloneShallowCopyV2() throws Exception {
assertThat(cloneSnapshotInfo.totalShards(), equalTo(sourceSnapshotInfo.totalShards()));
}

public void testCloneShallowCopyV2DeletedIndex() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath));
internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath));
internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath));

String indexName1 = "testindex1";
String indexName2 = "testindex2";
String indexName3 = "testindex3";
String snapshotRepoName = "test-clone-snapshot-repo";
String snapshotName1 = "test-create-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Client client = client();

assertAcked(
client.admin()
.cluster()
.preparePutRepository(snapshotRepoName)
.setType(FsRepository.TYPE)
.setSettings(
Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true)
)
);

createIndex(indexName1, getRemoteStoreBackedIndexSettings());
createIndex(indexName2, getRemoteStoreBackedIndexSettings());

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexRandomDocs(indexName1, numDocsInIndex1);
indexRandomDocs(indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.get();
SnapshotInfo sourceSnapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(sourceSnapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(sourceSnapshotInfo.successfulShards(), greaterThan(0));
assertThat(sourceSnapshotInfo.successfulShards(), equalTo(sourceSnapshotInfo.totalShards()));
assertThat(sourceSnapshotInfo.snapshotId().getName(), equalTo(snapshotName1));

// Validate that the snapshot was created
final BlobStoreRepository repository = (BlobStoreRepository) internalCluster().getCurrentClusterManagerNodeInstance(
RepositoriesService.class
).repository(snapshotRepoName);
PlainActionFuture<RepositoryData> repositoryDataPlainActionFuture = new PlainActionFuture<>();
repository.getRepositoryData(repositoryDataPlainActionFuture);

RepositoryData repositoryData = repositoryDataPlainActionFuture.get();

assertTrue(repositoryData.getSnapshotIds().contains(sourceSnapshotInfo.snapshotId()));

createIndex(indexName3, getRemoteStoreBackedIndexSettings());
indexRandomDocs(indexName3, 10);
ensureGreen(indexName3);

assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName1)).get());
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName2)).get());

AcknowledgedResponse response = client().admin()
.cluster()
.prepareCloneSnapshot(snapshotRepoName, snapshotName1, "test_clone_snapshot1")
.setIndices("*")
.get();
assertTrue(response.isAcknowledged());
awaitClusterManagerFinishRepoOperations();

AtomicReference<SnapshotId> cloneSnapshotId = new AtomicReference<>();
// Validate that snapshot is present in repository data
waitUntil(() -> {
PlainActionFuture<RepositoryData> repositoryDataPlainActionFutureClone = new PlainActionFuture<>();
repository.getRepositoryData(repositoryDataPlainActionFutureClone);

RepositoryData repositoryData1;
try {
repositoryData1 = repositoryDataPlainActionFutureClone.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
for (SnapshotId snapshotId : repositoryData1.getSnapshotIds()) {
if (snapshotId.getName().equals("test_clone_snapshot1")) {
cloneSnapshotId.set(snapshotId);
return true;
}
}
return false;
}, 90, TimeUnit.SECONDS);

final SnapshotId cloneSnapshotIdFinal = cloneSnapshotId.get();
SnapshotInfo cloneSnapshotInfo = PlainActionFuture.get(
f -> repository.threadPool().generic().execute(ActionRunnable.supply(f, () -> repository.getSnapshotInfo(cloneSnapshotIdFinal)))
);

assertThat(cloneSnapshotInfo.getPinnedTimestamp(), equalTo(sourceSnapshotInfo.getPinnedTimestamp()));
for (String index : sourceSnapshotInfo.indices()) {
assertTrue(cloneSnapshotInfo.indices().contains(index));

}
assertThat(cloneSnapshotInfo.totalShards(), equalTo(sourceSnapshotInfo.totalShards()));
}

public void testCloneShallowCopyAfterDisablingV2() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Expand Down
29 changes: 19 additions & 10 deletions server/src/main/java/org/opensearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,6 @@ public void cloneSnapshotV2(
) {

long startTime = System.currentTimeMillis();
ClusterState currentState = clusterService.state();
String snapshotName = snapshot.getSnapshotId().getName();
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.URGENT) {
private SnapshotsInProgress.Entry newEntry;
Expand Down Expand Up @@ -984,17 +983,27 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
throw new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2 clone, no longer cluster manager");
}
final StepListener<RepositoryData> pinnedTimestampListener = new StepListener<>();
pinnedTimestampListener.whenComplete(repoData -> {
final StepListener<Metadata> metadataListener = new StepListener<>();
pinnedTimestampListener.whenComplete(
rData -> threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(metadataListener, () -> {
final Metadata.Builder metaBuilder = Metadata.builder(repository.getSnapshotGlobalMetadata(newEntry.source()));
for (IndexId index : newEntry.indices()) {
metaBuilder.put(repository.getSnapshotIndexMetaData(repositoryData, newEntry.source(), index), false);
}
return metaBuilder.build();
})),
e -> {
logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} ", repositoryName, snapshotName);
stateWithoutSnapshotV2(newState);
leaveRepoLoop(repositoryName);
listener.onFailure(e);
}
);
metadataListener.whenComplete(meta -> {
repository.finalizeSnapshot(
shardGenerations,
repositoryData.getGenId(),
metadataForSnapshot(
currentState.metadata(),
newEntry.includeGlobalState(),
false,
newEntry.dataStreams(),
newEntry.indices()
),
metadataForSnapshot(meta, newEntry.includeGlobalState(), false, newEntry.dataStreams(), newEntry.indices()),
cloneSnapshotInfo,
repositoryData.getVersion(sourceSnapshotId),
state -> stateWithoutSnapshot(state, snapshot),
Expand Down Expand Up @@ -1038,7 +1047,7 @@ public void onFailure(Exception e) {
}
);
}, e -> {
logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} ", repositoryName, snapshotName);
logger.error("Failed to retrieve metadata for snapshot-v2 {} {} ", repositoryName, snapshotName);
stateWithoutSnapshotV2(newState);
leaveRepoLoop(repositoryName);
listener.onFailure(e);
Expand Down

0 comments on commit e77aefe

Please sign in to comment.