Skip to content

Commit

Permalink
Add batch async shard fetch transport action for replica #8218
Browse files Browse the repository at this point in the history
Modify async replica shard fetch to use helper functions

Signed-off-by: sudarshan baliga <[email protected]>
  • Loading branch information
sudarshan-baliga committed Jun 29, 2023
1 parent 73b7a85 commit fb5a983
Show file tree
Hide file tree
Showing 7 changed files with 627 additions and 201 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway;

import org.opensearch.action.ActionListener;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.lease.Releasable;
import org.opensearch.index.shard.ShardId;

import java.util.Map;


/**
* This class is responsible for fetching shard data from nodes. It is analogous to AsyncShardFetch class except
* that we fetch batch of shards in this class from single transport request to a node.
* @param <T>
*
* @opensearch.internal
*/
public abstract class AsyncShardsFetchPerNode<T extends BaseNodeResponse> implements Releasable {

/**
* An action that lists the relevant shard data that needs to be fetched.
*/
public interface Lister<NodesResponse extends BaseNodesResponse<NodeResponse>, NodeResponse extends BaseNodeResponse> {
void list(DiscoveryNode[] nodes, Map<ShardId, String> shardIdsWithCustomDataPath, ActionListener<NodesResponse> listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -106,7 +106,7 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary";
assert primaryShard.currentNodeId() != null;
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
final TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
final TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// just let the recovery find it out, no need to do anything about it for the initializing shard
Expand Down Expand Up @@ -223,7 +223,7 @@ public AllocateUnassignedDecision makeAllocationDecision(
}
assert primaryShard.currentNodeId() != null;
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
final TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
final TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// we want to let the replica be allocated in order to expose the actual problem with the primary that the replica
Expand Down Expand Up @@ -357,7 +357,7 @@ private static List<NodeAllocationResult> augmentExplanationsWithStoreInfo(
/**
* Finds the store for the assigned shard in the fetched data, returns null if none is found.
*/
private static TransportNodesListShardStoreMetadata.StoreFilesMetadata findStore(
private static TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata findStore(
DiscoveryNode node,
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> data
) {
Expand All @@ -373,7 +373,7 @@ private MatchingNodes findMatchingNodes(
RoutingAllocation allocation,
boolean noMatchFailedNodes,
DiscoveryNode primaryNode,
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> data,
boolean explain
) {
Expand All @@ -386,7 +386,7 @@ private MatchingNodes findMatchingNodes(
&& shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) {
continue;
}
TransportNodesListShardStoreMetadata.StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().storeFilesMetadata();
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().storeFilesMetadata();
// we don't have any files at all, it is an empty index
if (storeFilesMetadata.isEmpty()) {
continue;
Expand Down Expand Up @@ -442,8 +442,8 @@ private MatchingNodes findMatchingNodes(
}

private static long computeMatchingBytes(
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadata.StoreFilesMetadata storeFilesMetadata
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata
) {
long sizeMatched = 0;
for (StoreFileMetadata storeFileMetadata : storeFilesMetadata) {
Expand All @@ -456,18 +456,18 @@ private static long computeMatchingBytes(
}

private static boolean hasMatchingSyncId(
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata replicaStore
) {
String primarySyncId = primaryStore.syncId();
return primarySyncId != null && primarySyncId.equals(replicaStore.syncId());
}

private static MatchingNode computeMatchingNode(
DiscoveryNode primaryNode,
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
DiscoveryNode replicaNode,
TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata replicaStore
) {
final long retainingSeqNoForPrimary = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(primaryNode);
final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(replicaNode);
Expand All @@ -478,7 +478,7 @@ private static MatchingNode computeMatchingNode(
}

private static boolean canPerformOperationBasedRecovery(
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> shardStores,
DiscoveryNode targetNode
) {
Expand Down
Loading

0 comments on commit fb5a983

Please sign in to comment.