From 9efd85bf49fa0d961732ecd5fa6dd5232fc4baf4 Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Mon, 13 May 2024 18:08:24 +0530 Subject: [PATCH] Moving routing table version from IndexRouting stream to manifest --- .../remote/ClusterMetadataManifest.java | 33 ++++++++++- .../routingtable/IndexRoutingTableHeader.java | 59 +++++-------------- .../IndexRoutingTableInputStream.java | 9 ++- .../remote/ClusterMetadataManifestTests.java | 29 +++++++++ .../IndexRoutingTableHeaderTests.java | 14 ++--- .../IndexRoutingTableInputStreamTests.java | 12 ++-- 6 files changed, 90 insertions(+), 66 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 0279f8e0fd805..5b7192c624fed 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -49,6 +49,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { private static final ParseField INDICES_FIELD = new ParseField("indices"); private static final ParseField PREVIOUS_CLUSTER_UUID = new ParseField("previous_cluster_uuid"); private static final ParseField CLUSTER_UUID_COMMITTED = new ParseField("cluster_uuid_committed"); + private static final ParseField ROUTING_TABLE_VERSION_FIELD = new ParseField("routing_table_version"); private static final ParseField INDICES_ROUTING_FIELD = new ParseField("indices_routing"); private static long term(Object[] fields) { @@ -99,8 +100,12 @@ private static String globalMetadataFileName(Object[] fields) { return (String) fields[11]; } + private static long routingTableVersion(Object[] fields) { + return (long) fields[12]; + } + private static List indicesRouting(Object[] fields) { - return (List) fields[12]; + return (List) fields[13]; } private static final ConstructingObjectParser PARSER_V0 = new ConstructingObjectParser<>( @@ -154,6 +159,7 @@ private static List indicesRouting(Object[] fields) { indices(fields), previousClusterUUID(fields), clusterUUIDCommitted(fields), + routingTableVersion(fields), indicesRouting(fields) ) ); @@ -187,6 +193,7 @@ private static void declareParser(ConstructingObjectParser= CODEC_V2) { + parser.declareLong(ConstructingObjectParser.constructorArg(), ROUTING_TABLE_VERSION_FIELD); parser.declareObjectArray( ConstructingObjectParser.constructorArg(), (p, c) -> UploadedIndexMetadata.fromXContent(p), @@ -207,6 +214,7 @@ private static void declareParser(ConstructingObjectParser indicesRouting; public List getIndices() { @@ -257,6 +265,10 @@ public String getGlobalMetadataFileName() { return globalMetadataFileName; } + public long getRoutingTableVersion() { + return routingTableVersion; + } + public List getIndicesRouting() { return indicesRouting; } @@ -276,7 +288,7 @@ public ClusterMetadataManifest( boolean clusterUUIDCommitted ) { this(clusterTerm, version, clusterUUID, stateUUID, opensearchVersion, nodeId, committed, codecVersion, - globalMetadataFileName, indices, previousClusterUUID, clusterUUIDCommitted, new ArrayList<>()); + globalMetadataFileName, indices, previousClusterUUID, clusterUUIDCommitted, -1, new ArrayList<>()); } public ClusterMetadataManifest( @@ -292,6 +304,7 @@ public ClusterMetadataManifest( List indices, String previousClusterUUID, boolean clusterUUIDCommitted, + long routingTableVersion, List indicesRouting ) { this.clusterTerm = clusterTerm; @@ -306,6 +319,7 @@ public ClusterMetadataManifest( this.indices = Collections.unmodifiableList(indices); this.previousClusterUUID = previousClusterUUID; this.clusterUUIDCommitted = clusterUUIDCommitted; + this.routingTableVersion = routingTableVersion; this.indicesRouting = Collections.unmodifiableList(indicesRouting); } @@ -323,14 +337,17 @@ public ClusterMetadataManifest(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_2_14_0)) { this.codecVersion = in.readInt(); this.globalMetadataFileName = in.readString(); + this.routingTableVersion = in.readLong(); this.indicesRouting = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new)); } else if (in.getVersion().onOrAfter(Version.V_2_12_0)) { this.codecVersion = in.readInt(); this.globalMetadataFileName = in.readString(); + this.routingTableVersion = -1; this.indicesRouting = null; } else { this.codecVersion = CODEC_V0; // Default codec this.globalMetadataFileName = null; + this.routingTableVersion = -1; this.indicesRouting = null; } } @@ -368,6 +385,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName()); } if (onOrAfterCodecVersion(CODEC_V2)) { + builder.field(ROUTING_TABLE_VERSION_FIELD.getPreferredName(), getRoutingTableVersion()); builder.startArray(INDICES_ROUTING_FIELD.getPreferredName()); { for (UploadedIndexMetadata uploadedIndexMetadata : indicesRouting) { @@ -398,6 +416,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(globalMetadataFileName); } if (out.getVersion().onOrAfter(Version.V_2_14_0)) { + out.writeLong(routingTableVersion); out.writeCollection(indicesRouting); } } @@ -423,6 +442,7 @@ public boolean equals(Object o) { && Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted) && Objects.equals(globalMetadataFileName, that.globalMetadataFileName) && Objects.equals(codecVersion, that.codecVersion) + && Objects.equals(routingTableVersion, that.routingTableVersion) && Objects.equals(indicesRouting, that.indicesRouting); } @@ -441,6 +461,7 @@ public int hashCode() { committed, previousClusterUUID, clusterUUIDCommitted, + routingTableVersion, indicesRouting ); } @@ -481,6 +502,7 @@ public static class Builder { private String previousClusterUUID; private boolean committed; private boolean clusterUUIDCommitted; + private long routingTableVersion; private List indicesRouting; public Builder indices(List indices) { @@ -488,6 +510,11 @@ public Builder indices(List indices) { return this; } + public Builder routingTableVersion(long routingTableVersion) { + this.routingTableVersion = routingTableVersion; + return this; + } + public Builder indicesRouting(List indicesRouting) { this.indicesRouting = indicesRouting; return this; @@ -573,6 +600,7 @@ public Builder(ClusterMetadataManifest manifest) { this.indices = new ArrayList<>(manifest.indices); this.previousClusterUUID = manifest.previousClusterUUID; this.clusterUUIDCommitted = manifest.clusterUUIDCommitted; + this.routingTableVersion = manifest.routingTableVersion; this.indicesRouting = new ArrayList<>(manifest.indicesRouting); } @@ -590,6 +618,7 @@ public ClusterMetadataManifest build() { indices, previousClusterUUID, clusterUUIDCommitted, + routingTableVersion, indicesRouting ); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java index e29ce5a79dc02..bc99fea9b5c09 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java @@ -14,12 +14,6 @@ import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.store.InputStreamDataInput; import org.apache.lucene.store.OutputStreamDataOutput; -import org.opensearch.Version; -import org.opensearch.common.io.stream.BufferedChecksumStreamInput; -import org.opensearch.common.io.stream.BufferedChecksumStreamOutput; -import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.io.stream.BytesStreamInput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -31,52 +25,27 @@ */ public class IndexRoutingTableHeader { - private final long routingTableVersion; - - private final String indexName; - - private final Version nodeVersion; - public static final String INDEX_ROUTING_HEADER_CODEC = "index_routing_header_codec"; - public static final int INITIAL_VERSION = 1; - public static final int CURRENT_VERSION = INITIAL_VERSION; + private final String indexName; - public IndexRoutingTableHeader(long routingTableVersion, String indexName, Version nodeVersion) { - this.routingTableVersion = routingTableVersion; + public IndexRoutingTableHeader(String indexName) { this.indexName = indexName; - this.nodeVersion = nodeVersion; - } - - /** - * Returns the bytes reference for the {@link IndexRoutingTableHeader} - * @throws IOException - */ - public void write(StreamOutput out) throws IOException { - CodecUtil.writeHeader(new OutputStreamDataOutput(out), INDEX_ROUTING_HEADER_CODEC, CURRENT_VERSION); - // Write version - out.writeLong(routingTableVersion); - out.writeInt(nodeVersion.id); - out.writeString(indexName); - - out.flush(); } /** * Reads the contents on the byte array into the corresponding {@link IndexRoutingTableHeader} + * * @param in * @return IndexRoutingTableHeader * @throws IOException */ - public static IndexRoutingTableHeader read(BufferedChecksumStreamInput in) throws IOException { + public static IndexRoutingTableHeader read(StreamInput in) throws IOException { try { - readHeaderVersion(in); - final long version = in.readLong(); - final int nodeVersion = in.readInt(); - final String name = in.readString(); - assert version >= 0 : "Version must be non-negative [" + version + "]"; - return new IndexRoutingTableHeader(version, name, Version.fromId(nodeVersion)); + readHeaderVersion(in); + final String name = in.readString(); + return new IndexRoutingTableHeader(name); } catch (EOFException e) { throw new IOException("index routing header truncated", e); } @@ -92,15 +61,19 @@ static int readHeaderVersion(final StreamInput in) throws IOException { return version; } - public long getRoutingTableVersion() { - return routingTableVersion; + /** + * Returns the bytes reference for the {@link IndexRoutingTableHeader} + * + * @throws IOException + */ + public void write(StreamOutput out) throws IOException { + CodecUtil.writeHeader(new OutputStreamDataOutput(out), INDEX_ROUTING_HEADER_CODEC, CURRENT_VERSION); + out.writeString(indexName); + out.flush(); } public String getIndexName() { return indexName; } - public Version getNodeVersion() { - return nodeVersion; - } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java index 40e5908d5c65c..5332a8a87c9a4 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStream.java @@ -8,7 +8,6 @@ package org.opensearch.gateway.remote.routingtable; -import org.opensearch.Version; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.common.io.stream.BufferedChecksumStreamOutput; @@ -58,15 +57,15 @@ public class IndexRoutingTableInputStream extends InputStream { private final BytesStreamOutput bytesStreamOutput; private final BufferedChecksumStreamOutput out; - public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, long version, Version nodeVersion) throws IOException { - this(indexRoutingTable, version, nodeVersion, BUFFER_SIZE); + public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable) throws IOException { + this(indexRoutingTable, BUFFER_SIZE); } - public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, long version, Version nodeVersion, int size) + public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, int size) throws IOException { this.buf = new byte[size]; this.shardIter = indexRoutingTable.iterator(); - this.indexRoutingTableHeader = new IndexRoutingTableHeader(version, indexRoutingTable.getIndex().getName(), nodeVersion); + this.indexRoutingTableHeader = new IndexRoutingTableHeader(indexRoutingTable.getIndex().getName()); this.bytesStreamOutput = new BytesStreamOutput(); this.out = new BufferedChecksumStreamOutput(bytesStreamOutput); diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 6c9a3201656d7..73933141ddf23 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -233,6 +233,35 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { } } + public void testClusterMetadataManifestXContentV2() throws IOException { + UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); + ClusterMetadataManifest originalManifest = new ClusterMetadataManifest( + 1L, + 1L, + "test-cluster-uuid", + "test-state-uuid", + Version.CURRENT, + "test-node-id", + false, + ClusterMetadataManifest.CODEC_V2, + "test-metadata", + Collections.singletonList(uploadedIndexMetadata), + "prev-cluster-uuid", + true, + 1L, + Collections.singletonList(uploadedIndexMetadata) + ); + final XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + originalManifest.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContent(parser); + assertEquals(originalManifest, fromXContentManifest); + } + } + private List randomUploadedIndexMetadataList() { final int size = randomIntBetween(1, 10); final List uploadedIndexMetadataList = new ArrayList<>(size); diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java index 068db554b4226..0f42508615a26 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java @@ -8,28 +8,22 @@ package org.opensearch.gateway.remote.routingtable; -import org.apache.lucene.codecs.CodecUtil; -import org.apache.lucene.store.InputStreamDataInput; -import org.opensearch.Version; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.io.stream.BytesStreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; public class IndexRoutingTableHeaderTests extends OpenSearchTestCase { - public void testWrite() throws IOException { + public void testIndexRoutingTableHeader() throws IOException { + IndexRoutingTableHeader header = new IndexRoutingTableHeader("dummyIndex"); BytesStreamOutput out = new BytesStreamOutput(); - IndexRoutingTableHeader header = new IndexRoutingTableHeader(1, "dummyIndex", Version.V_3_0_0); header.write(out); BytesStreamInput in = new BytesStreamInput(out.bytes().toBytesRef().bytes); - CodecUtil.checkHeader(new InputStreamDataInput(in),IndexRoutingTableHeader.INDEX_ROUTING_HEADER_CODEC, IndexRoutingTableHeader.INITIAL_VERSION, IndexRoutingTableHeader.CURRENT_VERSION ); - assertEquals(1, in.readLong()); - assertEquals(Version.V_3_0_0.id, in.readInt()); - assertEquals("dummyIndex", in.readString()); + IndexRoutingTableHeader headerRead = IndexRoutingTableHeader.read(in); + assertEquals("dummyIndex", headerRead.getIndexName()); } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStreamTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStreamTests.java index 3957d4602201d..b10a60943798a 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStreamTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableInputStreamTests.java @@ -33,9 +33,9 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; -public class IndexRoutingTableInputStreamTests extends ReplicationTrackerTestCase { +public class IndexRoutingTableInputStreamTests extends OpenSearchTestCase { - public void testRoutingTableInputStream() throws IOException { + public void testRoutingTableInputStream(){ Metadata metadata = Metadata.builder() .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) .build(); @@ -44,14 +44,14 @@ public void testRoutingTableInputStream() throws IOException { initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> { try { - logger.info("IndexShardRoutingTables: {}", indexShardRoutingTables); - InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexShardRoutingTables, - initialRoutingTable.version(), Version.CURRENT); + InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexShardRoutingTables); IndexRoutingTableInputStreamReader reader = new IndexRoutingTableInputStreamReader(indexRoutingStream); Map indexShardRoutingTableMap = reader.read(); - logger.info("indexShardRoutingTableMap: {}", indexShardRoutingTableMap); + assertEquals(1, indexShardRoutingTableMap.size()); + assertNotNull(indexShardRoutingTableMap.get("test")); + assertEquals(2,indexShardRoutingTableMap.get("test").shards().size()); } catch (IOException e) { throw new RuntimeException(e); }