Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable compaction ITs on MSQ engine #16778

Merged
merged 7 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1357,7 +1357,10 @@ private void postResultPartitionBoundariesForStage(
* Publish the list of segments. Additionally, if {@link DataSourceMSQDestination#isReplaceTimeChunks()},
* also drop all other segments within the replacement intervals.
*/
private void publishAllSegments(final Set<DataSegment> segments) throws IOException
private void publishAllSegments(
final Set<DataSegment> segments,
Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction
) throws IOException
{
final DataSourceMSQDestination destination =
(DataSourceMSQDestination) querySpec.getDestination();
Expand Down Expand Up @@ -1413,7 +1416,7 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
}
performSegmentPublish(
context.taskActionClient(),
createOverwriteAction(taskLockType, segmentsWithTombstones)
createOverwriteAction(taskLockType, compactionStateAnnotateFunction.apply(segmentsWithTombstones))
);
}
} else if (!segments.isEmpty()) {
Expand Down Expand Up @@ -1543,6 +1546,7 @@ private void handleQueryResults(
if (MSQControllerTask.isIngestion(querySpec)) {
// Publish segments if needed.
final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber());
Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = Function.identity();

@SuppressWarnings("unchecked")
Set<DataSegment> segments = (Set<DataSegment>) queryKernel.getResultObjectForStage(finalStageId);
Expand All @@ -1553,7 +1557,7 @@ private void handleQueryResults(
Tasks.DEFAULT_STORE_COMPACTION_STATE
);

if (!segments.isEmpty() && storeCompactionState) {
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
if (storeCompactionState) {
DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination();
if (!destination.isReplaceTimeChunks()) {
// Store compaction state only for replace queries.
Expand All @@ -1565,20 +1569,21 @@ private void handleQueryResults(
DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) queryKernel
.getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema();

ShardSpec shardSpec = segments.stream().findFirst().get().getShardSpec();
ShardSpec shardSpec = segments.isEmpty() ? null : segments.stream().findFirst().get().getShardSpec();
ClusterBy clusterBy = queryKernel.getStageDefinition(finalStageId).getClusterBy();

Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = addCompactionStateToSegments(
compactionStateAnnotateFunction = addCompactionStateToSegments(
querySpec,
context.jsonMapper(),
dataSchema,
shardSpec,
clusterBy,
queryDef.getQueryId()
);
segments = compactionStateAnnotateFunction.apply(segments);
}
}
log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size());
publishAllSegments(segments);
publishAllSegments(segments, compactionStateAnnotateFunction);
} else if (MSQControllerTask.isExport(querySpec)) {
// Write manifest file.
ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination();
Expand Down Expand Up @@ -1624,33 +1629,49 @@ private static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateTo
MSQSpec querySpec,
ObjectMapper jsonMapper,
DataSchema dataSchema,
ShardSpec shardSpec,
@Nullable ShardSpec shardSpec,
@Nullable ClusterBy clusterBy,
String queryId
)
{
final MSQTuningConfig tuningConfig = querySpec.getTuningConfig();
PartitionsSpec partitionSpec;

if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) {
List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions();
// shardSpec is absent in the absence of segments, which happens when only tombstones are generated by an
// MSQControllerTask.
if (shardSpec != null) {
if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) {
List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions();
partitionSpec = new DimensionRangePartitionsSpec(
tuningConfig.getRowsPerSegment(),
null,
partitionDimensions,
false
);
} else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) {
// MSQ tasks don't use maxTotalRows. Hence using LONG.MAX_VALUE.
partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
} else {
// SingleDimenionShardSpec and other shard specs are never created in MSQ.
throw new MSQException(
UnknownFault.forMessage(
StringUtils.format(
"Query[%s] cannot store compaction state in segments as shard spec of unsupported type[%s].",
queryId,
shardSpec.getType()
)));
}
} else if (clusterBy != null && !clusterBy.getColumns().isEmpty()) {
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
partitionSpec = new DimensionRangePartitionsSpec(
tuningConfig.getRowsPerSegment(),
null,
partitionDimensions,
clusterBy.getColumns()
.stream()
.map(KeyColumn::columnName).collect(Collectors.toList()),
false
);
} else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) {
// MSQ tasks don't use maxTotalRows. Hence using LONG.MAX_VALUE.
partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
} else {
// SingleDimenionShardSpec and other shard specs are never created in MSQ.
throw new MSQException(
UnknownFault.forMessage(
StringUtils.format(
"Query[%s] cannot store compaction state in segments as shard spec of unsupported type[%s].",
queryId,
shardSpec.getType()
)));
partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
}

Granularity segmentGranularity = ((DataSourceMSQDestination) querySpec.getDestination())
Expand All @@ -1671,13 +1692,15 @@ private static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateTo
: new ClientCompactionTaskTransformSpec(
dataSchema.getTransformSpec().getFilter()
).asMap(jsonMapper);
List<Object> metricsSpec = dataSchema.getAggregators() == null
? null
: jsonMapper.convertValue(
dataSchema.getAggregators(),
new TypeReference<List<Object>>() {}
);
List<Object> metricsSpec = Collections.emptyList();

if (querySpec.getQuery() instanceof GroupByQuery) {
// For group-by queries, the aggregators are transformed to their combining factories in the dataschema, resulting
gargvishesh marked this conversation as resolved.
Show resolved Hide resolved
// in a mismatch between schema in compaction spec and the one in compaction state. Sourcing the metricsSpec
// therefore directly from the querySpec.
GroupByQuery groupByQuery = (GroupByQuery) querySpec.getQuery();
metricsSpec = jsonMapper.convertValue(groupByQuery.getAggregatorSpecs(), new TypeReference<List<Object>>() {});
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
}

IndexSpec indexSpec = tuningConfig.getIndexSpec();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.expression.TimestampFloorExprMacro;
Expand All @@ -58,11 +60,13 @@
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
Expand Down Expand Up @@ -123,7 +127,8 @@ public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, @JacksonInjec
*/
@Override
public CompactionConfigValidationResult validateCompactionTask(
CompactionTask compactionTask
CompactionTask compactionTask,
Map<Interval, DataSchema> intervalToDataSchemaMap
)
{
List<CompactionConfigValidationResult> validationResults = new ArrayList<>();
Expand All @@ -139,13 +144,58 @@ public CompactionConfigValidationResult validateCompactionTask(
));
}
validationResults.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(compactionTask.getContext()));
validationResults.add(ClientCompactionRunnerInfo.validateMetricsSpecForMSQ(compactionTask.getMetricsSpec()));
validationResults.add(validateRolledUpSegments(intervalToDataSchemaMap));
return validationResults.stream()
.filter(result -> !result.isValid())
.findFirst()
.orElse(new CompactionConfigValidationResult(true, null));
}

/**
* Valides that there are no rolled-up segments where either:
* <ul>
* <li>aggregator factory differs from its combining factory </li>
* <li>input col name is different from the output name (non-idempotent)</li>
* </ul>
*/
private CompactionConfigValidationResult validateRolledUpSegments(Map<Interval, DataSchema> intervalToDataSchemaMap)
{
for (Map.Entry<Interval, DataSchema> intervalDataSchema : intervalToDataSchemaMap.entrySet()) {
if (intervalDataSchema.getValue() instanceof CombinedDataSchema) {
CombinedDataSchema combinedDataSchema = (CombinedDataSchema) intervalDataSchema.getValue();
if (Boolean.TRUE.equals(combinedDataSchema.hasRolledUpSegments())) {
gargvishesh marked this conversation as resolved.
Show resolved Hide resolved
for (AggregatorFactory aggregatorFactory : combinedDataSchema.getAggregators()) {
// This is a conservative check as existing rollup may have been idempotent but the aggregator provided in
// compaction spec isn't. This would get properly compacted yet fails in the below pre-check.
if (
!(
aggregatorFactory.getClass().equals(aggregatorFactory.getCombiningFactory().getClass()) &&
(
aggregatorFactory.requiredFields().isEmpty() ||
(aggregatorFactory.requiredFields().size() == 1 &&
aggregatorFactory.requiredFields()
.get(0)
.equals(aggregatorFactory.getName()))
)
)
) {
// MSQ doesn't support rolling up already rolled-up segments when aggregate column name is different from
// the aggregated column name. This is because the aggregated values would then get overwritten by new
// values and the existing values would be lost. Note that if no rollup is specified in an index spec,
// the default value is true.
return new CompactionConfigValidationResult(
false,
gargvishesh marked this conversation as resolved.
Show resolved Hide resolved
"Rolled-up segments in interval[%s] for compaction not supported by MSQ engine.",
intervalDataSchema.getKey()
);
}
}
}
}
}
return new CompactionConfigValidationResult(true, null);
}

@Override
public CurrentSubTaskHolder getCurrentSubTaskHolder()
{
Expand Down Expand Up @@ -291,6 +341,10 @@ private static RowSignature getRowSignature(DataSchema dataSchema)
for (DimensionSchema dimensionSchema : dataSchema.getDimensionsSpec().getDimensions()) {
rowSignatureBuilder.add(dimensionSchema.getName(), ColumnType.fromString(dimensionSchema.getTypeName()));
}
// There can be columns that are part of metricsSpec for a datasource.
for (AggregatorFactory aggregatorFactory : dataSchema.getAggregators()) {
rowSignatureBuilder.add(aggregatorFactory.getName(), aggregatorFactory.getIntermediateType());
}
return rowSignatureBuilder.build();
}

Expand Down Expand Up @@ -354,14 +408,30 @@ private static List<OrderByColumnSpec> getOrderBySpec(PartitionsSpec partitionSp
private static Query<?> buildScanQuery(CompactionTask compactionTask, Interval interval, DataSchema dataSchema)
{
RowSignature rowSignature = getRowSignature(dataSchema);
return new Druids.ScanQueryBuilder().dataSource(dataSchema.getDataSource())
.columns(rowSignature.getColumnNames())
.virtualColumns(getVirtualColumns(dataSchema, interval))
.columnTypes(rowSignature.getColumnTypes())
.intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(interval)))
.filters(dataSchema.getTransformSpec().getFilter())
.context(compactionTask.getContext())
.build();
Druids.ScanQueryBuilder scanQueryBuilder = new Druids.ScanQueryBuilder()
.dataSource(dataSchema.getDataSource())
.columns(rowSignature.getColumnNames())
.virtualColumns(getVirtualColumns(dataSchema, interval))
.columnTypes(rowSignature.getColumnTypes())
.intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(interval)))
.filters(dataSchema.getTransformSpec().getFilter())
.context(compactionTask.getContext());

if (compactionTask.getTuningConfig() != null && compactionTask.getTuningConfig().getPartitionsSpec() != null) {
List<OrderByColumnSpec> orderByColumnSpecs = getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec());

scanQueryBuilder.orderBy(
orderByColumnSpecs
.stream()
.map(orderByColumnSpec ->
new ScanQuery.OrderBy(
orderByColumnSpec.getDimension(),
ScanQuery.Order.fromString(orderByColumnSpec.getDirection().toString())
))
.collect(Collectors.toList())
);
}
return scanQueryBuilder.build();
}

private static boolean isGroupBy(DataSchema dataSchema)
Expand Down Expand Up @@ -468,7 +538,10 @@ private Map<String, Object> createMSQTaskContext(CompactionTask compactionTask,
);
}
// Similar to compaction using the native engine, don't finalize aggregations.
// Used for writing the data schema during segment generation phase.
context.putIfAbsent(MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS, false);
// Add appropriate finalization to native query context i.e. for the GroupBy query
context.put(QueryContexts.FINALIZE_KEY, false);
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
// Only scalar or array-type dimensions are allowed as grouping keys.
context.putIfAbsent(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, false);
return context;
Expand Down
Loading
Loading