From e28424ea25871d59d3c8e96cc8a5ea5d666690b3 Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Wed, 4 Sep 2024 16:28:04 +0530 Subject: [PATCH] Enable rollup on multi-value dimensions for compaction with MSQ engine (#16937) Currently compaction with MSQ engine doesn't work for rollup on multi-value dimensions (MVDs), the reason being the default behaviour of grouping on MVD dimensions to unnest the dimension values; for instance grouping on `[s1,s2]` with aggregate `a` will result in two rows: `` and ``. This change enables rollup on MVDs (without unnest) by converting MVDs to Arrays before rollup using virtual columns, and then converting them back to MVDs using post aggregators. If segment schema is available to the compaction task (when it ends up downloading segments to get existing dimensions/metrics/granularity), it selectively does the MVD-Array conversion only for known multi-valued columns; else it conservatively performs this conversion for all `string` columns. --- .../msq/indexing/MSQCompactionRunner.java | 186 +++++++++++++----- .../msq/indexing/MSQCompactionRunnerTest.java | 79 ++++++-- .../indexing/common/task/CompactionTask.java | 67 ++++++- .../common/task/CompactionTaskTest.java | 70 +++++-- .../duty/ITAutoCompactionTest.java | 57 +++++- .../json/wikipedia_index_data1.json | 6 +- .../json/wikipedia_index_data2.json | 4 +- .../json/wikipedia_index_data3.json | 6 +- .../indexer/wikipedia_index_task.json | 1 + .../segment/indexing/CombinedDataSchema.java | 67 +++++++ .../segment/indexing/DataSchemaTest.java | 22 +++ 11 files changed, 461 insertions(+), 104 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java index 5457e04286c9..dc19e33282cf 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java @@ -54,19 +54,21 @@ import org.apache.druid.query.QueryContexts; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.ExpressionPostAggregator; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DimensionSpec; -import org.apache.druid.query.expression.TimestampFloorExprMacro; -import org.apache.druid.query.expression.TimestampParseExprMacro; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.indexing.CombinedDataSchema; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; @@ -82,6 +84,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -92,12 +95,14 @@ public class MSQCompactionRunner implements CompactionRunner private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.ALL; private final ObjectMapper jsonMapper; + private final ExprMacroTable exprMacroTable; private final Injector injector; // Needed as output column name while grouping in the scenario of: // a) no query granularity -- to specify an output name for the time dimension column since __time is a reserved name. // b) custom query granularity -- to create a virtual column containing the rounded-off row timestamp. // In both cases, the new column is converted back to __time later using columnMappings. public static final String TIME_VIRTUAL_COLUMN = "__vTime"; + public static final String ARRAY_VIRTUAL_COLUMN_PREFIX = "__vArray_"; @JsonIgnore private final CurrentSubTaskHolder currentSubTaskHolder = new CurrentSubTaskHolder( @@ -108,9 +113,14 @@ public class MSQCompactionRunner implements CompactionRunner @JsonCreator - public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, @JacksonInject Injector injector) + public MSQCompactionRunner( + @JacksonInject final ObjectMapper jsonMapper, + @JacksonInject final ExprMacroTable exprMacroTable, + @JacksonInject final Injector injector + ) { this.jsonMapper = jsonMapper; + this.exprMacroTable = exprMacroTable; this.injector = injector; } @@ -192,11 +202,12 @@ public List createMsqControllerTasks( Query query; Interval interval = intervalDataSchema.getKey(); DataSchema dataSchema = intervalDataSchema.getValue(); + Map inputColToVirtualCol = getVirtualColumns(dataSchema, interval); if (isGroupBy(dataSchema)) { - query = buildGroupByQuery(compactionTask, interval, dataSchema); + query = buildGroupByQuery(compactionTask, interval, dataSchema, inputColToVirtualCol); } else { - query = buildScanQuery(compactionTask, interval, dataSchema); + query = buildScanQuery(compactionTask, interval, dataSchema, inputColToVirtualCol); } QueryContext compactionTaskContext = new QueryContext(compactionTask.getContext()); @@ -308,7 +319,10 @@ private static RowSignature getRowSignature(DataSchema dataSchema) return rowSignatureBuilder.build(); } - private static List getAggregateDimensions(DataSchema dataSchema) + private static List getAggregateDimensions( + DataSchema dataSchema, + Map inputColToVirtualCol + ) { List dimensionSpecs = new ArrayList<>(); @@ -319,14 +333,22 @@ private static List getAggregateDimensions(DataSchema dataSchema) // The changed granularity would result in a new virtual column that needs to be aggregated upon. dimensionSpecs.add(new DefaultDimensionSpec(TIME_VIRTUAL_COLUMN, TIME_VIRTUAL_COLUMN, ColumnType.LONG)); } - - dimensionSpecs.addAll(dataSchema.getDimensionsSpec().getDimensions().stream() - .map(dim -> new DefaultDimensionSpec( - dim.getName(), - dim.getName(), - dim.getColumnType() - )) - .collect(Collectors.toList())); + // If virtual columns are created from dimensions, replace dimension columns names with virtual column names. + dimensionSpecs.addAll( + dataSchema.getDimensionsSpec().getDimensions().stream() + .map(dim -> { + String dimension = dim.getName(); + ColumnType colType = dim.getColumnType(); + if (inputColToVirtualCol.containsKey(dim.getName())) { + VirtualColumn virtualColumn = inputColToVirtualCol.get(dimension); + dimension = virtualColumn.getOutputName(); + if (virtualColumn instanceof ExpressionVirtualColumn) { + colType = ((ExpressionVirtualColumn) virtualColumn).getOutputType(); + } + } + return new DefaultDimensionSpec(dimension, dimension, colType); + }) + .collect(Collectors.toList())); return dimensionSpecs; } @@ -365,13 +387,19 @@ private static List getOrderBySpec(PartitionsSpec partitionSp return Collections.emptyList(); } - private static Query buildScanQuery(CompactionTask compactionTask, Interval interval, DataSchema dataSchema) + private static Query buildScanQuery( + CompactionTask compactionTask, + Interval interval, + DataSchema dataSchema, + Map inputColToVirtualCol + ) { RowSignature rowSignature = getRowSignature(dataSchema); + VirtualColumns virtualColumns = VirtualColumns.create(new ArrayList<>(inputColToVirtualCol.values())); Druids.ScanQueryBuilder scanQueryBuilder = new Druids.ScanQueryBuilder() .dataSource(dataSchema.getDataSource()) .columns(rowSignature.getColumnNames()) - .virtualColumns(getVirtualColumns(dataSchema, interval)) + .virtualColumns(virtualColumns) .columnTypes(rowSignature.getColumnTypes()) .intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(interval))) .filters(dataSchema.getTransformSpec().getFilter()) @@ -416,51 +444,115 @@ private static boolean isQueryGranularityEmptyOrNone(DataSchema dataSchema) } /** - * Creates a virtual timestamp column to create a new __time field according to the provided queryGranularity, as - * queryGranularity field itself is mandated to be ALL in MSQControllerTask. + * Conditionally creates below virtual columns + *
    + *
  • timestamp column (for custom queryGranularity): converts __time field in line with the provided + * queryGranularity, since the queryGranularity field itself in MSQControllerTask is mandated to be ALL.
  • + *
  • mv_to_array columns (for group-by queries): temporary columns that convert MVD columns to array to enable + * grouping on them without unnesting.
  • + *
*/ - private static VirtualColumns getVirtualColumns(DataSchema dataSchema, Interval interval) + private Map getVirtualColumns(DataSchema dataSchema, Interval interval) { - if (isQueryGranularityEmptyOrNone(dataSchema)) { - return VirtualColumns.EMPTY; + Map inputColToVirtualCol = new HashMap<>(); + if (!isQueryGranularityEmptyOrNone(dataSchema)) { + // Round-off time field according to provided queryGranularity + String timeVirtualColumnExpr; + if (dataSchema.getGranularitySpec() + .getQueryGranularity() + .equals(Granularities.ALL)) { + // For ALL query granularity, all records in a segment are assigned the interval start timestamp of the segment. + // It's the same behaviour in native compaction. + timeVirtualColumnExpr = StringUtils.format("timestamp_parse('%s')", interval.getStart()); + } else { + PeriodGranularity periodQueryGranularity = (PeriodGranularity) dataSchema.getGranularitySpec() + .getQueryGranularity(); + // Round off the __time column according to the required granularity. + timeVirtualColumnExpr = + StringUtils.format( + "timestamp_floor(\"%s\", '%s')", + ColumnHolder.TIME_COLUMN_NAME, + periodQueryGranularity.getPeriod().toString() + ); + } + inputColToVirtualCol.put(ColumnHolder.TIME_COLUMN_NAME, new ExpressionVirtualColumn( + TIME_VIRTUAL_COLUMN, + timeVirtualColumnExpr, + ColumnType.LONG, + exprMacroTable + )); } - String virtualColumnExpr; - if (dataSchema.getGranularitySpec() - .getQueryGranularity() - .equals(Granularities.ALL)) { - // For ALL query granularity, all records in a segment are assigned the interval start timestamp of the segment. - // It's the same behaviour in native compaction. - virtualColumnExpr = StringUtils.format("timestamp_parse('%s')", interval.getStart()); - } else { - PeriodGranularity periodQueryGranularity = (PeriodGranularity) dataSchema.getGranularitySpec() - .getQueryGranularity(); - // Round of the __time column according to the required granularity. - virtualColumnExpr = - StringUtils.format( - "timestamp_floor(\"%s\", '%s')", - ColumnHolder.TIME_COLUMN_NAME, - periodQueryGranularity.getPeriod().toString() - ); + if (isGroupBy(dataSchema)) { + // Convert MVDs to arrays for grouping to avoid unnest, assuming all string cols to be MVDs. + Set multiValuedColumns = dataSchema.getDimensionsSpec() + .getDimensions() + .stream() + .filter(dim -> dim.getColumnType().equals(ColumnType.STRING)) + .map(DimensionSchema::getName) + .collect(Collectors.toSet()); + if (dataSchema instanceof CombinedDataSchema && + ((CombinedDataSchema) dataSchema).getMultiValuedDimensions() != null) { + // Filter actual MVDs from schema info. + Set multiValuedColumnsFromSchema = + ((CombinedDataSchema) dataSchema).getMultiValuedDimensions(); + multiValuedColumns = multiValuedColumns.stream() + .filter(multiValuedColumnsFromSchema::contains) + .collect(Collectors.toSet()); + } + + for (String dim : multiValuedColumns) { + String virtualColumnExpr = StringUtils.format("mv_to_array(\"%s\")", dim); + inputColToVirtualCol.put( + dim, + new ExpressionVirtualColumn( + ARRAY_VIRTUAL_COLUMN_PREFIX + dim, + virtualColumnExpr, + ColumnType.STRING_ARRAY, + exprMacroTable + ) + ); + } } - return VirtualColumns.create(new ExpressionVirtualColumn( - TIME_VIRTUAL_COLUMN, - virtualColumnExpr, - ColumnType.LONG, - new ExprMacroTable(ImmutableList.of(new TimestampFloorExprMacro(), new TimestampParseExprMacro())) - )); + return inputColToVirtualCol; } - private static Query buildGroupByQuery(CompactionTask compactionTask, Interval interval, DataSchema dataSchema) + private Query buildGroupByQuery( + CompactionTask compactionTask, + Interval interval, + DataSchema dataSchema, + Map inputColToVirtualCol + ) { DimFilter dimFilter = dataSchema.getTransformSpec().getFilter(); + VirtualColumns virtualColumns = VirtualColumns.create(new ArrayList<>(inputColToVirtualCol.values())); + + // Convert MVDs converted to arrays back to MVDs, with the same name as the input column. + // This is safe since input column names no longer exist at post-aggregation stage. + List postAggregators = + inputColToVirtualCol.entrySet() + .stream() + .filter(entry -> !entry.getKey().equals(ColumnHolder.TIME_COLUMN_NAME)) + .map( + entry -> + new ExpressionPostAggregator( + entry.getKey(), + StringUtils.format("array_to_mv(\"%s\")", entry.getValue().getOutputName()), + null, + ColumnType.STRING, + exprMacroTable + ) + ) + .collect(Collectors.toList()); + GroupByQuery.Builder builder = new GroupByQuery.Builder() .setDataSource(new TableDataSource(compactionTask.getDataSource())) - .setVirtualColumns(getVirtualColumns(dataSchema, interval)) + .setVirtualColumns(virtualColumns) .setDimFilter(dimFilter) .setGranularity(new AllGranularity()) - .setDimensions(getAggregateDimensions(dataSchema)) + .setDimensions(getAggregateDimensions(dataSchema, inputColToVirtualCol)) .setAggregatorSpecs(Arrays.asList(dataSchema.getAggregators())) + .setPostAggregatorSpecs(postAggregators) .setContext(compactionTask.getContext()) .setInterval(interval); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java index 09c5ae477182..34092d061b2a 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java @@ -20,7 +20,6 @@ package org.apache.druid.msq.indexing; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -44,7 +43,6 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.GranularityType; -import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; import org.apache.druid.msq.util.MultiStageQueryContext; @@ -52,19 +50,27 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; -import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.data.CompressionFactory; import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; +import org.apache.druid.segment.indexing.CombinedDataSchema; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; import org.joda.time.Interval; import org.junit.Assert; import org.junit.BeforeClass; @@ -73,8 +79,10 @@ import javax.annotation.Nullable; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -83,20 +91,25 @@ public class MSQCompactionRunnerTest private static final String DATA_SOURCE = "dataSource"; private static final Interval COMPACTION_INTERVAL = Intervals.of("2017-01-01/2017-07-01"); - private static final String TIMESTAMP_COLUMN = "timestamp"; + private static final String TIMESTAMP_COLUMN = ColumnHolder.TIME_COLUMN_NAME; private static final int TARGET_ROWS_PER_SEGMENT = 100000; private static final GranularityType SEGMENT_GRANULARITY = GranularityType.HOUR; private static final GranularityType QUERY_GRANULARITY = GranularityType.HOUR; private static List PARTITION_DIMENSIONS; - private static final StringDimensionSchema STRING_DIMENSION = new StringDimensionSchema("string_dim", null, null); + private static final StringDimensionSchema STRING_DIMENSION = new StringDimensionSchema("string_dim", null, false); + private static final StringDimensionSchema MV_STRING_DIMENSION = new StringDimensionSchema("mv_string_dim", null, null); private static final LongDimensionSchema LONG_DIMENSION = new LongDimensionSchema("long_dim"); - private static final List DIMENSIONS = ImmutableList.of(STRING_DIMENSION, LONG_DIMENSION); + private static final List DIMENSIONS = ImmutableList.of( + STRING_DIMENSION, + LONG_DIMENSION, + MV_STRING_DIMENSION + ); private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); private static final AggregatorFactory AGG1 = new CountAggregatorFactory("agg_0"); private static final AggregatorFactory AGG2 = new LongSumAggregatorFactory("sum_added", "sum_added"); private static final List AGGREGATORS = ImmutableList.of(AGG1, AGG2); - private static final MSQCompactionRunner MSQ_COMPACTION_RUNNER = new MSQCompactionRunner(JSON_MAPPER, null); + private static final MSQCompactionRunner MSQ_COMPACTION_RUNNER = new MSQCompactionRunner(JSON_MAPPER, TestExprMacroTable.INSTANCE, null); @BeforeClass public static void setupClass() @@ -110,11 +123,6 @@ public static void setupClass() ); PARTITION_DIMENSIONS = Collections.singletonList(stringDimensionSchema.getName()); - - JSON_MAPPER.setInjectableValues(new InjectableValues.Std().addValue( - ExprMacroTable.class, - LookupEnabledTestExprMacroTable.INSTANCE - )); } @Test @@ -296,7 +304,10 @@ public void testMSQControllerTaskSpecWithScanIsValid() throws JsonProcessingExce actualMSQSpec.getDestination() ); - Assert.assertEquals(dimFilter, actualMSQSpec.getQuery().getFilter()); + Assert.assertTrue(actualMSQSpec.getQuery() instanceof ScanQuery); + ScanQuery scanQuery = (ScanQuery) actualMSQSpec.getQuery(); + + Assert.assertEquals(dimFilter, scanQuery.getFilter()); Assert.assertEquals( JSON_MAPPER.writeValueAsString(SEGMENT_GRANULARITY.toString()), msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY) @@ -305,7 +316,7 @@ public void testMSQControllerTaskSpecWithScanIsValid() throws JsonProcessingExce Assert.assertEquals(WorkerAssignmentStrategy.MAX, actualMSQSpec.getAssignmentStrategy()); Assert.assertEquals( PARTITION_DIMENSIONS.stream().map(OrderBy::ascending).collect(Collectors.toList()), - ((ScanQuery) actualMSQSpec.getQuery()).getOrderBys() + scanQuery.getOrderBys() ); } @@ -322,7 +333,10 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess null ); - DataSchema dataSchema = new DataSchema( + Set multiValuedDimensions = new HashSet<>(); + multiValuedDimensions.add(MV_STRING_DIMENSION.getName()); + + CombinedDataSchema dataSchema = new CombinedDataSchema( DATA_SOURCE, new TimestampSpec(TIMESTAMP_COLUMN, null, null), new DimensionsSpec(DIMENSIONS), @@ -332,7 +346,8 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess QUERY_GRANULARITY.getDefaultGranularity(), Collections.singletonList(COMPACTION_INTERVAL) ), - new TransformSpec(dimFilter, Collections.emptyList()) + new TransformSpec(dimFilter, Collections.emptyList()), + multiValuedDimensions ); @@ -367,7 +382,10 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess actualMSQSpec.getDestination() ); - Assert.assertEquals(dimFilter, actualMSQSpec.getQuery().getFilter()); + Assert.assertTrue(actualMSQSpec.getQuery() instanceof GroupByQuery); + GroupByQuery groupByQuery = (GroupByQuery) actualMSQSpec.getQuery(); + + Assert.assertEquals(dimFilter, groupByQuery.getFilter()); Assert.assertEquals( JSON_MAPPER.writeValueAsString(SEGMENT_GRANULARITY.toString()), msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY) @@ -377,6 +395,31 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY) ); Assert.assertEquals(WorkerAssignmentStrategy.MAX, actualMSQSpec.getAssignmentStrategy()); + + + // Since only MV_STRING_DIMENSION is indicated to be MVD by the CombinedSchema, conversion to array should happen + // only for that column. + List expectedDimensionSpec = DIMENSIONS.stream() + .filter(dim -> !MV_STRING_DIMENSION.getName() + .equals(dim.getName())) + .map(dim -> new DefaultDimensionSpec( + dim.getName(), + dim.getName(), + dim.getColumnType() + )) + .collect( + Collectors.toList()); + expectedDimensionSpec.add( + new DefaultDimensionSpec(MSQCompactionRunner.TIME_VIRTUAL_COLUMN, + MSQCompactionRunner.TIME_VIRTUAL_COLUMN, + ColumnType.LONG) + ); + String mvToArrayStringDim = MSQCompactionRunner.ARRAY_VIRTUAL_COLUMN_PREFIX + MV_STRING_DIMENSION.getName(); + expectedDimensionSpec.add(new DefaultDimensionSpec(mvToArrayStringDim, mvToArrayStringDim, ColumnType.STRING_ARRAY)); + MatcherAssert.assertThat( + expectedDimensionSpec, + Matchers.containsInAnyOrder(groupByQuery.getDimensions().toArray(new DimensionSpec[0])) + ); } private CompactionTask createCompactionTask( @@ -408,7 +451,7 @@ private CompactionTask createCompactionTask( .transformSpec(transformSpec) .granularitySpec(granularitySpec) .metricsSpec(metricsSpec) - .compactionRunner(new MSQCompactionRunner(JSON_MAPPER, null)) + .compactionRunner(MSQ_COMPACTION_RUNNER) .context(context); return builder.build(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index ab7736b047e1..73c8a35405c4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -77,8 +77,10 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.incremental.AppendableIndexSpec; +import org.apache.druid.segment.indexing.CombinedDataSchema; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.indexing.granularity.GranularitySpec; @@ -462,7 +464,8 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception transformSpec, metricsSpec, granularitySpec, - getMetricBuilder() + getMetricBuilder(), + !(compactionRunner instanceof NativeCompactionRunner) ); registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder()); @@ -488,7 +491,8 @@ static Map createDataSchemasForIntervals( @Nullable final ClientCompactionTaskTransformSpec transformSpec, @Nullable final AggregatorFactory[] metricsSpec, @Nullable final ClientCompactionTaskGranularitySpec granularitySpec, - final ServiceMetricEvent.Builder metricBuilder + final ServiceMetricEvent.Builder metricBuilder, + boolean needMultiValuedColumns ) throws IOException { final Iterable timelineSegments = retrieveRelevantTimelineHolders( @@ -552,7 +556,8 @@ static Map createDataSchemasForIntervals( metricsSpec, granularitySpec == null ? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null, null) - : granularitySpec.withSegmentGranularity(segmentGranularityToUse) + : granularitySpec.withSegmentGranularity(segmentGranularityToUse), + needMultiValuedColumns ); intervalDataSchemaMap.put(interval, dataSchema); } @@ -577,7 +582,8 @@ static Map createDataSchemasForIntervals( dimensionsSpec, transformSpec, metricsSpec, - granularitySpec + granularitySpec, + needMultiValuedColumns ); return Collections.singletonMap(segmentProvider.interval, dataSchema); } @@ -607,7 +613,8 @@ private static DataSchema createDataSchema( @Nullable DimensionsSpec dimensionsSpec, @Nullable ClientCompactionTaskTransformSpec transformSpec, @Nullable AggregatorFactory[] metricsSpec, - @Nonnull ClientCompactionTaskGranularitySpec granularitySpec + @Nonnull ClientCompactionTaskGranularitySpec granularitySpec, + boolean needMultiValuedColumns ) { // Check index metadata & decide which values to propagate (i.e. carry over) for rollup & queryGranularity @@ -616,7 +623,8 @@ private static DataSchema createDataSchema( granularitySpec.isRollup() == null, granularitySpec.getQueryGranularity() == null, dimensionsSpec == null, - metricsSpec == null + metricsSpec == null, + needMultiValuedColumns ); final Stopwatch stopwatch = Stopwatch.createStarted(); @@ -668,13 +676,14 @@ private static DataSchema createDataSchema( finalMetricsSpec = metricsSpec; } - return new DataSchema( + return new CombinedDataSchema( dataSource, new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null), finalDimensionsSpec, finalMetricsSpec, uniformGranularitySpec, - transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null) + transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null), + existingSegmentAnalyzer.getMultiValuedDimensions() ); } @@ -748,6 +757,7 @@ static class ExistingSegmentAnalyzer private final boolean needQueryGranularity; private final boolean needDimensionsSpec; private final boolean needMetricsSpec; + private final boolean needMultiValuedDimensions; // For processRollup: private boolean rollup = true; @@ -761,13 +771,15 @@ static class ExistingSegmentAnalyzer // For processMetricsSpec: private final Set> aggregatorFactoryLists = new HashSet<>(); + private Set multiValuedDimensions; ExistingSegmentAnalyzer( final Iterable>>> segmentsIterable, final boolean needRollup, final boolean needQueryGranularity, final boolean needDimensionsSpec, - final boolean needMetricsSpec + final boolean needMetricsSpec, + final boolean needMultiValuedDimensions ) { this.segmentsIterable = segmentsIterable; @@ -775,15 +787,26 @@ static class ExistingSegmentAnalyzer this.needQueryGranularity = needQueryGranularity; this.needDimensionsSpec = needDimensionsSpec; this.needMetricsSpec = needMetricsSpec; + this.needMultiValuedDimensions = needMultiValuedDimensions; + } + + private boolean shouldFetchSegments() + { + // Don't fetch segments for just needMultiValueDimensions + return needRollup || needQueryGranularity || needDimensionsSpec || needMetricsSpec; } public void fetchAndProcessIfNeeded() { - if (!needRollup && !needQueryGranularity && !needDimensionsSpec && !needMetricsSpec) { + if (!shouldFetchSegments()) { // Nothing to do; short-circuit and don't fetch segments. return; } + if (needMultiValuedDimensions) { + multiValuedDimensions = new HashSet<>(); + } + final List>>> segments = sortSegmentsListNewestFirst(); for (Pair>> segmentPair : segments) { @@ -804,6 +827,7 @@ public void fetchAndProcessIfNeeded() processQueryGranularity(index); processDimensionsSpec(index); processMetricsSpec(index); + processMultiValuedDimensions(index); } } } @@ -890,6 +914,11 @@ public AggregatorFactory[] getMetricsSpec() return mergedAggregators; } + public Set getMultiValuedDimensions() + { + return multiValuedDimensions; + } + /** * Sort {@link #segmentsIterable} in order, such that we look at later segments prior to earlier ones. Useful when * analyzing dimensions, as it allows us to take the latest value we see, and therefore prefer types from more @@ -983,6 +1012,24 @@ private void processMetricsSpec(final QueryableIndex index) } } + private void processMultiValuedDimensions(final QueryableIndex index) + { + if (!needMultiValuedDimensions) { + return; + } + for (String dimension : index.getAvailableDimensions()) { + if (isMultiValuedDimension(index, dimension)) { + multiValuedDimensions.add(dimension); + } + } + } + + private boolean isMultiValuedDimension(final QueryableIndex index, final String col) + { + ColumnCapabilities columnCapabilities = index.getColumnCapabilities(col); + return columnCapabilities != null && columnCapabilities.hasMultipleValues().isTrue(); + } + static Granularity compareWithCurrent(Granularity queryGranularity, Granularity current) { if (queryGranularity == null && current != null) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 84a1457ad914..c1bf649980f6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -116,6 +116,7 @@ import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.BatchIOConfig; +import org.apache.druid.segment.indexing.CombinedDataSchema; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; @@ -749,7 +750,8 @@ public void testCreateIngestionSchema() throws IOException null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -810,7 +812,8 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -872,7 +875,8 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -935,7 +939,8 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1005,7 +1010,8 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1055,7 +1061,8 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException null, customMetricsSpec, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1098,7 +1105,8 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1148,7 +1156,8 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); NativeCompactionRunner.createIngestionSpecs( @@ -1178,7 +1187,8 @@ public void testMissingMetadata() throws IOException null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); NativeCompactionRunner.createIngestionSpecs( @@ -1219,7 +1229,8 @@ public void testSegmentGranularityAndNullQueryGranularity() throws IOException null, null, new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null, null), - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1263,7 +1274,8 @@ public void testQueryGranularityAndNullSegmentGranularity() throws IOException null, null, new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null), null), - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( dataSchemasForIntervals, @@ -1308,7 +1320,8 @@ public void testQueryGranularityAndSegmentGranularityNonNull() throws IOExceptio new PeriodGranularity(Period.months(3), null, null), null ), - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1355,7 +1368,8 @@ public void testNullGranularitySpec() throws IOException null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1400,7 +1414,8 @@ public void testGranularitySpecWithNullQueryGranularityAndNullSegmentGranularity null, null, new ClientCompactionTaskGranularitySpec(null, null, null), - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1445,7 +1460,8 @@ public void testGranularitySpecWithNotNullRollup() null, null, new ClientCompactionTaskGranularitySpec(null, null, true), - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1475,7 +1491,8 @@ public void testGranularitySpecWithNullRollup() null, null, new ClientCompactionTaskGranularitySpec(null, null, null), - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1495,6 +1512,27 @@ public void testGranularitySpecWithNullRollup() } } + @Test + public void testMultiValuedDimensionsProcessing() + throws IOException + { + final Map dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals( + toolbox, + LockGranularity.TIME_CHUNK, + new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), + null, + null, + null, + new ClientCompactionTaskGranularitySpec(null, null, null), + METRIC_BUILDER, + true + ); + for (DataSchema dataSchema : dataSchemasForIntervals.values()) { + Assert.assertTrue(dataSchema instanceof CombinedDataSchema); + Assert.assertTrue(((CombinedDataSchema) dataSchema).getMultiValuedDimensions().isEmpty()); + } + } + @Test public void testChooseFinestGranularityWithNulls() { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 31a6bccffc72..e95d09bd5082 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -526,7 +526,7 @@ public void testAutoCompactionPreservesCreateBitmapIndexInDimensionSchema(Compac try (final Closeable ignored = unloader(fullDatasourceName)) { final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); intervalsBeforeCompaction.sort(null); - // 4 segments across 2 days (4 total)... + // 4 segments across 2 days (4 total) verifySegmentsCount(4); verifyQuery(INDEX_QUERIES_RESOURCE); @@ -548,13 +548,59 @@ public void testAutoCompactionPreservesCreateBitmapIndexInDimensionSchema(Compac true, engine ); - //...compacted into 1 segment for the entire year. + // Compacted into 1 segment for the entire year. forceTriggerAutoCompaction(1); verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); verifySegmentsCompactedDimensionSchema(dimensionSchemas); } } + @Test(dataProvider = "engine") + public void testAutoCompactionRollsUpMultiValueDimensionsWithoutUnnest(CompactionEngine engine) throws Exception + { + loadData(INDEX_TASK); + try (final Closeable ignored = unloader(fullDatasourceName)) { + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + intervalsBeforeCompaction.sort(null); + // 4 segments across 2 days (4 total) + verifySegmentsCount(4); + verifyQuery(INDEX_QUERIES_RESOURCE); + + LOG.info("Auto compaction test with YEAR segment granularity, DAY query granularity, dropExisting is true"); + + List dimensionSchemas = ImmutableList.of( + new StringDimensionSchema("language", null, true), + new StringDimensionSchema("tags", DimensionSchema.MultiValueHandling.SORTED_ARRAY, true) + ); + + submitCompactionConfig( + MAX_ROWS_PER_SEGMENT_COMPACTED, + NO_SKIP_OFFSET, + new UserCompactionTaskGranularityConfig(Granularities.YEAR, Granularities.DAY, true), + new UserCompactionTaskDimensionsConfig(dimensionSchemas), + null, + new AggregatorFactory[] {new LongSumAggregatorFactory("added", "added")}, + true, + engine + ); + // Compacted into 1 segment for the entire year. + forceTriggerAutoCompaction(1); + Map queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", + "added", + "%%EXPECTED_COUNT_RESULT%%", + 1, + "%%EXPECTED_SCAN_RESULT%%", + ImmutableList.of( + ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(516))) + ) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); + verifySegmentsCompactedDimensionSchema(dimensionSchemas); + } + } + @Test public void testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception { @@ -636,10 +682,11 @@ public void testAutoCompactionDutyCanUpdateCompactionConfig(CompactionEngine eng final HashedPartitionsSpec hashedPartitionsSpec = new HashedPartitionsSpec(null, 3, null); submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null, null, null, null, false, engine); - // 2 segments published per day after compaction. - forceTriggerAutoCompaction(4); + // 3 segments for both 2013-08-31 and 2013-09-01. (Note that numShards guarantees max shards but not exact + // number of final shards, since some shards may end up empty.) + forceTriggerAutoCompaction(6); verifyQuery(INDEX_QUERIES_RESOURCE); - verifySegmentsCompacted(hashedPartitionsSpec, 4); + verifySegmentsCompacted(hashedPartitionsSpec, 6); checkCompactionIntervals(intervalsBeforeCompaction); } diff --git a/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data1.json b/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data1.json index 169796cd7468..30fef106aafb 100644 --- a/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data1.json +++ b/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data1.json @@ -1,3 +1,3 @@ -{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143} -{"timestamp": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330} -{"timestamp": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111} \ No newline at end of file +{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "tags": ["t1", "t2"], "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143} +{"timestamp": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : "en", "tags": ["t1", "t2"], "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330} +{"timestamp": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "tags": ["t3", "t4", "t5"], "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111} \ No newline at end of file diff --git a/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data2.json b/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data2.json index 62e270113d51..d13b6a37bbcc 100644 --- a/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data2.json +++ b/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data2.json @@ -1,3 +1,3 @@ -{"timestamp": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900} +{"timestamp": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "tags": ["t6"], "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900} {"timestamp": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9} -{"timestamp": "2013-09-01T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143} \ No newline at end of file +{"timestamp": "2013-09-01T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "tags": ["t1", "t2"], "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143} \ No newline at end of file diff --git a/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data3.json b/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data3.json index 28e0762f84cf..a8f5c2ec292e 100644 --- a/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data3.json +++ b/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data3.json @@ -1,4 +1,4 @@ -{"timestamp": "2013-09-01T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330} -{"timestamp": "2013-09-01T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111} -{"timestamp": "2013-09-01T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900} +{"timestamp": "2013-09-01T03:32:45Z", "page": "Striker Eureka", "language" : "en", "tags": ["t1", "t2"], "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330} +{"timestamp": "2013-09-01T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "tags": ["t3", "t4", "t5"], "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111} +{"timestamp": "2013-09-01T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "tags": ["t6"], "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900} {"timestamp": "2013-09-01T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9} \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json index 00bf7721f2b7..00005c7b65aa 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json @@ -11,6 +11,7 @@ "dimensions": [ "page", {"type": "string", "name": "language", "createBitmapIndex": false}, + "tags", "user", "unpatrolled", "newPage", diff --git a/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java b/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java new file mode 100644 index 000000000000..14deba7725af --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.indexing; + +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.indexing.granularity.GranularitySpec; +import org.apache.druid.segment.transform.TransformSpec; + +import javax.annotation.Nullable; +import java.util.Set; + +/** + * Class representing the combined DataSchema of a set of segments, currently used only by Compaction. + */ +public class CombinedDataSchema extends DataSchema +{ + private final Set multiValuedDimensions; + + public CombinedDataSchema( + String dataSource, + @Nullable TimestampSpec timestampSpec, + @Nullable DimensionsSpec dimensionsSpec, + AggregatorFactory[] aggregators, + GranularitySpec granularitySpec, + TransformSpec transformSpec, + @Nullable Set multiValuedDimensions + ) + { + super( + dataSource, + timestampSpec, + dimensionsSpec, + aggregators, + granularitySpec, + transformSpec, + null, + null + ); + this.multiValuedDimensions = multiValuedDimensions; + } + + @Nullable + public Set getMultiValuedDimensions() + { + return multiValuedDimensions; + } + +} diff --git a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java index 32b2a3830fdf..90297dd4af9d 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java @@ -779,4 +779,26 @@ public void testWithDimensionSpec() Assert.assertSame(oldSchema.getParserMap(), newSchema.getParserMap()); } + + @Test + public void testCombinedDataSchemaSetsMultiValuedColumnsInfo() + { + Set multiValuedDimensions = ImmutableSet.of("dimA"); + + CombinedDataSchema schema = new CombinedDataSchema( + IdUtilsTest.VALID_ID_CHARS, + new TimestampSpec("time", "auto", null), + DimensionsSpec.builder() + .setDimensions( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimA", "dimB", "metric1")) + ) + .setDimensionExclusions(ImmutableList.of("dimC")) + .build(), + null, + new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))), + null, + multiValuedDimensions + ); + Assert.assertEquals(ImmutableSet.of("dimA"), schema.getMultiValuedDimensions()); + } }