Skip to content

Commit

Permalink
Changes for IndexRoutingTableInputStream
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <[email protected]>
  • Loading branch information
Bukhtawar authored and Himshikha Gupta committed Apr 23, 2024
1 parent 933cdc2 commit f35f0dc
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
import org.apache.lucene.store.InputStreamDataInput;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.opensearch.Version;
import org.opensearch.common.io.stream.BufferedChecksumStreamInput;
import org.opensearch.common.io.stream.BufferedChecksumStreamOutput;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.BufferedChecksumStreamInput;
import org.opensearch.common.io.stream.BufferedChecksumStreamOutput;

import java.io.EOFException;
import java.io.IOException;
Expand All @@ -30,20 +30,19 @@
*/
public class IndexRoutingTableHeader {

private int routingTableVersion;
private final long routingTableVersion;

private String indexName;
private final String indexName;

private Version nodeVersion;
private final Version nodeVersion;

public static final String INDEX_ROUTING_HEADER_CODEC = "index_routing_header_codec";

public static final int INITIAL_VERSION = 1;

public static final int CURRENT_VERSION = INITIAL_VERSION;


public IndexRoutingTableHeader(int routingTableVersion, String indexName, Version nodeVersion) {
public IndexRoutingTableHeader(long routingTableVersion, String indexName, Version nodeVersion) {
this.routingTableVersion = routingTableVersion;
this.indexName = indexName;
this.nodeVersion = nodeVersion;
Expand All @@ -56,11 +55,13 @@ public IndexRoutingTableHeader(int routingTableVersion, String indexName, Versio
*/
public BytesReference write() throws IOException {
BytesReference bytesReference;
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(bytesStreamOutput)) {
try (
BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(bytesStreamOutput)
) {
CodecUtil.writeHeader(new OutputStreamDataOutput(out), INDEX_ROUTING_HEADER_CODEC, CURRENT_VERSION);
// Write version
out.writeInt(routingTableVersion);
out.writeLong(routingTableVersion);
out.writeInt(nodeVersion.id);
out.writeString(indexName);
// Checksum header
Expand All @@ -71,7 +72,6 @@ public BytesReference write() throws IOException {
return bytesReference;
}


/**
* Reads the contents on the byte array into the corresponding {@link IndexRoutingTableHeader}
* @param inBytes
Expand All @@ -81,7 +81,7 @@ public BytesReference write() throws IOException {
*/
public IndexRoutingTableHeader read(byte[] inBytes, String source) throws IOException {
try {
try(BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(new BytesStreamInput(inBytes), source)) {
try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(new BytesStreamInput(inBytes), source)) {
readHeaderVersion(in);
final int version = in.readInt();
final int nodeVersion = in.readInt();
Expand All @@ -96,7 +96,6 @@ public IndexRoutingTableHeader read(byte[] inBytes, String source) throws IOExce
}
}


static void verifyChecksum(BufferedChecksumStreamInput in) throws IOException {
// This absolutely must come first, or else reading the checksum becomes part of the checksum
long expectedChecksum = in.getChecksum();
Expand All @@ -121,7 +120,7 @@ static int readHeaderVersion(final StreamInput in) throws IOException {
return version;
}

public int getRoutingTableVersion() {
public long getRoutingTableVersion() {
return routingTableVersion;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.routingtable;

import org.opensearch.Version;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.common.io.stream.BufferedChecksumStreamOutput;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.bytes.BytesReference;

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;

public class IndexRoutingTableInputStream extends InputStream {

/**
* The buffer where data is stored.
*/
protected byte[] buf;

/**
* The number of valid bytes in the buffer.
*/
protected int count;

/**
* The buffer left over from the last fill
*/
protected byte[] leftOverBuf;

/**
* The mark position
*/
protected int markPos = -1;

/**
* The read limit
*/
protected int markLimit;

/**
* The position
*/
protected int pos;

private static final int BUFFER_SIZE = 8192;

private final IndexRoutingTableHeader indexRoutingTableHeader;

private final Iterator<IndexShardRoutingTable> shardIter;

public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, long version, Version nodeVersion) throws IOException {
this(indexRoutingTable, version, nodeVersion, BUFFER_SIZE);
}

public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, long version, Version nodeVersion, int size)
throws IOException {
this.buf = new byte[size];
this.shardIter = indexRoutingTable.iterator();
this.indexRoutingTableHeader = new IndexRoutingTableHeader(version, indexRoutingTable.getIndex().getName(), nodeVersion);
initialFill();
}

@Override
public int read() throws IOException {
if (pos >= count) {
maybeResizeAndFill();
if (pos >= count) return -1;
}
return buf[pos++] & 0xff;
}

private void initialFill() throws IOException {
BytesReference bytesReference = indexRoutingTableHeader.write();
buf = bytesReference.toBytesRef().bytes;
count = bytesReference.length();
fill(buf);
}

private void fill(byte[] buf) throws IOException {
if (leftOverBuf != null) {
System.arraycopy(leftOverBuf, 0, buf, count, leftOverBuf.length);
}
if (count < buf.length && shardIter.hasNext()) {
IndexShardRoutingTable next = shardIter.next();
BytesReference bytesRef;
try (
BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(bytesStreamOutput)
) {
IndexShardRoutingTable.Builder.writeTo(next, out);
// Checksum header
out.writeInt((int) out.getChecksum());
out.flush();
bytesRef = bytesStreamOutput.bytes();
}
if (bytesRef.length() < buf.length - count) {
System.arraycopy(bytesRef.toBytesRef().bytes, 0, buf, count, bytesRef.length());
count += bytesRef.length();
leftOverBuf = null;
} else {
System.arraycopy(bytesRef.toBytesRef().bytes, 0, buf, count, buf.length - count);
count += buf.length - count;
leftOverBuf = new byte[bytesRef.length() - count];
System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count + 1, leftOverBuf, 0, bytesRef.length() - count);
}
}
}

private void maybeResizeAndFill() throws IOException {
byte[] buffer = buf;
if (markPos == -1) pos = 0; /* no mark: throw away the buffer */
else if (pos >= buffer.length) { /* no room left in buffer */
if (markPos > 0) { /* can throw away early part of the buffer */
int sz = pos - markPos;
System.arraycopy(buffer, markPos, buffer, 0, sz);
pos = sz;
markPos = 0;
} else if (buffer.length >= markLimit) {
markPos = -1; /* buffer got too big, invalidate mark */
pos = 0; /* drop buffer contents */
} else { /* grow buffer */
int nsz = markLimit + 1;
byte[] nbuf = new byte[nsz];
System.arraycopy(buffer, 0, nbuf, 0, pos);
buffer = nbuf;
}
}
count = pos;
fill(buffer);
}

@Override
public void mark(int readlimit) {
markLimit = readlimit;
markPos = pos;
}

@Override
public boolean markSupported() {
return true;
}

@Override
public void reset() throws IOException {
if (markPos < 0) throw new IOException("Resetting to invalid mark");
pos = markPos;
}
}

0 comments on commit f35f0dc

Please sign in to comment.