Skip to content

Commit

Permalink
Create a clone of local segements size map used for Remote Segment St…
Browse files Browse the repository at this point in the history
…ats until sync to remote completes (#11896) (#12143)

(cherry picked from commit 57cc0dd)

Signed-off-by: bansvaru <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent a70ddcc commit 4a3c324
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,17 @@ public Map<String, Long> getLatestLocalFileNameLengthMap() {
}

/**
* Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map. The method is given a function as an argument which is used for determining the file size (length in bytes). This method is also provided the collection of segment files which are the latest refresh local segment files. This method also removes the stale segment files from the map that are not part of the input segment files.
* Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map.
* The method is given a function as an argument which is used for determining the file size (length in bytes).
* This method is also provided the collection of segment files which are the latest refresh local segment files.
* This method also removes the stale segment files from the map that are not part of the input segment files.
*
* @param segmentFiles list of local refreshed segment files
* @param fileSizeFunction function is used to determine the file size in bytes
*
* @return updated map of local segment files and filesize
*/
public void updateLatestLocalFileNameLengthMap(
public Map<String, Long> updateLatestLocalFileNameLengthMap(
Collection<String> segmentFiles,
CheckedFunction<String, Long, IOException> fileSizeFunction
) {
Expand All @@ -332,6 +337,7 @@ public void updateLatestLocalFileNameLengthMap(
// Remove keys from the fileSizeMap that do not exist in the latest segment files
latestLocalFileNameLengthMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false);
computeBytesLag();
return Collections.unmodifiableMap(latestLocalFileNameLengthMap);
}

public void addToLatestUploadedFiles(String file) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ private boolean syncSegments() {
Collection<String> localSegmentsPostRefresh = segmentInfos.files(true);

// Create a map of file name to size and update the refresh segment tracker
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
Map<String, Long> localSegmentsSizeMap = updateLocalSizeMapAndTracker(localSegmentsPostRefresh).entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
CountDownLatch latch = new CountDownLatch(1);
ActionListener<Void> segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() {
@Override
Expand All @@ -242,6 +244,7 @@ public void onResponse(Void unused) {
refreshClockTimeMs,
refreshSeqNo,
lastRefreshedCheckpoint,
localSegmentsSizeMap,
checkpoint
);
// At this point since we have uploaded new segments, segment infos and segment metadata file,
Expand All @@ -262,7 +265,7 @@ public void onFailure(Exception e) {
}, latch);

// Start the segments files upload
uploadNewSegments(localSegmentsPostRefresh, segmentUploadsCompletedListener);
uploadNewSegments(localSegmentsPostRefresh, localSegmentsSizeMap, segmentUploadsCompletedListener);
latch.await();
} catch (EngineException e) {
logger.warn("Exception while reading SegmentInfosSnapshot", e);
Expand Down Expand Up @@ -306,10 +309,11 @@ private void onSuccessfulSegmentsSync(
long refreshClockTimeMs,
long refreshSeqNo,
long lastRefreshedCheckpoint,
Map<String, Long> localFileSizeMap,
ReplicationCheckpoint checkpoint
) {
// Update latest uploaded segment files name in segment tracker
segmentTracker.setLatestUploadedFiles(segmentTracker.getLatestLocalFileNameLengthMap().keySet());
segmentTracker.setLatestUploadedFiles(localFileSizeMap.keySet());
// Update the remote refresh time and refresh seq no
updateRemoteRefreshTimeAndSeqNo(refreshTimeMs, refreshClockTimeMs, refreshSeqNo);
// Reset the backoffDelayIterator for the future failures
Expand Down Expand Up @@ -383,7 +387,11 @@ void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos se
}
}

private void uploadNewSegments(Collection<String> localSegmentsPostRefresh, ActionListener<Void> listener) {
private void uploadNewSegments(
Collection<String> localSegmentsPostRefresh,
Map<String, Long> localSegmentsSizeMap,
ActionListener<Void> listener
) {
Collection<String> filteredFiles = localSegmentsPostRefresh.stream().filter(file -> !skipUpload(file)).collect(Collectors.toList());
if (filteredFiles.size() == 0) {
logger.debug("No new segments to upload in uploadNewSegments");
Expand All @@ -397,7 +405,7 @@ private void uploadNewSegments(Collection<String> localSegmentsPostRefresh, Acti

for (String src : filteredFiles) {
// Initializing listener here to ensure that the stats increment operations are thread-safe
UploadListener statsListener = createUploadListener();
UploadListener statsListener = createUploadListener(localSegmentsSizeMap);
ActionListener<Void> aggregatedListener = ActionListener.wrap(resp -> {
statsListener.onSuccess(src);
batchUploadListener.onResponse(resp);
Expand Down Expand Up @@ -456,9 +464,11 @@ private void updateRemoteRefreshTimeAndSeqNo(long refreshTimeMs, long refreshClo
* Updates map of file name to size of the input segment files in the segment tracker. Uses {@code storeDirectory.fileLength(file)} to get the size.
*
* @param segmentFiles list of segment files that are part of the most recent local refresh.
*
* @return updated map of local segment files and filesize
*/
private void updateLocalSizeMapAndTracker(Collection<String> segmentFiles) {
segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength);
private Map<String, Long> updateLocalSizeMapAndTracker(Collection<String> segmentFiles) {
return segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength);
}

private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesBeforeUpload, long startTimeInNS) {
Expand Down Expand Up @@ -533,30 +543,32 @@ private boolean isLocalOrSnapshotRecovery() {

/**
* Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events
*
* @param fileSizeMap updated map of current snapshot of local segments to their sizes
*/
private UploadListener createUploadListener() {
private UploadListener createUploadListener(Map<String, Long> fileSizeMap) {
return new UploadListener() {
private long uploadStartTime = 0;

@Override
public void beforeUpload(String file) {
// Start tracking the upload bytes started
segmentTracker.addUploadBytesStarted(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
segmentTracker.addUploadBytesStarted(fileSizeMap.get(file));
uploadStartTime = System.currentTimeMillis();
}

@Override
public void onSuccess(String file) {
// Track upload success
segmentTracker.addUploadBytesSucceeded(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
segmentTracker.addUploadBytesSucceeded(fileSizeMap.get(file));
segmentTracker.addToLatestUploadedFiles(file);
segmentTracker.addUploadTimeInMillis(Math.max(1, System.currentTimeMillis() - uploadStartTime));
}

@Override
public void onFailure(String file) {
// Track upload failure
segmentTracker.addUploadBytesFailed(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
segmentTracker.addUploadBytesFailed(fileSizeMap.get(file));
segmentTracker.addUploadTimeInMillis(Math.max(1, System.currentTimeMillis() - uploadStartTime));
}
};
Expand Down

0 comments on commit 4a3c324

Please sign in to comment.