Skip to content

Commit

Permalink
Skip instead of throwing error on 'getValidDocIdMetadata' (apache#12360)
Browse files Browse the repository at this point in the history
* Skip instead of throwing error on 'getValidDocIdMetadata'

- Current server side API throws the error if the snapshot
  file is not availble. In this case, we should skip
  instead of throwing the error.
- Improve the API doc for validDocIds related APIs

* Change the API 'validDocIdMetadata -> validDocIdsMetadata'
  • Loading branch information
Seunghyun Lee authored Feb 4, 2024
1 parent c9a82c4 commit 0a03c54
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@


@JsonIgnoreProperties(ignoreUnknown = true)
public class ValidDocIdMetadataInfo {
public class ValidDocIdsMetadataInfo {
private final String _segmentName;
private final long _totalValidDocs;
private final long _totalInvalidDocs;
private final long _totalDocs;
private final String _segmentCrc;
private final ValidDocIdsType _validDocIdsType;

public ValidDocIdMetadataInfo(@JsonProperty("segmentName") String segmentName,
public ValidDocIdsMetadataInfo(@JsonProperty("segmentName") String segmentName,
@JsonProperty("totalValidDocs") long totalValidDocs, @JsonProperty("totalInvalidDocs") long totalInvalidDocs,
@JsonProperty("totalDocs") long totalDocs, @JsonProperty("segmentCrc") String segmentCrc,
@JsonProperty("validDocIdsType") ValidDocIdsType validDocIdsType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.response.server.TableIndexMetadataResponse;
import org.apache.pinot.common.restlet.resources.TableSegmentValidationInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.access.AccessControlFactory;
Expand Down Expand Up @@ -951,18 +952,18 @@ public String getTableAggregateMetadata(
}

@GET
@Path("tables/{tableName}/validDocIdMetadata")
@Path("tables/{tableName}/validDocIdsMetadata")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_METADATA)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get the aggregate valid doc id metadata of all segments for a table", notes = "Get the "
+ "aggregate valid doc id metadata of all segments for a table")
public String getTableAggregateValidDocIdMetadata(
public String getTableAggregateValidDocIdsMetadata(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
@ApiParam(value = "A list of segments", allowMultiple = true) @QueryParam("segmentNames")
List<String> segmentNames,
@ApiParam(value = "Valid doc id type", example = "SNAPSHOT|IN_MEMORY|IN_MEMORY_WITH_DELETE")
@QueryParam("validDocIdsType") String validDocIdsType) {
@ApiParam(value = "Valid doc ids type")
@QueryParam("validDocIdsType") ValidDocIdsType validDocIdsType) {
LOGGER.info("Received a request to fetch aggregate valid doc id metadata for a table {}", tableName);
TableType tableType = Constants.validateTableType(tableTypeStr);
if (tableType == TableType.OFFLINE) {
Expand All @@ -972,21 +973,21 @@ public String getTableAggregateValidDocIdMetadata(
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);

String validDocIdMetadata;
String validDocIdsMetadata;
try {
TableMetadataReader tableMetadataReader =
new TableMetadataReader(_executor, _connectionManager, _pinotHelixResourceManager);
JsonNode segmentsMetadataJson =
tableMetadataReader.getAggregateValidDocIdMetadata(tableNameWithType, segmentNames, validDocIdsType,
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
validDocIdMetadata = JsonUtils.objectToPrettyString(segmentsMetadataJson);
tableMetadataReader.getAggregateValidDocIdsMetadata(tableNameWithType, segmentNames,
validDocIdsType.toString(), _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
validDocIdsMetadata = JsonUtils.objectToPrettyString(segmentsMetadataJson);
} catch (InvalidConfigException e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.BAD_REQUEST);
} catch (IOException ioe) {
throw new ControllerApplicationException(LOGGER, "Error parsing Pinot server response: " + ioe.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, ioe);
}
return validDocIdMetadata;
return validDocIdsMetadata;
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
import org.apache.pinot.common.restlet.resources.TableSegments;
import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.spi.utils.JsonUtils;
import org.glassfish.jersey.client.ClientConfig;
Expand Down Expand Up @@ -209,7 +209,7 @@ public List<String> getSegmentMetadataFromServer(String tableNameWithType,
*
* @return segment metadata as a JSON string
*/
public List<ValidDocIdMetadataInfo> getValidDocIdMetadataFromServer(String tableNameWithType,
public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String tableNameWithType,
Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> serverToEndpoints,
@Nullable List<String> segmentNames, int timeoutMs, String validDocIdsType) {
List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
Expand All @@ -226,7 +226,7 @@ public List<ValidDocIdMetadataInfo> getValidDocIdMetadataFromServer(String table
}
}
}
serverURLsAndBodies.add(generateValidDocIdMetadataURL(tableNameWithType, segmentsToQuery, validDocIdsType,
serverURLsAndBodies.add(generateValidDocIdsMetadataURL(tableNameWithType, segmentsToQuery, validDocIdsType,
serverToEndpoints.get(serverToSegments.getKey())));
}

Expand All @@ -239,16 +239,16 @@ public List<ValidDocIdMetadataInfo> getValidDocIdMetadataFromServer(String table
completionServiceHelper.doMultiPostRequest(serverURLsAndBodies, tableNameWithType, false, requestHeaders,
timeoutMs, null);

List<ValidDocIdMetadataInfo> validDocIdMetadataInfos = new ArrayList<>();
List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfos = new ArrayList<>();
int failedParses = 0;
int returnedSegmentsCount = 0;
for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
try {
String validDocIdMetadataList = streamResponse.getValue();
List<ValidDocIdMetadataInfo> validDocIdMetadataInfo =
JsonUtils.stringToObject(validDocIdMetadataList, new TypeReference<ArrayList<ValidDocIdMetadataInfo>>() {
String validDocIdsMetadataList = streamResponse.getValue();
List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfo =
JsonUtils.stringToObject(validDocIdsMetadataList, new TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
});
validDocIdMetadataInfos.addAll(validDocIdMetadataInfo);
validDocIdsMetadataInfos.addAll(validDocIdsMetadataInfo);
returnedSegmentsCount++;
} catch (Exception e) {
failedParses++;
Expand All @@ -261,12 +261,12 @@ public List<ValidDocIdMetadataInfo> getValidDocIdMetadataFromServer(String table
}

if (segmentNames != null && returnedSegmentsCount != segmentNames.size()) {
LOGGER.error("Unable to get validDocIdMetadata from all servers. Expected: {}, Actual: {}", segmentNames.size(),
LOGGER.error("Unable to get validDocIdsMetadata from all servers. Expected: {}, Actual: {}", segmentNames.size(),
returnedSegmentsCount);
}
LOGGER.info("Retrieved valid doc id metadata for {} segments from {} servers.", returnedSegmentsCount,
serverURLsAndBodies.size());
return validDocIdMetadataInfos;
return validDocIdsMetadataInfos;
}

/**
Expand Down Expand Up @@ -354,7 +354,7 @@ private String generateValidDocIdsBitmapURL(String tableNameWithType, String seg
return url;
}

private Pair<String, String> generateValidDocIdMetadataURL(String tableNameWithType, List<String> segmentNames,
private Pair<String, String> generateValidDocIdsMetadataURL(String tableNameWithType, List<String> segmentNames,
String validDocIdsType, String endpoint) {
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
TableSegments tableSegments = new TableSegments(segmentNames);
Expand All @@ -365,7 +365,7 @@ private Pair<String, String> generateValidDocIdMetadataURL(String tableNameWithT
LOGGER.error("Failed to convert segment names to json request body: segmentNames={}", segmentNames);
throw new RuntimeException(e);
}
String url = String.format("%s/tables/%s/validDocIdMetadata", endpoint, tableNameWithType);
String url = String.format("%s/tables/%s/validDocIdsMetadata", endpoint, tableNameWithType);
if (validDocIdsType != null) {
url = url + "?validDocIdsType=" + validDocIdsType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.spi.utils.JsonUtils;

Expand Down Expand Up @@ -156,9 +156,9 @@ public JsonNode getAggregateTableMetadata(String tableNameWithType, List<String>

/**
* This method retrieves the aggregated valid doc id metadata for a given table.
* @return a list of ValidDocIdMetadataInfo
* @return a list of ValidDocIdsMetadataInfo
*/
public JsonNode getAggregateValidDocIdMetadata(String tableNameWithType, List<String> segmentNames,
public JsonNode getAggregateValidDocIdsMetadata(String tableNameWithType, List<String> segmentNames,
String validDocIdsType, int timeoutMs)
throws InvalidConfigException {
final Map<String, List<String>> serverToSegments =
Expand All @@ -168,8 +168,8 @@ public JsonNode getAggregateValidDocIdMetadata(String tableNameWithType, List<St
ServerSegmentMetadataReader serverSegmentMetadataReader =
new ServerSegmentMetadataReader(_executor, _connectionManager);

List<ValidDocIdMetadataInfo> aggregateTableMetadataInfo =
serverSegmentMetadataReader.getValidDocIdMetadataFromServer(tableNameWithType, serverToSegments, endpoints,
List<ValidDocIdsMetadataInfo> aggregateTableMetadataInfo =
serverSegmentMetadataReader.getValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments, endpoints,
segmentNames, timeoutMs, validDocIdsType);
return JsonUtils.objectToJsonNode(aggregateTableMetadataInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public static class UpsertCompactionTask {
public static final String INVALID_RECORDS_THRESHOLD_COUNT = "invalidRecordsThresholdCount";

/**
* Valid doc id type
* Valid doc ids type
*/
public static final String VALID_DOC_IDS_TYPE = "validDocIdsType";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
Expand Down Expand Up @@ -151,15 +151,15 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
validDocIdsType));
}

List<ValidDocIdMetadataInfo> validDocIdMetadataList =
serverSegmentMetadataReader.getValidDocIdMetadataFromServer(tableNameWithType, serverToSegments,
List<ValidDocIdsMetadataInfo> validDocIdsMetadataList =
serverSegmentMetadataReader.getValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments,
serverToEndpoints, null, 60_000, validDocIdsType.toString());

Map<String, SegmentZKMetadata> completedSegmentsMap =
completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName, Function.identity()));

SegmentSelectionResult segmentSelectionResult =
processValidDocIdMetadata(taskConfigs, completedSegmentsMap, validDocIdMetadataList);
processValidDocIdsMetadata(taskConfigs, completedSegmentsMap, validDocIdsMetadataList);

if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) {
pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentSelectionResult.getSegmentsForDeletion(),
Expand Down Expand Up @@ -195,8 +195,8 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
}

@VisibleForTesting
public static SegmentSelectionResult processValidDocIdMetadata(Map<String, String> taskConfigs,
Map<String, SegmentZKMetadata> completedSegmentsMap, List<ValidDocIdMetadataInfo> validDocIdMetadataInfoList) {
public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, String> taskConfigs,
Map<String, SegmentZKMetadata> completedSegmentsMap, List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfoList) {
double invalidRecordsThresholdPercent = Double.parseDouble(
taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT)));
Expand All @@ -205,19 +205,19 @@ public static SegmentSelectionResult processValidDocIdMetadata(Map<String, Strin
String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT)));
List<SegmentZKMetadata> segmentsForCompaction = new ArrayList<>();
List<String> segmentsForDeletion = new ArrayList<>();
for (ValidDocIdMetadataInfo validDocIdMetadata : validDocIdMetadataInfoList) {
long totalInvalidDocs = validDocIdMetadata.getTotalInvalidDocs();
String segmentName = validDocIdMetadata.getSegmentName();
for (ValidDocIdsMetadataInfo validDocIdsMetadata : validDocIdsMetadataInfoList) {
long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
String segmentName = validDocIdsMetadata.getSegmentName();

// Skip segments if the crc from zk metadata and server does not match. They may be being reloaded.
SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
if (segment.getCrc() != Long.parseLong(validDocIdMetadata.getSegmentCrc())) {
if (segment.getCrc() != Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
LOGGER.warn(
"CRC mismatch for segment: {}, skipping it for compaction (segmentZKMetadata={}, validDocIdMetadata={})",
segmentName, segment.getCrc(), validDocIdMetadata.getSegmentCrc());
"CRC mismatch for segment: {}, skipping it for compaction (segmentZKMetadata={}, validDocIdsMetadata={})",
segmentName, segment.getCrc(), validDocIdsMetadata.getSegmentCrc());
continue;
}
long totalDocs = validDocIdMetadata.getTotalDocs();
long totalDocs = validDocIdsMetadata.getTotalDocs();
double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 100;
if (totalInvalidDocs == totalDocs) {
segmentsForDeletion.add(segment.getSegmentName());
Expand Down
Loading

0 comments on commit 0a03c54

Please sign in to comment.