Skip to content

Commit

Permalink
Add rest, transport, service layer changes for Tiering
Browse files Browse the repository at this point in the history
Signed-off-by: Neetika Singhal <[email protected]>
  • Loading branch information
neetikasinghal committed Jun 4, 2024
1 parent 39ae32a commit 4cb5952
Show file tree
Hide file tree
Showing 15 changed files with 1,598 additions and 1 deletion.
5 changes: 5 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@
import org.opensearch.action.termvectors.TransportMultiTermVectorsAction;
import org.opensearch.action.termvectors.TransportShardMultiTermsVectorAction;
import org.opensearch.action.termvectors.TransportTermVectorsAction;
import org.opensearch.action.tiering.RestWarmTieringAction;
import org.opensearch.action.tiering.TransportWarmTieringAction;
import org.opensearch.action.tiering.WarmTieringAction;
import org.opensearch.action.update.TransportUpdateAction;
import org.opensearch.action.update.UpdateAction;
import org.opensearch.client.node.NodeClient;
Expand Down Expand Up @@ -633,6 +636,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
actions.register(CloneSnapshotAction.INSTANCE, TransportCloneSnapshotAction.class);
actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
actions.register(WarmTieringAction.INSTANCE, TransportWarmTieringAction.class);
actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);

actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class);
Expand Down Expand Up @@ -964,6 +968,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestNodeAttrsAction());
registerHandler.accept(new RestRepositoriesAction());
registerHandler.accept(new RestSnapshotAction());
registerHandler.accept(new RestWarmTieringAction());
registerHandler.accept(new RestTemplatesAction());

// Point in time API
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.tiering;

import org.opensearch.client.node.NodeClient;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

import java.util.List;

import static java.util.Collections.singletonList;
import static org.opensearch.rest.RestRequest.Method.POST;

/**
* Rest Tiering API class to move index from hot to warm
*
* @opensearch.experimental
*/
public class RestWarmTieringAction extends BaseRestHandler {

@Override
public List<RestHandler.Route> routes() {
return singletonList(new RestHandler.Route(POST, "/{index}/_tier/_warm"));
}

@Override
public String getName() {
return "tiering_warm";
}

@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
final TieringIndexRequest tieringIndexRequest = new TieringIndexRequest(request.param("index"));
tieringIndexRequest.timeout(request.paramAsTime("timeout", tieringIndexRequest.timeout()));
tieringIndexRequest.clusterManagerNodeTimeout(
request.paramAsTime("cluster_manager_timeout", tieringIndexRequest.clusterManagerNodeTimeout())
);
tieringIndexRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", false));
return channel -> client.admin().cluster().execute(WarmTieringAction.INSTANCE, tieringIndexRequest, new RestToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.tiering;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.IndicesRequest;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.master.AcknowledgedRequest;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;

import static org.opensearch.action.ValidateActions.addValidationError;

/**
* Represents the tiering request for indices
* to move to a different tier
*
* @opensearch.experimental
*/
@ExperimentalApi
public class TieringIndexRequest extends AcknowledgedRequest<TieringIndexRequest> implements IndicesRequest.Replaceable {

private String[] indices;
private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, true);
private boolean waitForCompletion;
public TieringIndexRequest() {
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (indices == null || indices.length == 0) {
validationException = addValidationError("Mandatory parameter - indices is missing from the request", validationException);
}
return validationException;
}

public TieringIndexRequest(String... indices) {
this.indices = indices;
}

public TieringIndexRequest(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
waitForCompletion = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
out.writeBoolean(waitForCompletion);
}

@Override
public String[] indices() {
return indices;
}

@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}

@Override
public TieringIndexRequest indices(String... indices) {
this.indices = indices;
return this;
}

public TieringIndexRequest indicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}

/**
* If this parameter is set to true the operation will wait for completion of tiering process before returning.
*
* @param waitForCompletion if true the operation will wait for completion
* @return this request
*/
public TieringIndexRequest waitForCompletion(boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
return this;
}

/**
* Returns wait for completion setting
*
* @return true if the operation will wait for completion
*/
public boolean waitForCompletion() {
return waitForCompletion;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TieringIndexRequest that = (TieringIndexRequest) o;
return clusterManagerNodeTimeout.equals(that.clusterManagerNodeTimeout)
&& timeout.equals(that.timeout)
&& Objects.equals(indicesOptions, that.indicesOptions)
&& Arrays.equals(indices, that.indices)
&& waitForCompletion == that.waitForCompletion;
}

@Override
public int hashCode() {
return Objects.hash(clusterManagerNodeTimeout, timeout, indicesOptions, waitForCompletion, Arrays.hashCode(indices));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.tiering;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.tiering.TieringClusterStateListener;
import org.opensearch.tiering.TieringService;
import org.opensearch.transport.TransportService;

import java.io.IOException;

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS;

/**
* Transport Tiering API class to move index from hot to warm
*
* @opensearch.experimental
*/
public class TransportWarmTieringAction extends TransportClusterManagerNodeAction<TieringIndexRequest, AcknowledgedResponse> {

private static final Logger logger = LogManager.getLogger(TransportWarmTieringAction.class);
private final TieringService tieringService;
private final Client client;
@Inject
public TransportWarmTieringAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, TieringService tieringService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Client client) {
super(WarmTieringAction.NAME, transportService, clusterService, threadPool, actionFilters,
TieringIndexRequest::new, indexNameExpressionResolver);
this.client = client;
this.tieringService = tieringService;
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected AcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in);
}

@Override
protected ClusterBlockException checkBlock(TieringIndexRequest request, ClusterState state) {
ClusterBlockException blockException =
state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
if (blockException == null) {
// Check indices level block
blockException = state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE,
indexNameExpressionResolver.concreteIndexNames(clusterService.state(), request));
}
return blockException;
}

@Override
protected void clusterManagerOperation(TieringIndexRequest request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
// Collect node stats to get node level filesystem info. The response will be used for performing some
// validations.
client.admin().cluster().prepareNodesStats().clear().addMetric(FS.metricName()).execute(
new ActionListener<NodesStatsResponse>() {
@Override
public void onResponse(NodesStatsResponse nodesStatsResponse) {
// Collect index level stats. This response is also used for validations.
client.admin().indices().prepareStats().clear().setStore(true).setIndices(request.indices()).execute(
new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
tieringService.tier(request, nodesStatsResponse, indicesStatsResponse,
ActionListener.delegateFailure(listener, (delegatedListener, acknowledgedResponse) -> {
if (request.waitForCompletion()) {
TieringClusterStateListener.createAndRegisterListener(
clusterService,
new AcknowledgedResponse(acknowledgedResponse.isAcknowledged()),
delegatedListener
);
} else {
delegatedListener.onResponse(new AcknowledgedResponse(true));
}
}));
}
@Override
public void onFailure(Exception e) {
logger.debug("Indices stats call failed with exception", e);
listener.onFailure(e);
}
});
}
@Override
public void onFailure(Exception e) {
logger.debug("Node stats call failed with exception", e);
listener.onFailure(e);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.tiering;

import org.opensearch.action.ActionType;
import org.opensearch.action.support.master.AcknowledgedResponse;

/**
* Tiering action class to move index from hot to warm
*
* @opensearch.experimental
*/
public class WarmTieringAction extends ActionType<AcknowledgedResponse> {

public static final WarmTieringAction INSTANCE = new WarmTieringAction();
public static final String NAME = "indices:admin/tiering/warm";

public WarmTieringAction() {
super(NAME, AcknowledgedResponse::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.index.IndexModule;

/**
* {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods
Expand Down Expand Up @@ -58,6 +59,8 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all
* @return {@link RoutingPool} for the given index.
*/
public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
return indexMetadata.isRemoteSnapshot() ? REMOTE_CAPABLE : LOCAL_ONLY;
return indexMetadata.isRemoteSnapshot() ||
IndexModule.DataLocalityType.PARTIAL.name()
.equals(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey())) ? REMOTE_CAPABLE : LOCAL_ONLY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.tiering.HotToWarmTieringService;
import org.opensearch.transport.ProxyConnectionStrategy;
import org.opensearch.transport.RemoteClusterService;
import org.opensearch.transport.RemoteConnectionStrategy;
Expand Down Expand Up @@ -277,6 +278,7 @@ public void apply(Settings value, Settings current, Settings previous) {
FsRepository.REPOSITORIES_CHUNK_SIZE_SETTING,
FsRepository.REPOSITORIES_COMPRESS_SETTING,
FsRepository.REPOSITORIES_LOCATION_SETTING,
HotToWarmTieringService.WARM_TIERING_MAX_SHARD_SIZE,
IndicesQueryCache.INDICES_CACHE_QUERY_SIZE_SETTING,
IndicesQueryCache.INDICES_CACHE_QUERY_COUNT_SETTING,
IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING,
Expand Down
Loading

0 comments on commit 4cb5952

Please sign in to comment.