Skip to content

Commit

Permalink
Initial commit for RemoteRoutingTableService setup
Browse files Browse the repository at this point in the history
Signed-off-by: Himshikha Gupta <[email protected]>
  • Loading branch information
himshikha authored and Himshikha Gupta committed Apr 19, 2024
1 parent 0282e64 commit d5a17ed
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.remote;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.function.Supplier;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;

/**
* A Service which provides APIs to upload and download routing table from remote store.
*
* @opensearch.internal
*/
public class RemoteRoutingTableService implements Closeable {

/**
* Cluster setting to specify if routing table should be published to remote store
*/
public static final Setting<Boolean> REMOTE_ROUTING_TABLE_ENABLED_SETTING = Setting.boolSetting(
"cluster.remote_store.routing.enabled",
false,
Setting.Property.NodeScope,
Setting.Property.Final
);
private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private final ClusterSettings clusterSettings;
private BlobStoreRepository blobStoreRepository;

public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterSettings clusterSettings) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.clusterSettings = clusterSettings;
}

public List<ClusterMetadataManifest.UploadedIndexMetadata> writeFullRoutingTable(ClusterState clusterState, String previousClusterUUID) {
return null;
}

public List<ClusterMetadataManifest.UploadedIndexMetadata> writeIncrementalMetadata(
ClusterState previousClusterState,
ClusterState clusterState,
ClusterMetadataManifest previousManifest) {
return null;
}

public RoutingTable getLatestRoutingTable(String clusterName, String clusterUUID) {
return null;
}

public RoutingTable getIncrementalRoutingTable(ClusterState previousClusterState, ClusterMetadataManifest previousManifest, String clusterName, String clusterUUID) {
return null;
}

private void deleteStaleRoutingTable(String clusterName, String clusterUUID, int manifestsToRetain) {
}

@Override
public void close() throws IOException {
if (blobStoreRepository != null) {
IOUtils.close(blobStoreRepository);
}
}

public void start() {
assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
assert remoteStoreRepo != null : "Remote routing table repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** Package containing class to perform operations on remote routing table */
package org.opensearch.cluster.routing.remote;
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
Expand Down Expand Up @@ -732,7 +733,10 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,

// Remote Routing table settings
RemoteRoutingTableService.REMOTE_ROUTING_TABLE_ENABLED_SETTING
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ protected FeatureFlagSettings(
FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING,
FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING,
FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING,
FeatureFlags.PLUGGABLE_CACHE_SETTING
FeatureFlags.PLUGGABLE_CACHE_SETTING,
FeatureFlags.REMOTE_ROUTING_TABLE_EXPERIMENTAL_SETTING
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public class FeatureFlags {
*/
public static final String PLUGGABLE_CACHE = "opensearch.experimental.feature.pluggable.caching.enabled";

/**
* Gates the functionality of remote routing table.
*/
public static final String REMOTE_ROUTING_TABLE_EXPERIMENTAL = "opensearch.experimental.feature.remote_store.routing.enabled";

public static final Setting<Boolean> REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING = Setting.boolSetting(
REMOTE_STORE_MIGRATION_EXPERIMENTAL,
false,
Expand All @@ -93,14 +98,22 @@ public class FeatureFlags {

public static final Setting<Boolean> PLUGGABLE_CACHE_SETTING = Setting.boolSetting(PLUGGABLE_CACHE, false, Property.NodeScope);

public static final Setting<Boolean> REMOTE_ROUTING_TABLE_EXPERIMENTAL_SETTING = Setting.boolSetting(
REMOTE_ROUTING_TABLE_EXPERIMENTAL,
false,
Property.NodeScope
);


private static final List<Setting<Boolean>> ALL_FEATURE_FLAG_SETTINGS = List.of(
REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING,
EXTENSIONS_SETTING,
IDENTITY_SETTING,
TELEMETRY_SETTING,
DATETIME_FORMATTER_CACHING_SETTING,
WRITEABLE_REMOTE_INDEX_SETTING,
PLUGGABLE_CACHE_SETTING
PLUGGABLE_CACHE_SETTING,
REMOTE_ROUTING_TABLE_EXPERIMENTAL_SETTING
);
/**
* Should store the settings from opensearch.yml.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
Expand Down Expand Up @@ -64,6 +65,7 @@
import java.util.stream.Collectors;

import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -162,6 +164,7 @@ public class RemoteClusterStateService implements Closeable {
private final ThreadPool threadpool;
private BlobStoreRepository blobStoreRepository;
private BlobStoreTransferService blobStoreTransferService;
private RemoteRoutingTableService remoteRoutingTableService;
private volatile TimeValue slowWriteLoggingThreshold;

private volatile TimeValue indexMetadataUploadTimeout;
Expand Down Expand Up @@ -206,6 +209,11 @@ public RemoteClusterStateService(
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
this.remoteStateStats = new RemotePersistenceStats();

if(isRemoteRoutingTableEnabled(settings)) {
this.remoteRoutingTableService = new RemoteRoutingTableService(repositoriesService,
settings, clusterSettings);
}
}

private BlobStoreTransferService getBlobStoreTransferService() {
Expand Down Expand Up @@ -570,6 +578,9 @@ public void close() throws IOException {
if (blobStoreRepository != null) {
IOUtils.close(blobStoreRepository);
}
if(this.remoteRoutingTableService != null) {
this.remoteRoutingTableService.close();
}
}

public void start() {
Expand All @@ -581,6 +592,9 @@ public void start() {
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
if(this.remoteRoutingTableService != null) {
this.remoteRoutingTableService.start();
}
}

private ClusterMetadataManifest uploadManifest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.Node;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand All @@ -27,6 +29,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.common.util.FeatureFlags.REMOTE_ROUTING_TABLE_EXPERIMENTAL;

/**
* This is an abstraction for validating and storing information specific to remote backed storage nodes.
*
Expand All @@ -45,6 +49,8 @@ public class RemoteStoreNodeAttribute {
+ "."
+ CryptoMetadata.SETTINGS_KEY;
public static final String REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX = "remote_store.repository.%s.settings.";
public static final String REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.routing.repository";

private final RepositoriesMetadata repositoriesMetadata;

/**
Expand Down Expand Up @@ -151,6 +157,10 @@ private Set<String> getValidatedRepositoryNames(DiscoveryNode node) {
} else if (node.getAttributes().containsKey(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)) {
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));
}
if (node.getAttributes().containsKey(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)){
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY));
}

return repositoryNames;
}

Expand Down Expand Up @@ -181,6 +191,16 @@ public static boolean isRemoteStoreClusterStateEnabled(Settings settings) {
&& isRemoteClusterStateAttributePresent(settings);
}

public static boolean isRemoteRoutingTableAttributePresent(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)
.isEmpty() == false;
}

public static boolean isRemoteRoutingTableEnabled(Settings settings) {
return FeatureFlags.isEnabled(REMOTE_ROUTING_TABLE_EXPERIMENTAL) && RemoteRoutingTableService.REMOTE_ROUTING_TABLE_ENABLED_SETTING.get(settings)
&& isRemoteRoutingTableAttributePresent(settings);
}

public RepositoriesMetadata getRepositoriesMetadata() {
return this.repositoriesMetadata;
}
Expand Down

0 comments on commit d5a17ed

Please sign in to comment.