From 1163d1de4e383c100fe836ebfb3086c4b1f2b399 Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Tue, 7 May 2024 16:11:06 +0530 Subject: [PATCH] Fixing IndexRoutingTableInputStream and moving checksum to end to file Signed-off-by: Himshikha Gupta --- .../remote/ClusterMetadataManifest.java | 15 +++-- .../routingtable/IndexRoutingTableHeader.java | 32 ++--------- .../IndexRoutingTableInputStream.java | 57 ++++++++++++------- .../IndexRoutingTableHeaderTests.java | 35 ++++++++++++ 4 files changed, 86 insertions(+), 53 deletions(-) create mode 100644 server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 4e5891d154de0..0279f8e0fd805 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -276,7 +276,7 @@ public ClusterMetadataManifest( boolean clusterUUIDCommitted ) { this(clusterTerm, version, clusterUUID, stateUUID, opensearchVersion, nodeId, committed, codecVersion, - globalMetadataFileName, indices, previousClusterUUID, clusterUUIDCommitted, null); + globalMetadataFileName, indices, previousClusterUUID, clusterUUIDCommitted, new ArrayList<>()); } public ClusterMetadataManifest( @@ -355,7 +355,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startArray(INDICES_FIELD.getPreferredName()); { for (UploadedIndexMetadata uploadedIndexMetadata : indices) { + builder.startObject(); uploadedIndexMetadata.toXContent(builder, params); + builder.endObject(); } } builder.endArray(); @@ -369,9 +371,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startArray(INDICES_ROUTING_FIELD.getPreferredName()); { for (UploadedIndexMetadata uploadedIndexMetadata : indicesRouting) { + builder.startObject(); uploadedIndexMetadata.toXContent(builder, params); + builder.endObject(); } } + builder.endArray(); } return builder; } @@ -391,7 +396,8 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeInt(codecVersion); out.writeString(globalMetadataFileName); - } else if (out.getVersion().onOrAfter(Version.V_2_14_0)) { + } + if (out.getVersion().onOrAfter(Version.V_2_14_0)) { out.writeCollection(indicesRouting); } } @@ -659,11 +665,10 @@ public String getIndexUUID() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return builder.startObject() + return builder .field(INDEX_NAME_FIELD.getPreferredName(), getIndexName()) .field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID()) - .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath()) - .endObject(); + .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath()); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java index 04a0f1868b64f..23ab700d5a34f 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java @@ -21,6 +21,7 @@ import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.BytesStreamInput; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; import java.io.EOFException; import java.io.IOException; @@ -50,26 +51,16 @@ public IndexRoutingTableHeader(long routingTableVersion, String indexName, Versi /** * Returns the bytes reference for the {@link IndexRoutingTableHeader} - * @return the {@link BytesReference} * @throws IOException */ - public BytesReference write() throws IOException { - BytesReference bytesReference; - try ( - BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); - BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(bytesStreamOutput) - ) { + public void write(StreamOutput out) throws IOException { CodecUtil.writeHeader(new OutputStreamDataOutput(out), INDEX_ROUTING_HEADER_CODEC, CURRENT_VERSION); // Write version out.writeLong(routingTableVersion); out.writeInt(nodeVersion.id); out.writeString(indexName); - // Checksum header - out.writeInt((int) out.getChecksum()); + out.flush(); - bytesReference = bytesStreamOutput.bytes(); - } - return bytesReference; } /** @@ -83,10 +74,9 @@ public IndexRoutingTableHeader read(byte[] inBytes, String source) throws IOExce try { try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(new BytesStreamInput(inBytes), source)) { readHeaderVersion(in); - final int version = in.readInt(); + final long version = in.readLong(); final int nodeVersion = in.readInt(); final String name = in.readString(); - verifyChecksum(in); assert version >= 0 : "Version must be non-negative [" + version + "]"; assert in.readByte() == -1 : "Header is not fully read"; return new IndexRoutingTableHeader(version, name, Version.fromId(nodeVersion)); @@ -96,20 +86,6 @@ public IndexRoutingTableHeader read(byte[] inBytes, String source) throws IOExce } } - static void verifyChecksum(BufferedChecksumStreamInput in) throws IOException { - // This absolutely must come first, or else reading the checksum becomes part of the checksum - long expectedChecksum = in.getChecksum(); - long readChecksum = Integer.toUnsignedLong(in.readInt()); - if (readChecksum != expectedChecksum) { - throw new IOException( - "checksum verification failed - expected: 0x" - + Long.toHexString(expectedChecksum) - + ", got: 0x" - + Long.toHexString(readChecksum) - ); - } - } - static int readHeaderVersion(final StreamInput in) throws IOException { final int version; try { diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java index ac65232e9a24d..40e5908d5c65c 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java @@ -54,8 +54,9 @@ public class IndexRoutingTableInputStream extends InputStream { private static final int BUFFER_SIZE = 8192; private final IndexRoutingTableHeader indexRoutingTableHeader; - private final Iterator shardIter; + private final BytesStreamOutput bytesStreamOutput; + private final BufferedChecksumStreamOutput out; public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, long version, Version nodeVersion) throws IOException { this(indexRoutingTable, version, nodeVersion, BUFFER_SIZE); @@ -66,7 +67,10 @@ public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, long ve this.buf = new byte[size]; this.shardIter = indexRoutingTable.iterator(); this.indexRoutingTableHeader = new IndexRoutingTableHeader(version, indexRoutingTable.getIndex().getName(), nodeVersion); - initialFill(); + this.bytesStreamOutput = new BytesStreamOutput(); + this.out = new BufferedChecksumStreamOutput(bytesStreamOutput); + + initialFill(indexRoutingTable.shards().size()); } @Override @@ -78,39 +82,52 @@ public int read() throws IOException { return buf[pos++] & 0xff; } - private void initialFill() throws IOException { - BytesReference bytesReference = indexRoutingTableHeader.write(); - buf = bytesReference.toBytesRef().bytes; - count = bytesReference.length(); + private void initialFill(int shardCount) throws IOException { + indexRoutingTableHeader.write(out); + out.writeVInt(shardCount); + + System.arraycopy(bytesStreamOutput.bytes().toBytesRef().bytes, 0 , buf, 0, bytesStreamOutput.bytes().length()); + count = bytesStreamOutput.bytes().length(); + bytesStreamOutput.reset(); fill(buf); } private void fill(byte[] buf) throws IOException { if (leftOverBuf != null) { - System.arraycopy(leftOverBuf, 0, buf, count, leftOverBuf.length); + if(leftOverBuf.length > buf.length - count) { + // leftOverBuf has more content than length of buf, so we need to copy only based on buf length and keep the remaining in leftOverBuf. + System.arraycopy(leftOverBuf, 0, buf, count, buf.length - count); + byte[] tempLeftOverBuffer = new byte[leftOverBuf.length - (buf.length - count)]; + System.arraycopy(leftOverBuf, buf.length - count , tempLeftOverBuffer, 0, leftOverBuf.length - (buf.length - count)); + leftOverBuf = tempLeftOverBuffer; + count = buf.length - count; + } else { + System.arraycopy(leftOverBuf, 0, buf, count, leftOverBuf.length); + count += leftOverBuf.length; + leftOverBuf = null; + } } + if (count < buf.length && shardIter.hasNext()) { IndexShardRoutingTable next = shardIter.next(); - BytesReference bytesRef; - try ( - BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); - BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(bytesStreamOutput) - ) { - IndexShardRoutingTable.Builder.writeTo(next, out); - // Checksum header - out.writeInt((int) out.getChecksum()); - out.flush(); - bytesRef = bytesStreamOutput.bytes(); + IndexShardRoutingTable.Builder.writeTo(next, out); + //Add checksum for the file after all shards are done + if(!shardIter.hasNext()) { + out.writeLong(out.getChecksum()); } + out.flush(); + BytesReference bytesRef = bytesStreamOutput.bytes(); + bytesStreamOutput.reset(); + if (bytesRef.length() < buf.length - count) { System.arraycopy(bytesRef.toBytesRef().bytes, 0, buf, count, bytesRef.length()); count += bytesRef.length(); leftOverBuf = null; } else { System.arraycopy(bytesRef.toBytesRef().bytes, 0, buf, count, buf.length - count); - count += buf.length - count; - leftOverBuf = new byte[bytesRef.length() - count]; - System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count + 1, leftOverBuf, 0, bytesRef.length() - count); + leftOverBuf = new byte[bytesRef.length() - (buf.length - count)]; + System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count , leftOverBuf, 0, bytesRef.length() - (buf.length - count)); + count = buf.length; } } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java new file mode 100644 index 0000000000000..068db554b4226 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.routingtable; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.InputStreamDataInput; +import org.opensearch.Version; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +public class IndexRoutingTableHeaderTests extends OpenSearchTestCase { + + public void testWrite() throws IOException { + BytesStreamOutput out = new BytesStreamOutput(); + IndexRoutingTableHeader header = new IndexRoutingTableHeader(1, "dummyIndex", Version.V_3_0_0); + header.write(out); + + BytesStreamInput in = new BytesStreamInput(out.bytes().toBytesRef().bytes); + CodecUtil.checkHeader(new InputStreamDataInput(in),IndexRoutingTableHeader.INDEX_ROUTING_HEADER_CODEC, IndexRoutingTableHeader.INITIAL_VERSION, IndexRoutingTableHeader.CURRENT_VERSION ); + assertEquals(1, in.readLong()); + assertEquals(Version.V_3_0_0.id, in.readInt()); + assertEquals("dummyIndex", in.readString()); + } + +}