Skip to content

Commit

Permalink
Change dimensionToSchemaMap to dimensionSchemas and override ARRAY_IN…
Browse files Browse the repository at this point in the history
…GEST_MODE to array (#16909)

A follow-up PR for #16864. Just renames dimensionToSchemaMap to dimensionSchemas and always overrides ARRAY_INGEST_MODE context value to array for MSQ compaction.
  • Loading branch information
gargvishesh authored Aug 20, 2024
1 parent 2198001 commit fb7103c
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1951,7 +1951,7 @@ private static DataSchema makeDataSchemaForIngestion(
columnMappings,
isRollupQuery,
querySpec.getQuery(),
destination.getDimensionToSchemaMap()
destination.getDimensionSchemas()
);

return new DataSchema(
Expand Down Expand Up @@ -2127,11 +2127,11 @@ private static DimensionSchema getDimensionSchema(
final String outputColumnName,
@Nullable final ColumnType queryType,
QueryContext context,
@Nullable Map<String, DimensionSchema> dimensionToSchemaMap
@Nullable Map<String, DimensionSchema> dimensionSchemas
)
{
if (dimensionToSchemaMap != null && dimensionToSchemaMap.containsKey(outputColumnName)) {
return dimensionToSchemaMap.get(outputColumnName);
if (dimensionSchemas != null && dimensionSchemas.containsKey(outputColumnName)) {
return dimensionSchemas.get(outputColumnName);
}
// In case of ingestion, or when metrics are converted to dimensions when compaction is performed without rollup,
// we won't have an entry in the map. For those cases, use the default config.
Expand All @@ -2150,7 +2150,7 @@ private static Pair<List<DimensionSchema>, List<AggregatorFactory>> makeDimensio
final ColumnMappings columnMappings,
final boolean isRollupQuery,
final Query<?> query,
@Nullable final Map<String, DimensionSchema> dimensionToSchemaMap
@Nullable final Map<String, DimensionSchema> dimensionSchemas
)
{
// Log a warning unconditionally if arrayIngestMode is MVD, since the behaviour is incorrect, and is subject to
Expand Down Expand Up @@ -2237,13 +2237,13 @@ private static Pair<List<DimensionSchema>, List<AggregatorFactory>> makeDimensio
outputColumnName,
type,
query.context(),
dimensionToSchemaMap
dimensionSchemas
);
} else {
// complex columns only
if (DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.containsKey(type.getComplexTypeName())) {
dimensions.add(
getDimensionSchema(outputColumnName, type, query.context(), dimensionToSchemaMap)
getDimensionSchema(outputColumnName, type, query.context(), dimensionSchemas)
);
} else if (!isRollupQuery) {
aggregators.add(new PassthroughAggregatorFactory(outputColumnName, type.getComplexTypeName()));
Expand All @@ -2255,7 +2255,7 @@ private static Pair<List<DimensionSchema>, List<AggregatorFactory>> makeDimensio
outputColumnName,
type,
query.context(),
dimensionToSchemaMap
dimensionSchemas
);
}
}
Expand Down Expand Up @@ -2283,14 +2283,14 @@ private static void populateDimensionsAndAggregators(
String outputColumn,
ColumnType type,
QueryContext context,
Map<String, DimensionSchema> dimensionToSchemaMap
Map<String, DimensionSchema> dimensionSchemas
)
{
if (outputColumnAggregatorFactories.containsKey(outputColumn)) {
aggregators.add(outputColumnAggregatorFactories.get(outputColumn));
} else {
dimensions.add(
getDimensionSchema(outputColumn, type, context, dimensionToSchemaMap)
getDimensionSchema(outputColumn, type, context, dimensionSchemas)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,8 @@ private Map<String, Object> createMSQTaskContext(CompactionTask compactionTask,
context.putIfAbsent(QueryContexts.FINALIZE_KEY, false);
// Only scalar or array-type dimensions are allowed as grouping keys.
context.putIfAbsent(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, false);
context.putIfAbsent(MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, "array");
// Always override CTX_ARRAY_INGEST_MODE since it can otherwise lead to mixed ARRAY and MVD types for a column.
context.put(MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, "array");
return context;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,22 @@ public class DataSourceMSQDestination implements MSQDestination
private final List<Interval> replaceTimeChunks;

@Nullable
private final Map<String, DimensionSchema> dimensionToSchemaMap;
private final Map<String, DimensionSchema> dimensionSchemas;

@JsonCreator
public DataSourceMSQDestination(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("segmentSortOrder") @Nullable List<String> segmentSortOrder,
@JsonProperty("replaceTimeChunks") @Nullable List<Interval> replaceTimeChunks,
@JsonProperty("dimensionToSchemaMap") @Nullable Map<String, DimensionSchema> dimensionToSchemaMap
@JsonProperty("dimensionSchemas") @Nullable Map<String, DimensionSchema> dimensionSchemas
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.segmentGranularity = Preconditions.checkNotNull(segmentGranularity, "segmentGranularity");
this.segmentSortOrder = segmentSortOrder != null ? segmentSortOrder : Collections.emptyList();
this.replaceTimeChunks = replaceTimeChunks;
this.dimensionToSchemaMap = dimensionToSchemaMap;
this.dimensionSchemas = dimensionSchemas;

if (replaceTimeChunks != null) {
// Verify that if replaceTimeChunks is provided, it is nonempty.
Expand Down Expand Up @@ -138,9 +138,9 @@ public List<Interval> getReplaceTimeChunks()
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Map<String, DimensionSchema> getDimensionToSchemaMap()
public Map<String, DimensionSchema> getDimensionSchemas()
{
return dimensionToSchemaMap;
return dimensionSchemas;
}

/**
Expand Down Expand Up @@ -177,13 +177,13 @@ public boolean equals(Object o)
&& Objects.equals(segmentGranularity, that.segmentGranularity)
&& Objects.equals(segmentSortOrder, that.segmentSortOrder)
&& Objects.equals(replaceTimeChunks, that.replaceTimeChunks)
&& Objects.equals(dimensionToSchemaMap, that.dimensionToSchemaMap);
&& Objects.equals(dimensionSchemas, that.dimensionSchemas);
}

@Override
public int hashCode()
{
return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks, dimensionToSchemaMap);
return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks, dimensionSchemas);
}

@Override
Expand All @@ -194,7 +194,7 @@ public String toString()
", segmentGranularity=" + segmentGranularity +
", segmentSortOrder=" + segmentSortOrder +
", replaceTimeChunks=" + replaceTimeChunks +
", dimensionToSchemaMap=" + dimensionToSchemaMap +
", dimensionSchemas=" + dimensionSchemas +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class DataSourceMSQDestinationTest
public void testEquals()
{
EqualsVerifier.forClass(DataSourceMSQDestination.class)
.withNonnullFields("dataSource", "segmentGranularity", "segmentSortOrder", "dimensionToSchemaMap")
.withNonnullFields("dataSource", "segmentGranularity", "segmentSortOrder", "dimensionSchemas")
.withPrefabValues(
Map.class,
ImmutableMap.of(
Expand Down

0 comments on commit fb7103c

Please sign in to comment.