Skip to content

Commit

Permalink
More changes
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Sep 18, 2024
1 parent cc61ad4 commit 16063da
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1030,28 +1030,39 @@ public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws Interr
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

int concurrentSnapshots = 5;
Thread thread = new Thread(() -> {
try {
String snapshotName = "snapshot-earlier-master";
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName)
.setWaitForCompletion(true)
.get();
} catch (Exception e) {}
});

thread.start();

String snapshotName = "snapshot-concurrent-";
// stop existing master
final String clusterManagerNode = internalCluster().getClusterManagerName();
stopNode(clusterManagerNode);

// Validate that we have greater one snapshot has been created
String snapshotName = "new-snapshot";
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName)
.setWaitForCompletion(false)
.get();

//restart existing master
final String clusterManagerNode = internalCluster().getClusterManagerName();
stopNode(clusterManagerNode);

// Validate that only one snapshot has been created
Repository repository = internalCluster().getInstance(RepositoriesService.class).repository(snapshotRepoName);
PlainActionFuture<RepositoryData> repositoryDataPlainActionFuture = new PlainActionFuture<>();
repository.getRepositoryData(repositoryDataPlainActionFuture);

RepositoryData repositoryData = repositoryDataPlainActionFuture.get();
assertEquals(repositoryData.getSnapshotIds().size() , 0);
assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(1));
thread.join();
}

public void testCreateSnapshotV2WithRedIndex() throws Exception {
internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
Expand Down
128 changes: 83 additions & 45 deletions server/src/main/java/org/opensearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,12 @@ public void onResponse(RepositoryData repositoryData) {

@Override
public void onFailure(Exception e) {
logger.error("Failed to upload files to snapshot repo {} for snapshot-v2 {} due to {} ", repositoryName, snapshotName, e);
logger.error(
"Failed to upload files to snapshot repo {} for snapshot-v2 {} due to {} ",
repositoryName,
snapshotName,
e
);
listener.onFailure(e);
}
}
Expand Down Expand Up @@ -594,6 +599,8 @@ public void createSnapshotV2(final CreateSnapshotRequest request, final ActionLi

private Snapshot snapshot;

boolean enteredLoop;

@Override
public ClusterState execute(ClusterState currentState) {
// move to in progress
Expand Down Expand Up @@ -625,13 +632,7 @@ public ClusterState execute(ClusterState currentState) {

final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final List<SnapshotsInProgress.Entry> runningSnapshots = snapshots.entries();
if (tryEnterRepoLoop(repositoryName) == false) {
throw new ConcurrentSnapshotExecutionException(
repositoryName,
snapshotName,
"cannot start snapshot-v2 while a repository is in finalization state"
);
}

final List<IndexId> indexIds = repositoryData.resolveNewIndices(
indices,
getInFlightIndexIds(runningSnapshots, repositoryName),
Expand Down Expand Up @@ -662,6 +663,15 @@ public ClusterState execute(ClusterState currentState) {
);
final List<SnapshotsInProgress.Entry> newEntries = new ArrayList<>(runningSnapshots);
newEntries.add(newEntry);

enteredLoop = tryEnterRepoLoop(repositoryName);
if (enteredLoop == false) {
throw new ConcurrentSnapshotExecutionException(
repositoryName,
snapshotName,
"cannot start snapshot-v2 while a repository is in finalization state"
);
}
return ClusterState.builder(currentState)
.putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(new ArrayList<>(newEntries)))
.build();
Expand All @@ -671,7 +681,7 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot-v2", repositoryName, snapshotName), e);
listener.onFailure(e);
if ((e instanceof ConcurrentSnapshotExecutionException) == false) {
if (enteredLoop) {
leaveRepoLoop(repositoryName);
}

Expand All @@ -685,7 +695,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
newEntry.indices(),
repositoryData
);

final List<String> dataStreams = indexNameExpressionResolver.dataStreamNames(
newState,
request.indicesOptions(),
Expand All @@ -705,11 +714,18 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
true,
pinnedTimestamp
);
// if (snapshotName.contains("snapshot-concurrent-")) {
// try {
// listener.onResponse(snapshotInfo);
// leaveRepoLoop(repositoryName);
// return;
// } catch (Exception e) {
// }
// }

final Version version = minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null);
final StepListener<RepositoryData> pinnedTimestampListener = new StepListener<>();
pinnedTimestampListener.whenComplete(repoData -> {
listener.onResponse(snapshotInfo);
}, listener::onFailure);
pinnedTimestampListener.whenComplete(repoData -> { listener.onResponse(snapshotInfo); }, listener::onFailure);
repository.finalizeSnapshot(
shardGenerations,
repositoryData.getGenId(),
Expand All @@ -721,7 +737,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
new ActionListener<RepositoryData>() {
@Override
public void onResponse(RepositoryData repositoryData) {
if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) {
if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) {
failSnapshotCompletionListeners(
snapshot,
new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager")
Expand All @@ -732,7 +748,7 @@ public void onResponse(RepositoryData repositoryData) {

return;
}
logger.info("Process it now");
endingSnapshots.remove(snapshot);
leaveRepoLoop(repositoryName);
updateSnapshotPinnedTimestamp(repositoryData, snapshot, pinnedTimestamp, pinnedTimestampListener);
}
Expand Down Expand Up @@ -1667,9 +1683,10 @@ public void applyClusterState(ClusterChangedEvent event) {
SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final boolean newClusterManager = event.previousState().nodes().isLocalNodeElectedClusterManager() == false;
if (newClusterManager && snapshotsInProgress.entries().isEmpty() == false) {
logger.info("Cleaning it now");
// clean up snapshot v2 in progress or clone v2 present
stateWithoutSnapshotv2(event.state());
// clean up snapshot v2 in progress or clone v2 present.
// Snapshot v2 create and clone are sync operation . In case of cluster manager failures in midst , we won't
// send ack to caller and won't continue on new cluster manager . Caller will need to retry it.
stateWithoutSnapshotV2(event.state());
}
processExternalChanges(
newClusterManager || removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes()),
Expand Down Expand Up @@ -1782,7 +1799,14 @@ private void processExternalChanges(boolean changedNodes, boolean startShards) {
@Override
public ClusterState execute(ClusterState currentState) {
RoutingTable routingTable = currentState.routingTable();
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
// Removing shallow snapshots v2 as we we take care of these in stateWithoutSnapshotV2()
snapshots = SnapshotsInProgress.of(
snapshots.entries()
.stream()
.filter(snapshot -> snapshot.remoteStoreIndexShallowCopyV2() == false)
.collect(Collectors.toList())
);
DiscoveryNodes nodes = currentState.nodes();
boolean changed = false;
final EnumSet<State> statesToUpdate;
Expand Down Expand Up @@ -1839,7 +1863,7 @@ public ClusterState execute(ClusterState currentState) {
changed = true;
logger.debug("[{}] was found in dangling INIT or ABORTED state", snapshot);
} else {
if (snapshot.state().completed() || completed(snapshot.shards().values())) {
if ((snapshot.state().completed() || completed(snapshot.shards().values()))) {
finishedSnapshots.add(snapshot);
}
updatedSnapshotEntries.add(snapshot);
Expand Down Expand Up @@ -2365,9 +2389,8 @@ private static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sn
return readyDeletions(result).v1();
}

private ClusterState stateWithoutSnapshotv2(ClusterState state) {
private void stateWithoutSnapshotV2(ClusterState state) {
SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
ClusterState result = state;
boolean changed = false;
ArrayList<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
Expand All @@ -2378,32 +2401,44 @@ private ClusterState stateWithoutSnapshotv2(ClusterState state) {
}
}
if (changed) {
result = ClusterState.builder(state)
.putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(unmodifiableList(entries)))
.build();

ClusterState finalResult = result;
clusterService.submitStateUpdateTask("update snapshot v2 after cluster manager switch", new ClusterStateUpdateTask() {

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return finalResult;
}
clusterService.submitStateUpdateTask(
"remove in progress snapshot v2 after cluster manager switch",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
boolean changed = false;
ArrayList<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.remoteStoreIndexShallowCopyV2()) {
changed = true;
} else {
entries.add(entry);
}
}
if (changed) {
return ClusterState.builder(currentState)
.putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(unmodifiableList(entries)))
.build();
} else {
return currentState;
}
}

@Override
public void onFailure(String source, Exception e) {
// execute never fails today, so we should never hit this.
logger.warn(
() -> new ParameterizedMessage(
"failed to remove in progress snapshot v2 state after cluster manager switch",
source
),
e
);
@Override
public void onFailure(String source, Exception e) {
// execute never fails , so we should never hit this.
logger.warn(
() -> new ParameterizedMessage(
"failed to remove in progress snapshot v2 state after cluster manager switch",
source
),
e
);
}
}
});
);
}
return result;
}

/**
Expand Down Expand Up @@ -3556,6 +3591,9 @@ public boolean assertAllListenersResolved() {
+ " on ["
+ localNode
+ "]";
if (repositoryOperations.isEmpty() == false) {
logger.info("Not empty");
}
assert repositoryOperations.isEmpty() : "Found leaked snapshots to finalize " + repositoryOperations + " on [" + localNode + "]";
return true;
}
Expand Down

0 comments on commit 16063da

Please sign in to comment.