Skip to content

Commit

Permalink
Introduce checkpoint data as string in TranslogFileSnapshot as metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
Sandeep Kumawat committed Apr 9, 2024
1 parent 9b0f578 commit d65cdb3
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Base64;
import java.util.Objects;

/**
Expand Down Expand Up @@ -153,12 +155,42 @@ public boolean equals(Object o) {
public static final class TranslogFileSnapshot extends TransferFileSnapshot {

private final long generation;
private Path checkpointFilePath;
private String checkpointDataAsString;

public TranslogFileSnapshot(long primaryTerm, long generation, Path path, Long checksum) throws IOException {
super(path, primaryTerm, checksum);
this.generation = generation;
}

public TranslogFileSnapshot(long primaryTerm, long generation, Path path, Long checksum, Path checkpointPath) throws IOException {
super(path, primaryTerm, checksum);
this.generation = generation;
this.checkpointFilePath = checkpointPath;
}

public String provideCheckpointDataAsString() throws IOException {
this.checkpointDataAsString = buildCheckpointDataAsBase64String(checkpointFilePath);
return checkpointDataAsString;
}

static String buildCheckpointDataAsBase64String(Path checkpointFilePath) throws IOException {
try (FileChannel fileChannel = FileChannel.open(checkpointFilePath, StandardOpenOption.READ)) {
assert fileChannel.size() < 1500 : "checkpoint file size of more then 1.5KB size, can't be stored as metadata";
ByteBuffer buffer = ByteBuffer.allocate((int) fileChannel.size());
fileChannel.read(buffer);
buffer.flip();
return Base64.getEncoder().encodeToString(buffer.array());
}
}

public static byte[] convertBase64StringToCheckpointFileDataBytes(String base64CheckpointString) {
if (base64CheckpointString == null) {
return null;
}
return Base64.getDecoder().decode(base64CheckpointString);
}

public long getGeneration() {
return generation;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,13 @@ public TranslogCheckpointTransferSnapshot build() throws IOException {
Path checkpointPath = location.resolve(checkpointGenFileNameMapper.apply(readerGeneration));
generations.add(readerGeneration);
translogTransferSnapshot.add(
new TranslogFileSnapshot(readerPrimaryTerm, readerGeneration, translogPath, reader.getTranslogChecksum()),
new TranslogFileSnapshot(
readerPrimaryTerm,
readerGeneration,
translogPath,
reader.getTranslogChecksum(),
checkpointPath
),
new CheckpointFileSnapshot(
readerPrimaryTerm,
checkpointGeneration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@

package org.opensearch.index.translog.transfer;

import org.opensearch.index.translog.Translog;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.After;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Base64;

public class FileSnapshotTests extends OpenSearchTestCase {

Expand Down Expand Up @@ -69,6 +73,63 @@ public void testFileSnapshotContent() throws IOException {
}
}

public void testBuildCheckpointDataAsBase64String() throws IOException {
Path file = createTempFile(Translog.TRANSLOG_FILE_PREFIX + 10, Translog.CHECKPOINT_SUFFIX);
Files.writeString(file, "hello_world_with_checkpoint_file_data");
Files.writeString(file, "hello_world_with_checkpoint_file_data-2");
Files.writeString(file, "hello_world_with_checkpoint_file_data-4");
Files.writeString(file, "213123123");

fileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12, null);

assertFileSnapshotProperties(file);
String encodedString = FileSnapshot.TranslogFileSnapshot.buildCheckpointDataAsBase64String(file);

// Assert
assertNotNull(encodedString);
byte[] decoded = Base64.getDecoder().decode(encodedString);
assertArrayEquals(Files.readAllBytes(file), decoded);
}

public void testBuildCheckpointDataAsBase64StringWhenPathIsNull() throws IOException {
Path file = createTempFile(Translog.TRANSLOG_FILE_PREFIX + 10, Translog.CHECKPOINT_SUFFIX);
Files.writeString(file, "hello_world_with_checkpoint_file_data");

fileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12, null);

assertFileSnapshotProperties(file);

assertThrows(NullPointerException.class, () -> FileSnapshot.TranslogFileSnapshot.buildCheckpointDataAsBase64String(null));
}

public void testConvertCheckpointBase64StringToBytes() throws IOException {
Path file = createTempFile(Translog.TRANSLOG_FILE_PREFIX + 10, Translog.CHECKPOINT_SUFFIX);
Files.writeString(file, "test-hello_world_with_checkpoint_file_data");

fileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12, null);

assertFileSnapshotProperties(file);
String encodedString = FileSnapshot.TranslogFileSnapshot.buildCheckpointDataAsBase64String(file);

byte[] decodedBytes = FileSnapshot.TranslogFileSnapshot.convertBase64StringToCheckpointFileDataBytes(encodedString);
assertNotNull(encodedString);
assertArrayEquals("test-hello_world_with_checkpoint_file_data".getBytes(), decodedBytes);
}

public void testBuildCheckpointDataAsBase64String_whenFileSizeGreaterThan2KB_shouldThrowAssertionError() throws IOException {
Path file = createTempFile(Translog.TRANSLOG_FILE_PREFIX + 10, Translog.CHECKPOINT_SUFFIX);
byte[] data = new byte[2048]; // 2KB

fileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12, null);

assertFileSnapshotProperties(file);

ByteBuffer buffer = ByteBuffer.wrap(data);
Files.write(file, buffer.array(), StandardOpenOption.WRITE);

assertThrows(AssertionError.class, () -> FileSnapshot.TranslogFileSnapshot.buildCheckpointDataAsBase64String(file));
}

private void assertFileSnapshotProperties(Path file) throws IOException {
assertEquals(file.getFileName().toString(), fileSnapshot.getName());
assertEquals(Files.size(file), fileSnapshot.getContentLength());
Expand Down

0 comments on commit d65cdb3

Please sign in to comment.