From 52fa8db3263576bb7b9a94a6b8e5578ddedbc52b Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Mon, 22 Jul 2024 21:16:51 +0530 Subject: [PATCH 1/5] Fix IT tests and add check for pre-rolled up segments --- .../apache/druid/msq/exec/ControllerImpl.java | 79 +++++--- .../msq/indexing/MSQCompactionRunner.java | 79 +++++++- .../msq/indexing/MSQCompactionRunnerTest.java | 70 ++++--- .../indexing/common/task/CompactionTask.java | 14 +- .../duty/ITAutoCompactionTest.java | 177 +++++++++--------- .../indexing/ClientCompactionRunnerInfo.java | 27 --- .../druid/segment/indexing/DataSchema.java | 37 +++- .../ClientCompactionRunnerInfoTest.java | 25 +-- 8 files changed, 308 insertions(+), 200 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index a30e96860875..dad80e07b835 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1357,7 +1357,10 @@ private void postResultPartitionBoundariesForStage( * Publish the list of segments. Additionally, if {@link DataSourceMSQDestination#isReplaceTimeChunks()}, * also drop all other segments within the replacement intervals. */ - private void publishAllSegments(final Set segments) throws IOException + private void publishAllSegments( + final Set segments, + Function, Set> compactionStateAnnotateFunction + ) throws IOException { final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); @@ -1413,7 +1416,7 @@ private void publishAllSegments(final Set segments) throws IOExcept } performSegmentPublish( context.taskActionClient(), - createOverwriteAction(taskLockType, segmentsWithTombstones) + createOverwriteAction(taskLockType, compactionStateAnnotateFunction.apply(segmentsWithTombstones)) ); } } else if (!segments.isEmpty()) { @@ -1543,6 +1546,7 @@ private void handleQueryResults( if (MSQControllerTask.isIngestion(querySpec)) { // Publish segments if needed. final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber()); + Function, Set> compactionStateAnnotateFunction = Function.identity(); @SuppressWarnings("unchecked") Set segments = (Set) queryKernel.getResultObjectForStage(finalStageId); @@ -1553,7 +1557,7 @@ private void handleQueryResults( Tasks.DEFAULT_STORE_COMPACTION_STATE ); - if (!segments.isEmpty() && storeCompactionState) { + if (storeCompactionState) { DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); if (!destination.isReplaceTimeChunks()) { // Store compaction state only for replace queries. @@ -1565,20 +1569,21 @@ private void handleQueryResults( DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) queryKernel .getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema(); - ShardSpec shardSpec = segments.stream().findFirst().get().getShardSpec(); + ShardSpec shardSpec = segments.isEmpty() ? null : segments.stream().findFirst().get().getShardSpec(); + ClusterBy clusterBy = queryKernel.getStageDefinition(finalStageId).getClusterBy(); - Function, Set> compactionStateAnnotateFunction = addCompactionStateToSegments( + compactionStateAnnotateFunction = addCompactionStateToSegments( querySpec, context.jsonMapper(), dataSchema, shardSpec, + clusterBy, queryDef.getQueryId() ); - segments = compactionStateAnnotateFunction.apply(segments); } } log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size()); - publishAllSegments(segments); + publishAllSegments(segments, compactionStateAnnotateFunction); } else if (MSQControllerTask.isExport(querySpec)) { // Write manifest file. ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination(); @@ -1624,33 +1629,49 @@ private static Function, Set> addCompactionStateTo MSQSpec querySpec, ObjectMapper jsonMapper, DataSchema dataSchema, - ShardSpec shardSpec, + @Nullable ShardSpec shardSpec, + @Nullable ClusterBy clusterBy, String queryId ) { final MSQTuningConfig tuningConfig = querySpec.getTuningConfig(); PartitionsSpec partitionSpec; - if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) { - List partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions(); + // shardSpec is absent in the absence of segments, which happens when only tombstones are generated by an + // MSQControllerTask. + if (shardSpec != null) { + if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) { + List partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions(); + partitionSpec = new DimensionRangePartitionsSpec( + tuningConfig.getRowsPerSegment(), + null, + partitionDimensions, + false + ); + } else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) { + // MSQ tasks don't use maxTotalRows. Hence using LONG.MAX_VALUE. + partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE); + } else { + // SingleDimenionShardSpec and other shard specs are never created in MSQ. + throw new MSQException( + UnknownFault.forMessage( + StringUtils.format( + "Query[%s] cannot store compaction state in segments as shard spec of unsupported type[%s].", + queryId, + shardSpec.getType() + ))); + } + } else if (clusterBy != null && !clusterBy.getColumns().isEmpty()) { partitionSpec = new DimensionRangePartitionsSpec( tuningConfig.getRowsPerSegment(), null, - partitionDimensions, + clusterBy.getColumns() + .stream() + .map(KeyColumn::columnName).collect(Collectors.toList()), false ); - } else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) { - // MSQ tasks don't use maxTotalRows. Hence using LONG.MAX_VALUE. - partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE); } else { - // SingleDimenionShardSpec and other shard specs are never created in MSQ. - throw new MSQException( - UnknownFault.forMessage( - StringUtils.format( - "Query[%s] cannot store compaction state in segments as shard spec of unsupported type[%s].", - queryId, - shardSpec.getType() - ))); + partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE); } Granularity segmentGranularity = ((DataSourceMSQDestination) querySpec.getDestination()) @@ -1671,13 +1692,15 @@ private static Function, Set> addCompactionStateTo : new ClientCompactionTaskTransformSpec( dataSchema.getTransformSpec().getFilter() ).asMap(jsonMapper); - List metricsSpec = dataSchema.getAggregators() == null - ? null - : jsonMapper.convertValue( - dataSchema.getAggregators(), - new TypeReference>() {} - ); + List metricsSpec = Collections.emptyList(); + if (querySpec.getQuery() instanceof GroupByQuery) { + // For group-by queries, the aggregators are transformed to their combining factories in the dataschema, resulting + // in a mismatch between schema in compaction spec and the one in compaction state. Sourcing the metricsSpec + // therefore directly from the querySpec. + GroupByQuery groupByQuery = (GroupByQuery) querySpec.getQuery(); + metricsSpec = jsonMapper.convertValue(groupByQuery.getAggregatorSpecs(), new TypeReference>() {}); + } IndexSpec indexSpec = tuningConfig.getIndexSpec(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java index ac43e7c864b8..756f14ae32a1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java @@ -49,7 +49,9 @@ import org.apache.druid.query.Druids; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContext; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.expression.TimestampFloorExprMacro; @@ -58,6 +60,7 @@ import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnHolder; @@ -139,7 +142,6 @@ public CompactionConfigValidationResult validateCompactionTask( )); } validationResults.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(compactionTask.getContext())); - validationResults.add(ClientCompactionRunnerInfo.validateMetricsSpecForMSQ(compactionTask.getMetricsSpec())); return validationResults.stream() .filter(result -> !result.isValid()) .findFirst() @@ -159,6 +161,41 @@ public TaskStatus runCompactionTasks( TaskToolbox taskToolbox ) throws Exception { + for (Map.Entry intervalDataSchema : intervalDataSchemas.entrySet()) { + if (Boolean.valueOf(true).equals(intervalDataSchema.getValue().getHasRolledUpSegments())) { + for (AggregatorFactory aggregatorFactory : intervalDataSchema.getValue().getAggregators()) { + // Don't proceed if either: + // - aggregator factory differs from its combining factory + // - input col name is different from the output name (idempotent) + // This is a conservative check as existing rollup may have been idempotent but the aggregator provided in + // compaction spec isn't. This would get properly compacted yet fails in the below pre-check. + if ( + !( + aggregatorFactory.getClass().equals(aggregatorFactory.getCombiningFactory().getClass()) && + ( + aggregatorFactory.requiredFields().isEmpty() || + (aggregatorFactory.requiredFields().size() == 1 && + aggregatorFactory.requiredFields() + .get(0) + .equals(aggregatorFactory.getName())) + ) + ) + ) { + // MSQ doesn't support rolling up already rolled-up segments when aggregate column name is different from + // the aggregated column name. This is because the aggregated values would then get overwritten by new + // values and the existing values would be lost. Note that if no rollup is specified in an index spec, + // the default value is true. + String errorMsg = StringUtils.format( + "Rolled-up segments in interval[%s] for compaction not supported by MSQ engine.", + intervalDataSchema.getKey() + ); + log.error(errorMsg); + return TaskStatus.failure(compactionTask.getId(), errorMsg); + + } + } + } + } List msqControllerTasks = createMsqControllerTasks(compactionTask, intervalDataSchemas); if (msqControllerTasks.isEmpty()) { @@ -291,6 +328,10 @@ private static RowSignature getRowSignature(DataSchema dataSchema) for (DimensionSchema dimensionSchema : dataSchema.getDimensionsSpec().getDimensions()) { rowSignatureBuilder.add(dimensionSchema.getName(), ColumnType.fromString(dimensionSchema.getTypeName())); } + // There can be columns that are part of metricsSpec for a datasource. + for (AggregatorFactory aggregatorFactory : dataSchema.getAggregators()) { + rowSignatureBuilder.add(aggregatorFactory.getName(), aggregatorFactory.getIntermediateType()); + } return rowSignatureBuilder.build(); } @@ -354,15 +395,31 @@ private static List getOrderBySpec(PartitionsSpec partitionSp private static Query buildScanQuery(CompactionTask compactionTask, Interval interval, DataSchema dataSchema) { RowSignature rowSignature = getRowSignature(dataSchema); - return new Druids.ScanQueryBuilder().dataSource(dataSchema.getDataSource()) - .columns(rowSignature.getColumnNames()) - .virtualColumns(getVirtualColumns(dataSchema, interval)) - .columnTypes(rowSignature.getColumnTypes()) - .intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(interval))) - .legacy(false) - .filters(dataSchema.getTransformSpec().getFilter()) - .context(compactionTask.getContext()) - .build(); + Druids.ScanQueryBuilder scanQueryBuilder = new Druids.ScanQueryBuilder() + .dataSource(dataSchema.getDataSource()) + .columns(rowSignature.getColumnNames()) + .virtualColumns(getVirtualColumns(dataSchema, interval)) + .columnTypes(rowSignature.getColumnTypes()) + .intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(interval))) + .legacy(false) + .filters(dataSchema.getTransformSpec().getFilter()) + .context(compactionTask.getContext()); + + if (compactionTask.getTuningConfig() != null && compactionTask.getTuningConfig().getPartitionsSpec() != null) { + List orderByColumnSpecs = getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec()); + + scanQueryBuilder.orderBy( + orderByColumnSpecs + .stream() + .map(orderByColumnSpec -> + new ScanQuery.OrderBy( + orderByColumnSpec.getDimension(), + ScanQuery.Order.fromString(orderByColumnSpec.getDirection().toString()) + )) + .collect(Collectors.toList()) + ); + } + return scanQueryBuilder.build(); } private static boolean isGroupBy(DataSchema dataSchema) @@ -470,6 +527,8 @@ private Map createMSQTaskContext(CompactionTask compactionTask, } // Similar to compaction using the native engine, don't finalize aggregations. context.putIfAbsent(MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS, false); + // Add appropriate finalization to native query context. + context.put(QueryContexts.FINALIZE_KEY, false); // Only scalar or array-type dimensions are allowed as grouping keys. context.putIfAbsent(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, false); return context; diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java index b95243f7783f..745131b56ec1 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java @@ -42,6 +42,7 @@ import org.apache.druid.indexing.common.task.TuningConfigBuilder; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.math.expr.ExprMacroTable; @@ -54,6 +55,7 @@ import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.data.CompressionFactory; import org.apache.druid.segment.data.CompressionStrategy; @@ -61,7 +63,6 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.transform.TransformSpec; -import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.joda.time.Interval; import org.junit.Assert; @@ -73,6 +74,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class MSQCompactionRunnerTest { @@ -195,27 +197,6 @@ public void testRollupFalseWithMetricsSpecIsInValid() Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); } - @Test - public void testMSQEngineWithUnsupportedMetricsSpecIsInValid() - { - // Aggregators having different input and ouput column names are unsupported. - final String inputColName = "added"; - final String outputColName = "sum_added"; - CompactionTask compactionTask = createCompactionTask( - new DynamicPartitionsSpec(3, null), - null, - Collections.emptyMap(), - new ClientCompactionTaskGranularitySpec(null, null, null), - new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)} - ); - CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask); - Assert.assertFalse(validationResult.isValid()); - Assert.assertEquals( - "Different name[sum_added] and fieldName(s)[[added]] for aggregator unsupported for MSQ engine.", - validationResult.getReason() - ); - } - @Test public void testRunCompactionTasksWithEmptyTaskListFails() throws Exception { @@ -288,6 +269,10 @@ public void testMSQControllerTaskSpecWithScanIsValid() throws JsonProcessingExce ); Assert.assertNull(msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY)); Assert.assertEquals(WorkerAssignmentStrategy.MAX, actualMSQSpec.getAssignmentStrategy()); + Assert.assertEquals(PARTITION_DIMENSIONS.stream().map(col -> new ScanQuery.OrderBy( + col, + ScanQuery.Order.ASCENDING + )).collect(Collectors.toList()), ((ScanQuery) actualMSQSpec.getQuery()).getOrderBys()); } @Test @@ -358,6 +343,47 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess Assert.assertEquals(WorkerAssignmentStrategy.MAX, actualMSQSpec.getAssignmentStrategy()); } + @Test + public void testIntervalsWithRolledUpSegmentsAndNonIdempotentAggregatorFails() throws Exception + { + final String inputColName = "added"; + final String outputColName = "sum_added"; + CompactionTask compactionTask = createCompactionTask( + null, + null, + Collections.emptyMap(), + null, + new AggregatorFactory[]{ + new LongSumAggregatorFactory( + outputColName, + inputColName + ) + } + ); + DataSchema dataSchema = new DataSchema( + DATA_SOURCE, + new TimestampSpec(TIMESTAMP_COLUMN, null, null), + new DimensionsSpec(DIMENSIONS), + new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}, + new UniformGranularitySpec( + SEGMENT_GRANULARITY.getDefaultGranularity(), + null, + false, + Collections.singletonList(COMPACTION_INTERVAL) + ), + null, + true, + null, + null + ); + TaskStatus taskStatus = MSQ_COMPACTION_RUNNER.runCompactionTasks(compactionTask, Collections.singletonMap(COMPACTION_INTERVAL, dataSchema), null); + Assert.assertTrue(taskStatus.isFailure()); + Assert.assertEquals(taskStatus.getErrorMsg(), StringUtils.format( + "Rolled-up segments in interval[%s] for compaction not supported by MSQ engine.", + COMPACTION_INTERVAL + )); + } + private CompactionTask createCompactionTask( @Nullable PartitionsSpec partitionsSpec, @Nullable DimFilter dimFilter, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 8659eb0f397e..3f4d8dd1ab2a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -610,7 +610,7 @@ private static DataSchema createDataSchema( // Check index metadata & decide which values to propagate (i.e. carry over) for rollup & queryGranularity final ExistingSegmentAnalyzer existingSegmentAnalyzer = new ExistingSegmentAnalyzer( segments, - granularitySpec.isRollup() == null, + true, // Always need rollup to check if there are some rollup segments already present. granularitySpec.getQueryGranularity() == null, dimensionsSpec == null, metricsSpec == null @@ -671,7 +671,10 @@ private static DataSchema createDataSchema( finalDimensionsSpec, finalMetricsSpec, uniformGranularitySpec, - transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null) + transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null), + existingSegmentAnalyzer.getHasRolledUpSegments(), + null, + null ); } @@ -748,6 +751,7 @@ static class ExistingSegmentAnalyzer // For processRollup: private boolean rollup = true; + private boolean hasRolledUpSegments = false; // For processQueryGranularity: private Granularity queryGranularity; @@ -815,6 +819,11 @@ public Boolean getRollup() return rollup; } + public Boolean getHasRolledUpSegments() + { + return hasRolledUpSegments; + } + public Granularity getQueryGranularity() { if (!needQueryGranularity) { @@ -904,6 +913,7 @@ private void processRollup(final QueryableIndex index) // Pick rollup value if all segments being compacted have the same, non-null, value otherwise set it to false final Boolean isIndexRollup = index.getMetadata().isRollup(); rollup = rollup && Boolean.valueOf(true).equals(isIndexRollup); + hasRolledUpSegments = hasRolledUpSegments || Boolean.valueOf(true).equals(isIndexRollup); } private void processQueryGranularity(final QueryableIndex index) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 230a19236c16..50a2814a13ba 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -28,10 +28,10 @@ import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; -import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.indexing.common.task.CompactionIntervalSpec; import org.apache.druid.indexing.common.task.CompactionTask; import org.apache.druid.indexing.overlord.http.TaskPayloadResponse; @@ -328,8 +328,8 @@ public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExis } } - @Test - public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics(CompactionEngine engine) throws Exception { // added = 31, count = null, sum_added = null loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS); @@ -356,7 +356,8 @@ public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics() throws new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))), null, new AggregatorFactory[] {new CountAggregatorFactory("count"), new LongSumAggregatorFactory("sum_added", "added")}, - false + false, + engine ); // should now only have 1 row after compaction // added = null, count = 2, sum_added = 62 @@ -559,8 +560,8 @@ public void testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception } } - @Test - public void testAutoCompactionDutyCanUpdateCompactionConfig() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyCanUpdateCompactionConfig(CompactionEngine engine) throws Exception { loadData(INDEX_TASK); try (final Closeable ignored = unloader(fullDatasourceName)) { @@ -571,9 +572,9 @@ public void testAutoCompactionDutyCanUpdateCompactionConfig() throws Exception verifyQuery(INDEX_QUERIES_RESOURCE); // Dummy compaction config which will be overwritten - submitCompactionConfig(10000, NO_SKIP_OFFSET); + submitCompactionConfig(10000, NO_SKIP_OFFSET, engine); // New compaction config should overwrites the existing compaction config - submitCompactionConfig(1, NO_SKIP_OFFSET); + submitCompactionConfig(1, NO_SKIP_OFFSET, engine); LOG.info("Auto compaction test with dynamic partitioning"); @@ -584,25 +585,28 @@ public void testAutoCompactionDutyCanUpdateCompactionConfig() throws Exception verifySegmentsCompacted(10, 1); checkCompactionIntervals(intervalsBeforeCompaction); - LOG.info("Auto compaction test with hash partitioning"); - - final HashedPartitionsSpec hashedPartitionsSpec = new HashedPartitionsSpec(null, 3, null); - submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null, null, null, null, false); - // 2 segments published per day after compaction. - forceTriggerAutoCompaction(4); - verifyQuery(INDEX_QUERIES_RESOURCE); - verifySegmentsCompacted(hashedPartitionsSpec, 4); - checkCompactionIntervals(intervalsBeforeCompaction); + if (engine == CompactionEngine.NATIVE) { + // HashedPartitionsSpec not supported by MSQ. + LOG.info("Auto compaction test with hash partitioning"); + + final HashedPartitionsSpec hashedPartitionsSpec = new HashedPartitionsSpec(null, 3, null); + submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null, null, null, null, false, engine); + // 2 segments published per day after compaction. + forceTriggerAutoCompaction(4); + verifyQuery(INDEX_QUERIES_RESOURCE); + verifySegmentsCompacted(hashedPartitionsSpec, 4); + checkCompactionIntervals(intervalsBeforeCompaction); + } LOG.info("Auto compaction test with range partitioning"); - final SingleDimensionPartitionsSpec rangePartitionsSpec = new SingleDimensionPartitionsSpec( + final DimensionRangePartitionsSpec rangePartitionsSpec = new DimensionRangePartitionsSpec( 5, null, - "city", + ImmutableList.of("city"), false ); - submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null, null, null, null, false); + submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null, null, null, null, false, engine); forceTriggerAutoCompaction(2); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(rangePartitionsSpec, 2); @@ -695,8 +699,8 @@ public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception } } - @Test - public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrue() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrue(CompactionEngine engine) throws Exception { // Interval is "2013-08-31/2013-09-02", segment gran is DAY, // "maxRowsPerSegment": 3 @@ -732,7 +736,13 @@ public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrue( Granularity newGranularity = Granularities.YEAR; // Set dropExisting to true // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z", - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); + submitCompactionConfig( + 1000, + NO_SKIP_OFFSET, + new UserCompactionTaskGranularityConfig(newGranularity, null, null), + true, + engine + ); List expectedIntervalAfterCompaction = new ArrayList<>(); for (String interval : intervalsBeforeCompaction) { @@ -750,7 +760,13 @@ public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrue( // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z", newGranularity = Granularities.MONTH; // Set dropExisting to true - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); + submitCompactionConfig( + 1000, + NO_SKIP_OFFSET, + new UserCompactionTaskGranularityConfig(newGranularity, null, null), + true, + engine + ); // Since dropExisting is set to true... // Again data is only in two days @@ -778,7 +794,13 @@ public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrue( // compact only tombstones, so it should be a tombstone itself. newGranularity = new PeriodGranularity(new Period("P6M"), null, DateTimeZone.UTC); // Set dropExisting to true - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); + submitCompactionConfig( + 1000, + NO_SKIP_OFFSET, + new UserCompactionTaskGranularityConfig(newGranularity, null, null), + true, + engine + ); // Since dropExisting is set to true... // The earlier 12 segments with MONTH granularity will be completely covered, overshadowed, by the @@ -804,8 +826,8 @@ public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrue( } } - @Test - public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrueThenFalse() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrueThenFalse(CompactionEngine engine) throws Exception { // Interval is "2013-08-31/2013-09-02", segment gran is DAY, // "maxRowsPerSegment": 3 @@ -841,7 +863,7 @@ public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrueT Granularity newGranularity = Granularities.YEAR; // Set dropExisting to true // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z", - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine); List expectedIntervalAfterCompaction = new ArrayList<>(); for (String interval : intervalsBeforeCompaction) { @@ -859,7 +881,7 @@ public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrueT // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z", newGranularity = Granularities.MONTH; // Set dropExisting to true - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine); // Since dropExisting is set to true... // Again data is only in two days @@ -885,9 +907,13 @@ public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrueT LOG.info("Auto compaction test with SEMESTER segment granularity, dropExisting is false, over tombstones"); newGranularity = new PeriodGranularity(new Period("P6M"), null, DateTimeZone.UTC); // Set dropExisting to false - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, - null, null - ), false); + submitCompactionConfig( + 1000, + NO_SKIP_OFFSET, + new UserCompactionTaskGranularityConfig(newGranularity, null, null), + false, + engine + ); // Since dropExisting is set to false the first semester will be forced to dropExisting true // Hence, we will have two, one tombstone for the first semester and one data segment for the second. @@ -963,8 +989,8 @@ public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingFalse } } - @Test - public void testAutoCompactionDutyWithSegmentGranularityAndMixedVersion() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithSegmentGranularityAndMixedVersion(CompactionEngine engine) throws Exception { loadData(INDEX_TASK); try (final Closeable ignored = unloader(fullDatasourceName)) { @@ -974,14 +1000,14 @@ public void testAutoCompactionDutyWithSegmentGranularityAndMixedVersion() throws verifySegmentsCount(4); verifyQuery(INDEX_QUERIES_RESOURCE); - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1)); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1), engine); //...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (3 total) forceTriggerAutoCompaction(3); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); Granularity newGranularity = Granularities.YEAR; - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null)); + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), engine); LOG.info("Auto compaction test with YEAR segment granularity"); @@ -1001,8 +1027,8 @@ public void testAutoCompactionDutyWithSegmentGranularityAndMixedVersion() throws } } - @Test - public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegmentsHaveSameSegmentGranularity() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegmentsHaveSameSegmentGranularity(CompactionEngine engine) throws Exception { loadData(INDEX_TASK); try (final Closeable ignored = unloader(fullDatasourceName)) { @@ -1013,7 +1039,7 @@ public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegm verifyQuery(INDEX_QUERIES_RESOURCE); // Compacted without SegmentGranularity in auto compaction config - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, engine); forceTriggerAutoCompaction(2); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); @@ -1023,7 +1049,7 @@ public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegm // Segments were compacted and already has DAY granularity since it was initially ingested with DAY granularity. // Now set auto compaction with DAY granularity in the granularitySpec Granularity newGranularity = Granularities.DAY; - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null)); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), engine); forceTriggerAutoCompaction(2); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); @@ -1033,8 +1059,8 @@ public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegm } } - @Test - public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegmentsHaveDifferentSegmentGranularity() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegmentsHaveDifferentSegmentGranularity(CompactionEngine engine) throws Exception { loadData(INDEX_TASK); try (final Closeable ignored = unloader(fullDatasourceName)) { @@ -1045,7 +1071,7 @@ public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegm verifyQuery(INDEX_QUERIES_RESOURCE); // Compacted without SegmentGranularity in auto compaction config - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, engine); forceTriggerAutoCompaction(2); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); @@ -1055,7 +1081,7 @@ public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegm // Segments were compacted and already has DAY granularity since it was initially ingested with DAY granularity. // Now set auto compaction with DAY granularity in the granularitySpec Granularity newGranularity = Granularities.YEAR; - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null)); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), engine); forceTriggerAutoCompaction(1); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); @@ -1066,8 +1092,8 @@ public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegm } } - @Test - public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimelineAndDropExistingTrue() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimelineAndDropExistingTrue(CompactionEngine engine) throws Exception { loadData(INDEX_TASK); try (final Closeable ignored = unloader(fullDatasourceName)) { @@ -1079,7 +1105,7 @@ public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranula Granularity newGranularity = Granularities.YEAR; // Set dropExisting to true - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine); List expectedIntervalAfterCompaction = new ArrayList<>(); // We will still have one visible segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) @@ -1109,7 +1135,7 @@ public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranula coordinator.getSegmentIntervals(fullDatasourceName); // Since dropExisting is set to true... // This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine); // verify: expectedIntervalAfterCompaction = new ArrayList<>(); // The previous segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) will be @@ -1245,8 +1271,8 @@ public void testAutoCompactionDutyWithSegmentGranularityFinerAndNotAlignWithSegm } } - @Test - public void testAutoCompactionDutyWithSegmentGranularityCoarserAndNotAlignWithSegment() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithSegmentGranularityCoarserAndNotAlignWithSegment(CompactionEngine engine) throws Exception { updateCompactionTaskSlot(1, 1, null); final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles")); @@ -1263,7 +1289,8 @@ public void testAutoCompactionDutyWithSegmentGranularityCoarserAndNotAlignWithSe MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null), - false + false, + engine ); // Before compaction, we have segments with the interval 2013-08-26T00:00:00.000Z/2013-09-02T00:00:00.000Z // We will compact the latest segment to MONTH. @@ -1292,8 +1319,8 @@ public void testAutoCompactionDutyWithSegmentGranularityCoarserAndNotAlignWithSe } } - @Test - public void testAutoCompactionDutyWithRollup() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithRollup(CompactionEngine engine) throws Exception { final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles")); Map specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono)))); @@ -1309,7 +1336,8 @@ public void testAutoCompactionDutyWithRollup() throws Exception MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(null, null, true), - false + false, + engine ); forceTriggerAutoCompaction(2); queryAndResultFields = ImmutableMap.of( @@ -1328,8 +1356,8 @@ public void testAutoCompactionDutyWithRollup() throws Exception } } - @Test - public void testAutoCompactionDutyWithQueryGranularity() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithQueryGranularity(CompactionEngine engine) throws Exception { final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles")); Map specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, true, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono)))); @@ -1345,7 +1373,8 @@ public void testAutoCompactionDutyWithQueryGranularity() throws Exception MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(null, Granularities.DAY, null), - false + false, + engine ); forceTriggerAutoCompaction(2); queryAndResultFields = ImmutableMap.of( @@ -1364,8 +1393,8 @@ public void testAutoCompactionDutyWithQueryGranularity() throws Exception } } - @Test - public void testAutoCompactionDutyWithDimensionsSpec() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithDimensionsSpec(CompactionEngine engine) throws Exception { // Index data with dimensions "page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", // "namespace", "continent", "country", "region", "city" @@ -1392,7 +1421,8 @@ public void testAutoCompactionDutyWithDimensionsSpec() throws Exception new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))), null, null, - false + false, + engine ); forceTriggerAutoCompaction(2); @@ -1760,30 +1790,6 @@ private void submitCompactionConfig( ); } - private void submitCompactionConfig( - PartitionsSpec partitionsSpec, - Period skipOffsetFromLatest, - int maxNumConcurrentSubTasks, - UserCompactionTaskGranularityConfig granularitySpec, - UserCompactionTaskDimensionsConfig dimensionsSpec, - UserCompactionTaskTransformConfig transformSpec, - AggregatorFactory[] metricsSpec, - boolean dropExisting - ) throws Exception - { - submitCompactionConfig( - partitionsSpec, - skipOffsetFromLatest, - maxNumConcurrentSubTasks, - granularitySpec, - dimensionsSpec, - transformSpec, - metricsSpec, - dropExisting, - null - ); - } - private void submitCompactionConfig( PartitionsSpec partitionsSpec, Period skipOffsetFromLatest, @@ -1829,8 +1835,9 @@ private void submitCompactionConfig( transformSpec, !dropExisting ? null : new UserCompactionTaskIOConfig(true), engine, - null + ImmutableMap.of("maxNumTasks", 2) ); + LOG.info("Submitting compaction config for engine[%s].", engine == null ? CompactionEngine.NATIVE: engine); compactionResource.submitCompactionConfig(compactionConfig); // Wait for compaction config to persist diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java index ed9e22dfaa29..12e8fdabd8f8 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java @@ -32,7 +32,6 @@ import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; @@ -121,7 +120,6 @@ private static CompactionConfigValidationResult compactionConfigSupportedByMSQEn )); } validationResults.add(validateMaxNumTasksForMSQ(newConfig.getTaskContext())); - validationResults.add(validateMetricsSpecForMSQ(newConfig.getMetricsSpec())); return validationResults.stream() .filter(result -> !result.isValid()) .findFirst() @@ -188,29 +186,4 @@ public static CompactionConfigValidationResult validateMaxNumTasksForMSQ(Map - !(aggregatorFactory.requiredFields().isEmpty() - || aggregatorFactory.requiredFields().size() == 1 - && aggregatorFactory.requiredFields() - .get(0) - .equals(aggregatorFactory.getName()))) - .findFirst() - .map(aggregatorFactory -> - new CompactionConfigValidationResult( - false, - "Different name[%s] and fieldName(s)[%s] for aggregator unsupported for MSQ engine.", - aggregatorFactory.getName(), - aggregatorFactory.requiredFields() - )).orElse(new CompactionConfigValidationResult(true, null)); - } } diff --git a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java index 1e717f5d17b6..5837868867ea 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java @@ -66,6 +66,7 @@ public class DataSchema private final GranularitySpec granularitySpec; private final TransformSpec transformSpec; private final Map parserMap; + private final Boolean hasRolledUpSegments; private final ObjectMapper objectMapper; // The below fields can be initialized lazily from parser for backward compatibility. @@ -83,6 +84,7 @@ public DataSchema( @JsonProperty("metricsSpec") AggregatorFactory[] aggregators, @JsonProperty("granularitySpec") GranularitySpec granularitySpec, @JsonProperty("transformSpec") TransformSpec transformSpec, + @JsonProperty("hasRolledUpSegments") @Nullable Boolean hasRolledUpSegments, @Deprecated @JsonProperty("parser") @Nullable Map parserMap, @JacksonInject ObjectMapper objectMapper ) @@ -108,6 +110,7 @@ public DataSchema( } this.transformSpec = transformSpec == null ? TransformSpec.NONE : transformSpec; this.parserMap = parserMap; + this.hasRolledUpSegments = hasRolledUpSegments; this.objectMapper = objectMapper; // Fail-fast if there are output name collisions. Note: because of the pull-from-parser magic in getDimensionsSpec, @@ -123,6 +126,30 @@ public DataSchema( } } + public DataSchema( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("timestampSpec") @Nullable TimestampSpec timestampSpec, // can be null in old task spec + @JsonProperty("dimensionsSpec") @Nullable DimensionsSpec dimensionsSpec, // can be null in old task spec + @JsonProperty("metricsSpec") AggregatorFactory[] aggregators, + @JsonProperty("granularitySpec") GranularitySpec granularitySpec, + @JsonProperty("transformSpec") TransformSpec transformSpec, + @Deprecated @JsonProperty("parser") @Nullable Map parserMap, + @JacksonInject ObjectMapper objectMapper + ) + { + this( + dataSource, + timestampSpec, + dimensionsSpec, + aggregators, + granularitySpec, + transformSpec, + null, + parserMap, + objectMapper + ); + } + @VisibleForTesting public DataSchema( String dataSource, @@ -133,7 +160,7 @@ public DataSchema( TransformSpec transformSpec ) { - this(dataSource, timestampSpec, dimensionsSpec, aggregators, granularitySpec, transformSpec, null, null); + this(dataSource, timestampSpec, dimensionsSpec, aggregators, granularitySpec, transformSpec, null, null, null); } // old constructor for backward compatibility @@ -147,7 +174,7 @@ public DataSchema( ObjectMapper objectMapper ) { - this(dataSource, null, null, aggregators, granularitySpec, transformSpec, parserMap, objectMapper); + this(dataSource, null, null, aggregators, granularitySpec, transformSpec, null, parserMap, objectMapper); } private static void validateDatasourceName(String dataSource) @@ -361,6 +388,12 @@ public InputRowParser getParser() return inputRowParser; } + @Nullable + public Boolean getHasRolledUpSegments() + { + return hasRolledUpSegments; + } + public DataSchema withGranularitySpec(GranularitySpec granularitySpec) { return new DataSchema( diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java index f6d4a2b6e581..fb4a3714219f 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java @@ -130,7 +130,7 @@ public void testMSQEngineWithQueryGranularityAllIsValid() } @Test - public void testMSQEngineWithRollupFalseWithMetricsSpecIsInValid() + public void testMSQEngineWithRollupFalseWithMetricsSpecIsInvalid() { DataSourceCompactionConfig compactionConfig = createCompactionConfig( new DynamicPartitionsSpec(3, null), @@ -149,29 +149,6 @@ public void testMSQEngineWithRollupFalseWithMetricsSpecIsInValid() ); } - @Test - public void testMSQEngineWithUnsupportedMetricsSpecIsInValid() - { - // Aggregators having different input and ouput column names are unsupported. - final String inputColName = "added"; - final String outputColName = "sum_added"; - DataSourceCompactionConfig compactionConfig = createCompactionConfig( - new DynamicPartitionsSpec(3, null), - Collections.emptyMap(), - new UserCompactionTaskGranularityConfig(null, null, null), - new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)} - ); - CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig( - compactionConfig, - CompactionEngine.NATIVE - ); - Assert.assertFalse(validationResult.isValid()); - Assert.assertEquals( - "Different name[sum_added] and fieldName(s)[[added]] for aggregator unsupported for MSQ engine.", - validationResult.getReason() - ); - } - @Test public void testMSQEngineWithRollupNullWithMetricsSpecIsValid() { From 0b03f01b2046da06c5b3200f2f16dfc4af6e8a8e Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Wed, 24 Jul 2024 17:07:24 +0530 Subject: [PATCH 2/5] Address review comments --- .../msq/indexing/MSQCompactionRunner.java | 88 +++++++++++-------- .../msq/indexing/MSQCompactionRunnerTest.java | 31 ++++--- .../common/task/CompactionRunner.java | 5 +- .../indexing/common/task/CompactionTask.java | 12 +-- .../common/task/NativeCompactionRunner.java | 3 +- .../duty/ITAutoCompactionTest.java | 1 - .../segment/indexing/CombinedDataSchema.java | 65 ++++++++++++++ .../druid/segment/indexing/DataSchema.java | 37 +------- 8 files changed, 149 insertions(+), 93 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java index c5526a0e042a..d5bb07f5f1f5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java @@ -66,6 +66,7 @@ import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.indexing.CombinedDataSchema; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; @@ -126,7 +127,8 @@ public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, @JacksonInjec */ @Override public CompactionConfigValidationResult validateCompactionTask( - CompactionTask compactionTask + CompactionTask compactionTask, + Map intervalToDataSchemaMap ) { List validationResults = new ArrayList<>(); @@ -142,12 +144,58 @@ public CompactionConfigValidationResult validateCompactionTask( )); } validationResults.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(compactionTask.getContext())); + validationResults.add(validateRolledUpSegments(intervalToDataSchemaMap)); return validationResults.stream() .filter(result -> !result.isValid()) .findFirst() .orElse(new CompactionConfigValidationResult(true, null)); } + /** + * Valides that there are no rolled-up segments where either: + *
    + *
  • aggregator factory differs from its combining factory
  • + *
  • input col name is different from the output name (non-idempotent)
  • + *
+ */ + private CompactionConfigValidationResult validateRolledUpSegments(Map intervalToDataSchemaMap) + { + for (Map.Entry intervalDataSchema : intervalToDataSchemaMap.entrySet()) { + if (intervalDataSchema.getValue() instanceof CombinedDataSchema) { + CombinedDataSchema combinedDataSchema = (CombinedDataSchema) intervalDataSchema.getValue(); + if (Boolean.TRUE.equals(combinedDataSchema.hasRolledUpSegments())) { + for (AggregatorFactory aggregatorFactory : combinedDataSchema.getAggregators()) { + // This is a conservative check as existing rollup may have been idempotent but the aggregator provided in + // compaction spec isn't. This would get properly compacted yet fails in the below pre-check. + if ( + !( + aggregatorFactory.getClass().equals(aggregatorFactory.getCombiningFactory().getClass()) && + ( + aggregatorFactory.requiredFields().isEmpty() || + (aggregatorFactory.requiredFields().size() == 1 && + aggregatorFactory.requiredFields() + .get(0) + .equals(aggregatorFactory.getName())) + ) + ) + ) { + // MSQ doesn't support rolling up already rolled-up segments when aggregate column name is different from + // the aggregated column name. This is because the aggregated values would then get overwritten by new + // values and the existing values would be lost. Note that if no rollup is specified in an index spec, + // the default value is true. + return new CompactionConfigValidationResult( + false, + "Rolled-up segments in interval[%s] for compaction not supported by MSQ engine.", + intervalDataSchema.getKey() + ); + } + } + } + } + } + return new CompactionConfigValidationResult(true, null); + } + @Override public CurrentSubTaskHolder getCurrentSubTaskHolder() { @@ -161,41 +209,6 @@ public TaskStatus runCompactionTasks( TaskToolbox taskToolbox ) throws Exception { - for (Map.Entry intervalDataSchema : intervalDataSchemas.entrySet()) { - if (Boolean.valueOf(true).equals(intervalDataSchema.getValue().getHasRolledUpSegments())) { - for (AggregatorFactory aggregatorFactory : intervalDataSchema.getValue().getAggregators()) { - // Don't proceed if either: - // - aggregator factory differs from its combining factory - // - input col name is different from the output name (idempotent) - // This is a conservative check as existing rollup may have been idempotent but the aggregator provided in - // compaction spec isn't. This would get properly compacted yet fails in the below pre-check. - if ( - !( - aggregatorFactory.getClass().equals(aggregatorFactory.getCombiningFactory().getClass()) && - ( - aggregatorFactory.requiredFields().isEmpty() || - (aggregatorFactory.requiredFields().size() == 1 && - aggregatorFactory.requiredFields() - .get(0) - .equals(aggregatorFactory.getName())) - ) - ) - ) { - // MSQ doesn't support rolling up already rolled-up segments when aggregate column name is different from - // the aggregated column name. This is because the aggregated values would then get overwritten by new - // values and the existing values would be lost. Note that if no rollup is specified in an index spec, - // the default value is true. - String errorMsg = StringUtils.format( - "Rolled-up segments in interval[%s] for compaction not supported by MSQ engine.", - intervalDataSchema.getKey() - ); - log.error(errorMsg); - return TaskStatus.failure(compactionTask.getId(), errorMsg); - - } - } - } - } List msqControllerTasks = createMsqControllerTasks(compactionTask, intervalDataSchemas); if (msqControllerTasks.isEmpty()) { @@ -525,8 +538,9 @@ private Map createMSQTaskContext(CompactionTask compactionTask, ); } // Similar to compaction using the native engine, don't finalize aggregations. + // Used for writing the data schema during segment generation phase. context.putIfAbsent(MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS, false); - // Add appropriate finalization to native query context. + // Add appropriate finalization to native query context i.e. for the GroupBy query context.put(QueryContexts.FINALIZE_KEY, false); // Only scalar or array-type dimensions are allowed as grouping keys. context.putIfAbsent(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, false); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java index 745131b56ec1..a0d65f891601 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java @@ -60,9 +60,11 @@ import org.apache.druid.segment.data.CompressionFactory; import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; +import org.apache.druid.segment.indexing.CombinedDataSchema; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.transform.TransformSpec; +import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.joda.time.Interval; import org.junit.Assert; @@ -129,7 +131,7 @@ public void testHashedPartitionsSpecIsInvalid() null, null ); - Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); + Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid()); } @Test @@ -142,7 +144,7 @@ public void testDimensionRangePartitionsSpecIsValid() null, null ); - Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); + Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid()); } @Test @@ -155,7 +157,7 @@ public void testMaxTotalRowsIsInvalid() null, null ); - Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); + Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid()); } @Test @@ -168,7 +170,7 @@ public void testDynamicPartitionsSpecIsValid() null, null ); - Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); + Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid()); } @Test @@ -181,7 +183,7 @@ public void testQueryGranularityAllIsValid() new ClientCompactionTaskGranularitySpec(null, Granularities.ALL, null), null ); - Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); + Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid()); } @Test @@ -194,7 +196,7 @@ public void testRollupFalseWithMetricsSpecIsInValid() new ClientCompactionTaskGranularitySpec(null, null, false), AGGREGATORS.toArray(new AggregatorFactory[0]) ); - Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); + Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid()); } @Test @@ -344,7 +346,7 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess } @Test - public void testIntervalsWithRolledUpSegmentsAndNonIdempotentAggregatorFails() throws Exception + public void testIntervalsWithRolledUpSegmentsAndNonIdempotentAggregatorFails() { final String inputColName = "added"; final String outputColName = "sum_added"; @@ -360,7 +362,7 @@ public void testIntervalsWithRolledUpSegmentsAndNonIdempotentAggregatorFails() t ) } ); - DataSchema dataSchema = new DataSchema( + CombinedDataSchema dataSchema = new CombinedDataSchema( DATA_SOURCE, new TimestampSpec(TIMESTAMP_COLUMN, null, null), new DimensionsSpec(DIMENSIONS), @@ -372,13 +374,16 @@ public void testIntervalsWithRolledUpSegmentsAndNonIdempotentAggregatorFails() t Collections.singletonList(COMPACTION_INTERVAL) ), null, - true, null, - null + null, + true ); - TaskStatus taskStatus = MSQ_COMPACTION_RUNNER.runCompactionTasks(compactionTask, Collections.singletonMap(COMPACTION_INTERVAL, dataSchema), null); - Assert.assertTrue(taskStatus.isFailure()); - Assert.assertEquals(taskStatus.getErrorMsg(), StringUtils.format( + CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask( + compactionTask, + Collections.singletonMap(COMPACTION_INTERVAL, dataSchema) + ); + Assert.assertFalse(validationResult.isValid()); + Assert.assertEquals(validationResult.getReason(), StringUtils.format( "Rolled-up segments in interval[%s] for compaction not supported by MSQ engine.", COMPACTION_INTERVAL )); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java index 8d30a60d04e6..0abaeed8eb27 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java @@ -57,6 +57,9 @@ TaskStatus runCompactionTasks( * Checks if the provided compaction config is supported by the runner. * The same validation is done at {@link org.apache.druid.msq.indexing.MSQCompactionRunner#validateCompactionTask} */ - CompactionConfigValidationResult validateCompactionTask(CompactionTask compactionTask); + CompactionConfigValidationResult validateCompactionTask( + CompactionTask compactionTask, + Map intervalToDataSchemaMap + ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 3f4d8dd1ab2a..804ec87160cb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -77,6 +77,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.incremental.AppendableIndexSpec; +import org.apache.druid.segment.indexing.CombinedDataSchema; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.indexing.granularity.GranularitySpec; @@ -463,7 +464,8 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception ); registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder()); - CompactionConfigValidationResult supportsCompactionConfig = compactionRunner.validateCompactionTask(this); + CompactionConfigValidationResult supportsCompactionConfig = + compactionRunner.validateCompactionTask(this, intervalDataSchemas); if (!supportsCompactionConfig.isValid()) { throw InvalidInput.exception("Compaction spec not supported. Reason[%s].", supportsCompactionConfig.getReason()); } @@ -665,16 +667,16 @@ private static DataSchema createDataSchema( finalMetricsSpec = metricsSpec; } - return new DataSchema( + return new CombinedDataSchema( dataSource, new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null), finalDimensionsSpec, finalMetricsSpec, uniformGranularitySpec, transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null), - existingSegmentAnalyzer.getHasRolledUpSegments(), null, - null + null, + existingSegmentAnalyzer.hasRolledUpSegments() ); } @@ -819,7 +821,7 @@ public Boolean getRollup() return rollup; } - public Boolean getHasRolledUpSegments() + public Boolean hasRolledUpSegments() { return hasRolledUpSegments; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java index 30761b674e54..078ac44d8687 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java @@ -85,7 +85,8 @@ public CurrentSubTaskHolder getCurrentSubTaskHolder() @Override public CompactionConfigValidationResult validateCompactionTask( - CompactionTask compactionTask + CompactionTask compactionTask, + Map intervalToDataSchemaMap ) { return new CompactionConfigValidationResult(true, null); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 50a2814a13ba..e0400d3cce2e 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -1837,7 +1837,6 @@ private void submitCompactionConfig( engine, ImmutableMap.of("maxNumTasks", 2) ); - LOG.info("Submitting compaction config for engine[%s].", engine == null ? CompactionEngine.NATIVE: engine); compactionResource.submitCompactionConfig(compactionConfig); // Wait for compaction config to persist diff --git a/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java b/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java new file mode 100644 index 000000000000..8d0a45be91cd --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.indexing; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.indexing.granularity.GranularitySpec; +import org.apache.druid.segment.transform.TransformSpec; + +import javax.annotation.Nullable; +import java.util.Map; + +public class CombinedDataSchema extends DataSchema +{ + private final Boolean hasRolledUpSegments; + + public CombinedDataSchema( + String dataSource, + @Nullable TimestampSpec timestampSpec, + @Nullable DimensionsSpec dimensionsSpec, + AggregatorFactory[] aggregators, + GranularitySpec granularitySpec, + TransformSpec transformSpec, + @Nullable Map parserMap, + ObjectMapper objectMapper, + @Nullable Boolean hasRolledUpSegments + ) + { + super( + dataSource, + timestampSpec, + dimensionsSpec, + aggregators, + granularitySpec, + transformSpec, + parserMap, + objectMapper + ); + this.hasRolledUpSegments = hasRolledUpSegments; + } + + public Boolean hasRolledUpSegments() + { + return hasRolledUpSegments; + } +} diff --git a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java index 5837868867ea..1e717f5d17b6 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java @@ -66,7 +66,6 @@ public class DataSchema private final GranularitySpec granularitySpec; private final TransformSpec transformSpec; private final Map parserMap; - private final Boolean hasRolledUpSegments; private final ObjectMapper objectMapper; // The below fields can be initialized lazily from parser for backward compatibility. @@ -84,7 +83,6 @@ public DataSchema( @JsonProperty("metricsSpec") AggregatorFactory[] aggregators, @JsonProperty("granularitySpec") GranularitySpec granularitySpec, @JsonProperty("transformSpec") TransformSpec transformSpec, - @JsonProperty("hasRolledUpSegments") @Nullable Boolean hasRolledUpSegments, @Deprecated @JsonProperty("parser") @Nullable Map parserMap, @JacksonInject ObjectMapper objectMapper ) @@ -110,7 +108,6 @@ public DataSchema( } this.transformSpec = transformSpec == null ? TransformSpec.NONE : transformSpec; this.parserMap = parserMap; - this.hasRolledUpSegments = hasRolledUpSegments; this.objectMapper = objectMapper; // Fail-fast if there are output name collisions. Note: because of the pull-from-parser magic in getDimensionsSpec, @@ -126,30 +123,6 @@ public DataSchema( } } - public DataSchema( - @JsonProperty("dataSource") String dataSource, - @JsonProperty("timestampSpec") @Nullable TimestampSpec timestampSpec, // can be null in old task spec - @JsonProperty("dimensionsSpec") @Nullable DimensionsSpec dimensionsSpec, // can be null in old task spec - @JsonProperty("metricsSpec") AggregatorFactory[] aggregators, - @JsonProperty("granularitySpec") GranularitySpec granularitySpec, - @JsonProperty("transformSpec") TransformSpec transformSpec, - @Deprecated @JsonProperty("parser") @Nullable Map parserMap, - @JacksonInject ObjectMapper objectMapper - ) - { - this( - dataSource, - timestampSpec, - dimensionsSpec, - aggregators, - granularitySpec, - transformSpec, - null, - parserMap, - objectMapper - ); - } - @VisibleForTesting public DataSchema( String dataSource, @@ -160,7 +133,7 @@ public DataSchema( TransformSpec transformSpec ) { - this(dataSource, timestampSpec, dimensionsSpec, aggregators, granularitySpec, transformSpec, null, null, null); + this(dataSource, timestampSpec, dimensionsSpec, aggregators, granularitySpec, transformSpec, null, null); } // old constructor for backward compatibility @@ -174,7 +147,7 @@ public DataSchema( ObjectMapper objectMapper ) { - this(dataSource, null, null, aggregators, granularitySpec, transformSpec, null, parserMap, objectMapper); + this(dataSource, null, null, aggregators, granularitySpec, transformSpec, parserMap, objectMapper); } private static void validateDatasourceName(String dataSource) @@ -388,12 +361,6 @@ public InputRowParser getParser() return inputRowParser; } - @Nullable - public Boolean getHasRolledUpSegments() - { - return hasRolledUpSegments; - } - public DataSchema withGranularitySpec(GranularitySpec granularitySpec) { return new DataSchema( From 88cf2a573dbca039b5a92fd09d681201d0e3db33 Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Fri, 26 Jul 2024 17:11:54 +0530 Subject: [PATCH 3/5] Address latest review comments and add update tests with tombstone segments. --- .../apache/druid/msq/exec/ControllerImpl.java | 18 ++- .../msq/indexing/MSQCompactionRunner.java | 2 +- .../apache/druid/msq/exec/MSQReplaceTest.java | 136 +++++++++++++++++- .../msq/indexing/MSQCompactionRunnerTest.java | 2 - .../indexing/common/task/CompactionTask.java | 24 ++-- .../common/task/CompactionTaskTest.java | 48 ++++--- .../segment/indexing/CombinedDataSchema.java | 17 ++- 7 files changed, 201 insertions(+), 46 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index dad80e07b835..d2d5cc657e6a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -20,7 +20,6 @@ package org.apache.druid.msq.exec; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -1696,10 +1695,21 @@ private static Function, Set> addCompactionStateTo if (querySpec.getQuery() instanceof GroupByQuery) { // For group-by queries, the aggregators are transformed to their combining factories in the dataschema, resulting - // in a mismatch between schema in compaction spec and the one in compaction state. Sourcing the metricsSpec - // therefore directly from the querySpec. + // in a mismatch between schema in compaction spec and the one in compaction state. Sourcing the original + // AggregatorFactory definition for aggregators in the dataSchema, therefore, directly from the querySpec. GroupByQuery groupByQuery = (GroupByQuery) querySpec.getQuery(); - metricsSpec = jsonMapper.convertValue(groupByQuery.getAggregatorSpecs(), new TypeReference>() {}); + // Collect all aggregators that are part of the current dataSchema, since a non-rollup query (isRollup() is false) + // moves metrics columns to dimensions in the final schema. + Set aggregatorsInDataSchema = Arrays.stream(dataSchema.getAggregators()) + .map(AggregatorFactory::getName) + .collect( + Collectors.toSet()); + metricsSpec = new ArrayList<>( + groupByQuery.getAggregatorSpecs() + .stream() + .filter(aggregatorFactory -> aggregatorsInDataSchema.contains(aggregatorFactory.getName())) + .collect(Collectors.toList()) + ); } IndexSpec indexSpec = tuningConfig.getIndexSpec(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java index d5bb07f5f1f5..baef816f63d2 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java @@ -163,7 +163,7 @@ private CompactionConfigValidationResult validateRolledUpSegments(Map intervalDataSchema : intervalToDataSchemaMap.entrySet()) { if (intervalDataSchema.getValue() instanceof CombinedDataSchema) { CombinedDataSchema combinedDataSchema = (CombinedDataSchema) intervalDataSchema.getValue(); - if (Boolean.TRUE.equals(combinedDataSchema.hasRolledUpSegments())) { + if (combinedDataSchema.hasRolledUpSegments()) { for (AggregatorFactory aggregatorFactory : combinedDataSchema.getAggregators()) { // This is a conservative check as existing rollup may have been idempotent but the aggregator provided in // compaction spec isn't. This would get properly compacted yet fails in the below pre-check. diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index f6eb80b32826..1e8dc474f6e5 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -49,6 +49,7 @@ import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestTaskActionClient; import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.QueryContext; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -957,6 +958,13 @@ public void testReplaceOnFoo1WithLimit(String contextName, Map c "Using RangeShardSpec to generate segments." ) ) + .setExpectedLastCompactionState(expectedCompactionState( + queryContext, + Collections.singletonList("dim1"), + Collections.singletonList(new StringDimensionSchema("dim1")), + GranularityType.ALL, + Intervals.ETERNITY + )) .verifyResults(); } @@ -1150,7 +1158,6 @@ public void testReplaceOnFoo1RangeClusteredBySubset(String contextName, Map createDataSchemasForIntervals( @Nullable final ClientCompactionTaskTransformSpec transformSpec, @Nullable final AggregatorFactory[] metricsSpec, @Nullable final ClientCompactionTaskGranularitySpec granularitySpec, - final ServiceMetricEvent.Builder metricBuilder + final ServiceMetricEvent.Builder metricBuilder, + CompactionRunner compactionRunner ) throws IOException { final Iterable timelineSegments = retrieveRelevantTimelineHolders( @@ -551,7 +553,8 @@ static Map createDataSchemasForIntervals( metricsSpec, granularitySpec == null ? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null, null) - : granularitySpec.withSegmentGranularity(segmentGranularityToUse) + : granularitySpec.withSegmentGranularity(segmentGranularityToUse), + compactionRunner ); intervalDataSchemaMap.put(interval, dataSchema); } @@ -576,7 +579,8 @@ static Map createDataSchemasForIntervals( dimensionsSpec, transformSpec, metricsSpec, - granularitySpec + granularitySpec, + compactionRunner ); return Collections.singletonMap(segmentProvider.interval, dataSchema); } @@ -606,13 +610,17 @@ private static DataSchema createDataSchema( @Nullable DimensionsSpec dimensionsSpec, @Nullable ClientCompactionTaskTransformSpec transformSpec, @Nullable AggregatorFactory[] metricsSpec, - @Nonnull ClientCompactionTaskGranularitySpec granularitySpec + @Nonnull ClientCompactionTaskGranularitySpec granularitySpec, + @Nullable CompactionRunner compactionRunner ) { // Check index metadata & decide which values to propagate (i.e. carry over) for rollup & queryGranularity final ExistingSegmentAnalyzer existingSegmentAnalyzer = new ExistingSegmentAnalyzer( segments, - true, // Always need rollup to check if there are some rollup segments already present. + // For MSQ, always need rollup to check if there are some rollup segments already present. + compactionRunner == null || compactionRunner instanceof NativeCompactionRunner + ? (granularitySpec.isRollup() == null) + : true, granularitySpec.getQueryGranularity() == null, dimensionsSpec == null, metricsSpec == null @@ -674,8 +682,6 @@ private static DataSchema createDataSchema( finalMetricsSpec, uniformGranularitySpec, transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null), - null, - null, existingSegmentAnalyzer.hasRolledUpSegments() ); } @@ -821,7 +827,7 @@ public Boolean getRollup() return rollup; } - public Boolean hasRolledUpSegments() + public boolean hasRolledUpSegments() { return hasRolledUpSegments; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index f9849b1483df..3a386bc4aa79 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -749,7 +749,8 @@ public void testCreateIngestionSchema() throws IOException null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -810,7 +811,8 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -872,7 +874,8 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -935,7 +938,8 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1005,7 +1009,8 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1055,7 +1060,8 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException null, customMetricsSpec, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1098,7 +1104,8 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1148,7 +1155,8 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); NativeCompactionRunner.createIngestionSpecs( @@ -1178,7 +1186,8 @@ public void testMissingMetadata() throws IOException null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); NativeCompactionRunner.createIngestionSpecs( @@ -1219,7 +1228,8 @@ public void testSegmentGranularityAndNullQueryGranularity() throws IOException null, null, new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null, null), - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1263,7 +1273,8 @@ public void testQueryGranularityAndNullSegmentGranularity() throws IOException null, null, new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null), null), - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( dataSchemasForIntervals, @@ -1308,7 +1319,8 @@ public void testQueryGranularityAndSegmentGranularityNonNull() throws IOExceptio new PeriodGranularity(Period.months(3), null, null), null ), - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1355,7 +1367,8 @@ public void testNullGranularitySpec() throws IOException null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1400,7 +1413,8 @@ public void testGranularitySpecWithNullQueryGranularityAndNullSegmentGranularity null, null, new ClientCompactionTaskGranularitySpec(null, null, null), - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1445,7 +1459,8 @@ public void testGranularitySpecWithNotNullRollup() null, null, new ClientCompactionTaskGranularitySpec(null, null, true), - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1475,7 +1490,8 @@ public void testGranularitySpecWithNullRollup() null, null, new ClientCompactionTaskGranularitySpec(null, null, null), - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( diff --git a/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java b/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java index 8d0a45be91cd..b2cb90bc0cec 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java @@ -19,7 +19,6 @@ package org.apache.druid.segment.indexing; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -27,11 +26,13 @@ import org.apache.druid.segment.transform.TransformSpec; import javax.annotation.Nullable; -import java.util.Map; +/** + * Class representing the combined DataSchema of a set of segments, currently used only by Compaction. + */ public class CombinedDataSchema extends DataSchema { - private final Boolean hasRolledUpSegments; + private final boolean hasRolledUpSegments; public CombinedDataSchema( String dataSource, @@ -40,9 +41,7 @@ public CombinedDataSchema( AggregatorFactory[] aggregators, GranularitySpec granularitySpec, TransformSpec transformSpec, - @Nullable Map parserMap, - ObjectMapper objectMapper, - @Nullable Boolean hasRolledUpSegments + @Nullable boolean hasRolledUpSegments ) { super( @@ -52,13 +51,13 @@ public CombinedDataSchema( aggregators, granularitySpec, transformSpec, - parserMap, - objectMapper + null, + null ); this.hasRolledUpSegments = hasRolledUpSegments; } - public Boolean hasRolledUpSegments() + public boolean hasRolledUpSegments() { return hasRolledUpSegments; } From 496db40e26770ccbdaf5863587ef821462a84c66 Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Mon, 29 Jul 2024 18:22:19 +0530 Subject: [PATCH 4/5] Add coverage test --- .../msq/indexing/MSQCompactionRunner.java | 7 +++---- .../indexing/common/task/CompactionTask.java | 2 +- .../segment/indexing/DataSchemaTest.java | 20 +++++++++++++++++++ 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java index 79792ecc3696..7b4d3235dc0c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java @@ -183,9 +183,8 @@ private CompactionConfigValidationResult validateRolledUpSegments(Map Date: Mon, 29 Jul 2024 21:19:30 +0530 Subject: [PATCH 5/5] Fix test --- .../org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java index 79d6f4e43af0..6c5d19572652 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java @@ -382,7 +382,7 @@ public void testIntervalsWithRolledUpSegmentsAndNonIdempotentAggregatorFails() ); Assert.assertFalse(validationResult.isValid()); Assert.assertEquals(validationResult.getReason(), StringUtils.format( - "Rolled-up segments in interval[%s] for compaction not supported by MSQ engine.", + "MSQ: Rolled-up segments in compaction interval[%s].", COMPACTION_INTERVAL )); }