Skip to content

Commit

Permalink
Reset discovery nodes in most transport node actions request. (#15131) (
Browse files Browse the repository at this point in the history
#15674)

Signed-off-by: Swetha Guptha <[email protected]>
  • Loading branch information
SwethaGuptha authored Sep 5, 2024
1 parent cd77e41 commit 4589765
Show file tree
Hide file tree
Showing 25 changed files with 87 additions and 112 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409))[SnapshotV2] Snapshot Status API changes (#15409))
- ClusterManagerTaskThrottler Improvements ([#15508](https://github.com/opensearch-project/OpenSearch/pull/15508))
- Relax the join validation for Remote State publication ([#15471](https://github.com/opensearch-project/OpenSearch/pull/15471))
- Reset DiscoveryNodes in all transport node actions request ([#15131](https://github.com/opensearch-project/OpenSearch/pull/15131))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public NodesHotThreadsRequest(StreamInput in) throws IOException {
* threads for all nodes is used.
*/
public NodesHotThreadsRequest(String... nodesIds) {
super(nodesIds);
super(false, nodesIds);
}

public int threads() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public NodesInfoRequest(StreamInput in) throws IOException {
* for all nodes will be returned.
*/
public NodesInfoRequest(String... nodesIds) {
super(nodesIds);
super(false, nodesIds);
all();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
private final Set<String> requestedMetrics = new HashSet<>();

public NodesStatsRequest() {
super((String[]) null);
super(false, (String[]) null);
}

public NodesStatsRequest(StreamInput in) throws IOException {
Expand Down Expand Up @@ -90,7 +90,7 @@ public NodesStatsRequest(StreamInput in) throws IOException {
* for all nodes will be returned.
*/
public NodesStatsRequest(String... nodesIds) {
super(nodesIds);
super(false, nodesIds);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public NodesUsageRequest(StreamInput in) throws IOException {
* passed, usage for all nodes will be returned.
*/
public NodesUsageRequest(String... nodesIds) {
super(nodesIds);
super(false, nodesIds);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public Request(StreamInput in) throws IOException {
}

public Request(String[] nodesIds) {
super(nodesIds);
super(false, nodesIds);
}

public Request snapshots(Snapshot[] snapshots) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public ClusterStatsRequest(StreamInput in) throws IOException {
* based on all nodes will be returned.
*/
public ClusterStatsRequest(String... nodesIds) {
super(nodesIds);
super(false, nodesIds);
}

public boolean useAggregatedNodeLevelResponses() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public FindDanglingIndexRequest(StreamInput in) throws IOException {
}

public FindDanglingIndexRequest(String indexUUID) {
super(Strings.EMPTY_ARRAY);
super(false, Strings.EMPTY_ARRAY);
this.indexUUID = indexUUID;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ public ListDanglingIndicesRequest(StreamInput in) throws IOException {
}

public ListDanglingIndicesRequest() {
super(Strings.EMPTY_ARRAY);
super(false, Strings.EMPTY_ARRAY);
this.indexUUID = null;
}

public ListDanglingIndicesRequest(String indexUUID) {
super(Strings.EMPTY_ARRAY);
super(false, Strings.EMPTY_ARRAY);
this.indexUUID = indexUUID;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class GetAllPitNodesRequest extends BaseNodesRequest<GetAllPitNodesReques

@Inject
public GetAllPitNodesRequest(DiscoveryNode... concreteNodes) {
super(concreteNodes);
super(false, concreteNodes);
}

public GetAllPitNodesRequest(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>
* Setting default behavior as `true` but can be explicitly changed in requests that do not require.
*/
private boolean includeDiscoveryNodes = true;

private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30);

private TimeValue timeout;
Expand All @@ -88,11 +89,22 @@ protected BaseNodesRequest(String... nodesIds) {
this.nodesIds = nodesIds;
}

protected BaseNodesRequest(boolean includeDiscoveryNodes, String... nodesIds) {
this.nodesIds = nodesIds;
this.includeDiscoveryNodes = includeDiscoveryNodes;
}

protected BaseNodesRequest(DiscoveryNode... concreteNodes) {
this.nodesIds = null;
this.concreteNodes = concreteNodes;
}

protected BaseNodesRequest(boolean includeDiscoveryNodes, DiscoveryNode... concreteNodes) {
this.nodesIds = null;
this.concreteNodes = concreteNodes;
this.includeDiscoveryNodes = includeDiscoveryNodes;
}

public final String[] nodesIds() {
return nodesIds;
}
Expand Down Expand Up @@ -127,10 +139,6 @@ public void setConcreteNodes(DiscoveryNode[] concreteNodes) {
this.concreteNodes = concreteNodes;
}

public void setIncludeDiscoveryNodes(boolean value) {
includeDiscoveryNodes = value;
}

public boolean getIncludeDiscoveryNodes() {
return includeDiscoveryNodes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,16 @@ class AsyncAction {
}
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);
this.concreteNodes = request.concreteNodes();

if (request.getIncludeDiscoveryNodes() == false) {
// As we transfer the ownership of discovery nodes to route the request to into the AsyncAction class, we
// remove the list of DiscoveryNodes from the request. This reduces the payload of the request and improves
// As we transfer the ownership of discovery nodes to route the request to into the AsyncAction class,
// we remove the list of DiscoveryNodes from the request. This reduces the payload of the request and improves
// the number of concrete nodes in the memory.
request.setConcreteNodes(null);
}
}

void start() {
final DiscoveryNode[] nodes = this.concreteNodes;
if (nodes.length == 0) {
if (this.concreteNodes.length == 0) {
// nothing to notify
threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));
return;
Expand All @@ -260,9 +258,9 @@ void start() {
if (request.timeout() != null) {
builder.withTimeout(request.timeout());
}
for (int i = 0; i < nodes.length; i++) {
for (int i = 0; i < this.concreteNodes.length; i++) {
final int idx = i;
final DiscoveryNode node = nodes[i];
final DiscoveryNode node = this.concreteNodes[i];
final String nodeId = node.getId();
try {
TransportRequest nodeRequest = newNodeRequest(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public Request(StreamInput in) throws IOException {
}

public Request(String... nodesIds) {
super(nodesIds);
super(false, nodesIds);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public Request(StreamInput in) throws IOException {
}

public Request(ShardId shardId, String customDataPath, DiscoveryNode[] nodes) {
super(nodes);
super(false, nodes);
this.shardId = Objects.requireNonNull(shardId);
this.customDataPath = Objects.requireNonNull(customDataPath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public Request(StreamInput in) throws IOException {
}

public Request(DiscoveryNode[] nodes, Map<ShardId, ShardAttributes> shardAttributes) {
super(nodes);
super(false, nodes);
this.shardAttributes = Objects.requireNonNull(shardAttributes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public Request(StreamInput in) throws IOException {
}

public Request(ShardId shardId, String customDataPath, DiscoveryNode[] nodes) {
super(nodes);
super(false, nodes);
this.shardId = Objects.requireNonNull(shardId);
this.customDataPath = Objects.requireNonNull(customDataPath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public Request(StreamInput in) throws IOException {
}

public Request(Map<ShardId, ShardAttributes> shardAttributes, DiscoveryNode[] nodes) {
super(nodes);
super(false, nodes);
this.shardAttributes = Objects.requireNonNull(shardAttributes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public String getName() {
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null));
clusterStatsRequest.timeout(request.param("timeout"));
clusterStatsRequest.setIncludeDiscoveryNodes(false);
clusterStatsRequest.useAggregatedNodeLevelResponses(true);
return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
final NodesInfoRequest nodesInfoRequest = prepareRequest(request);
nodesInfoRequest.timeout(request.param("timeout"));
settingsFilter.addFilterSettingParams(request);
nodesInfoRequest.setIncludeDiscoveryNodes(false);
return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
// If no levels are passed in this results in an empty array.
String[] levels = Strings.splitStringByCommaToArray(request.param("level"));
nodesStatsRequest.indices().setLevels(levels);
nodesStatsRequest.setIncludeDiscoveryNodes(false);
nodesStatsRequest.indices().setIncludeIndicesStatsByLevel(true);

return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
public void processResponse(final ClusterStateResponse clusterStateResponse) {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.timeout(request.param("timeout"));
nodesInfoRequest.setIncludeDiscoveryNodes(false);
nodesInfoRequest.clear()
.addMetrics(
NodesInfoRequest.Metric.JVM.metricName(),
Expand All @@ -138,7 +137,6 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) {
public void processResponse(final NodesInfoResponse nodesInfoResponse) {
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
nodesStatsRequest.timeout(request.param("timeout"));
nodesStatsRequest.setIncludeDiscoveryNodes(false);
nodesStatsRequest.clear()
.indices(true)
.addMetrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,48 +33,12 @@

public class TransportClusterStatsActionTests extends TransportNodesActionTests {

/**
* By default, we send discovery nodes list to each request that is sent across from the coordinator node. This
* behavior is asserted in this test.
*/
public void testClusterStatsActionWithRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.setIncludeDiscoveryNodes(true);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

public void testClusterStatsActionWithPreFilledConcreteNodesAndWithRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
Collection<DiscoveryNode> discoveryNodes = clusterService.state().getNodes().getNodes().values();
request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new));
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

/**
* In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is
* asserted in this test.
*/
public void testClusterStatsActionWithoutRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.setIncludeDiscoveryNodes(false);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
Expand All @@ -88,7 +52,6 @@ public void testClusterStatsActionWithPreFilledConcreteNodesAndWithoutRetentionO
ClusterStatsRequest request = new ClusterStatsRequest();
Collection<DiscoveryNode> discoveryNodes = clusterService.state().getNodes().getNodes().values();
request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new));
request.setIncludeDiscoveryNodes(false);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
Expand Down
Loading

0 comments on commit 4589765

Please sign in to comment.