Skip to content

Commit

Permalink
Add rest endpoint for remote store restore
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Jul 6, 2022
1 parent 289c2d3 commit e68643c
Show file tree
Hide file tree
Showing 13 changed files with 544 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,8 @@ public void testApiNamingConventions() throws Exception {
"nodes.hot_threads",
"nodes.usage",
"nodes.reload_secure_settings",
"search_shards", };
"search_shards",
"remote_store.restore", };
List<String> booleanReturnMethods = Arrays.asList("security.enable_user", "security.disable_user", "security.change_password");
Set<String> deprecatedMethods = new HashSet<>();
deprecatedMethods.add("indices.force_merge");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"remote_store.restore":{
"documentation":{
"url": "https://opensearch.org/docs/latest/opensearch/rest-api/remote-store#restore",
"description":"Restores from remote store."
},
"stability":"experimental",
"url":{
"paths":[
{
"path":"/_remotestore/_restore",
"methods":[
"POST"
]
}
]
},
"params":{
"master_timeout":{
"type":"time",
"description":"Explicit operation timeout for connection to master node",
"deprecated":{
"version":"2.0.0",
"description":"To support inclusive language, use 'cluster_manager_timeout' instead."
}
},
"cluster_manager_timeout":{
"type":"time",
"description":"Explicit operation timeout for connection to cluster-manager node"
},
"wait_for_completion":{
"type":"boolean",
"description":"Should this request wait until the operation has completed before returning",
"default":false
}
},
"body":{
"description":"A comma separated list of index IDs",
"required":true
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
---
"Check if index exists and created with remote_store=true":
- skip:
version: " - 2.99.99"
reason: "remote store restore API only supported since 3.0.0"

- do:
indices.create:
index: "index1"
body:
settings:
number_of_shards: 1
number_of_replicas: 2

- do:
remote_store.restore:
body:
indices: "index1,index2"

- match: { remote_store.indices: []}
- match: { remote_store.shards.total: 0 }
- match: { remote_store.shards.successful: 0 }
- match: { remote_store.shards.failed : 0 }
---
"Error no indices":
- skip:
version: " - 2.99.99"
reason: "remote store restore API only supported since 3.0.0"

- do:
catch: bad_request
remote_store.restore:
body:
indices: ""

- match: { status: 400 }
- match: { error.type: action_request_validation_exception}
- match: { error.reason: "Validation Failed: 1: indices are missing;"}
9 changes: 9 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import org.opensearch.action.admin.cluster.node.usage.TransportNodesUsageAction;
import org.opensearch.action.admin.cluster.remote.RemoteInfoAction;
import org.opensearch.action.admin.cluster.remote.TransportRemoteInfoAction;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreAction;
import org.opensearch.action.admin.cluster.remotestore.restore.TransportRestoreRemoteStoreAction;
import org.opensearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction;
import org.opensearch.action.admin.cluster.repositories.cleanup.TransportCleanupRepositoryAction;
import org.opensearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction;
Expand Down Expand Up @@ -308,6 +310,7 @@
import org.opensearch.rest.action.admin.cluster.RestPutStoredScriptAction;
import org.opensearch.rest.action.admin.cluster.RestReloadSecureSettingsAction;
import org.opensearch.rest.action.admin.cluster.RestRemoteClusterInfoAction;
import org.opensearch.rest.action.admin.cluster.RestRestoreRemoteStoreAction;
import org.opensearch.rest.action.admin.cluster.RestRestoreSnapshotAction;
import org.opensearch.rest.action.admin.cluster.RestSnapshotsStatusAction;
import org.opensearch.rest.action.admin.cluster.RestVerifyRepositoryAction;
Expand Down Expand Up @@ -657,6 +660,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class);
actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class);

// Remote Store
actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class);

return unmodifiableMap(actions.getRegistry());
}

Expand Down Expand Up @@ -842,6 +848,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
}
}
registerHandler.accept(new RestCatAction(catActions));

// Remote Store APIs
registerHandler.accept(new RestRestoreRemoteStoreAction());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.remotestore.restore;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.RestoreInProgress;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.index.shard.ShardId;
import org.opensearch.snapshots.RestoreInfo;
import org.opensearch.snapshots.RestoreService;

import static org.opensearch.snapshots.RestoreService.restoreInProgress;

/**
* Transport listener for cluster state updates
*
* @opensearch.internal
*/
public class RemoteStoreRestoreClusterStateListener implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(RemoteStoreRestoreClusterStateListener.class);

private final ClusterService clusterService;
private final String uuid;
private final ActionListener<RestoreRemoteStoreResponse> listener;

private RemoteStoreRestoreClusterStateListener(
ClusterService clusterService,
RestoreService.RestoreCompletionResponse response,
ActionListener<RestoreRemoteStoreResponse> listener
) {
this.clusterService = clusterService;
this.uuid = response.getUuid();
this.listener = listener;

}

@Override
public void clusterChanged(ClusterChangedEvent changedEvent) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
// When there is a cluster-manager failure after a restore has been started, this listener might not be registered
// on the current cluster-manager and as such it might miss some intermediary cluster states due to batching.
// Clean up listener in that case and acknowledge completion of restore operation to client.
clusterService.removeListener(this);
listener.onResponse(new RestoreRemoteStoreResponse((RestoreInfo) null));
} else if (newEntry == null) {
clusterService.removeListener(this);
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
assert prevEntry.state().completed() : "expected completed remote store restore state but was " + prevEntry.state();
assert RestoreService.completed(shards) : "expected all restore entries to be completed";
RestoreInfo ri = new RestoreInfo(
"remote_store",
prevEntry.indices(),
shards.size(),
+shards.size() - RestoreService.failedShards(shards)
);
RestoreRemoteStoreResponse response = new RestoreRemoteStoreResponse(ri);
logger.debug("restore from remote store completed");
listener.onResponse(response);
} else {
// restore not completed yet, wait for next cluster state update
}
}

/**
* Creates a cluster state listener and registers it with the cluster service. The listener passed as a
* parameter will be called when the restore is complete.
*/
public static void createAndRegisterListener(
ClusterService clusterService,
RestoreService.RestoreCompletionResponse response,
ActionListener<RestoreRemoteStoreResponse> listener
) {
clusterService.addListener(new RemoteStoreRestoreClusterStateListener(clusterService, response, listener));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.remotestore.restore;

import org.opensearch.action.ActionType;

/**
* Restore remote store action
*
* @opensearch.internal
*/
public class RestoreRemoteStoreAction extends ActionType<RestoreRemoteStoreResponse> {

public static final RestoreRemoteStoreAction INSTANCE = new RestoreRemoteStoreAction();
public static final String NAME = "cluster:admin/remotestore/restore";

private RestoreRemoteStoreAction() {
super(NAME, RestoreRemoteStoreResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.remotestore.restore;

import org.opensearch.action.ActionResponse;
import org.opensearch.common.Nullable;
import org.opensearch.common.ParseField;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ConstructingObjectParser;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.rest.RestStatus;
import org.opensearch.snapshots.RestoreInfo;

import java.io.IOException;
import java.util.Objects;

import static org.opensearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
* Contains information about remote store restores
*
* @opensearch.internal
*/
public class RestoreRemoteStoreResponse extends ActionResponse implements ToXContentObject {

@Nullable
private final RestoreInfo restoreInfo;

public RestoreRemoteStoreResponse(@Nullable RestoreInfo restoreInfo) {
this.restoreInfo = restoreInfo;
}

public RestoreRemoteStoreResponse(StreamInput in) throws IOException {
super(in);
restoreInfo = RestoreInfo.readOptionalRestoreInfo(in);
}

/**
* Returns restore information if remote store restore was completed before this method returned, null otherwise
*
* @return restore information or null
*/
public RestoreInfo getRestoreInfo() {
return restoreInfo;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(restoreInfo);
}

public RestStatus status() {
if (restoreInfo == null) {
return RestStatus.ACCEPTED;
}
return restoreInfo.status();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (restoreInfo != null) {
builder.field("remote_store");
restoreInfo.toXContent(builder, params);
} else {
builder.field("accepted", true);
}
builder.endObject();
return builder;
}

public static final ConstructingObjectParser<RestoreRemoteStoreResponse, Void> PARSER = new ConstructingObjectParser<>(
"restore_remote_store",
true,
v -> {
RestoreInfo restoreInfo = (RestoreInfo) v[0];
Boolean accepted = (Boolean) v[1];
assert (accepted == null && restoreInfo != null) || (accepted != null && accepted && restoreInfo == null) : "accepted: ["
+ accepted
+ "], restoreInfo: ["
+ restoreInfo
+ "]";
return new RestoreRemoteStoreResponse(restoreInfo);
}
);

static {
PARSER.declareObject(
optionalConstructorArg(),
(parser, context) -> RestoreInfo.fromXContent(parser),
new ParseField("remote_store")
);
PARSER.declareBoolean(optionalConstructorArg(), new ParseField("accepted"));
}

public static RestoreRemoteStoreResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RestoreRemoteStoreResponse that = (RestoreRemoteStoreResponse) o;
return Objects.equals(restoreInfo, that.restoreInfo);
}

@Override
public int hashCode() {
return Objects.hash(restoreInfo);
}

@Override
public String toString() {
return "RestoreRemoteStoreResponse{" + "restoreInfo=" + restoreInfo + '}';
}
}
Loading

0 comments on commit e68643c

Please sign in to comment.