diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java new file mode 100644 index 0000000000000..c3ebbe5122f99 --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.guice.NestedDataModule; +import org.apache.druid.jackson.AggregatorsModule; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.post.ConstantPostAggregator; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.GroupByQueryQueryToolChest; +import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ColumnType; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +public class GroupByDeserializationBenchmark +{ + + static { + NullHandling.initializeForTests(); + NestedDataModule.registerHandlersAndSerde(); + AggregatorsModule.registerComplexMetricsAndSerde(); + } + + @Param({"100", "1000"}) + private int numDimensions; + + @Param({"0", "0.25", "0.5", "0.75", "0.85", "0.95", "0.99", "1.0"}) + private double primitiveToComplexDimensionRatio; + + @Param({"json", "serializablePairLongString"}) + private String complexDimensionType; + + @Param({"true", "false"}) + private boolean backwardCompatibility; + + private GroupByQuery sqlQuery; + private String serializedRow; + private GroupByQueryQueryToolChest groupByQueryQueryToolChest; + private ObjectMapper decoratedMapper; + + @Setup(Level.Trial) + public void setup() throws JsonProcessingException + { + final ObjectMapper undecoratedMapper = TestHelper.makeJsonMapper(); + undecoratedMapper.registerModules(NestedDataModule.getJacksonModulesList()); + undecoratedMapper.registerModule(new AggregatorsModule()); + final Pair sqlQueryAndResultRow = sqlQueryAndResultRow( + numDimensions, + primitiveToComplexDimensionRatio, + complexDimensionType, + undecoratedMapper + ); + sqlQuery = sqlQueryAndResultRow.lhs; + serializedRow = sqlQueryAndResultRow.rhs; + + groupByQueryQueryToolChest = new GroupByQueryQueryToolChest( + null, + () -> new GroupByQueryConfig() + { + @Override + public boolean isIntermediateResultAsMapCompat() + { + return backwardCompatibility; + } + }, + null, + null + ); + + decoratedMapper = groupByQueryQueryToolChest.decorateObjectMapper(undecoratedMapper, sqlQuery); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void deserializeResultRows(Blackhole blackhole) throws JsonProcessingException + { + blackhole.consume(decoratedMapper.readValue(serializedRow, ResultRow.class)); + } + + private static Pair sqlQueryAndResultRow( + final int numDimensions, + final double primitiveToComplexDimensionRatio, + final String complexDimensionType, + final ObjectMapper mapper + ) throws JsonProcessingException + { + final int numPrimitiveDimensions = (int) Math.floor(primitiveToComplexDimensionRatio * numDimensions); + final int numComplexDimensions = numDimensions - numPrimitiveDimensions; + + final List dimensions = new ArrayList<>(); + final List rowList = new ArrayList<>(); + + // Add timestamp + rowList.add(DateTimes.of("2000").getMillis()); + + for (int i = 0; i < numPrimitiveDimensions; ++i) { + dimensions.add( + new DefaultDimensionSpec( + StringUtils.format("primitive%d", i), + StringUtils.format("primitive%d", i), + ColumnType.STRING + ) + ); + rowList.add("foo"); + } + + for (int i = 0; i < numComplexDimensions; ++i) { + dimensions.add( + new DefaultDimensionSpec( + StringUtils.format("complex%d", i), + StringUtils.format("complex%d", i), + ColumnType.ofComplex(complexDimensionType) + ) + ); + + // Serialized version of this object is a valid value for both json and long-string pair dimensions + rowList.add(new SerializablePairLongString(1L, "test")); + } + + // Add aggregator + rowList.add(100); + + // Add post aggregator + rowList.add(10.0); + + GroupByQuery query = GroupByQuery.builder() + .setDataSource("foo") + .setQuerySegmentSpec(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .setDimensions(dimensions) + .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) + .setPostAggregatorSpecs(Collections.singletonList(new ConstantPostAggregator( + "post", + 10 + ))) + .setContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, true)) + .setGranularity(Granularities.DAY) + .build(); + + return Pair.of(query, mapper.writeValueAsString(rowList)); + } +} diff --git a/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java b/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java index bfb1dc79d788e..b02de3ce49323 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java @@ -20,8 +20,11 @@ package org.apache.druid.java.util.common.jackson; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonSerializer; @@ -122,6 +125,40 @@ public static void writeObjectUsingSerializerProvider( } } + /** + * Reads an object using the {@link JsonParser}. It reuses the provided {@link DeserializationContext} which offers + * better performance that calling {@link JsonParser#readValueAs(Class)} because it avoids re-creating the {@link DeserializationContext} + * for each readValue call + */ + @Nullable + public static T readObjectUsingDeserializationContext( + final JsonParser jp, + final DeserializationContext deserializationContext, + final Class clazz + ) throws IOException + { + if (jp.currentToken() == JsonToken.VALUE_NULL) { + return null; + } + return deserializationContext.readValue(jp, clazz); + } + + /** + * @see #readObjectUsingDeserializationContext(JsonParser, DeserializationContext, Class) + */ + @Nullable + public static Object readObjectUsingDeserializationContext( + final JsonParser jp, + final DeserializationContext deserializationContext, + final JavaType javaType + ) throws IOException + { + if (jp.currentToken() == JsonToken.VALUE_NULL) { + return null; + } + return deserializationContext.readValue(jp, javaType); + } + /** * Convert the given object to an array of bytes. Use when the object is * known serializable so that the Jackson exception can be suppressed. diff --git a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java index fa394beec43a2..978b49226154c 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java @@ -88,6 +88,11 @@ public final JavaType getBySegmentResultType() * For most queries, this is a no-op, but it can be useful for query types that support more than one result * serialization format. Queries that implement this method must not modify the provided ObjectMapper, but instead * must return a copy. + *

+ * Jackson's default implementation of deserialization is usually optimised and this method should be overriden + * only if there is a functional requirement of so. The method must be benchmarked in isolation, without other portions + * of the query engine executing as modifying this method can alter the performance of queries where deserializing is + * a major portion of the execution. */ public ObjectMapper decorateObjectMapper(final ObjectMapper objectMapper, final QueryType query) { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java index 9950695f28ce9..dbd6f2869a75c 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java @@ -293,7 +293,6 @@ public boolean isVectorize() return vectorize; } - @SuppressWarnings("unused") public boolean isIntermediateResultAsMapCompat() { return intermediateResultAsMapCompat; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 3f2faf3fc5620..a452052496875 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -19,23 +19,14 @@ package org.apache.druid.query.groupby; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.core.ObjectCodec; import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; +import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.inject.Inject; -import org.apache.druid.data.input.Row; import org.apache.druid.error.DruidException; import org.apache.druid.frame.Frame; import org.apache.druid.frame.allocation.MemoryAllocatorFactory; @@ -51,7 +42,6 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.DataSource; import org.apache.druid.query.FrameSignaturePair; @@ -82,7 +72,6 @@ import javax.annotation.Nullable; import java.io.Closeable; -import java.io.IOException; import java.util.ArrayList; import java.util.BitSet; import java.util.Comparator; @@ -108,6 +97,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest queryConfigSupplier, GroupByQueryMetricsFactory queryMetricsFactory, @Merging GroupByResourcesReservationPool groupByResourcesReservationPool ) { this.groupingEngine = groupingEngine; + this.queryConfig = queryConfigSupplier.get(); this.queryMetricsFactory = queryMetricsFactory; this.groupByResourcesReservationPool = groupByResourcesReservationPool; } @@ -450,96 +443,7 @@ public TypeReference getResultTypeReference() @Override public ObjectMapper decorateObjectMapper(final ObjectMapper objectMapper, final GroupByQuery query) { - final boolean resultAsArray = query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false); - - // Serializer that writes array- or map-based rows as appropriate, based on the "resultAsArray" setting. - final JsonSerializer serializer = new JsonSerializer() - { - @Override - public void serialize( - final ResultRow resultRow, - final JsonGenerator jg, - final SerializerProvider serializers - ) throws IOException - { - if (resultAsArray) { - JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, resultRow.getArray()); - } else { - JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, resultRow.toMapBasedRow(query)); - } - } - }; - - // Deserializer that can deserialize either array- or map-based rows. - final JsonDeserializer deserializer = new JsonDeserializer() - { - final Class[] dimensionClasses = createDimensionClasses(query); - boolean containsComplexDimensions = query.getDimensions() - .stream() - .anyMatch( - dimensionSpec -> dimensionSpec.getOutputType().is(ValueType.COMPLEX) - ); - - @Override - public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException - { - if (jp.isExpectedStartObjectToken()) { - final Row row = jp.readValueAs(Row.class); - final ResultRow resultRow = ResultRow.fromLegacyRow(row, query); - if (containsComplexDimensions) { - final List queryDimensions = query.getDimensions(); - for (int i = 0; i < queryDimensions.size(); ++i) { - if (queryDimensions.get(i).getOutputType().is(ValueType.COMPLEX)) { - final int dimensionIndexInResultRow = query.getResultRowDimensionStart() + i; - resultRow.set( - dimensionIndexInResultRow, - objectMapper.convertValue( - resultRow.get(dimensionIndexInResultRow), - dimensionClasses[i] - ) - ); - } - } - } - return resultRow; - } else { - Object[] objectArray = new Object[query.getResultRowSizeWithPostAggregators()]; - - if (!jp.isExpectedStartArrayToken()) { - throw DruidException.defensive("Expected start token, received [%s]", jp.currentToken()); - } - - ObjectCodec codec = jp.getCodec(); - - jp.nextToken(); - - int numObjects = 0; - while (jp.currentToken() != JsonToken.END_ARRAY) { - if (numObjects >= query.getResultRowDimensionStart() && numObjects < query.getResultRowAggregatorStart()) { - objectArray[numObjects] = codec.readValue(jp, dimensionClasses[numObjects - query.getResultRowDimensionStart()]); - } else { - objectArray[numObjects] = codec.readValue(jp, Object.class); - } - jp.nextToken(); - ++numObjects; - } - return ResultRow.of(objectArray); - } - } - }; - - class GroupByResultRowModule extends SimpleModule - { - private GroupByResultRowModule() - { - addSerializer(ResultRow.class, serializer); - addDeserializer(ResultRow.class, deserializer); - } - } - - final ObjectMapper newObjectMapper = objectMapper.copy(); - newObjectMapper.registerModule(new GroupByResultRowModule()); - return newObjectMapper; + return ResultRowObjectMapperDecoratorUtil.decorateObjectMapper(objectMapper, query, queryConfig); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java b/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java new file mode 100644 index 0000000000000..60b55ce748286 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.type.TypeFactory; +import org.apache.druid.data.input.Row; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.NullableTypeStrategy; +import org.apache.druid.segment.column.ValueType; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.List; + +/** + * Utility class for conditional serde of {@link ResultRow} objects. Depending on the query configuration and the query + * dimensions, this class chooses an optimally performant method for serdeing the result rows while also preserving the + * dimension classes. + * Any modification this class must be benchmarked properly as it runs in a hot-loop and can have significant impact on + * long-running queries. See {@code GroupByDeserializationBenchmark} for existing benchmarks + */ +public class ResultRowObjectMapperDecoratorUtil +{ + /** + * Decorates the provided object mapper so that it can read the result rows generated by the given query and the + * groupByQueryConfig. It never modifies the provided object mapper. It can either return the same mapper undecorated, + * or clones the object mapper before decorating it. + */ + public static ObjectMapper decorateObjectMapper( + final ObjectMapper baseObjectMapper, + final GroupByQuery query, + final GroupByQueryConfig groupByQueryConfig + ) + { + final JsonDeserializer deserializer = getDeserializer(baseObjectMapper, query, groupByQueryConfig); + final JsonSerializer serializer = getSerializer(query, groupByQueryConfig); + if (deserializer == null && serializer == null) { + return baseObjectMapper; + } + + final ObjectMapper decoratedObjectMapper = baseObjectMapper.copy(); + class GroupByResultRowModule extends SimpleModule + { + private GroupByResultRowModule() + { + if (serializer != null) { + addSerializer(ResultRow.class, serializer); + } + if (deserializer != null) { + addDeserializer(ResultRow.class, deserializer); + } + } + } + decoratedObjectMapper.registerModule(new GroupByResultRowModule()); + return decoratedObjectMapper; + } + + /** + * Returns a deserializer required to for the result rows of the provided query. It returns null if no special + * deserialization is required, and type-unaware generic java objects are sufficient. + */ + @Nullable + private static JsonDeserializer getDeserializer( + final ObjectMapper objectMapper, + final GroupByQuery query, + final GroupByQueryConfig groupByQueryConfig + ) + { + final boolean resultAsArray = query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false); + final boolean intermediateCompatMode = groupByQueryConfig.isIntermediateResultAsMapCompat(); + final boolean arrayBasedRows = resultAsArray && !intermediateCompatMode; + final boolean dimensionsRequireConversion = query.getDimensions() + .stream() + .anyMatch( + dimensionSpec -> dimensionRequiresConversion(dimensionSpec.getOutputType()) + ); + + // Most common case - when array based rows are used, and grouping is done on primitive/array/json types + if (arrayBasedRows && !dimensionsRequireConversion) { + // We can assume ResultRow are serialized and deserialized as arrays. No need for special decoration, + // and we can save the overhead of making a copy of the ObjectMapper + return null; + } else if (!arrayBasedRows && !dimensionsRequireConversion) { + // We have to deserialize map based rows, however we don't have to deserialize the dimensions individually + // Returns a deserializer that can deserialize both map and array based rows simultaneously + return new JsonDeserializer() + { + @Override + public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException + { + if (jp.isExpectedStartObjectToken()) { + final Row row = jp.readValueAs(Row.class); + return ResultRow.fromLegacyRow(row, query); + } else { + return ResultRow.of(jp.readValueAs(Object[].class)); + } + } + }; + + } else { + // Dimensions need to be serialized individually because some of them require conversion to specialized types + return new JsonDeserializer() + { + final JavaType[] javaTypes = createJavaTypesForResultRow(query); + + @Override + public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException + { + if (jp.isExpectedStartObjectToken()) { + final Row row = jp.readValueAs(Row.class); + final ResultRow resultRow = ResultRow.fromLegacyRow(row, query); + + final List queryDimensions = query.getDimensions(); + for (int i = 0; i < queryDimensions.size(); ++i) { + if (dimensionRequiresConversion(queryDimensions.get(i).getOutputType())) { + final int dimensionIndexInResultRow = query.getResultRowDimensionStart() + i; + resultRow.set( + dimensionIndexInResultRow, + objectMapper.convertValue( + resultRow.get(dimensionIndexInResultRow), + javaTypes[dimensionIndexInResultRow] + ) + ); + } + } + + return resultRow; + } else { + if (!jp.isExpectedStartArrayToken()) { + throw DruidException.defensive("Expected start token, received [%s]", jp.currentToken()); + } + + Object[] objectArray = new Object[query.getResultRowSizeWithPostAggregators()]; + int index = 0; + + while (jp.nextToken() != JsonToken.END_ARRAY) { + objectArray[index] = JacksonUtils.readObjectUsingDeserializationContext(jp, ctxt, javaTypes[index]); + ++index; + } + + return ResultRow.of(objectArray); + } + } + }; + } + } + + /** + * Returns a legacy mode aware serialiazer that serializes the result rows as arrays or maps depending on the query + * configuration + */ + @Nullable + private static JsonSerializer getSerializer( + final GroupByQuery query, + final GroupByQueryConfig groupByQueryConfig + ) + { + final boolean resultAsArray = query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false); + final boolean intermediateCompatMode = groupByQueryConfig.isIntermediateResultAsMapCompat(); + final boolean arrayBasedRows = resultAsArray && !intermediateCompatMode; + if (arrayBasedRows) { + return null; + } else { + if (resultAsArray) { + return new JsonSerializer() + { + @Override + public void serialize(ResultRow resultRow, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) + throws IOException + { + JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializerProvider, resultRow.getArray()); + } + }; + + } else { + return new JsonSerializer() + { + @Override + public void serialize(ResultRow resultRow, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) + throws IOException + { + JacksonUtils.writeObjectUsingSerializerProvider( + jsonGenerator, + serializerProvider, + resultRow.toMapBasedRow(query) + ); + } + }; + } + } + } + + /** + * Returns true if the dimension needs to be converted from generic Java objects to the specialized column type. It involves all + * complex types, except for JSON types. JSON types are special in a way that they can work with the generic java objects + * without any conversion + */ + private static boolean dimensionRequiresConversion(final ColumnType dimensionType) + { + return dimensionType.is(ValueType.COMPLEX) && !ColumnType.NESTED_DATA.equals(dimensionType); + } + + /** + * Creates java types for deserializing the result row. For timestamp, aggregators and post-aggregators, it resorts to + * {@code Object.class}. For dimensions requiring conversion (check {@link #dimensionRequiresConversion(ColumnType)}), + * it returns the java type for the associated class of the complex object. + */ + private static JavaType[] createJavaTypesForResultRow(final GroupByQuery groupByQuery) + { + final TypeFactory typeFactory = TypeFactory.defaultInstance(); + final JavaType[] javaTypes = new JavaType[groupByQuery.getResultRowSizeWithPostAggregators()]; + final List dimensions = groupByQuery.getDimensions(); + for (int i = 0; i < groupByQuery.getResultRowSizeWithPostAggregators(); ++i) { + if (i >= groupByQuery.getResultRowDimensionStart() && i < groupByQuery.getResultRowAggregatorStart()) { + DimensionSpec dimension = dimensions.get(i - groupByQuery.getResultRowDimensionStart()); + ColumnType dimensionType = dimensions.get(i - groupByQuery.getResultRowDimensionStart()).getOutputType(); + if (dimensionType.is(ValueType.COMPLEX)) { + //noinspection rawtypes + NullableTypeStrategy nullableTypeStrategy = dimensionType.getNullableStrategy(); + if (!nullableTypeStrategy.groupable()) { + throw DruidException.defensive( + "Ungroupable dimension [%s] with type [%s] found in the query.", + dimension, + dimensionType + ); + } + javaTypes[i] = typeFactory.constructType(nullableTypeStrategy.getClazz()); + } else { + javaTypes[i] = typeFactory.constructType(Object.class); + } + } else { + javaTypes[i] = typeFactory.constructType(Object.class); + } + } + return javaTypes; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index da8a0e046230f..0e73d5db6f480 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonValue; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.core.ObjectCodec; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.ObjectMapper; @@ -45,6 +44,7 @@ import org.apache.druid.java.util.common.granularity.AllGranularity; import org.apache.druid.java.util.common.guava.Accumulator; import org.apache.druid.java.util.common.guava.Comparators; +import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.query.BaseQuery; import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.DimensionComparisonUtils; @@ -1378,7 +1378,6 @@ public RowBasedKey deserialize( } jp.nextToken(); - final ObjectCodec codec = jp.getCodec(); final int timestampAdjustment = includeTimestamp ? 1 : 0; final int dimsToRead = timestampAdjustment + serdeHelpers.length; int dimsReadSoFar = 0; @@ -1389,15 +1388,19 @@ public RowBasedKey deserialize( jp.currentToken() != JsonToken.END_ARRAY, "Unexpected end of array when deserializing timestamp from the spilled files" ); - objects[dimsReadSoFar] = codec.readValue(jp, Long.class); + objects[dimsReadSoFar] = JacksonUtils.readObjectUsingDeserializationContext(jp, deserializationContext, Long.class); ++dimsReadSoFar; jp.nextToken(); } while (jp.currentToken() != JsonToken.END_ARRAY) { - objects[dimsReadSoFar] = - codec.readValue(jp, serdeHelpers[dimsReadSoFar - timestampAdjustment].getClazz()); + objects[dimsReadSoFar] = JacksonUtils.readObjectUsingDeserializationContext( + jp, + deserializationContext, + serdeHelpers[dimsReadSoFar - timestampAdjustment].getClazz() + ); + ++dimsReadSoFar; jp.nextToken(); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index 7279ca938bd8a..d9aefd5f55e2b 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -792,7 +792,19 @@ public void testResultSerdeIntermediateResultAsMapCompat() throws Exception .setGranularity(QueryRunnerTestHelper.DAY_GRAN) .build(); - final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(null, null, null); + final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( + null, + () -> new GroupByQueryConfig() + { + @Override + public boolean isIntermediateResultAsMapCompat() + { + return true; + } + }, + null, + null + ); final ObjectMapper objectMapper = TestHelper.makeJsonMapper(); final ObjectMapper arraysObjectMapper = toolChest.decorateObjectMapper( diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index a5dbb49bca51b..475848edbdc7a 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -363,6 +363,7 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory( ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( groupingEngine, + () -> config, DefaultGroupByQueryMetricsFactory.instance(), groupByResourcesReservationPool );