Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Routing Table] Add write flow for remote routing table #13870

Merged
merged 66 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
d5a17ed
Initial commit for RemoteRoutingTableService setup
himshikha Apr 19, 2024
9176015
Adds unit test for remote routing setup
Apr 22, 2024
7d9badd
Updating remote routing table setting name
May 7, 2024
ad7ecf5
Update remote routing table repo setting name
May 7, 2024
a66aaaf
Initial commit for index routing table manifest
Bukhtawar Apr 17, 2024
ad480ee
Changes for IndexRoutingTableHeader
Bukhtawar Apr 19, 2024
2ad70c4
Revert unintentional changes for IndexRoutingTableHeader
Bukhtawar Apr 19, 2024
acc172e
Revert unintentional changes for IndexRoutingTableHeader
Bukhtawar Apr 19, 2024
f65b102
Changes for IndexRoutingTableInputStream
Bukhtawar Apr 20, 2024
441f520
Fixing IndexRoutingTableInputStream and moving checksum to end to file
May 7, 2024
ee70dca
Add read flow for IndexRoutingTable
Arpit-Bandejiya May 8, 2024
7aeecc8
Moving routing table version from IndexRouting stream to manifest
May 13, 2024
0b70dea
Merge branch 'main' into remote_routing
himshikha May 14, 2024
7b2bc79
Refactor reader and add failure test
Arpit-Bandejiya May 14, 2024
88d266f
Fix GatewayMetaStatePersistedStateTests
Arpit-Bandejiya May 14, 2024
4c97869
Moving codec to version 2 for compatibility with manifest parser
May 14, 2024
2107372
spotless apply and fixing gradle failures
May 15, 2024
ce04f80
Merge branch 'main' into remote_routing
himshikha May 16, 2024
89c33bc
Remove unused methods
May 16, 2024
df64105
Handle routingTable repo check in JoinTaskExecutor
May 24, 2024
b80c89b
Merge branch 'main' into remote_routing
May 24, 2024
59a0e90
spotless fix
May 24, 2024
51333c9
Reverting accidental commit
May 24, 2024
418d9fd
Removing buffer logic
May 28, 2024
0400f40
Merge branch 'main' into remote_routing_input_stream
May 28, 2024
802e04f
Move repo check logic to RemoteStoreNodeAttr
May 28, 2024
7f33ba2
Move BufferedChecksum streams to libs/core
May 29, 2024
66d02d9
Merge branch 'remote_routing_input_stream' into async_write_pr
May 29, 2024
42efb38
Spotless fix
May 29, 2024
a204663
Merge branch 'remote_routing_input_stream' into async_write_pr
May 29, 2024
9b50f8a
Refactor RemoteIndexRoutingTable read
Arpit-Bandejiya May 29, 2024
a7f7cab
Add Manifest Tests and spotless fix
Arpit-Bandejiya May 29, 2024
49bf7d0
Draft changes for remote routing table write
May 29, 2024
71e151f
Fix remoteClusterServiceTests
Arpit-Bandejiya May 30, 2024
a9ab9c1
Remote store objects to implement writeable interface
May 31, 2024
4167eeb
addressing pr comments
Jun 3, 2024
4194886
Merge branch 'main' into remote_routing
Jun 3, 2024
e486165
Addressing PR comments
Jun 3, 2024
b3c18d8
test fixes
Jun 3, 2024
5d65317
Merge branch 'remote_routing_input_stream' into async_write_pr
Jun 3, 2024
bfde8a0
Merge branch 'remote_routing' into async_write_pr
Jun 3, 2024
50a2a2e
Cleaning up code
Jun 3, 2024
8ecf865
addressing PR comments
Jun 4, 2024
09782bc
Merge branch 'remote_routing_input_stream' into async_write_pr
Jun 4, 2024
dfc9042
node join based on routing table repo
Jun 4, 2024
3b75317
Merge branch 'remote_routing' into async_write_pr
Jun 4, 2024
b367d00
Merge branch 'main' into remote_routing
Jun 5, 2024
7ad7d05
rebasing
Jun 5, 2024
0e5862d
handle case in node join
Jun 5, 2024
b120ce0
Merge branch 'remote_routing' into async_write_pr
Jun 5, 2024
bc1c1c7
Adding tests
Jun 5, 2024
a27b48d
Add tests
Jun 5, 2024
fe6568c
Merge branch 'main' into async_write_pr
Jun 6, 2024
089d4ee
rebasing
Jun 6, 2024
41a5f64
spotless fix
Jun 6, 2024
df1704f
Move path generation logic to path input
Jun 7, 2024
cfd28d9
Recovering Refactor the path type and hash algorith code
Jun 9, 2024
e6f41f6
moving remoteRoutingTableService init to factory
Jun 9, 2024
b1f73f1
spotless fix
Jun 9, 2024
1abe899
PR comments
Jun 10, 2024
5d3f5b2
Merge branch 'main' into async_write_pr
himshikha Jun 10, 2024
df7339c
Fixing gradle failure
Jun 10, 2024
abd6f90
Fixing tests
Jun 10, 2024
c015d56
add log
Jun 10, 2024
a604d89
Merge branch 'main' into async_write_pr
himshikha Jun 10, 2024
13af300
fixing test failure due to merge
Jun 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add getMetadataFields to MapperService ([#13819](https://github.com/opensearch-project/OpenSearch/pull/13819))
- [Remote State] Add async remote state deletion task running on an interval, configurable by a setting ([#13131](https://github.com/opensearch-project/OpenSearch/pull/13131))
- Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776))
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@
* GitHub history for details.
*/

package org.opensearch.index.translog;
package org.opensearch.core.common.io.stream;

import org.apache.lucene.store.BufferedChecksum;
import org.apache.lucene.util.BitUtil;
import org.opensearch.core.common.io.stream.FilterStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;

import java.io.EOFException;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@
* GitHub history for details.
*/

package org.opensearch.index.translog;
package org.opensearch.core.common.io.stream;

import org.apache.lucene.store.BufferedChecksum;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.zip.CRC32;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,27 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
assert existingNodes.isEmpty() == false;

CompatibilityMode remoteStoreCompatibilityMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(metadata.settings());
if (STRICT.equals(remoteStoreCompatibilityMode)) {

DiscoveryNode existingNode = existingNodes.get(0);
List<String> reposToSkip = new ArrayList<>(1);
Optional<DiscoveryNode> remoteRoutingTableNode = existingNodes.stream()
.filter(
node -> node.getAttributes().get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY) != null
)
.findFirst();
// If none of the existing nodes have routing table repo, then we skip this repo check if present in joining node.
// This ensures a new node with remote routing table repo is able to join the cluster.
if (remoteRoutingTableNode.isEmpty()) {
String joiningNodeRepoName = joiningNode.getAttributes()
.get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY);
if (joiningNodeRepoName != null) {
reposToSkip.add(joiningNodeRepoName);
}
}

if (STRICT.equals(remoteStoreCompatibilityMode)) {
DiscoveryNode existingNode = remoteRoutingTableNode.orElseGet(() -> existingNodes.get(0));
if (joiningNode.isRemoteStoreNode()) {
ensureRemoteStoreNodesCompatibility(joiningNode, existingNode);
ensureRemoteStoreNodesCompatibility(joiningNode, existingNode, reposToSkip);
} else {
if (existingNode.isRemoteStoreNode()) {
throw new IllegalStateException(
Expand All @@ -537,19 +553,25 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
throw new IllegalStateException(reason);
}
if (joiningNode.isRemoteStoreNode()) {
Optional<DiscoveryNode> remoteDN = existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode));
Optional<DiscoveryNode> remoteDN = remoteRoutingTableNode.isPresent()
? remoteRoutingTableNode
: existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode, reposToSkip));
}
}
}
}

private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode) {
private static void ensureRemoteStoreNodesCompatibility(
DiscoveryNode joiningNode,
DiscoveryNode existingNode,
List<String> reposToSkip
) {
if (joiningNode.isRemoteStoreNode()) {
if (existingNode.isRemoteStoreNode()) {
RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);
if (existingRemoteStoreNodeAttribute.equals(joiningRemoteStoreNodeAttribute) == false) {
if (existingRemoteStoreNodeAttribute.equalsWithRepoSkip(joiningRemoteStoreNodeAttribute, reposToSkip) == false) {
throw new IllegalStateException(
"a remote store node ["
+ joiningNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.List;
import java.util.stream.Collectors;

import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;

Expand Down Expand Up @@ -164,6 +166,40 @@ public boolean equalsIgnoreGenerations(@Nullable RepositoriesMetadata other) {
return true;
}

/**
* Checks if this instance and the give instance share the same repositories, with option to skip checking for a list of repos.
* This will support
* @param other other repositories metadata
* @param reposToSkip list of repos to skip check for equality
* @return {@code true} iff both instances contain the same repositories apart from differences in generations, not including repos provided in reposToSkip.
*/
public boolean equalsIgnoreGenerationsWithRepoSkip(@Nullable RepositoriesMetadata other, List<String> reposToSkip) {
if (other == null) {
return false;
}
List<RepositoryMetadata> currentRepositories = repositories.stream()
.filter(repo -> !reposToSkip.contains(repo.name()))
.collect(Collectors.toList());
List<RepositoryMetadata> otherRepositories = other.repositories.stream()
.filter(repo -> !reposToSkip.contains(repo.name()))
.collect(Collectors.toList());

if (otherRepositories.size() != currentRepositories.size()) {
return false;
}
// Sort repos by name for ordered comparison
Comparator<RepositoryMetadata> compareByName = (o1, o2) -> o1.name().compareTo(o2.name());
currentRepositories.sort(compareByName);
otherRepositories.sort(compareByName);

for (int i = 0; i < currentRepositories.size(); i++) {
if (currentRepositories.get(i).equalsIgnoreGenerations(otherRepositories.get(i)) == false) {
return false;
}
}
himshikha marked this conversation as resolved.
Show resolved Hide resolved
return true;
}

@Override
public int hashCode() {
return repositories.hashCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ public boolean equals(Object o) {
IndexShardRoutingTable that = (IndexShardRoutingTable) o;

if (!shardId.equals(that.shardId)) return false;
if (!shards.equals(that.shards)) return false;
if (!new HashSet<>(shards).equals(new HashSet<>(that.shards))) return false;
himshikha marked this conversation as resolved.
Show resolved Hide resolved

return true;
himshikha marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.remote;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.IndexInput;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;

/**
* A Service which provides APIs to upload and download routing table from remote store.
*
* @opensearch.internal
*/
public class RemoteRoutingTableService extends AbstractLifecycleComponent {

public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
public static final String DELIMITER = "__";
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";
himshikha marked this conversation as resolved.
Show resolved Hide resolved

private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private BlobStoreRepository blobStoreRepository;

public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService, Settings settings) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
}

private static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER =
new DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable>() {
@Override
public void write(IndexRoutingTable value, StreamOutput out) throws IOException {
value.writeTo(out);
}

@Override
public IndexRoutingTable read(StreamInput in, String key) throws IOException {
return IndexRoutingTable.readFrom(in);
}
};

public static DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapDiff(
RoutingTable before,
RoutingTable after
) {
return DiffableUtils.diff(
before.getIndicesRouting(),
after.getIndicesRouting(),
DiffableUtils.getStringKeySerializer(),
CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER
);
}

public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
himshikha marked this conversation as resolved.
Show resolved Hide resolved
himshikha marked this conversation as resolved.
Show resolved Hide resolved
ClusterState clusterState,
himshikha marked this conversation as resolved.
Show resolved Hide resolved
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
BlobPath clusterBasePath
) throws IOException {

BlobPath indexRoutingPath = clusterBasePath.add(INDEX_ROUTING_PATH_TOKEN);
BlobPath path = RemoteStoreEnums.PathType.HASHED_PREFIX.path(
RemoteStorePathStrategy.BasePathInput.builder().basePath(indexRoutingPath).indexUUID(indexRouting.getIndex().getUUID()).build(),
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64
himshikha marked this conversation as resolved.
Show resolved Hide resolved
);
final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(path);

himshikha marked this conversation as resolved.
Show resolved Hide resolved
final String fileName = getIndexRoutingFileName();

ActionListener<Void> completionListener = ActionListener.wrap(
resp -> latchedActionListener.onResponse(
new ClusterMetadataManifest.UploadedIndexMetadata(
indexRouting.getIndex().getName(),
indexRouting.getIndex().getUUID(),
path.buildAsString() + fileName,
himshikha marked this conversation as resolved.
Show resolved Hide resolved
INDEX_ROUTING_METADATA_PREFIX
)
),
ex -> latchedActionListener.onFailure(
new RemoteClusterStateService.RemoteStateTransferException(
"Exception in writing index to remote store: " + indexRouting.getIndex().toString(),
ex
)
)
);

return () -> uploadIndex(indexRouting, fileName, blobContainer, completionListener);
}

public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndicesRouting(
ClusterMetadataManifest previousManifest,
List<ClusterMetadataManifest.UploadedIndexMetadata> indicesRoutingUploaded,
List<String> indicesRoutingToDelete
) {
final Map<String, ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndicesRouting = previousManifest.getIndicesRouting()
.stream()
.collect(Collectors.toMap(ClusterMetadataManifest.UploadedIndexMetadata::getIndexName, Function.identity()));

indicesRoutingUploaded.forEach(
uploadedIndexRouting -> allUploadedIndicesRouting.put(uploadedIndexRouting.getIndexName(), uploadedIndexRouting)
);
indicesRoutingToDelete.forEach(allUploadedIndicesRouting::remove);

return new ArrayList<>(allUploadedIndicesRouting.values());
}

private void uploadIndex(
IndexRoutingTable indexRouting,
String fileName,
BlobContainer blobContainer,
ActionListener<Void> completionListener
) throws IOException {
RemoteIndexRoutingTable indexRoutingInput = new RemoteIndexRoutingTable(indexRouting);
BytesReference bytesInput = null;
try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
indexRoutingInput.writeTo(streamOutput);
bytesInput = streamOutput.bytes();
} catch (IOException e) {
throw new IOException("Failed to serialize IndexRoutingTable. ", e);
}

if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) {
try {
blobContainer.writeBlob(fileName, bytesInput.streamInput(), bytesInput.length(), true);
completionListener.onResponse(null);
} catch (IOException e) {
logger.error("Failed to write IndexRoutingTable to remote store. ", e);
himshikha marked this conversation as resolved.
Show resolved Hide resolved
completionListener.onFailure(e);
}
return;
}

try (IndexInput input = new ByteArrayIndexInput("indexrouting", BytesReference.toBytes(bytesInput))) {
try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
fileName,
fileName,
input.length(),
true,
WritePriority.URGENT,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
null,
false
)
) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(
remoteTransferContainer.createWriteContext(),
completionListener
);
} catch (IOException e) {
logger.error("Failed to write IndexRoutingTable to remote store. ", e);
completionListener.onFailure(e);
}
} catch (IOException e) {
logger.error("Failed to create transfer object for IndexRoutingTable for remote store upload. ", e);
completionListener.onFailure(e);
}
himshikha marked this conversation as resolved.
Show resolved Hide resolved
}

private String getIndexRoutingFileName() {
return String.join(DELIMITER, INDEX_ROUTING_FILE_PREFIX, RemoteStoreUtils.invertLong(System.currentTimeMillis()));

}

@Override
protected void doClose() throws IOException {
if (blobStoreRepository != null) {
IOUtils.close(blobStoreRepository);
}
}

@Override
protected void doStart() {
assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
assert remoteStoreRepo != null : "Remote routing table repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
}

@Override
protected void doStop() {}

}
Loading
Loading