Skip to content

Commit

Permalink
When reading server to segments map, exclude OFFLINE segments (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Oct 17, 2023
1 parent 51335a2 commit 7247da8
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
import io.swagger.annotations.SwaggerDefinition;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -661,13 +661,13 @@ public ServerReloadControllerJobStatusResponse getReloadJobStatus(
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
if (singleSegmentName != null) {
// No need to query servers where this segment is not supposed to be hosted
serverToSegments = new HashMap<>();
List<String> segmentList = Arrays.asList(singleSegmentName);
serverToSegments = new TreeMap<>();
List<String> segmentList = Collections.singletonList(singleSegmentName);
_pinotHelixResourceManager.getServers(tableNameWithType, singleSegmentName).forEach(server -> {
serverToSegments.put(server, segmentList);
});
} else {
serverToSegments = _pinotHelixResourceManager.getServerToOnlineSegmentsMap(tableNameWithType);
serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
}

BiMap<String, String> serverEndPoints =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2651,29 +2651,8 @@ public void toggleQueryQuotaStateForBroker(String brokerInstanceName, String sta
}

/**
* Returns a map from server instance to list of online segments it serves for the given table.
*/
public Map<String, List<String>> getServerToOnlineSegmentsMap(String tableNameWithType) {
Map<String, List<String>> serverToSegmentsMap = new TreeMap<>();
IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
if (idealState == null) {
throw new IllegalStateException("Ideal state does not exist for table: " + tableNameWithType);
}
for (Map.Entry<String, Map<String, String>> entry : idealState.getRecord().getMapFields().entrySet()) {
String segmentName = entry.getKey();
for (Map.Entry<String, String> e : entry.getValue().entrySet()) {
String server = e.getKey();
String status = e.getValue();
if (status.equals(SegmentStateModel.CONSUMING) || status.equals(SegmentStateModel.ONLINE)) {
serverToSegmentsMap.computeIfAbsent(server, key -> new ArrayList<>()).add(segmentName);
}
}
}
return serverToSegmentsMap;
}

/**
* Returns a map from server instance to list of segments it serves for the given table.
* Returns a map from server instance to list of segments it serves for the given table. Ignore OFFLINE segments from
* the ideal state because they are not supposed to be served.
*/
public Map<String, List<String>> getServerToSegmentsMap(String tableNameWithType) {
Map<String, List<String>> serverToSegmentsMap = new TreeMap<>();
Expand All @@ -2683,15 +2662,18 @@ public Map<String, List<String>> getServerToSegmentsMap(String tableNameWithType
}
for (Map.Entry<String, Map<String, String>> entry : idealState.getRecord().getMapFields().entrySet()) {
String segmentName = entry.getKey();
for (String server : entry.getValue().keySet()) {
serverToSegmentsMap.computeIfAbsent(server, key -> new ArrayList<>()).add(segmentName);
for (Map.Entry<String, String> instanceStateEntry : entry.getValue().entrySet()) {
if (!instanceStateEntry.getValue().equals(SegmentStateModel.OFFLINE)) {
serverToSegmentsMap.computeIfAbsent(instanceStateEntry.getKey(), key -> new ArrayList<>()).add(segmentName);
}
}
}
return serverToSegmentsMap;
}

/**
* Returns a set of server instances for a given table and segment
* Returns a set of server instances for a given table and segment. Ignore OFFLINE segments from the ideal state
* because they are not supposed to be served.
*/
public Set<String> getServers(String tableNameWithType, String segmentName) {
IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
Expand All @@ -2701,7 +2683,13 @@ public Set<String> getServers(String tableNameWithType, String segmentName) {
Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName);
Preconditions.checkState(instanceStateMap != null, "Segment: {} does not exist in the ideal state of table: {}",
segmentName, tableNameWithType);
return instanceStateMap.keySet();
Set<String> servers = new TreeSet<>();
for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
if (!entry.getValue().equals(SegmentStateModel.OFFLINE)) {
servers.add(entry.getKey());
}
}
return servers;
}

/**
Expand All @@ -2722,15 +2710,9 @@ public Set<String> getConsumingSegments(String tableNameWithType) {
return consumingSegments;
}

/**
* Utility function to return set of servers corresponding to a given segment.
*/
@Deprecated
public Set<String> getServersForSegment(String tableNameWithType, String segmentName) {
IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
if (idealState == null) {
throw new IllegalStateException("Ideal state does not exist for table: " + tableNameWithType);
}
return new HashSet<>(idealState.getInstanceStateMap(segmentName).keySet());
return getServers(tableNameWithType, segmentName);
}

public synchronized Map<String, String> getSegmentsCrcForTable(String tableNameWithType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,13 @@ public TableStatus.IngestionStatus getIngestionStatus(String tableNameWithType,
}

// Check if any responses are missing
Set<String> serversForSegment = _pinotHelixResourceManager.getServersForSegment(tableNameWithType, segmentName);
if (serversForSegment.size() != consumingSegmentInfoList.size()) {
Set<String> servers = _pinotHelixResourceManager.getServers(tableNameWithType, segmentName);
if (servers.size() != consumingSegmentInfoList.size()) {
Set<String> serversResponded =
consumingSegmentInfoList.stream().map(c -> c._serverName).collect(Collectors.toSet());
serversForSegment.removeAll(serversResponded);
servers.removeAll(serversResponded);
String errorMessage =
"Not all servers responded for segment: " + segmentName + " Missing servers : " + serversForSegment;
"Not all servers responded for segment: " + segmentName + " Missing servers : " + servers;
return TableStatus.IngestionStatus.newIngestionStatus(TableStatus.IngestionState.UNHEALTHY, errorMessage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -197,11 +197,11 @@ private BiMap<String, String> serverEndpoints(String... servers) {
private void mockSetup(final String[] servers, final Set<String> consumingSegments)
throws InvalidConfigException {
when(_helix.getServerToSegmentsMap(anyString())).thenAnswer(invocationOnMock -> subsetOfServerSegments(servers));
when(_helix.getServers(anyString(), anyString())).thenAnswer(
invocationOnMock -> new TreeSet<>(Arrays.asList(servers)));
when(_helix.getDataInstanceAdminEndpoints(ArgumentMatchers.anySet())).thenAnswer(
invocationOnMock -> serverEndpoints(servers));
when(_helix.getConsumingSegments(anyString())).thenAnswer(invocationOnMock -> consumingSegments);
when(_helix.getServersForSegment(anyString(), anyString())).thenAnswer(
invocationOnMock -> new HashSet<>(Arrays.asList(servers)));
}

private ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap testRunner(final String[] servers,
Expand Down

0 comments on commit 7247da8

Please sign in to comment.