Skip to content

Commit

Permalink
Add table indexes API (apache#11576)
Browse files Browse the repository at this point in the history
* Add table indexes API

* Add tests

* Lint fix

* Address comments

* Fix test

---------

Co-authored-by: Saurabh Dubey <[email protected]>
  • Loading branch information
saurabhd336 and Saurabh Dubey authored Sep 18, 2023
1 parent b1b070f commit 49196ec
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.response.server;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;


public class TableIndexMetadataResponse {
private final long _totalOnlineSegments;
private final Map<String, Map<String, Integer>> _columnToIndexesCount;

@JsonCreator
public TableIndexMetadataResponse(@JsonProperty("totalOnlineSegments") long totalOnlineSegments,
@JsonProperty("columnToIndexesCount") Map<String, Map<String, Integer>> columnToIndexesCount) {
_totalOnlineSegments = totalOnlineSegments;
_columnToIndexesCount = columnToIndexesCount;
}

public long getTotalOnlineSegments() {
return _totalOnlineSegments;
}

public Map<String, Map<String, Integer>> getColumnToIndexesCount() {
return _columnToIndexesCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.response.server.TableIndexMetadataResponse;
import org.apache.pinot.common.restlet.resources.TableSegmentValidationInfo;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
Expand Down Expand Up @@ -964,6 +965,73 @@ public String getTableAggregateMetadata(
return segmentsMetadata;
}

@GET
@Path("tables/{tableName}/indexes")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_METADATA)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get the aggregate index details of all segments for a table", notes = "Get the aggregate "
+ "index details of all segments for a table")
public String getTableIndexes(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr) {
LOGGER.info("Received a request to fetch aggregate metadata for a table {}", tableName);
TableType tableType = Constants.validateTableType(tableTypeStr);
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);

String tableIndexMetadata;
try {
JsonNode segmentsMetadataJson = getAggregateIndexMetadataFromServer(tableNameWithType);
tableIndexMetadata = JsonUtils.objectToPrettyString(segmentsMetadataJson);
} catch (InvalidConfigException e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.BAD_REQUEST);
} catch (IOException ioe) {
throw new ControllerApplicationException(LOGGER, "Error parsing Pinot server response: " + ioe.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, ioe);
}
return tableIndexMetadata;
}

private JsonNode getAggregateIndexMetadataFromServer(String tableNameWithType)
throws InvalidConfigException, JsonProcessingException {
final Map<String, List<String>> serverToSegments =
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);

BiMap<String, String> serverEndPoints =
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
CompletionServiceHelper completionServiceHelper =
new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);

List<String> serverUrls = new ArrayList<>();
BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
for (String endpoint : endpointsToServers.keySet()) {
String segmentIndexesEndpoint = endpoint + String.format("/tables/%s/indexes", tableNameWithType);
serverUrls.add(segmentIndexesEndpoint);
}

CompletionServiceHelper.CompletionServiceResponse serviceResponse =
completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000);

int totalSegments = 0;
Map<String, Map<String, Integer>> columnToIndexCountMap = new HashMap<>();
for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
String responseString = streamResponse.getValue();
TableIndexMetadataResponse response = JsonUtils.stringToObject(responseString, TableIndexMetadataResponse.class);
totalSegments += response.getTotalOnlineSegments();
response.getColumnToIndexesCount().forEach((col, indexToCount) -> {
Map<String, Integer> indexCountMap = columnToIndexCountMap.computeIfAbsent(col, c -> new HashMap<>());
indexToCount.forEach((indexName, count) -> {
indexCountMap.merge(indexName, count, Integer::sum);
});
});
}

TableIndexMetadataResponse tableIndexMetadataResponse =
new TableIndexMetadataResponse(totalSegments, columnToIndexCountMap);

return JsonUtils.objectToJsonNode(tableIndexMetadataResponse);
}

/**
* This is a helper method to get the metadata for all segments for a given table name.
* @param tableNameWithType name of the table along with its type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.response.server.TableIndexMetadataResponse;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.IndexingConfig;
Expand Down Expand Up @@ -2911,6 +2913,38 @@ public void testHardcodedServerPartitionedSqlQueries()
super.testHardcodedServerPartitionedSqlQueries();
}

@Test
public void testIndexMetadataAPI()
throws Exception {
TableIndexMetadataResponse tableIndexMetadataResponse =
JsonUtils.stringToObject(sendGetRequest(getControllerBaseApiUrl() + "/tables/mytable/indexes?type=OFFLINE"),
TableIndexMetadataResponse.class);

getInvertedIndexColumns().forEach(column -> {
Assert.assertEquals(
(long) tableIndexMetadataResponse.getColumnToIndexesCount().get(column).get(StandardIndexes.INVERTED_ID),
tableIndexMetadataResponse.getTotalOnlineSegments());
});

getNoDictionaryColumns().forEach(column -> {
Assert.assertEquals(
(long) tableIndexMetadataResponse.getColumnToIndexesCount().get(column).get(StandardIndexes.DICTIONARY_ID),
0);
});

getRangeIndexColumns().forEach(column -> {
Assert.assertEquals(
(long) tableIndexMetadataResponse.getColumnToIndexesCount().get(column).get(StandardIndexes.RANGE_ID),
tableIndexMetadataResponse.getTotalOnlineSegments());
});

getBloomFilterColumns().forEach(column -> {
Assert.assertEquals(
(long) tableIndexMetadataResponse.getColumnToIndexesCount().get(column).get(StandardIndexes.BLOOM_FILTER_ID),
tableIndexMetadataResponse.getTotalOnlineSegments());
});
}

@Test
public void testAggregateMetadataAPI()
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.response.server.TableIndexMetadataResponse;
import org.apache.pinot.common.restlet.resources.ResourceUtils;
import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
Expand All @@ -83,6 +84,8 @@
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.IndexService;
import org.apache.pinot.segment.spi.index.IndexType;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.server.access.AccessControlFactory;
Expand All @@ -93,6 +96,7 @@
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -285,6 +289,51 @@ public String getSegmentMetadata(
return ResourceUtils.convertToJsonString(tableMetadataInfo);
}

@GET
@Encoded
@Path("/tables/{tableName}/indexes")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Provide index metadata", notes = "Provide index details for the table")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error",
response = ErrorInfo.class), @ApiResponse(code = 404, message = "Table or segment not found", response =
ErrorInfo.class)
})
public String getTableIndexes(
@ApiParam(value = "Table name including type", required = true, example = "myTable_OFFLINE")
@PathParam("tableName") String tableName)
throws Exception {
TableDataManager tableDataManager = ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableName);
List<SegmentDataManager> allSegments = tableDataManager.acquireAllSegments();
try {
int totalSegmentCount = 0;
Map<String, Map<String, Integer>> columnToIndexesCount = new HashMap<>();
for (SegmentDataManager segmentDataManager : allSegments) {
if (segmentDataManager instanceof RealtimeSegmentDataManager) {
// REALTIME segments may not have indexes since not all indexes have mutable implementations
continue;
}
totalSegmentCount++;
IndexSegment segment = segmentDataManager.getSegment();
segment.getColumnNames().forEach(col -> {
columnToIndexesCount.putIfAbsent(col, new HashMap<>());
DataSource colDataSource = segment.getDataSource(col);
IndexService.getInstance().getAllIndexes().forEach(idxType -> {
int count = colDataSource.getIndex(idxType) != null ? 1 : 0;
columnToIndexesCount.get(col).merge(idxType.getId(), count, Integer::sum);
});
});
}
TableIndexMetadataResponse tableIndexMetadataResponse =
new TableIndexMetadataResponse(totalSegmentCount, columnToIndexesCount);
return JsonUtils.objectToString(tableIndexMetadataResponse);
} finally {
for (SegmentDataManager segmentDataManager : allSegments) {
tableDataManager.releaseSegment(segmentDataManager);
}
}
}

@GET
@Encoded
@Path("/tables/{tableName}/segments/{segmentName}/metadata")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.Response;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.response.server.TableIndexMetadataResponse;
import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
import org.apache.pinot.common.restlet.resources.TableSegments;
import org.apache.pinot.common.restlet.resources.TablesList;
Expand All @@ -35,6 +38,8 @@
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.IndexService;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
Expand Down Expand Up @@ -109,6 +114,38 @@ public void getSegments()
Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
}

@Test
public void getTableIndexes()
throws Exception {
String tableIndexesPath =
"/tables/" + TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TABLE_NAME) + "/indexes";

JsonNode jsonResponse = JsonUtils.stringToJsonNode(_webTarget.path(tableIndexesPath).request().get(String.class));
TableIndexMetadataResponse tableIndexMetadataResponse =
JsonUtils.jsonNodeToObject(jsonResponse, TableIndexMetadataResponse.class);
Assert.assertNotNull(tableIndexMetadataResponse);
Assert.assertEquals(tableIndexMetadataResponse.getTotalOnlineSegments(), _offlineIndexSegments.size());

Map<String, Map<String, Integer>> columnToIndexCountMap = new HashMap<>();
for (ImmutableSegment segment : _offlineIndexSegments) {
segment.getColumnNames().forEach(colName -> {
DataSource dataSource = segment.getDataSource(colName);
columnToIndexCountMap.putIfAbsent(colName, new HashMap<>());
IndexService.getInstance().getAllIndexes().forEach(indexType -> {
int count = dataSource.getIndex(indexType) != null ? 1 : 0;
columnToIndexCountMap.get(colName).merge(indexType.getId(), count, Integer::sum);
});
});
}

Assert.assertEquals(tableIndexMetadataResponse.getColumnToIndexesCount(), columnToIndexCountMap);

// No such table
Response response = _webTarget.path("/tables/noSuchTable/indexes").request().get(Response.class);
Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
}

@Test
public void getTableMetadata()
throws Exception {
Expand Down

0 comments on commit 49196ec

Please sign in to comment.