Skip to content

Commit

Permalink
Upload translog.ckp file data as metadata to translog.tlog file for s…
Browse files Browse the repository at this point in the history
…3 remote store

Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
skumawat2025 committed Apr 15, 2024
1 parent 7345371 commit d046e97
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,15 @@ public interface BlobContainer {
* Creates a new {@link FetchBlobResult} for the given blob name.
*
* @param blobName
* The name of the blob to get an {@link InputStream} for.
* The name of the blob to get an {@link FetchBlobResult} for.
* @return The {@link FetchBlobResult} of the blob.
* @throws NoSuchFileException if the blob does not exist
* @throws IOException if the blob can not be read.
*/
@ExperimentalApi
default FetchBlobResult readBlobWithMetadata(String blobName) throws IOException {
throw new UnsupportedOperationException("readBlobWithMetadata is not implemented yet");
InputStream inputStream = readBlob(blobName);
return new FetchBlobResult(inputStream, null);
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.opensearch.common.annotation.ExperimentalApi;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;

Expand All @@ -20,7 +22,7 @@
* @opensearch.experimental
*/
@ExperimentalApi
public class FetchBlobResult {
public class FetchBlobResult implements Closeable {

/**
* Downloaded blob InputStream
Expand All @@ -45,4 +47,10 @@ public FetchBlobResult(InputStream inputStream, Map<String, String> metadata) {
this.metadata = metadata;
}

@Override
public void close() throws IOException {
if (inputStream != null) {
inputStream.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.InputStream;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -98,20 +99,47 @@ public void uploadBlobs(
if (!(blobStore.blobContainer(blobPath) instanceof AsyncMultiStreamBlobContainer)) {
uploadBlob(ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, blobPath, listener, writePriority);
} else {
logger.info("uploading file = {}", fileSnapshot.getName());
uploadBlob(fileSnapshot, listener, blobPath, writePriority);
}
});

}

private Map<String, String> prepareFileMetadata(TransferFileSnapshot fileSnapshot) throws IOException {
if (!(fileSnapshot instanceof FileSnapshot.TranslogFileSnapshot)) {
return null;
}

FileSnapshot.TranslogFileSnapshot tlogFileSnapshot = (FileSnapshot.TranslogFileSnapshot) fileSnapshot;
String ckpAsString = tlogFileSnapshot.provideCheckpointDataAsString();
Long checkpointChecksum = tlogFileSnapshot.getCheckpointChecksum();

assert checkpointChecksum != null : "checksum can not be null";

Map<String, String> metadata = new HashMap<>();
metadata.put(FileSnapshot.TranslogFileSnapshot.CHECKPOINT_FILE_DATA_KEY, ckpAsString);
metadata.put(FileSnapshot.TranslogFileSnapshot.CHECKPOINT_FILE_CHECKSUM_KEY, checkpointChecksum.toString());
return metadata;
}

private void uploadBlob(
TransferFileSnapshot fileSnapshot,
ActionListener<TransferFileSnapshot> listener,
BlobPath blobPath,
WritePriority writePriority
) {

if (fileSnapshot instanceof FileSnapshot.CheckpointFileSnapshot) {
logger.info("Skip uploading checkpoint file as this file = {} is stored as metadata of translog file", fileSnapshot.getName());
listener.onResponse(fileSnapshot);
return;
}

try {

Map<String, String> metadata = prepareFileMetadata(fileSnapshot);

ChannelFactory channelFactory = FileChannel::open;
long contentLength;
try (FileChannel channel = channelFactory.open(fileSnapshot.getPath(), StandardOpenOption.READ)) {
Expand All @@ -130,7 +158,8 @@ private void uploadBlob(
writePriority,
(size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position),
Objects.requireNonNull(fileSnapshot.getChecksum()),
remoteIntegrityEnabled
remoteIntegrityEnabled,
metadata
);
ActionListener<Void> completionListener = ActionListener.wrap(resp -> listener.onResponse(fileSnapshot), ex -> {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Base64;
import java.util.Objects;

/**
Expand Down Expand Up @@ -153,16 +155,50 @@ public boolean equals(Object o) {
public static final class TranslogFileSnapshot extends TransferFileSnapshot {

private final long generation;
private Path checkpointFilePath;
private Long checkpointChecksum;
public final static String CHECKPOINT_FILE_DATA_KEY = "ckp-data";
public final static String CHECKPOINT_FILE_CHECKSUM_KEY = "ckp-checksum";

public TranslogFileSnapshot(long primaryTerm, long generation, Path path, Long checksum) throws IOException {
super(path, primaryTerm, checksum);
this.generation = generation;
}

public void setCheckpointFilePath(Path checkpointFilePath) {
this.checkpointFilePath = checkpointFilePath;
}

public void setCheckpointChecksum(Long checkpointChecksum) {
this.checkpointChecksum = checkpointChecksum;
}

public String provideCheckpointDataAsString() throws IOException {
return buildCheckpointDataAsBase64String(checkpointFilePath);
}

static String buildCheckpointDataAsBase64String(Path checkpointFilePath) throws IOException {
long fileSize = Files.size(checkpointFilePath);
assert fileSize < 1500 : "checkpoint file size is more than 1.5KB size, can't be stored as metadata";
byte[] fileBytes = Files.readAllBytes(checkpointFilePath);
return Base64.getEncoder().encodeToString(fileBytes);
}

public static byte[] convertBase64StringToCheckpointFileDataBytes(String base64CheckpointString) {
if (base64CheckpointString == null) {
return null;
}
return Base64.getDecoder().decode(base64CheckpointString);
}

public long getGeneration() {
return generation;
}

public Long getCheckpointChecksum() {
return checkpointChecksum;
}

@Override
public int hashCode() {
return Objects.hash(generation, super.hashCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public class TranslogCheckpointTransferSnapshot implements TransferSnapshot, Clo
}

private void add(TranslogFileSnapshot translogFileSnapshot, CheckpointFileSnapshot checkPointFileSnapshot) {
// set checkpoint file path and checkpoint file checksum for a translog file
translogFileSnapshot.setCheckpointFilePath(checkPointFileSnapshot.getPath());
translogFileSnapshot.setCheckpointChecksum(checkPointFileSnapshot.getChecksum());

translogCheckpointFileInfoTupleSet.add(Tuple.tuple(translogFileSnapshot, checkPointFileSnapshot));
assert translogFileSnapshot.getGeneration() == checkPointFileSnapshot.getGeneration();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.common.SetOnce;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.FetchBlobResult;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.common.io.stream.BytesStreamOutput;
Expand All @@ -27,6 +28,7 @@
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogCheckedContainer;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -47,6 +49,9 @@

import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot.CHECKPOINT_FILE_CHECKSUM_KEY;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot.CHECKPOINT_FILE_DATA_KEY;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot.convertBase64StringToCheckpointFileDataBytes;

/**
* The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService}
Expand Down Expand Up @@ -236,16 +241,60 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca
generation,
location
);
// Download Checkpoint file from remote to local FS
String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation));
downloadToFS(ckpFileName, location, primaryTerm);
// Download translog file from remote to local FS

// Download translog file with object metadata from remote to local FS
String translogFilename = Translog.getFilename(Long.parseLong(generation));
downloadToFS(translogFilename, location, primaryTerm);
downloadTranslogFileToFS(translogFilename, location, primaryTerm, generation);
return true;
}

private void downloadToFS(String fileName, Path location, String primaryTerm) throws IOException {
private void downloadTranslogFileToFS(String fileName, Path location, String primaryTerm, String generation) throws IOException {
Path filePath = location.resolve(fileName);
// Here, we always override the existing file if present.
// We need to change this logic when we introduce incremental download
if (Files.exists(filePath)) {
Files.delete(filePath);
}

boolean downloadStatus = false;
long bytesToRead = 0, downloadStartTime = System.nanoTime();
Map<String, String> metadata;

try (
FetchBlobResult fetchBlobResult = transferService.downloadBlobWithMetadata(remoteDataTransferPath.add(primaryTerm), fileName)
) {
InputStream inputStream = fetchBlobResult.getInputStream();
metadata = fetchBlobResult.getMetadata();

bytesToRead = inputStream.available();
Files.copy(inputStream, filePath);
downloadStatus = true;

logger.info("downloaded translog for fileName = {}, with metadata = {}", fileName, metadata);
} finally {
remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L);
if (downloadStatus) {
remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead);
}
}

// Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync
fileTransferTracker.add(fileName, true);

try {
if (metadata == null || metadata.isEmpty()) {
logger.info("metadata is null. Download checkpoint file from remote store separately");
String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation));
downloadCheckpointFileToFS(ckpFileName, location, primaryTerm);
} else {
writeCheckpointFileFromMetadata(metadata, location, generation, fileName);
}
} catch (Exception e) {
throw new IOException("Failed to download translog file from remote", e);
}
}

private void downloadCheckpointFileToFS(String fileName, Path location, String primaryTerm) throws IOException {
Path filePath = location.resolve(fileName);
// Here, we always override the existing file if present.
// We need to change this logic when we introduce incremental download
Expand All @@ -271,6 +320,73 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th
fileTransferTracker.add(fileName, true);
}

private void writeCheckpointFileFromMetadata(Map<String, String> metadata, Path location, String generation, String fileName)
throws IOException {

try {
String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation));
Path filePath = location.resolve(ckpFileName);

// Here, we always override the existing file if present.
if (Files.exists(filePath)) {
Files.delete(filePath);
}

String ckpDataBase64 = metadata.get(CHECKPOINT_FILE_DATA_KEY);
String checksumKeyValue = metadata.get(CHECKPOINT_FILE_CHECKSUM_KEY);
if (ckpDataBase64 == null) {
throw new IllegalStateException(
"Checkpoint file data (ckp-data) key is expected but not found in metadata for file: " + fileName
);
}
if (checksumKeyValue == null) {
throw new IllegalStateException(
"Checkpoint file checksum (ckp-checksum) key is expected but not found in metadata for file: " + fileName
);
}

byte[] ckpFileBytes = convertBase64StringToCheckpointFileDataBytes(ckpDataBase64);
Long remoteDataChecksum = Long.parseLong(checksumKeyValue);

TranslogCheckedContainer translogCheckedContainer = new TranslogCheckedContainer(ckpFileBytes);
Long currentDataChecksum = translogCheckedContainer.getChecksum();

if (currentDataChecksum.equals(remoteDataChecksum)) {
logger.debug(
"Checksum verification successful. currentDataChecksum={}, remoteDataChecksum={}",
currentDataChecksum,
remoteDataChecksum
);
} else {
logger.warn(
"Checksum verification failed. currentDataChecksum={}, remoteDataChecksum={}",
currentDataChecksum,
remoteDataChecksum
);
throw new RuntimeException(
"Checksum verification failed for file: "
+ fileName
+ ". currentDataChecksum="
+ currentDataChecksum
+ ", remoteChecksum="
+ remoteDataChecksum
);
}

Files.write(filePath, ckpFileBytes);

// Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync
fileTransferTracker.add(ckpFileName, true);
logger.info("Wrote checkpoint file for fileName: {}", fileName);
} catch (IOException e) {
logger.error("Error writing checkpoint file for file: {}", fileName, e);
throw e;
} catch (IllegalStateException e) {
logger.error("Error processing metadata for file: {}", fileName, e);
throw e;
}
}

public TranslogTransferMetadata readMetadata() throws IOException {
SetOnce<TranslogTransferMetadata> metadataSetOnce = new SetOnce<>();
SetOnce<IOException> exceptionSetOnce = new SetOnce<>();
Expand Down
Loading

0 comments on commit d046e97

Please sign in to comment.