Skip to content

Commit

Permalink
metadata upload (all flows) and download(not async multipart flow) te…
Browse files Browse the repository at this point in the history
…sted

Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
Sandeep Kumawat committed Apr 8, 2024
1 parent ca99c35 commit 3332318
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,10 @@ public void writeBlobWithMetadata(
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
SocketAccess.doPrivilegedIOException(() -> {
if (blobSize <= getLargeBlobThresholdInBytes()) {
logger.info("using executeSingleUpload()..to upload file = {}", blobName);
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
} else {
logger.info("using executeMultipartUpload()..to upload file = {}", blobName);
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
}
return null;
Expand All @@ -231,6 +233,7 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
);
InputStreamContainer inputStream = streamContext.provideStream(0);
try {
logger.info("Using multipart upload method..");
executeMultipartUpload(
blobStore,
uploadRequest.getKey(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,12 @@ public CompletableFuture<Void> uploadObject(
CompletableFuture<Void> returnFuture = new CompletableFuture<>();
try {
if (streamContext.getNumberOfParts() == 1) {
log.info("using uploadInOneChunk()..to upload file = {}", uploadRequest.getKey());
log.debug(() -> "Starting the upload as a single upload part request");
uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext.provideStream(0), returnFuture, statsMetricPublisher);
} else {
log.debug(() -> "Starting the upload as multipart upload request");
log.info("using uploadInParts()..to upload file = {}", uploadRequest.getKey());
uploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, statsMetricPublisher);
}
} catch (Throwable throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@
import java.io.InputStream;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.*;

import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC;

Expand Down Expand Up @@ -81,8 +78,9 @@ public void uploadBlob(
public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable<String> remoteTransferPath, WritePriority writePriority)
throws IOException {
BlobPath blobPath = (BlobPath) remoteTransferPath;
Map<String, String> metadata = prepareFileMetadata(fileSnapshot);
try (InputStream inputStream = fileSnapshot.inputStream()) {
blobStore.blobContainer(blobPath).writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true);
blobStore.blobContainer(blobPath).writeBlobAtomicWithMetadata(fileSnapshot.getName(), inputStream, metadata, fileSnapshot.getContentLength(), true);
}
}

Expand All @@ -98,18 +96,38 @@ public void uploadBlobs(
if (!(blobStore.blobContainer(blobPath) instanceof AsyncMultiStreamBlobContainer)) {
uploadBlob(ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, blobPath, listener, writePriority);
} else {
uploadBlob(fileSnapshot, listener, blobPath, writePriority);
if(!(fileSnapshot instanceof FileSnapshot.CheckpointFileSnapshot)) {
logger.info("uploading file = {}", fileSnapshot.getName());
try {
uploadBlob(fileSnapshot, listener, blobPath, writePriority);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
});

}

private Map<String, String> prepareFileMetadata(TransferFileSnapshot fileSnapshot) throws IOException {
Map<String, String> metadata = new HashMap<>();
if (fileSnapshot instanceof FileSnapshot.TranslogFileSnapshot){
FileSnapshot.TranslogFileSnapshot tlogFileSnapshot = (FileSnapshot.TranslogFileSnapshot) fileSnapshot;
String ckpAsString = tlogFileSnapshot.provideCheckpointDataAsString();
metadata.put(FileSnapshot.TranslogFileSnapshot.CHECKPOINT_OBJECT_METADATA_KEY, ckpAsString);
return metadata;
}
return null;
}

private void uploadBlob(
TransferFileSnapshot fileSnapshot,
ActionListener<TransferFileSnapshot> listener,
BlobPath blobPath,
WritePriority writePriority
) {
) throws IOException {

Map<String, String> metadata = prepareFileMetadata(fileSnapshot);

try {
ChannelFactory channelFactory = FileChannel::open;
Expand All @@ -130,7 +148,8 @@ private void uploadBlob(
writePriority,
(size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position),
Objects.requireNonNull(fileSnapshot.getChecksum()),
remoteIntegrityEnabled
remoteIntegrityEnabled,
metadata
);
ActionListener<Void> completionListener = ActionListener.wrap(resp -> listener.onResponse(fileSnapshot), ex -> {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex);
Expand All @@ -149,6 +168,7 @@ private void uploadBlob(
((AsyncMultiStreamBlobContainer) blobStore.blobContainer(blobPath)).asyncBlobUpload(writeContext, completionListener);

} catch (Exception e) {
logger.info("Exception while uploading file = {} with metadata", fileSnapshot.getName());
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e);
listener.onFailure(new FileTransferException(fileSnapshot, e));
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Base64;
import java.util.Objects;

/**
Expand Down Expand Up @@ -153,12 +155,43 @@ public boolean equals(Object o) {
public static final class TranslogFileSnapshot extends TransferFileSnapshot {

private final long generation;
private Path checkpointFilePath;
private String checkpointDataAsString;
public final static String CHECKPOINT_OBJECT_METADATA_KEY = "ckpfile";

public TranslogFileSnapshot(long primaryTerm, long generation, Path path, Long checksum) throws IOException {
super(path, primaryTerm, checksum);
this.generation = generation;
}

public TranslogFileSnapshot(long primaryTerm, long generation, Path path, Long checksum, Path checkpointPath) throws IOException {
super(path, primaryTerm, checksum);
this.generation = generation;
this.checkpointFilePath = checkpointPath;
}

public String provideCheckpointDataAsString() throws IOException {
this.checkpointDataAsString = buildCheckpointDataAsBase64String(checkpointFilePath);
return checkpointDataAsString;
}

static String buildCheckpointDataAsBase64String(Path checkpointFilePath) throws IOException {
try (FileChannel fileChannel = FileChannel.open(checkpointFilePath, StandardOpenOption.READ)) {
assert fileChannel.size() < 1500 : "checkpoint file size of more then 1.5KB size, can't be stored as metadata";
ByteBuffer buffer = ByteBuffer.allocate((int) fileChannel.size());
fileChannel.read(buffer);
buffer.flip();
return Base64.getEncoder().encodeToString(buffer.array());
}
}

public static byte[] convertBase64StringToCheckpointFileDataBytes(String base64CheckpointString) {
if (base64CheckpointString == null) {
return null;
}
return Base64.getDecoder().decode(base64CheckpointString);
}

public long getGeneration() {
return generation;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public TranslogCheckpointTransferSnapshot build() throws IOException {
Path checkpointPath = location.resolve(checkpointGenFileNameMapper.apply(readerGeneration));
generations.add(readerGeneration);
translogTransferSnapshot.add(
new TranslogFileSnapshot(readerPrimaryTerm, readerGeneration, translogPath, reader.getTranslogChecksum()),
new TranslogFileSnapshot(readerPrimaryTerm, readerGeneration, translogPath, reader.getTranslogChecksum(), checkpointPath),
new CheckpointFileSnapshot(
readerPrimaryTerm,
checkpointGeneration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.SetOnce;
import org.opensearch.common.blobstore.BlobDownloadResponse;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.WritePriority;
Expand Down Expand Up @@ -47,6 +48,8 @@

import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot.CHECKPOINT_OBJECT_METADATA_KEY;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot.convertBase64StringToCheckpointFileDataBytes;

/**
* The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService}
Expand Down Expand Up @@ -111,7 +114,9 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans

try {
toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots()));
toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots())));
//skip checkpoint files...
//toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots())));

if (toUpload.isEmpty()) {
logger.trace("Nothing to upload for transfer");
return true;
Expand Down Expand Up @@ -237,15 +242,15 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca
location
);
// Download Checkpoint file from remote to local FS
String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation));
downloadToFS(ckpFileName, location, primaryTerm);
//String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation));
//downloadToFS(ckpFileName, location, primaryTerm);
// Download translog file from remote to local FS
String translogFilename = Translog.getFilename(Long.parseLong(generation));
downloadToFS(translogFilename, location, primaryTerm);
downloadToFS(translogFilename, location, primaryTerm, generation);
return true;
}

private void downloadToFS(String fileName, Path location, String primaryTerm) throws IOException {
private void downloadToFS(String fileName, Path location, String primaryTerm, String generation) throws IOException {
Path filePath = location.resolve(fileName);
// Here, we always override the existing file if present.
// We need to change this logic when we introduce incremental download
Expand All @@ -255,8 +260,16 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th

boolean downloadStatus = false;
long bytesToRead = 0, downloadStartTime = System.nanoTime();
try (InputStream inputStream = transferService.downloadBlob(remoteDataTransferPath.add(primaryTerm), fileName)) {
BlobDownloadResponse downloaded = transferService.downloadBlobWithMetadata(remoteDataTransferPath.add(primaryTerm), fileName);
try {
// Capture number of bytes for stats before reading
InputStream inputStream = downloaded.getInputStream();
Map<String, String> metadata = downloaded.getMetadata();

logger.info("downloaded translog for fileName = {}, metadata = {}", fileName, metadata);

applyMetadataToCkpFile(metadata, location, generation, fileName);

bytesToRead = inputStream.available();
Files.copy(inputStream, filePath);
downloadStatus = true;
Expand All @@ -271,6 +284,33 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th
fileTransferTracker.add(fileName, true);
}

private void applyMetadataToCkpFile(Map<String, String> metadata, Path location, String generation, String fileName) throws IOException {
String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation));
Path filePath = location.resolve(ckpFileName);

byte[] ckpFileBytes = convertBase64StringToCheckpointFileDataBytes(metadata.get(CHECKPOINT_OBJECT_METADATA_KEY));
if (ckpFileBytes == null) {
logger.info("If ckpFileBytes is null file should be ckp which is without metadata, for file = {}", fileName);
return;
}

// Here, we always override the existing file if present.
// We need to change this logic when we introduce incremental download
if (Files.exists(filePath)) {
Files.delete(filePath);
}

try {
Files.write(filePath, ckpFileBytes);
// Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync
fileTransferTracker.add(ckpFileName, true);
logger.info("Applied metadata bytes to checkpoint file for fileName = {}", fileName);
} catch (IOException e) {
logger.debug("Error in processing metadata bytes");
throw e;
}
}

public TranslogTransferMetadata readMetadata() throws IOException {
SetOnce<TranslogTransferMetadata> metadataSetOnce = new SetOnce<>();
SetOnce<IOException> exceptionSetOnce = new SetOnce<>();
Expand Down

0 comments on commit 3332318

Please sign in to comment.