Skip to content

Commit

Permalink
Enhance Broker reducer to handle expression format change (apache#11762)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Oct 12, 2023
1 parent 314cc03 commit ad0d2b1
Show file tree
Hide file tree
Showing 13 changed files with 736 additions and 329 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
Expand All @@ -47,9 +45,12 @@
@SuppressWarnings({"rawtypes", "unchecked"})
public class AggregationDataTableReducer implements DataTableReducer {
private final QueryContext _queryContext;
private final AggregationFunction[] _aggregationFunctions;

AggregationDataTableReducer(QueryContext queryContext) {
public AggregationDataTableReducer(QueryContext queryContext) {
_queryContext = queryContext;
_aggregationFunctions = _queryContext.getAggregationFunctions();
assert _aggregationFunctions != null;
}

/**
Expand All @@ -59,11 +60,11 @@ public class AggregationDataTableReducer implements DataTableReducer {
public void reduceAndSetResults(String tableName, DataSchema dataSchema,
Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative,
DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
assert dataSchema != null;
dataSchema = ReducerDataSchemaUtils.canonicalizeDataSchemaForAggregation(_queryContext, dataSchema);

if (dataTableMap.isEmpty()) {
DataSchema resultTableSchema =
new PostAggregationHandler(_queryContext, getPrePostAggregationDataSchema()).getResultDataSchema();
new PostAggregationHandler(_queryContext, getPrePostAggregationDataSchema(dataSchema)).getResultDataSchema();
brokerResponseNative.setResultTable(new ResultTable(resultTableSchema, Collections.emptyList()));
return;
}
Expand All @@ -78,9 +79,7 @@ public void reduceAndSetResults(String tableName, DataSchema dataSchema,

private void reduceWithIntermediateResult(DataSchema dataSchema, Collection<DataTable> dataTables,
BrokerResponseNative brokerResponseNative) {
AggregationFunction[] aggregationFunctions = _queryContext.getAggregationFunctions();
assert aggregationFunctions != null;
int numAggregationFunctions = aggregationFunctions.length;
int numAggregationFunctions = _aggregationFunctions.length;
Object[] intermediateResults = new Object[numAggregationFunctions];
for (DataTable dataTable : dataTables) {
for (int i = 0; i < numAggregationFunctions; i++) {
Expand All @@ -100,25 +99,23 @@ private void reduceWithIntermediateResult(DataSchema dataSchema, Collection<Data
if (mergedIntermediateResult == null) {
intermediateResults[i] = intermediateResultToMerge;
} else {
intermediateResults[i] = aggregationFunctions[i].merge(mergedIntermediateResult, intermediateResultToMerge);
intermediateResults[i] = _aggregationFunctions[i].merge(mergedIntermediateResult, intermediateResultToMerge);
}
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(i);
}
}
Object[] finalResults = new Object[numAggregationFunctions];
for (int i = 0; i < numAggregationFunctions; i++) {
AggregationFunction aggregationFunction = aggregationFunctions[i];
AggregationFunction aggregationFunction = _aggregationFunctions[i];
Comparable result = aggregationFunction.extractFinalResult(intermediateResults[i]);
finalResults[i] = result == null ? null : aggregationFunction.getFinalResultColumnType().convert(result);
}
brokerResponseNative.setResultTable(reduceToResultTable(finalResults));
brokerResponseNative.setResultTable(reduceToResultTable(getPrePostAggregationDataSchema(dataSchema), finalResults));
}

private void reduceWithFinalResult(DataSchema dataSchema, DataTable dataTable,
BrokerResponseNative brokerResponseNative) {
AggregationFunction[] aggregationFunctions = _queryContext.getAggregationFunctions();
assert aggregationFunctions != null;
int numAggregationFunctions = aggregationFunctions.length;
int numAggregationFunctions = _aggregationFunctions.length;
Object[] finalResults = new Object[numAggregationFunctions];
for (int i = 0; i < numAggregationFunctions; i++) {
ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
Expand All @@ -133,50 +130,42 @@ private void reduceWithFinalResult(DataSchema dataSchema, DataTable dataTable,
finalResults[i] = AggregationFunctionUtils.getConvertedFinalResult(dataTable, columnDataType, 0, i);
}
}
brokerResponseNative.setResultTable(reduceToResultTable(finalResults));
brokerResponseNative.setResultTable(reduceToResultTable(dataSchema, finalResults));
}

/**
* Sets aggregation results into ResultsTable
*/
private ResultTable reduceToResultTable(Object[] finalResults) {
PostAggregationHandler postAggregationHandler =
new PostAggregationHandler(_queryContext, getPrePostAggregationDataSchema());
DataSchema dataSchema = postAggregationHandler.getResultDataSchema();
private ResultTable reduceToResultTable(DataSchema dataSchema, Object[] finalResults) {
PostAggregationHandler postAggregationHandler = new PostAggregationHandler(_queryContext, dataSchema);
DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
Object[] row = postAggregationHandler.getResult(finalResults);

RewriterResult resultRewriterResult =
ResultRewriteUtils.rewriteResult(dataSchema, Collections.singletonList(row));
dataSchema = resultRewriterResult.getDataSchema();
ResultRewriteUtils.rewriteResult(resultDataSchema, Collections.singletonList(row));
resultDataSchema = resultRewriterResult.getDataSchema();
List<Object[]> rows = resultRewriterResult.getRows();

ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
ColumnDataType[] columnDataTypes = resultDataSchema.getColumnDataTypes();
int numColumns = columnDataTypes.length;
for (Object[] rewrittenRow : rows) {
for (int j = 0; j < numColumns; j++) {
rewrittenRow[j] = columnDataTypes[j].format(rewrittenRow[j]);
}
}

return new ResultTable(dataSchema, rows);
return new ResultTable(resultDataSchema, rows);
}

/**
* Constructs the DataSchema for the rows before the post-aggregation (SQL mode).
*/
private DataSchema getPrePostAggregationDataSchema() {
List<Pair<AggregationFunction, FilterContext>> filteredAggregationFunctions =
_queryContext.getFilteredAggregationFunctions();
assert filteredAggregationFunctions != null;
int numColumns = filteredAggregationFunctions.size();
String[] columnNames = new String[numColumns];
ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
for (int i = 0; i < numColumns; i++) {
Pair<AggregationFunction, FilterContext> pair = filteredAggregationFunctions.get(i);
AggregationFunction aggregationFunction = pair.getLeft();
columnNames[i] = AggregationFunctionUtils.getResultColumnName(aggregationFunction, pair.getRight());
columnDataTypes[i] = aggregationFunction.getFinalResultColumnType();
private DataSchema getPrePostAggregationDataSchema(DataSchema dataSchema) {
int numAggregationFunctions = _aggregationFunctions.length;
ColumnDataType[] columnDataTypes = new ColumnDataType[numAggregationFunctions];
for (int i = 0; i < numAggregationFunctions; i++) {
columnDataTypes[i] = _aggregationFunctions[i].getFinalResultColumnType();
}
return new DataSchema(columnNames, columnDataTypes);
return new DataSchema(dataSchema.getColumnNames(), columnDataTypes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
*/
package org.apache.pinot.core.query.reduce;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
Expand All @@ -38,6 +42,8 @@
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -46,6 +52,8 @@
*/
@ThreadSafe
public class BrokerReduceService extends BaseReduceService {
private static final Logger LOGGER = LoggerFactory.getLogger(BrokerReduceService.class);

public BrokerReduceService(PinotConfiguration config) {
super(config);
}
Expand All @@ -65,7 +73,9 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, Broke
BrokerResponseNative brokerResponseNative = new BrokerResponseNative();

// Cache a data schema from data tables (try to cache one with data rows associated with it).
DataSchema cachedDataSchema = null;
DataSchema dataSchemaFromEmptyDataTable = null;
DataSchema dataSchemaFromNonEmptyDataTable = null;
List<ServerRoutingInstance> serversWithConflictingDataSchema = new ArrayList<>();

// Process server response metadata.
Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator = dataTableMap.entrySet().iterator();
Expand All @@ -83,12 +93,22 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, Broke
} else {
// Try to cache a data table with data rows inside, or cache one with data schema inside.
if (dataTable.getNumberOfRows() == 0) {
if (cachedDataSchema == null) {
cachedDataSchema = dataSchema;
if (dataSchemaFromEmptyDataTable == null) {
dataSchemaFromEmptyDataTable = dataSchema;
}
iterator.remove();
} else {
cachedDataSchema = dataSchema;
if (dataSchemaFromNonEmptyDataTable == null) {
dataSchemaFromNonEmptyDataTable = dataSchema;
} else {
// Remove data tables with conflicting data schema.
// NOTE: Only compare the column data types, since the column names (string representation of expression)
// can change across different versions.
if (!Arrays.equals(dataSchema.getColumnDataTypes(), dataSchemaFromNonEmptyDataTable.getColumnDataTypes())) {
serversWithConflictingDataSchema.add(entry.getKey());
iterator.remove();
}
}
}
}
}
Expand All @@ -99,8 +119,23 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, Broke
// Set execution statistics and Update broker metrics.
aggregator.setStats(rawTableName, brokerResponseNative, brokerMetrics);

// Report the servers with conflicting data schema.
if (!serversWithConflictingDataSchema.isEmpty()) {
String errorMessage =
String.format("%s: responses for table: %s from servers: %s got dropped due to data schema inconsistency.",
QueryException.MERGE_RESPONSE_ERROR.getMessage(), tableName, serversWithConflictingDataSchema);
LOGGER.warn(errorMessage);
if (brokerMetrics != null) {
brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1L);
}
brokerResponseNative.addToExceptions(
new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
}

// NOTE: When there is no cached data schema, that means all servers encountered exception. In such case, return the
// response with metadata only.
DataSchema cachedDataSchema =
dataSchemaFromNonEmptyDataTable != null ? dataSchemaFromNonEmptyDataTable : dataSchemaFromEmptyDataTable;
if (cachedDataSchema == null) {
return brokerResponseNative;
}
Expand All @@ -124,8 +159,7 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, Broke
if (gapfillType == null) {
throw new BadQueryRequestException("Nested query is not supported without gapfill");
}
BaseGapfillProcessor gapfillProcessor =
GapfillProcessorFactory.getGapfillProcessor(queryContext, gapfillType);
BaseGapfillProcessor gapfillProcessor = GapfillProcessorFactory.getGapfillProcessor(queryContext, gapfillType);
gapfillProcessor.process(brokerResponseNative);
}

Expand Down
Loading

0 comments on commit ad0d2b1

Please sign in to comment.