Skip to content

Commit

Permalink
Enable compaction ITs on MSQ engine (apache#16778)
Browse files Browse the repository at this point in the history
Follow-up to apache#16291, this commit enables a subset of existing native compaction ITs on the MSQ engine.

In the process, the following changes have been introduced in the MSQ compaction flow:
- Populate `metricsSpec` in `CompactionState` from `querySpec` in `MSQControllerTask` instead of `dataSchema`
- Add check for pre-rolled-up segments having `AggregatorFactory` with different input and output column names
- Fix passing missing cluster-by clause in scan queries
- Add annotation of `CompactionState` to tombstone segments
  • Loading branch information
gargvishesh authored and sreemanamala committed Aug 6, 2024
1 parent ebb5aa2 commit d4e603d
Show file tree
Hide file tree
Showing 13 changed files with 573 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.msq.exec;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -1357,7 +1356,10 @@ private void postResultPartitionBoundariesForStage(
* Publish the list of segments. Additionally, if {@link DataSourceMSQDestination#isReplaceTimeChunks()},
* also drop all other segments within the replacement intervals.
*/
private void publishAllSegments(final Set<DataSegment> segments) throws IOException
private void publishAllSegments(
final Set<DataSegment> segments,
Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction
) throws IOException
{
final DataSourceMSQDestination destination =
(DataSourceMSQDestination) querySpec.getDestination();
Expand Down Expand Up @@ -1413,7 +1415,7 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
}
performSegmentPublish(
context.taskActionClient(),
createOverwriteAction(taskLockType, segmentsWithTombstones)
createOverwriteAction(taskLockType, compactionStateAnnotateFunction.apply(segmentsWithTombstones))
);
}
} else if (!segments.isEmpty()) {
Expand Down Expand Up @@ -1543,6 +1545,7 @@ private void handleQueryResults(
if (MSQControllerTask.isIngestion(querySpec)) {
// Publish segments if needed.
final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber());
Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = Function.identity();

@SuppressWarnings("unchecked")
Set<DataSegment> segments = (Set<DataSegment>) queryKernel.getResultObjectForStage(finalStageId);
Expand All @@ -1553,7 +1556,7 @@ private void handleQueryResults(
Tasks.DEFAULT_STORE_COMPACTION_STATE
);

if (!segments.isEmpty() && storeCompactionState) {
if (storeCompactionState) {
DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination();
if (!destination.isReplaceTimeChunks()) {
// Store compaction state only for replace queries.
Expand All @@ -1565,20 +1568,21 @@ private void handleQueryResults(
DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) queryKernel
.getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema();

ShardSpec shardSpec = segments.stream().findFirst().get().getShardSpec();
ShardSpec shardSpec = segments.isEmpty() ? null : segments.stream().findFirst().get().getShardSpec();
ClusterBy clusterBy = queryKernel.getStageDefinition(finalStageId).getClusterBy();

Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = addCompactionStateToSegments(
compactionStateAnnotateFunction = addCompactionStateToSegments(
querySpec,
context.jsonMapper(),
dataSchema,
shardSpec,
clusterBy,
queryDef.getQueryId()
);
segments = compactionStateAnnotateFunction.apply(segments);
}
}
log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size());
publishAllSegments(segments);
publishAllSegments(segments, compactionStateAnnotateFunction);
} else if (MSQControllerTask.isExport(querySpec)) {
// Write manifest file.
ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination();
Expand Down Expand Up @@ -1624,33 +1628,49 @@ private static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateTo
MSQSpec querySpec,
ObjectMapper jsonMapper,
DataSchema dataSchema,
ShardSpec shardSpec,
@Nullable ShardSpec shardSpec,
@Nullable ClusterBy clusterBy,
String queryId
)
{
final MSQTuningConfig tuningConfig = querySpec.getTuningConfig();
PartitionsSpec partitionSpec;

if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) {
List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions();
// shardSpec is absent in the absence of segments, which happens when only tombstones are generated by an
// MSQControllerTask.
if (shardSpec != null) {
if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) {
List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions();
partitionSpec = new DimensionRangePartitionsSpec(
tuningConfig.getRowsPerSegment(),
null,
partitionDimensions,
false
);
} else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) {
// MSQ tasks don't use maxTotalRows. Hence using LONG.MAX_VALUE.
partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
} else {
// SingleDimenionShardSpec and other shard specs are never created in MSQ.
throw new MSQException(
UnknownFault.forMessage(
StringUtils.format(
"Query[%s] cannot store compaction state in segments as shard spec of unsupported type[%s].",
queryId,
shardSpec.getType()
)));
}
} else if (clusterBy != null && !clusterBy.getColumns().isEmpty()) {
partitionSpec = new DimensionRangePartitionsSpec(
tuningConfig.getRowsPerSegment(),
null,
partitionDimensions,
clusterBy.getColumns()
.stream()
.map(KeyColumn::columnName).collect(Collectors.toList()),
false
);
} else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) {
// MSQ tasks don't use maxTotalRows. Hence using LONG.MAX_VALUE.
partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
} else {
// SingleDimenionShardSpec and other shard specs are never created in MSQ.
throw new MSQException(
UnknownFault.forMessage(
StringUtils.format(
"Query[%s] cannot store compaction state in segments as shard spec of unsupported type[%s].",
queryId,
shardSpec.getType()
)));
partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
}

Granularity segmentGranularity = ((DataSourceMSQDestination) querySpec.getDestination())
Expand All @@ -1671,13 +1691,26 @@ private static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateTo
: new ClientCompactionTaskTransformSpec(
dataSchema.getTransformSpec().getFilter()
).asMap(jsonMapper);
List<Object> metricsSpec = dataSchema.getAggregators() == null
? null
: jsonMapper.convertValue(
dataSchema.getAggregators(),
new TypeReference<List<Object>>() {}
);

List<Object> metricsSpec = Collections.emptyList();

if (querySpec.getQuery() instanceof GroupByQuery) {
// For group-by queries, the aggregators are transformed to their combining factories in the dataschema, resulting
// in a mismatch between schema in compaction spec and the one in compaction state. Sourcing the original
// AggregatorFactory definition for aggregators in the dataSchema, therefore, directly from the querySpec.
GroupByQuery groupByQuery = (GroupByQuery) querySpec.getQuery();
// Collect all aggregators that are part of the current dataSchema, since a non-rollup query (isRollup() is false)
// moves metrics columns to dimensions in the final schema.
Set<String> aggregatorsInDataSchema = Arrays.stream(dataSchema.getAggregators())
.map(AggregatorFactory::getName)
.collect(
Collectors.toSet());
metricsSpec = new ArrayList<>(
groupByQuery.getAggregatorSpecs()
.stream()
.filter(aggregatorFactory -> aggregatorsInDataSchema.contains(aggregatorFactory.getName()))
.collect(Collectors.toList())
);
}

IndexSpec indexSpec = tuningConfig.getIndexSpec();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.expression.TimestampFloorExprMacro;
Expand All @@ -58,11 +60,13 @@
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
Expand Down Expand Up @@ -123,7 +127,8 @@ public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, @JacksonInjec
*/
@Override
public CompactionConfigValidationResult validateCompactionTask(
CompactionTask compactionTask
CompactionTask compactionTask,
Map<Interval, DataSchema> intervalToDataSchemaMap
)
{
List<CompactionConfigValidationResult> validationResults = new ArrayList<>();
Expand All @@ -139,13 +144,57 @@ public CompactionConfigValidationResult validateCompactionTask(
));
}
validationResults.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(compactionTask.getContext()));
validationResults.add(ClientCompactionRunnerInfo.validateMetricsSpecForMSQ(compactionTask.getMetricsSpec()));
validationResults.add(validateRolledUpSegments(intervalToDataSchemaMap));
return validationResults.stream()
.filter(result -> !result.isValid())
.findFirst()
.orElse(CompactionConfigValidationResult.success());
}

/**
* Valides that there are no rolled-up segments where either:
* <ul>
* <li>aggregator factory differs from its combining factory </li>
* <li>input col name is different from the output name (non-idempotent)</li>
* </ul>
*/
private CompactionConfigValidationResult validateRolledUpSegments(Map<Interval, DataSchema> intervalToDataSchemaMap)
{
for (Map.Entry<Interval, DataSchema> intervalDataSchema : intervalToDataSchemaMap.entrySet()) {
if (intervalDataSchema.getValue() instanceof CombinedDataSchema) {
CombinedDataSchema combinedDataSchema = (CombinedDataSchema) intervalDataSchema.getValue();
if (combinedDataSchema.hasRolledUpSegments()) {
for (AggregatorFactory aggregatorFactory : combinedDataSchema.getAggregators()) {
// This is a conservative check as existing rollup may have been idempotent but the aggregator provided in
// compaction spec isn't. This would get properly compacted yet fails in the below pre-check.
if (
!(
aggregatorFactory.getClass().equals(aggregatorFactory.getCombiningFactory().getClass()) &&
(
aggregatorFactory.requiredFields().isEmpty() ||
(aggregatorFactory.requiredFields().size() == 1 &&
aggregatorFactory.requiredFields()
.get(0)
.equals(aggregatorFactory.getName()))
)
)
) {
// MSQ doesn't support rolling up already rolled-up segments when aggregate column name is different from
// the aggregated column name. This is because the aggregated values would then get overwritten by new
// values and the existing values would be lost. Note that if no rollup is specified in an index spec,
// the default value is true.
return CompactionConfigValidationResult.failure(
"MSQ: Rolled-up segments in compaction interval[%s].",
intervalDataSchema.getKey()
);
}
}
}
}
}
return CompactionConfigValidationResult.success();
}

@Override
public CurrentSubTaskHolder getCurrentSubTaskHolder()
{
Expand Down Expand Up @@ -291,6 +340,10 @@ private static RowSignature getRowSignature(DataSchema dataSchema)
for (DimensionSchema dimensionSchema : dataSchema.getDimensionsSpec().getDimensions()) {
rowSignatureBuilder.add(dimensionSchema.getName(), ColumnType.fromString(dimensionSchema.getTypeName()));
}
// There can be columns that are part of metricsSpec for a datasource.
for (AggregatorFactory aggregatorFactory : dataSchema.getAggregators()) {
rowSignatureBuilder.add(aggregatorFactory.getName(), aggregatorFactory.getIntermediateType());
}
return rowSignatureBuilder.build();
}

Expand Down Expand Up @@ -354,14 +407,30 @@ private static List<OrderByColumnSpec> getOrderBySpec(PartitionsSpec partitionSp
private static Query<?> buildScanQuery(CompactionTask compactionTask, Interval interval, DataSchema dataSchema)
{
RowSignature rowSignature = getRowSignature(dataSchema);
return new Druids.ScanQueryBuilder().dataSource(dataSchema.getDataSource())
.columns(rowSignature.getColumnNames())
.virtualColumns(getVirtualColumns(dataSchema, interval))
.columnTypes(rowSignature.getColumnTypes())
.intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(interval)))
.filters(dataSchema.getTransformSpec().getFilter())
.context(compactionTask.getContext())
.build();
Druids.ScanQueryBuilder scanQueryBuilder = new Druids.ScanQueryBuilder()
.dataSource(dataSchema.getDataSource())
.columns(rowSignature.getColumnNames())
.virtualColumns(getVirtualColumns(dataSchema, interval))
.columnTypes(rowSignature.getColumnTypes())
.intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(interval)))
.filters(dataSchema.getTransformSpec().getFilter())
.context(compactionTask.getContext());

if (compactionTask.getTuningConfig() != null && compactionTask.getTuningConfig().getPartitionsSpec() != null) {
List<OrderByColumnSpec> orderByColumnSpecs = getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec());

scanQueryBuilder.orderBy(
orderByColumnSpecs
.stream()
.map(orderByColumnSpec ->
new ScanQuery.OrderBy(
orderByColumnSpec.getDimension(),
ScanQuery.Order.fromString(orderByColumnSpec.getDirection().toString())
))
.collect(Collectors.toList())
);
}
return scanQueryBuilder.build();
}

private static boolean isGroupBy(DataSchema dataSchema)
Expand Down Expand Up @@ -468,7 +537,10 @@ private Map<String, Object> createMSQTaskContext(CompactionTask compactionTask,
);
}
// Similar to compaction using the native engine, don't finalize aggregations.
// Used for writing the data schema during segment generation phase.
context.putIfAbsent(MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS, false);
// Add appropriate finalization to native query context i.e. for the GroupBy query
context.put(QueryContexts.FINALIZE_KEY, false);
// Only scalar or array-type dimensions are allowed as grouping keys.
context.putIfAbsent(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, false);
return context;
Expand Down
Loading

0 comments on commit d4e603d

Please sign in to comment.