From fb7103cceff52944c29ca365e56b3ebb23d2e5be Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Tue, 20 Aug 2024 10:30:24 +0530 Subject: [PATCH] Change dimensionToSchemaMap to dimensionSchemas and override ARRAY_INGEST_MODE to array (#16909) A follow-up PR for #16864. Just renames dimensionToSchemaMap to dimensionSchemas and always overrides ARRAY_INGEST_MODE context value to array for MSQ compaction. --- .../apache/druid/msq/exec/ControllerImpl.java | 20 +++++++++---------- .../msq/indexing/MSQCompactionRunner.java | 3 ++- .../destination/DataSourceMSQDestination.java | 16 +++++++-------- .../DataSourceMSQDestinationTest.java | 2 +- 4 files changed, 21 insertions(+), 20 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 278a85685dd1..4310619f81f1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1951,7 +1951,7 @@ private static DataSchema makeDataSchemaForIngestion( columnMappings, isRollupQuery, querySpec.getQuery(), - destination.getDimensionToSchemaMap() + destination.getDimensionSchemas() ); return new DataSchema( @@ -2127,11 +2127,11 @@ private static DimensionSchema getDimensionSchema( final String outputColumnName, @Nullable final ColumnType queryType, QueryContext context, - @Nullable Map dimensionToSchemaMap + @Nullable Map dimensionSchemas ) { - if (dimensionToSchemaMap != null && dimensionToSchemaMap.containsKey(outputColumnName)) { - return dimensionToSchemaMap.get(outputColumnName); + if (dimensionSchemas != null && dimensionSchemas.containsKey(outputColumnName)) { + return dimensionSchemas.get(outputColumnName); } // In case of ingestion, or when metrics are converted to dimensions when compaction is performed without rollup, // we won't have an entry in the map. For those cases, use the default config. @@ -2150,7 +2150,7 @@ private static Pair, List> makeDimensio final ColumnMappings columnMappings, final boolean isRollupQuery, final Query query, - @Nullable final Map dimensionToSchemaMap + @Nullable final Map dimensionSchemas ) { // Log a warning unconditionally if arrayIngestMode is MVD, since the behaviour is incorrect, and is subject to @@ -2237,13 +2237,13 @@ private static Pair, List> makeDimensio outputColumnName, type, query.context(), - dimensionToSchemaMap + dimensionSchemas ); } else { // complex columns only if (DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.containsKey(type.getComplexTypeName())) { dimensions.add( - getDimensionSchema(outputColumnName, type, query.context(), dimensionToSchemaMap) + getDimensionSchema(outputColumnName, type, query.context(), dimensionSchemas) ); } else if (!isRollupQuery) { aggregators.add(new PassthroughAggregatorFactory(outputColumnName, type.getComplexTypeName())); @@ -2255,7 +2255,7 @@ private static Pair, List> makeDimensio outputColumnName, type, query.context(), - dimensionToSchemaMap + dimensionSchemas ); } } @@ -2283,14 +2283,14 @@ private static void populateDimensionsAndAggregators( String outputColumn, ColumnType type, QueryContext context, - Map dimensionToSchemaMap + Map dimensionSchemas ) { if (outputColumnAggregatorFactories.containsKey(outputColumn)) { aggregators.add(outputColumnAggregatorFactories.get(outputColumn)); } else { dimensions.add( - getDimensionSchema(outputColumn, type, context, dimensionToSchemaMap) + getDimensionSchema(outputColumn, type, context, dimensionSchemas) ); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java index aeb91a6c2538..fa011429763f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java @@ -503,7 +503,8 @@ private Map createMSQTaskContext(CompactionTask compactionTask, context.putIfAbsent(QueryContexts.FINALIZE_KEY, false); // Only scalar or array-type dimensions are allowed as grouping keys. context.putIfAbsent(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, false); - context.putIfAbsent(MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, "array"); + // Always override CTX_ARRAY_INGEST_MODE since it can otherwise lead to mixed ARRAY and MVD types for a column. + context.put(MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, "array"); return context; } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java index 74be1329467b..6276c588d9d3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java @@ -52,7 +52,7 @@ public class DataSourceMSQDestination implements MSQDestination private final List replaceTimeChunks; @Nullable - private final Map dimensionToSchemaMap; + private final Map dimensionSchemas; @JsonCreator public DataSourceMSQDestination( @@ -60,14 +60,14 @@ public DataSourceMSQDestination( @JsonProperty("segmentGranularity") Granularity segmentGranularity, @JsonProperty("segmentSortOrder") @Nullable List segmentSortOrder, @JsonProperty("replaceTimeChunks") @Nullable List replaceTimeChunks, - @JsonProperty("dimensionToSchemaMap") @Nullable Map dimensionToSchemaMap + @JsonProperty("dimensionSchemas") @Nullable Map dimensionSchemas ) { this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); this.segmentGranularity = Preconditions.checkNotNull(segmentGranularity, "segmentGranularity"); this.segmentSortOrder = segmentSortOrder != null ? segmentSortOrder : Collections.emptyList(); this.replaceTimeChunks = replaceTimeChunks; - this.dimensionToSchemaMap = dimensionToSchemaMap; + this.dimensionSchemas = dimensionSchemas; if (replaceTimeChunks != null) { // Verify that if replaceTimeChunks is provided, it is nonempty. @@ -138,9 +138,9 @@ public List getReplaceTimeChunks() @Nullable @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) - public Map getDimensionToSchemaMap() + public Map getDimensionSchemas() { - return dimensionToSchemaMap; + return dimensionSchemas; } /** @@ -177,13 +177,13 @@ public boolean equals(Object o) && Objects.equals(segmentGranularity, that.segmentGranularity) && Objects.equals(segmentSortOrder, that.segmentSortOrder) && Objects.equals(replaceTimeChunks, that.replaceTimeChunks) - && Objects.equals(dimensionToSchemaMap, that.dimensionToSchemaMap); + && Objects.equals(dimensionSchemas, that.dimensionSchemas); } @Override public int hashCode() { - return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks, dimensionToSchemaMap); + return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks, dimensionSchemas); } @Override @@ -194,7 +194,7 @@ public String toString() ", segmentGranularity=" + segmentGranularity + ", segmentSortOrder=" + segmentSortOrder + ", replaceTimeChunks=" + replaceTimeChunks + - ", dimensionToSchemaMap=" + dimensionToSchemaMap + + ", dimensionSchemas=" + dimensionSchemas + '}'; } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java index 242c00213e26..331430181a95 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java @@ -35,7 +35,7 @@ public class DataSourceMSQDestinationTest public void testEquals() { EqualsVerifier.forClass(DataSourceMSQDestination.class) - .withNonnullFields("dataSource", "segmentGranularity", "segmentSortOrder", "dimensionToSchemaMap") + .withNonnullFields("dataSource", "segmentGranularity", "segmentSortOrder", "dimensionSchemas") .withPrefabValues( Map.class, ImmutableMap.of(