From 49196ec7dfb63f7444ae5128183ce209f247456b Mon Sep 17 00:00:00 2001 From: Saurabh Dubey Date: Mon, 18 Sep 2023 20:16:40 +0530 Subject: [PATCH] Add table indexes API (#11576) * Add table indexes API * Add tests * Lint fix * Address comments * Fix test --------- Co-authored-by: Saurabh Dubey --- .../server/TableIndexMetadataResponse.java | 44 ++++++++++++ .../resources/PinotTableRestletResource.java | 68 +++++++++++++++++++ .../tests/OfflineClusterIntegrationTest.java | 34 ++++++++++ .../server/api/resources/TablesResource.java | 49 +++++++++++++ .../pinot/server/api/TablesResourceTest.java | 37 ++++++++++ 5 files changed, 232 insertions(+) create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/response/server/TableIndexMetadataResponse.java diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/server/TableIndexMetadataResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/server/TableIndexMetadataResponse.java new file mode 100644 index 000000000000..eeb836c91939 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/server/TableIndexMetadataResponse.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.response.server; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; + + +public class TableIndexMetadataResponse { + private final long _totalOnlineSegments; + private final Map> _columnToIndexesCount; + + @JsonCreator + public TableIndexMetadataResponse(@JsonProperty("totalOnlineSegments") long totalOnlineSegments, + @JsonProperty("columnToIndexesCount") Map> columnToIndexesCount) { + _totalOnlineSegments = totalOnlineSegments; + _columnToIndexesCount = columnToIndexesCount; + } + + public long getTotalOnlineSegments() { + return _totalOnlineSegments; + } + + public Map> getColumnToIndexesCount() { + return _columnToIndexesCount; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index c79b6e4d040a..0cf4e4f8add2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -81,6 +81,7 @@ import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.response.server.TableIndexMetadataResponse; import org.apache.pinot.common.restlet.resources.TableSegmentValidationInfo; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.controller.ControllerConf; @@ -964,6 +965,73 @@ public String getTableAggregateMetadata( return segmentsMetadata; } + @GET + @Path("tables/{tableName}/indexes") + @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_METADATA) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get the aggregate index details of all segments for a table", notes = "Get the aggregate " + + "index details of all segments for a table") + public String getTableIndexes( + @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, + @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr) { + LOGGER.info("Received a request to fetch aggregate metadata for a table {}", tableName); + TableType tableType = Constants.validateTableType(tableTypeStr); + String tableNameWithType = + ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0); + + String tableIndexMetadata; + try { + JsonNode segmentsMetadataJson = getAggregateIndexMetadataFromServer(tableNameWithType); + tableIndexMetadata = JsonUtils.objectToPrettyString(segmentsMetadataJson); + } catch (InvalidConfigException e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.BAD_REQUEST); + } catch (IOException ioe) { + throw new ControllerApplicationException(LOGGER, "Error parsing Pinot server response: " + ioe.getMessage(), + Response.Status.INTERNAL_SERVER_ERROR, ioe); + } + return tableIndexMetadata; + } + + private JsonNode getAggregateIndexMetadataFromServer(String tableNameWithType) + throws InvalidConfigException, JsonProcessingException { + final Map> serverToSegments = + _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType); + + BiMap serverEndPoints = + _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet()); + CompletionServiceHelper completionServiceHelper = + new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints); + + List serverUrls = new ArrayList<>(); + BiMap endpointsToServers = serverEndPoints.inverse(); + for (String endpoint : endpointsToServers.keySet()) { + String segmentIndexesEndpoint = endpoint + String.format("/tables/%s/indexes", tableNameWithType); + serverUrls.add(segmentIndexesEndpoint); + } + + CompletionServiceHelper.CompletionServiceResponse serviceResponse = + completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000); + + int totalSegments = 0; + Map> columnToIndexCountMap = new HashMap<>(); + for (Map.Entry streamResponse : serviceResponse._httpResponses.entrySet()) { + String responseString = streamResponse.getValue(); + TableIndexMetadataResponse response = JsonUtils.stringToObject(responseString, TableIndexMetadataResponse.class); + totalSegments += response.getTotalOnlineSegments(); + response.getColumnToIndexesCount().forEach((col, indexToCount) -> { + Map indexCountMap = columnToIndexCountMap.computeIfAbsent(col, c -> new HashMap<>()); + indexToCount.forEach((indexName, count) -> { + indexCountMap.merge(indexName, count, Integer::sum); + }); + }); + } + + TableIndexMetadataResponse tableIndexMetadataResponse = + new TableIndexMetadataResponse(totalSegments, columnToIndexCountMap); + + return JsonUtils.objectToJsonNode(tableIndexMetadataResponse); + } + /** * This is a helper method to get the metadata for all segments for a given table name. * @param tableNameWithType name of the table along with its type diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 0be7e109c634..89d2d2449551 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -54,12 +54,14 @@ import org.apache.pinot.common.exception.HttpErrorStatusException; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.response.server.TableIndexMetadataResponse; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.common.utils.SimpleHttpResponse; import org.apache.pinot.common.utils.http.HttpClient; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator; +import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.config.table.IndexingConfig; @@ -2911,6 +2913,38 @@ public void testHardcodedServerPartitionedSqlQueries() super.testHardcodedServerPartitionedSqlQueries(); } + @Test + public void testIndexMetadataAPI() + throws Exception { + TableIndexMetadataResponse tableIndexMetadataResponse = + JsonUtils.stringToObject(sendGetRequest(getControllerBaseApiUrl() + "/tables/mytable/indexes?type=OFFLINE"), + TableIndexMetadataResponse.class); + + getInvertedIndexColumns().forEach(column -> { + Assert.assertEquals( + (long) tableIndexMetadataResponse.getColumnToIndexesCount().get(column).get(StandardIndexes.INVERTED_ID), + tableIndexMetadataResponse.getTotalOnlineSegments()); + }); + + getNoDictionaryColumns().forEach(column -> { + Assert.assertEquals( + (long) tableIndexMetadataResponse.getColumnToIndexesCount().get(column).get(StandardIndexes.DICTIONARY_ID), + 0); + }); + + getRangeIndexColumns().forEach(column -> { + Assert.assertEquals( + (long) tableIndexMetadataResponse.getColumnToIndexesCount().get(column).get(StandardIndexes.RANGE_ID), + tableIndexMetadataResponse.getTotalOnlineSegments()); + }); + + getBloomFilterColumns().forEach(column -> { + Assert.assertEquals( + (long) tableIndexMetadataResponse.getColumnToIndexesCount().get(column).get(StandardIndexes.BLOOM_FILTER_ID), + tableIndexMetadataResponse.getTotalOnlineSegments()); + }); + } + @Test public void testAggregateMetadataAPI() throws IOException { diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index 0e261e143ddd..b08833b966d1 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -62,6 +62,7 @@ import org.apache.helix.model.IdealState; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.response.server.TableIndexMetadataResponse; import org.apache.pinot.common.restlet.resources.ResourceUtils; import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo; import org.apache.pinot.common.restlet.resources.TableMetadataInfo; @@ -83,6 +84,8 @@ import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.index.IndexService; import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.server.access.AccessControlFactory; @@ -93,6 +96,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.stream.ConsumerPartitionState; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; +import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.roaringbitmap.buffer.MutableRoaringBitmap; import org.slf4j.Logger; @@ -285,6 +289,51 @@ public String getSegmentMetadata( return ResourceUtils.convertToJsonString(tableMetadataInfo); } + @GET + @Encoded + @Path("/tables/{tableName}/indexes") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Provide index metadata", notes = "Provide index details for the table") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error", + response = ErrorInfo.class), @ApiResponse(code = 404, message = "Table or segment not found", response = + ErrorInfo.class) + }) + public String getTableIndexes( + @ApiParam(value = "Table name including type", required = true, example = "myTable_OFFLINE") + @PathParam("tableName") String tableName) + throws Exception { + TableDataManager tableDataManager = ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableName); + List allSegments = tableDataManager.acquireAllSegments(); + try { + int totalSegmentCount = 0; + Map> columnToIndexesCount = new HashMap<>(); + for (SegmentDataManager segmentDataManager : allSegments) { + if (segmentDataManager instanceof RealtimeSegmentDataManager) { + // REALTIME segments may not have indexes since not all indexes have mutable implementations + continue; + } + totalSegmentCount++; + IndexSegment segment = segmentDataManager.getSegment(); + segment.getColumnNames().forEach(col -> { + columnToIndexesCount.putIfAbsent(col, new HashMap<>()); + DataSource colDataSource = segment.getDataSource(col); + IndexService.getInstance().getAllIndexes().forEach(idxType -> { + int count = colDataSource.getIndex(idxType) != null ? 1 : 0; + columnToIndexesCount.get(col).merge(idxType.getId(), count, Integer::sum); + }); + }); + } + TableIndexMetadataResponse tableIndexMetadataResponse = + new TableIndexMetadataResponse(totalSegmentCount, columnToIndexesCount); + return JsonUtils.objectToString(tableIndexMetadataResponse); + } finally { + for (SegmentDataManager segmentDataManager : allSegments) { + tableDataManager.releaseSegment(segmentDataManager); + } + } + } + @GET @Encoded @Path("/tables/{tableName}/segments/{segmentName}/metadata") diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java index 7e7c4b7fbaae..488350aa1773 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java @@ -22,9 +22,12 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.List; +import java.util.Map; import javax.ws.rs.core.Response; import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.response.server.TableIndexMetadataResponse; import org.apache.pinot.common.restlet.resources.TableMetadataInfo; import org.apache.pinot.common.restlet.resources.TableSegments; import org.apache.pinot.common.restlet.resources.TablesList; @@ -35,6 +38,8 @@ import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.index.IndexService; import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; @@ -109,6 +114,38 @@ public void getSegments() Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode()); } + @Test + public void getTableIndexes() + throws Exception { + String tableIndexesPath = + "/tables/" + TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TABLE_NAME) + "/indexes"; + + JsonNode jsonResponse = JsonUtils.stringToJsonNode(_webTarget.path(tableIndexesPath).request().get(String.class)); + TableIndexMetadataResponse tableIndexMetadataResponse = + JsonUtils.jsonNodeToObject(jsonResponse, TableIndexMetadataResponse.class); + Assert.assertNotNull(tableIndexMetadataResponse); + Assert.assertEquals(tableIndexMetadataResponse.getTotalOnlineSegments(), _offlineIndexSegments.size()); + + Map> columnToIndexCountMap = new HashMap<>(); + for (ImmutableSegment segment : _offlineIndexSegments) { + segment.getColumnNames().forEach(colName -> { + DataSource dataSource = segment.getDataSource(colName); + columnToIndexCountMap.putIfAbsent(colName, new HashMap<>()); + IndexService.getInstance().getAllIndexes().forEach(indexType -> { + int count = dataSource.getIndex(indexType) != null ? 1 : 0; + columnToIndexCountMap.get(colName).merge(indexType.getId(), count, Integer::sum); + }); + }); + } + + Assert.assertEquals(tableIndexMetadataResponse.getColumnToIndexesCount(), columnToIndexCountMap); + + // No such table + Response response = _webTarget.path("/tables/noSuchTable/indexes").request().get(Response.class); + Assert.assertNotNull(response); + Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode()); + } + @Test public void getTableMetadata() throws Exception {