Skip to content

Commit

Permalink
[Remote Store] Sync segments in refresh listener on refresh after commit
Browse files Browse the repository at this point in the history
  • Loading branch information
ashking94 committed Oct 22, 2023
1 parent 14d4a63 commit e90762d
Showing 1 changed file with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ protected void runAfterRefreshExactlyOnce(boolean didRefresh) {
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
logger.debug("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm());
this.primaryTerm = indexShard.getOperationPrimaryTerm();
this.remoteDirectory.init();
RemoteSegmentMetadata uploadedMetadata = this.remoteDirectory.init();
if (uploadedMetadata != null) {
segmentTracker.setLatestUploadedFiles(uploadedMetadata.getMetadata().keySet());
}
}
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
Collection<String> localSegmentsPostRefresh = segmentInfosGatedCloseable.get().files(true);
Expand All @@ -151,7 +154,7 @@ protected void runAfterRefreshExactlyOnce(boolean didRefresh) {
@Override
protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
boolean successful;
if (shouldSync(didRefresh)) {
if (shouldSyncOnPerformAfterRefresh(didRefresh)) {
successful = syncSegments();
} else {
successful = true;
Expand All @@ -167,6 +170,10 @@ private boolean shouldSync(boolean didRefresh) {
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty();
}

private boolean shouldSyncOnPerformAfterRefresh(boolean didRefresh) {
return shouldSync(didRefresh) || isRefreshAfterCommitSafe();
}

private boolean syncSegments() {
if (indexShard.getReplicationTracker().isPrimaryMode() == false || indexShard.state() == IndexShardState.CLOSED) {
logger.debug(
Expand Down Expand Up @@ -323,6 +330,15 @@ private boolean isRefreshAfterCommit() throws IOException {
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
}

private boolean isRefreshAfterCommitSafe() {
try {
return isRefreshAfterCommit();
} catch (Exception e) {
logger.info("Exception occurred in isRefreshAfterCommitSafe", e);
}
return false;
}

void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint)
throws IOException {
final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint();
Expand Down

0 comments on commit e90762d

Please sign in to comment.