Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable compaction ITs on MSQ engine #16778

Merged
merged 7 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1357,7 +1357,10 @@ private void postResultPartitionBoundariesForStage(
* Publish the list of segments. Additionally, if {@link DataSourceMSQDestination#isReplaceTimeChunks()},
* also drop all other segments within the replacement intervals.
*/
private void publishAllSegments(final Set<DataSegment> segments) throws IOException
private void publishAllSegments(
final Set<DataSegment> segments,
Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction
) throws IOException
{
final DataSourceMSQDestination destination =
(DataSourceMSQDestination) querySpec.getDestination();
Expand Down Expand Up @@ -1413,7 +1416,7 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
}
performSegmentPublish(
context.taskActionClient(),
createOverwriteAction(taskLockType, segmentsWithTombstones)
createOverwriteAction(taskLockType, compactionStateAnnotateFunction.apply(segmentsWithTombstones))
);
}
} else if (!segments.isEmpty()) {
Expand Down Expand Up @@ -1543,6 +1546,7 @@ private void handleQueryResults(
if (MSQControllerTask.isIngestion(querySpec)) {
// Publish segments if needed.
final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber());
Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = Function.identity();

@SuppressWarnings("unchecked")
Set<DataSegment> segments = (Set<DataSegment>) queryKernel.getResultObjectForStage(finalStageId);
Expand All @@ -1553,7 +1557,7 @@ private void handleQueryResults(
Tasks.DEFAULT_STORE_COMPACTION_STATE
);

if (!segments.isEmpty() && storeCompactionState) {
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
if (storeCompactionState) {
DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination();
if (!destination.isReplaceTimeChunks()) {
// Store compaction state only for replace queries.
Expand All @@ -1565,20 +1569,21 @@ private void handleQueryResults(
DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) queryKernel
.getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema();

ShardSpec shardSpec = segments.stream().findFirst().get().getShardSpec();
ShardSpec shardSpec = segments.isEmpty() ? null : segments.stream().findFirst().get().getShardSpec();
ClusterBy clusterBy = queryKernel.getStageDefinition(finalStageId).getClusterBy();

Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = addCompactionStateToSegments(
compactionStateAnnotateFunction = addCompactionStateToSegments(
querySpec,
context.jsonMapper(),
dataSchema,
shardSpec,
clusterBy,
queryDef.getQueryId()
);
segments = compactionStateAnnotateFunction.apply(segments);
}
}
log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size());
publishAllSegments(segments);
publishAllSegments(segments, compactionStateAnnotateFunction);
} else if (MSQControllerTask.isExport(querySpec)) {
// Write manifest file.
ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination();
Expand Down Expand Up @@ -1624,33 +1629,49 @@ private static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateTo
MSQSpec querySpec,
ObjectMapper jsonMapper,
DataSchema dataSchema,
ShardSpec shardSpec,
@Nullable ShardSpec shardSpec,
@Nullable ClusterBy clusterBy,
String queryId
)
{
final MSQTuningConfig tuningConfig = querySpec.getTuningConfig();
PartitionsSpec partitionSpec;

if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) {
List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions();
// shardSpec is absent in the absence of segments, which happens when only tombstones are generated by an
// MSQControllerTask.
if (shardSpec != null) {
if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) {
List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions();
partitionSpec = new DimensionRangePartitionsSpec(
tuningConfig.getRowsPerSegment(),
null,
partitionDimensions,
false
);
} else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) {
// MSQ tasks don't use maxTotalRows. Hence using LONG.MAX_VALUE.
partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
} else {
// SingleDimenionShardSpec and other shard specs are never created in MSQ.
throw new MSQException(
UnknownFault.forMessage(
StringUtils.format(
"Query[%s] cannot store compaction state in segments as shard spec of unsupported type[%s].",
queryId,
shardSpec.getType()
)));
}
} else if (clusterBy != null && !clusterBy.getColumns().isEmpty()) {
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
partitionSpec = new DimensionRangePartitionsSpec(
tuningConfig.getRowsPerSegment(),
null,
partitionDimensions,
clusterBy.getColumns()
.stream()
.map(KeyColumn::columnName).collect(Collectors.toList()),
false
);
} else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) {
// MSQ tasks don't use maxTotalRows. Hence using LONG.MAX_VALUE.
partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
} else {
// SingleDimenionShardSpec and other shard specs are never created in MSQ.
throw new MSQException(
UnknownFault.forMessage(
StringUtils.format(
"Query[%s] cannot store compaction state in segments as shard spec of unsupported type[%s].",
queryId,
shardSpec.getType()
)));
partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
}

Granularity segmentGranularity = ((DataSourceMSQDestination) querySpec.getDestination())
Expand All @@ -1671,13 +1692,15 @@ private static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateTo
: new ClientCompactionTaskTransformSpec(
dataSchema.getTransformSpec().getFilter()
).asMap(jsonMapper);
List<Object> metricsSpec = dataSchema.getAggregators() == null
? null
: jsonMapper.convertValue(
dataSchema.getAggregators(),
new TypeReference<List<Object>>() {}
);
List<Object> metricsSpec = Collections.emptyList();

if (querySpec.getQuery() instanceof GroupByQuery) {
// For group-by queries, the aggregators are transformed to their combining factories in the dataschema, resulting
gargvishesh marked this conversation as resolved.
Show resolved Hide resolved
// in a mismatch between schema in compaction spec and the one in compaction state. Sourcing the metricsSpec
// therefore directly from the querySpec.
GroupByQuery groupByQuery = (GroupByQuery) querySpec.getQuery();
metricsSpec = jsonMapper.convertValue(groupByQuery.getAggregatorSpecs(), new TypeReference<List<Object>>() {});
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
}

IndexSpec indexSpec = tuningConfig.getIndexSpec();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.expression.TimestampFloorExprMacro;
Expand All @@ -58,6 +60,7 @@
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
Expand Down Expand Up @@ -139,7 +142,6 @@ public CompactionConfigValidationResult validateCompactionTask(
));
}
validationResults.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(compactionTask.getContext()));
validationResults.add(ClientCompactionRunnerInfo.validateMetricsSpecForMSQ(compactionTask.getMetricsSpec()));
return validationResults.stream()
.filter(result -> !result.isValid())
.findFirst()
Expand All @@ -159,6 +161,41 @@ public TaskStatus runCompactionTasks(
TaskToolbox taskToolbox
) throws Exception
{
for (Map.Entry<Interval, DataSchema> intervalDataSchema : intervalDataSchemas.entrySet()) {
if (Boolean.valueOf(true).equals(intervalDataSchema.getValue().getHasRolledUpSegments())) {
for (AggregatorFactory aggregatorFactory : intervalDataSchema.getValue().getAggregators()) {
// Don't proceed if either:
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
// - aggregator factory differs from its combining factory
// - input col name is different from the output name (idempotent)
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
// This is a conservative check as existing rollup may have been idempotent but the aggregator provided in
// compaction spec isn't. This would get properly compacted yet fails in the below pre-check.
if (
!(
aggregatorFactory.getClass().equals(aggregatorFactory.getCombiningFactory().getClass()) &&
(
aggregatorFactory.requiredFields().isEmpty() ||
(aggregatorFactory.requiredFields().size() == 1 &&
aggregatorFactory.requiredFields()
.get(0)
.equals(aggregatorFactory.getName()))
)
)
) {
// MSQ doesn't support rolling up already rolled-up segments when aggregate column name is different from
// the aggregated column name. This is because the aggregated values would then get overwritten by new
// values and the existing values would be lost. Note that if no rollup is specified in an index spec,
// the default value is true.
String errorMsg = StringUtils.format(
"Rolled-up segments in interval[%s] for compaction not supported by MSQ engine.",
intervalDataSchema.getKey()
);
log.error(errorMsg);
return TaskStatus.failure(compactionTask.getId(), errorMsg);

}
}
}
}
List<MSQControllerTask> msqControllerTasks = createMsqControllerTasks(compactionTask, intervalDataSchemas);

if (msqControllerTasks.isEmpty()) {
Expand Down Expand Up @@ -291,6 +328,10 @@ private static RowSignature getRowSignature(DataSchema dataSchema)
for (DimensionSchema dimensionSchema : dataSchema.getDimensionsSpec().getDimensions()) {
rowSignatureBuilder.add(dimensionSchema.getName(), ColumnType.fromString(dimensionSchema.getTypeName()));
}
// There can be columns that are part of metricsSpec for a datasource.
for (AggregatorFactory aggregatorFactory : dataSchema.getAggregators()) {
rowSignatureBuilder.add(aggregatorFactory.getName(), aggregatorFactory.getIntermediateType());
}
return rowSignatureBuilder.build();
}

Expand Down Expand Up @@ -354,14 +395,30 @@ private static List<OrderByColumnSpec> getOrderBySpec(PartitionsSpec partitionSp
private static Query<?> buildScanQuery(CompactionTask compactionTask, Interval interval, DataSchema dataSchema)
{
RowSignature rowSignature = getRowSignature(dataSchema);
return new Druids.ScanQueryBuilder().dataSource(dataSchema.getDataSource())
.columns(rowSignature.getColumnNames())
.virtualColumns(getVirtualColumns(dataSchema, interval))
.columnTypes(rowSignature.getColumnTypes())
.intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(interval)))
.filters(dataSchema.getTransformSpec().getFilter())
.context(compactionTask.getContext())
.build();
Druids.ScanQueryBuilder scanQueryBuilder = new Druids.ScanQueryBuilder()
.dataSource(dataSchema.getDataSource())
.columns(rowSignature.getColumnNames())
.virtualColumns(getVirtualColumns(dataSchema, interval))
.columnTypes(rowSignature.getColumnTypes())
.intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(interval)))
.filters(dataSchema.getTransformSpec().getFilter())
.context(compactionTask.getContext());

if (compactionTask.getTuningConfig() != null && compactionTask.getTuningConfig().getPartitionsSpec() != null) {
List<OrderByColumnSpec> orderByColumnSpecs = getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec());

scanQueryBuilder.orderBy(
orderByColumnSpecs
.stream()
.map(orderByColumnSpec ->
new ScanQuery.OrderBy(
orderByColumnSpec.getDimension(),
ScanQuery.Order.fromString(orderByColumnSpec.getDirection().toString())
))
.collect(Collectors.toList())
);
}
return scanQueryBuilder.build();
}

private static boolean isGroupBy(DataSchema dataSchema)
Expand Down Expand Up @@ -469,6 +526,8 @@ private Map<String, Object> createMSQTaskContext(CompactionTask compactionTask,
}
// Similar to compaction using the native engine, don't finalize aggregations.
context.putIfAbsent(MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS, false);
// Add appropriate finalization to native query context.
context.put(QueryContexts.FINALIZE_KEY, false);
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
// Only scalar or array-type dimensions are allowed as grouping keys.
context.putIfAbsent(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, false);
return context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.indexing.common.task.TuningConfigBuilder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.math.expr.ExprMacroTable;
Expand All @@ -54,14 +55,14 @@
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.joda.time.Interval;
import org.junit.Assert;
Expand All @@ -73,6 +74,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class MSQCompactionRunnerTest
{
Expand Down Expand Up @@ -195,27 +197,6 @@ public void testRollupFalseWithMetricsSpecIsInValid()
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}

@Test
public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
{
// Aggregators having different input and ouput column names are unsupported.
final String inputColName = "added";
final String outputColName = "sum_added";
CompactionTask compactionTask = createCompactionTask(
new DynamicPartitionsSpec(3, null),
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, null, null),
new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}
);
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"Different name[sum_added] and fieldName(s)[[added]] for aggregator unsupported for MSQ engine.",
validationResult.getReason()
);
}

@Test
public void testRunCompactionTasksWithEmptyTaskListFails() throws Exception
{
Expand Down Expand Up @@ -288,6 +269,10 @@ public void testMSQControllerTaskSpecWithScanIsValid() throws JsonProcessingExce
);
Assert.assertNull(msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY));
Assert.assertEquals(WorkerAssignmentStrategy.MAX, actualMSQSpec.getAssignmentStrategy());
Assert.assertEquals(PARTITION_DIMENSIONS.stream().map(col -> new ScanQuery.OrderBy(
col,
ScanQuery.Order.ASCENDING
)).collect(Collectors.toList()), ((ScanQuery) actualMSQSpec.getQuery()).getOrderBys());
}

@Test
Expand Down Expand Up @@ -358,6 +343,47 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess
Assert.assertEquals(WorkerAssignmentStrategy.MAX, actualMSQSpec.getAssignmentStrategy());
}

@Test
public void testIntervalsWithRolledUpSegmentsAndNonIdempotentAggregatorFails() throws Exception
{
final String inputColName = "added";
final String outputColName = "sum_added";
CompactionTask compactionTask = createCompactionTask(
null,
null,
Collections.emptyMap(),
null,
new AggregatorFactory[]{
new LongSumAggregatorFactory(
outputColName,
inputColName
)
}
);
DataSchema dataSchema = new DataSchema(
DATA_SOURCE,
new TimestampSpec(TIMESTAMP_COLUMN, null, null),
new DimensionsSpec(DIMENSIONS),
new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)},
new UniformGranularitySpec(
SEGMENT_GRANULARITY.getDefaultGranularity(),
null,
false,
Collections.singletonList(COMPACTION_INTERVAL)
),
null,
true,
null,
null
);
TaskStatus taskStatus = MSQ_COMPACTION_RUNNER.runCompactionTasks(compactionTask, Collections.singletonMap(COMPACTION_INTERVAL, dataSchema), null);
Assert.assertTrue(taskStatus.isFailure());
Assert.assertEquals(taskStatus.getErrorMsg(), StringUtils.format(
"Rolled-up segments in interval[%s] for compaction not supported by MSQ engine.",
COMPACTION_INTERVAL
));
}

private CompactionTask createCompactionTask(
@Nullable PartitionsSpec partitionsSpec,
@Nullable DimFilter dimFilter,
Expand Down
Loading
Loading