Skip to content

Commit

Permalink
Enhancing metadata API to return upsert partition to primary key coun…
Browse files Browse the repository at this point in the history
…t map for both controller and server APIs (apache#12334)
  • Loading branch information
9aman authored Feb 9, 2024
1 parent 2d41b38 commit 947b47e
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,17 @@ public class TableMetadataInfo {
private final Map<String, Double> _columnCardinalityMap;
private final Map<String, Double> _maxNumMultiValuesMap;
private final Map<String, Map<String, Double>> _columnIndexSizeMap;
private final Map<Integer, Map<String, Long>> _upsertPartitionToServerPrimaryKeyCountMap;

@JsonCreator
public TableMetadataInfo(@JsonProperty("tableName") String tableName,
@JsonProperty("diskSizeInBytes") long sizeInBytes, @JsonProperty("numSegments") long numSegments,
@JsonProperty("numRows") long numRows, @JsonProperty("columnLengthMap") Map<String, Double> columnLengthMap,
@JsonProperty("columnCardinalityMap") Map<String, Double> columnCardinalityMap,
@JsonProperty("maxNumMultiValuesMap") Map<String, Double> maxNumMultiValuesMap,
@JsonProperty("columnIndexSizeMap") Map<String, Map<String, Double>> columnIndexSizeMap) {
@JsonProperty("columnIndexSizeMap") Map<String, Map<String, Double>> columnIndexSizeMap,
@JsonProperty("upsertPartitionToServerPrimaryKeyCountMap")
Map<Integer, Map<String, Long>> upsertPartitionToServerPrimaryKeyCountMap) {
_tableName = tableName;
_diskSizeInBytes = sizeInBytes;
_numSegments = numSegments;
Expand All @@ -59,6 +62,7 @@ public TableMetadataInfo(@JsonProperty("tableName") String tableName,
_columnCardinalityMap = columnCardinalityMap;
_maxNumMultiValuesMap = maxNumMultiValuesMap;
_columnIndexSizeMap = columnIndexSizeMap;
_upsertPartitionToServerPrimaryKeyCountMap = upsertPartitionToServerPrimaryKeyCountMap;
}

public String getTableName() {
Expand Down Expand Up @@ -92,4 +96,8 @@ public Map<String, Double> getMaxNumMultiValuesMap() {
public Map<String, Map<String, Double>> getColumnIndexSizeMap() {
return _columnIndexSizeMap;
}

public Map<Integer, Map<String, Long>> getUpsertPartitionToServerPrimaryKeyCountMap() {
return _upsertPartitionToServerPrimaryKeyCountMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public TableMetadataInfo getAggregatedTableMetadataFromServer(String tableNameWi
final Map<String, Double> columnCardinalityMap = new HashMap<>();
final Map<String, Double> maxNumMultiValuesMap = new HashMap<>();
final Map<String, Map<String, Double>> columnIndexSizeMap = new HashMap<>();
final Map<Integer, Map<String, Long>> upsertPartitionToServerPrimaryKeyCountMap = new HashMap<>();
for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
try {
TableMetadataInfo tableMetadataInfo =
Expand All @@ -128,6 +129,14 @@ public TableMetadataInfo getAggregatedTableMetadataFromServer(String tableNameWi
}
return l;
}));
tableMetadataInfo.getUpsertPartitionToServerPrimaryKeyCountMap().forEach(
(partition, serverToPrimaryKeyCount) -> upsertPartitionToServerPrimaryKeyCountMap.merge(partition,
new HashMap<>(serverToPrimaryKeyCount), (l, r) -> {
for (Map.Entry<String, Long> serverToPKCount : r.entrySet()) {
l.merge(serverToPKCount.getKey(), serverToPKCount.getValue(), Long::sum);
}
return l;
}));
} catch (IOException e) {
failedParses++;
LOGGER.error("Unable to parse server {} response due to an error: ", streamResponse.getKey(), e);
Expand All @@ -151,7 +160,7 @@ public TableMetadataInfo getAggregatedTableMetadataFromServer(String tableNameWi

TableMetadataInfo aggregateTableMetadataInfo =
new TableMetadataInfo(tableNameWithType, totalDiskSizeInBytes, totalNumSegments, totalNumRows, columnLengthMap,
columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizeMap);
columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizeMap, upsertPartitionToServerPrimaryKeyCountMap);
if (failedParses != 0) {
LOGGER.warn("Failed to parse {} / {} aggregated segment metadata responses from servers.", failedParses,
serverUrls.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,18 @@ public TableUpsertMetadataManager getTableUpsertMetadataManager() {
return _tableUpsertMetadataManager;
}

/**
* Retrieves a mapping of partition id to the primary key count for the partition.
*
* @return A {@code Map} where keys are partition id and values are count of primary keys for that specific partition.
*/
public Map<Integer, Long> getUpsertPartitionToPrimaryKeyCount() {
if (isUpsertEnabled()) {
return _tableUpsertMetadataManager.getPartitionToPrimaryKeyCount();
}
return Collections.emptyMap();
}

/**
* Validate a schema against the table config for real-time record consumption.
* Ideally, we should validate these things when schema is added or table is created, but either of these
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.helix.HelixManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
Expand Down Expand Up @@ -72,6 +73,11 @@ public PartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionI
public void stop() {
}

@Override
public Map<Integer, Long> getPartitionToPrimaryKeyCount() {
return Collections.emptyMap();
}

@Override
public void close()
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.upsert;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -45,6 +46,15 @@ public void stop() {
}
}

@Override
public Map<Integer, Long> getPartitionToPrimaryKeyCount() {
Map<Integer, Long> partitionToPrimaryKeyCount = new HashMap<>();
_partitionMetadataManagerMap.forEach(
(partitionID, upsertMetadataManager) -> partitionToPrimaryKeyCount.put(partitionID,
upsertMetadataManager.getNumPrimaryKeys()));
return partitionToPrimaryKeyCount;
}

@Override
public void close()
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.upsert;

import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -47,5 +48,12 @@ void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataMana
*/
void stop();

/**
* Retrieves a mapping of partition id to the primary key count for the partition.
*
* @return A {@code Map} where keys are partition id and values are count of primary keys for that specific partition
*/
Map<Integer, Long> getPartitionToPrimaryKeyCount();

boolean isPreloading();
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
Expand Down Expand Up @@ -286,9 +287,23 @@ public String getSegmentMetadata(
}
}

// fetch partition to primary key count for realtime tables that have upsert enabled
Map<Integer, Long> upsertPartitionToPrimaryKeyCountMap = new HashMap<>();
if (tableDataManager instanceof RealtimeTableDataManager) {
RealtimeTableDataManager realtimeTableDataManager = (RealtimeTableDataManager) tableDataManager;
upsertPartitionToPrimaryKeyCountMap = realtimeTableDataManager.getUpsertPartitionToPrimaryKeyCount();
}

// construct upsertPartitionToServerPrimaryKeyCountMap to populate in TableMetadataInfo
Map<Integer, Map<String, Long>> upsertPartitionToServerPrimaryKeyCountMap = new HashMap<>();
upsertPartitionToPrimaryKeyCountMap.forEach(
(partition, primaryKeyCount) -> upsertPartitionToServerPrimaryKeyCountMap.put(partition,
Map.of(instanceDataManager.getInstanceId(), primaryKeyCount)));

TableMetadataInfo tableMetadataInfo =
new TableMetadataInfo(tableDataManager.getTableName(), totalSegmentSizeBytes, segmentDataManagers.size(),
totalNumRows, columnLengthMap, columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizesMap);
totalNumRows, columnLengthMap, columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizesMap,
upsertPartitionToServerPrimaryKeyCountMap);
return ResourceUtils.convertToJsonString(tableMetadataInfo);
}

Expand Down

0 comments on commit 947b47e

Please sign in to comment.