diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java index 87d94dc1147d1..04a0f1868b64f 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java @@ -15,12 +15,12 @@ import org.apache.lucene.store.InputStreamDataInput; import org.apache.lucene.store.OutputStreamDataOutput; import org.opensearch.Version; +import org.opensearch.common.io.stream.BufferedChecksumStreamInput; +import org.opensearch.common.io.stream.BufferedChecksumStreamOutput; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.BytesStreamInput; import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.BufferedChecksumStreamInput; -import org.opensearch.common.io.stream.BufferedChecksumStreamOutput; import java.io.EOFException; import java.io.IOException; @@ -30,11 +30,11 @@ */ public class IndexRoutingTableHeader { - private int routingTableVersion; + private final long routingTableVersion; - private String indexName; + private final String indexName; - private Version nodeVersion; + private final Version nodeVersion; public static final String INDEX_ROUTING_HEADER_CODEC = "index_routing_header_codec"; @@ -42,8 +42,7 @@ public class IndexRoutingTableHeader { public static final int CURRENT_VERSION = INITIAL_VERSION; - - public IndexRoutingTableHeader(int routingTableVersion, String indexName, Version nodeVersion) { + public IndexRoutingTableHeader(long routingTableVersion, String indexName, Version nodeVersion) { this.routingTableVersion = routingTableVersion; this.indexName = indexName; this.nodeVersion = nodeVersion; @@ -56,11 +55,13 @@ public IndexRoutingTableHeader(int routingTableVersion, String indexName, Versio */ public BytesReference write() throws IOException { BytesReference bytesReference; - try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); - BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(bytesStreamOutput)) { + try ( + BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(bytesStreamOutput) + ) { CodecUtil.writeHeader(new OutputStreamDataOutput(out), INDEX_ROUTING_HEADER_CODEC, CURRENT_VERSION); // Write version - out.writeInt(routingTableVersion); + out.writeLong(routingTableVersion); out.writeInt(nodeVersion.id); out.writeString(indexName); // Checksum header @@ -71,7 +72,6 @@ public BytesReference write() throws IOException { return bytesReference; } - /** * Reads the contents on the byte array into the corresponding {@link IndexRoutingTableHeader} * @param inBytes @@ -81,7 +81,7 @@ public BytesReference write() throws IOException { */ public IndexRoutingTableHeader read(byte[] inBytes, String source) throws IOException { try { - try(BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(new BytesStreamInput(inBytes), source)) { + try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(new BytesStreamInput(inBytes), source)) { readHeaderVersion(in); final int version = in.readInt(); final int nodeVersion = in.readInt(); @@ -96,7 +96,6 @@ public IndexRoutingTableHeader read(byte[] inBytes, String source) throws IOExce } } - static void verifyChecksum(BufferedChecksumStreamInput in) throws IOException { // This absolutely must come first, or else reading the checksum becomes part of the checksum long expectedChecksum = in.getChecksum(); @@ -121,7 +120,7 @@ static int readHeaderVersion(final StreamInput in) throws IOException { return version; } - public int getRoutingTableVersion() { + public long getRoutingTableVersion() { return routingTableVersion; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java new file mode 100644 index 0000000000000..ac65232e9a24d --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java @@ -0,0 +1,157 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.routingtable; + +import org.opensearch.Version; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.common.io.stream.BufferedChecksumStreamOutput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.bytes.BytesReference; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; + +public class IndexRoutingTableInputStream extends InputStream { + + /** + * The buffer where data is stored. + */ + protected byte[] buf; + + /** + * The number of valid bytes in the buffer. + */ + protected int count; + + /** + * The buffer left over from the last fill + */ + protected byte[] leftOverBuf; + + /** + * The mark position + */ + protected int markPos = -1; + + /** + * The read limit + */ + protected int markLimit; + + /** + * The position + */ + protected int pos; + + private static final int BUFFER_SIZE = 8192; + + private final IndexRoutingTableHeader indexRoutingTableHeader; + + private final Iterator shardIter; + + public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, long version, Version nodeVersion) throws IOException { + this(indexRoutingTable, version, nodeVersion, BUFFER_SIZE); + } + + public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, long version, Version nodeVersion, int size) + throws IOException { + this.buf = new byte[size]; + this.shardIter = indexRoutingTable.iterator(); + this.indexRoutingTableHeader = new IndexRoutingTableHeader(version, indexRoutingTable.getIndex().getName(), nodeVersion); + initialFill(); + } + + @Override + public int read() throws IOException { + if (pos >= count) { + maybeResizeAndFill(); + if (pos >= count) return -1; + } + return buf[pos++] & 0xff; + } + + private void initialFill() throws IOException { + BytesReference bytesReference = indexRoutingTableHeader.write(); + buf = bytesReference.toBytesRef().bytes; + count = bytesReference.length(); + fill(buf); + } + + private void fill(byte[] buf) throws IOException { + if (leftOverBuf != null) { + System.arraycopy(leftOverBuf, 0, buf, count, leftOverBuf.length); + } + if (count < buf.length && shardIter.hasNext()) { + IndexShardRoutingTable next = shardIter.next(); + BytesReference bytesRef; + try ( + BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(bytesStreamOutput) + ) { + IndexShardRoutingTable.Builder.writeTo(next, out); + // Checksum header + out.writeInt((int) out.getChecksum()); + out.flush(); + bytesRef = bytesStreamOutput.bytes(); + } + if (bytesRef.length() < buf.length - count) { + System.arraycopy(bytesRef.toBytesRef().bytes, 0, buf, count, bytesRef.length()); + count += bytesRef.length(); + leftOverBuf = null; + } else { + System.arraycopy(bytesRef.toBytesRef().bytes, 0, buf, count, buf.length - count); + count += buf.length - count; + leftOverBuf = new byte[bytesRef.length() - count]; + System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count + 1, leftOverBuf, 0, bytesRef.length() - count); + } + } + } + + private void maybeResizeAndFill() throws IOException { + byte[] buffer = buf; + if (markPos == -1) pos = 0; /* no mark: throw away the buffer */ + else if (pos >= buffer.length) { /* no room left in buffer */ + if (markPos > 0) { /* can throw away early part of the buffer */ + int sz = pos - markPos; + System.arraycopy(buffer, markPos, buffer, 0, sz); + pos = sz; + markPos = 0; + } else if (buffer.length >= markLimit) { + markPos = -1; /* buffer got too big, invalidate mark */ + pos = 0; /* drop buffer contents */ + } else { /* grow buffer */ + int nsz = markLimit + 1; + byte[] nbuf = new byte[nsz]; + System.arraycopy(buffer, 0, nbuf, 0, pos); + buffer = nbuf; + } + } + count = pos; + fill(buffer); + } + + @Override + public void mark(int readlimit) { + markLimit = readlimit; + markPos = pos; + } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public void reset() throws IOException { + if (markPos < 0) throw new IOException("Resetting to invalid mark"); + pos = markPos; + } +}