Skip to content

Commit

Permalink
More change
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Sep 17, 2024
1 parent 1a9da53 commit cc61ad4
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,61 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio
assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(1));
}

public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws InterruptedException, ExecutionException, IOException {
internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String snapshotRepoName = "test-create-snapshot-repo";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Settings.Builder settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true);
createRepository(snapshotRepoName, FsRepository.TYPE, settings);

Client client = client();
Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

int concurrentSnapshots = 5;

String snapshotName = "snapshot-concurrent-";
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName)
.setWaitForCompletion(false)
.get();

//restart existing master
final String clusterManagerNode = internalCluster().getClusterManagerName();
stopNode(clusterManagerNode);

// Validate that only one snapshot has been created
Repository repository = internalCluster().getInstance(RepositoriesService.class).repository(snapshotRepoName);
PlainActionFuture<RepositoryData> repositoryDataPlainActionFuture = new PlainActionFuture<>();
repository.getRepositoryData(repositoryDataPlainActionFuture);

RepositoryData repositoryData = repositoryDataPlainActionFuture.get();
assertEquals(repositoryData.getSnapshotIds().size() , 0);
}

public void testCreateSnapshotV2WithRedIndex() throws Exception {
internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public static Entry startClone(
Map.of(),
false,
false// initialising to false, will be updated in startCloning method of SnapshotsService while updating entry with
// clone jobs
// clone jobs
);
}

Expand Down Expand Up @@ -231,7 +231,8 @@ public static Entry startClone(
source,
Map.of(),
remoteStoreIndexShallowCopyV2,
remoteStoreIndexShallowCopyV2// initialising to false, will be updated in startCloning method of SnapshotsService while updating entry with
remoteStoreIndexShallowCopyV2// initialising to false, will be updated in startCloning method of SnapshotsService while updating
// entry with
// clone jobs
);
}
Expand Down Expand Up @@ -388,7 +389,8 @@ public Entry(
}
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
this.remoteStoreIndexShallowCopyV2 = remoteStoreIndexShallowCopyV2;
assert this.remoteStoreIndexShallowCopyV2 || assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones);
assert this.remoteStoreIndexShallowCopyV2
|| assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones);
}

private Entry(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2441,6 +2441,7 @@ public void finalizeSnapshot(
// number_of_shards) has increased.
Set<String> updatedIndexIds = writeNewIndexShardPaths(existingRepositoryData, updatedRepositoryData, snapshotId);
cleanupRedundantSnapshotShardPaths(updatedIndexIds);
logger.info("update repo data for {}", snapshotInfo.snapshotId());
writeIndexGen(
updatedRepositoryData,
repositoryStateId,
Expand Down Expand Up @@ -3224,6 +3225,7 @@ public ClusterState execute(ClusterState currentState) {
+ "] must be larger than latest known generation ["
+ latestKnownRepoGen.get()
+ "]";
logger.info("Setting it to {} {}", safeGeneration, newGen);
return ClusterState.builder(currentState)
.metadata(
Metadata.builder(currentState.getMetadata())
Expand Down Expand Up @@ -3344,6 +3346,7 @@ public ClusterState execute(ClusterState currentState) {
+ "]"
);
}
logger.info("Done Setting it to {} {}", newGen, newGen);
return updateRepositoryGenerationsIfNecessary(
stateFilter.apply(
ClusterState.builder(currentState)
Expand Down
Loading

0 comments on commit cc61ad4

Please sign in to comment.