diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java index e6f1a26c863c..c3ebbe5122f9 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.benchmark; import com.fasterxml.jackson.core.JsonProcessingException; @@ -53,12 +72,10 @@ public class GroupByDeserializationBenchmark AggregatorsModule.registerComplexMetricsAndSerde(); } -// @Param({"100", "1000"}) - @Param({"100"}) + @Param({"100", "1000"}) private int numDimensions; -// @Param({"0", "0.25", "0.5", "0.75", "0.85", "0.95", "0.99", "1.0"}) - @Param({"0", "0.5", "1.0"}) + @Param({"0", "0.25", "0.5", "0.75", "0.85", "0.95", "0.99", "1.0"}) private double primitiveToComplexDimensionRatio; @Param({"json", "serializablePairLongString"}) diff --git a/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java b/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java index 937580acdb81..705c6962c75f 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java @@ -123,7 +123,7 @@ public static void writeObjectUsingSerializerProvider( serializer.serialize(o, jsonGenerator, serializers); } } - + public static T readObjectUsingDeserializationContext( final JsonParser jp, final DeserializationContext deserializationContext, @@ -133,6 +133,15 @@ public static T readObjectUsingDeserializationContext( return deserializationContext.readValue(jp, clazz); } + public static Object readObjectUsingDeserializationContext( + final JsonParser jp, + final DeserializationContext deserializationContext, + final JavaType javaType + ) throws IOException + { + return deserializationContext.readValue(jp, javaType); + } + /** * Convert the given object to an array of bytes. Use when the object is * known serializable so that the Jackson exception can be suppressed. diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java index 9950695f28ce..dbd6f2869a75 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java @@ -293,7 +293,6 @@ public boolean isVectorize() return vectorize; } - @SuppressWarnings("unused") public boolean isIntermediateResultAsMapCompat() { return intermediateResultAsMapCompat; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index b09c8182d046..e978e332e394 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -19,23 +19,14 @@ package org.apache.druid.query.groupby; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.inject.Inject; -import org.apache.druid.data.input.Row; import org.apache.druid.error.DruidException; import org.apache.druid.frame.Frame; import org.apache.druid.frame.allocation.MemoryAllocatorFactory; @@ -51,7 +42,6 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.DataSource; import org.apache.druid.query.FrameSignaturePair; @@ -82,7 +72,6 @@ import javax.annotation.Nullable; import java.io.Closeable; -import java.io.IOException; import java.util.ArrayList; import java.util.BitSet; import java.util.Comparator; @@ -454,104 +443,7 @@ public TypeReference getResultTypeReference() @Override public ObjectMapper decorateObjectMapper(final ObjectMapper objectMapper, final GroupByQuery query) { - final boolean resultAsArray = query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false); - - return objectMapper; - -// // Serializer that writes array- or map-based rows as appropriate, based on the "resultAsArray" setting. -// final JsonSerializer serializer = new JsonSerializer() -// { -// @Override -// public void serialize( -// final ResultRow resultRow, -// final JsonGenerator jg, -// final SerializerProvider serializers -// ) throws IOException -// { -// if (resultAsArray) { -// JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, resultRow.getArray()); -// } else { -// JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, resultRow.toMapBasedRow(query)); -// } -// } -// }; -// -// // Deserializer that can deserialize either array- or map-based rows. -// final JsonDeserializer deserializer = new JsonDeserializer() -// { -// final Class[] dimensionClasses = createDimensionClasses(query); -// boolean containsComplexDimensions = query.getDimensions() -// .stream() -// .anyMatch( -// dimensionSpec -> dimensionSpec.getOutputType().is(ValueType.COMPLEX) -// ); -// -// @Override -// public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException -// { -// if (jp.isExpectedStartObjectToken()) { -// final Row row = jp.readValueAs(Row.class); -// final ResultRow resultRow = ResultRow.fromLegacyRow(row, query); -// if (containsComplexDimensions) { -// final List queryDimensions = query.getDimensions(); -// for (int i = 0; i < queryDimensions.size(); ++i) { -// if (queryDimensions.get(i).getOutputType().is(ValueType.COMPLEX)) { -// final int dimensionIndexInResultRow = query.getResultRowDimensionStart() + i; -// resultRow.set( -// dimensionIndexInResultRow, -// objectMapper.convertValue( -// resultRow.get(dimensionIndexInResultRow), -// dimensionClasses[i] -// ) -// ); -// } -// } -// } -// return resultRow; -// } else { -// Object[] objectArray = new Object[query.getResultRowSizeWithPostAggregators()]; -// -// if (!jp.isExpectedStartArrayToken()) { -// throw DruidException.defensive("Expected start token, received [%s]", jp.currentToken()); -// } -// -// jp.nextToken(); -// -// int numObjects = 0; -// while (jp.currentToken() != JsonToken.END_ARRAY) { -// if (numObjects >= query.getResultRowDimensionStart() && numObjects < query.getResultRowAggregatorStart()) { -// objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( -// jp, -// ctxt, -// dimensionClasses[numObjects - query.getResultRowDimensionStart()] -// ); -// } else { -// objectArray[numObjects] = JacksonUtils.readObjectUsingDeserializationContext( -// jp, -// ctxt, -// Object.class -// ); -// } -// jp.nextToken(); -// ++numObjects; -// } -// return ResultRow.of(objectArray); -// } -// } -// }; -// -// class GroupByResultRowModule extends SimpleModule -// { -// private GroupByResultRowModule() -// { -// addSerializer(ResultRow.class, serializer); -// addDeserializer(ResultRow.class, deserializer); -// } -// } -// -// final ObjectMapper newObjectMapper = objectMapper.copy(); -// newObjectMapper.registerModule(new GroupByResultRowModule()); -// return newObjectMapper; + return ResultRowObjectMapperDecoratorUtil.decorateObjectMapper(objectMapper, query, queryConfig); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java b/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java new file mode 100644 index 000000000000..9f951923913c --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/groupby/ResultRowObjectMapperDecoratorUtil.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.type.TypeFactory; +import org.apache.druid.data.input.Row; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.NullableTypeStrategy; +import org.apache.druid.segment.column.ValueType; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.List; + +public class ResultRowObjectMapperDecoratorUtil +{ + public static ObjectMapper decorateObjectMapper( + final ObjectMapper baseObjectMapper, + final GroupByQuery query, + final GroupByQueryConfig groupByQueryConfig + ) + { + final JsonDeserializer deserializer = getDeserializer(baseObjectMapper, query, groupByQueryConfig); + final JsonSerializer serializer = getSerializer(query, groupByQueryConfig); + if (deserializer == null && serializer == null) { + return baseObjectMapper; + } + + final ObjectMapper decoratedObjectMapper = baseObjectMapper.copy(); + class GroupByResultRowModule extends SimpleModule + { + private GroupByResultRowModule() + { + if (serializer != null) { + addSerializer(ResultRow.class, serializer); + } + if (deserializer != null) { + addDeserializer(ResultRow.class, deserializer); + } + } + } + decoratedObjectMapper.registerModule(new GroupByResultRowModule()); + return decoratedObjectMapper; + } + + @Nullable + private static JsonDeserializer getDeserializer( + final ObjectMapper objectMapper, + final GroupByQuery query, + final GroupByQueryConfig groupByQueryConfig + ) + { + final boolean resultAsArray = query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false); + final boolean intermediateCompatMode = groupByQueryConfig.isIntermediateResultAsMapCompat(); + final boolean arrayBasedRows = resultAsArray && !intermediateCompatMode; + final boolean dimensionsRequireConversion = query.getDimensions() + .stream() + .anyMatch( + dimensionSpec -> dimensionRequiresConversion(dimensionSpec.getOutputType()) + ); + + if (arrayBasedRows && !dimensionsRequireConversion) { + // We can assume ResultRow are serialized and deserialized as arrays. No need for special decoration, + // and we can save the overhead of making a copy of the ObjectMapper + return null; + } else if (!arrayBasedRows && !dimensionsRequireConversion) { + // Have to deserialize map based rows, however don't have to deserialize the dimensions individually + // Deserializer that can deserialize either array- or map-based rows. + return new JsonDeserializer() + { + @Override + public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException + { + if (jp.isExpectedStartObjectToken()) { + final Row row = jp.readValueAs(Row.class); + return ResultRow.fromLegacyRow(row, query); + } else { + return ResultRow.of(jp.readValueAs(Object[].class)); + } + } + }; + + } else { + // Have to deserialize the dimensions carefully + return new JsonDeserializer() + { + final JavaType[] javaTypes = createJavaTypesForResultRow(query); + + @Override + public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException + { + if (jp.isExpectedStartObjectToken()) { + final Row row = jp.readValueAs(Row.class); + final ResultRow resultRow = ResultRow.fromLegacyRow(row, query); + + final List queryDimensions = query.getDimensions(); + for (int i = 0; i < queryDimensions.size(); ++i) { + if (dimensionRequiresConversion(queryDimensions.get(i).getOutputType())) { + final int dimensionIndexInResultRow = query.getResultRowDimensionStart() + i; + resultRow.set( + dimensionIndexInResultRow, + objectMapper.convertValue( + resultRow.get(dimensionIndexInResultRow), + javaTypes[dimensionIndexInResultRow] + ) + ); + } + } + + return resultRow; + } else { + if (!jp.isExpectedStartArrayToken()) { + throw DruidException.defensive("Expected start token, received [%s]", jp.currentToken()); + } + + Object[] objectArray = new Object[query.getResultRowSizeWithPostAggregators()]; + int index = 0; + + while (jp.nextToken() != JsonToken.END_ARRAY) { + objectArray[index] = JacksonUtils.readObjectUsingDeserializationContext(jp, ctxt, javaTypes[index]); + ++index; + jp.nextToken(); + } + + return ResultRow.of(objectArray); + } + } + }; + } + } + + @Nullable + private static JsonSerializer getSerializer( + final GroupByQuery query, + final GroupByQueryConfig groupByQueryConfig + ) + { + final boolean resultAsArray = query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false); + final boolean intermediateCompatMode = groupByQueryConfig.isIntermediateResultAsMapCompat(); + final boolean arrayBasedRows = resultAsArray && !intermediateCompatMode; + if (arrayBasedRows) { + return null; + } else { + if (resultAsArray) { + return new JsonSerializer() + { + @Override + public void serialize(ResultRow resultRow, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) + throws IOException + { + JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializerProvider, resultRow.getArray()); + } + }; + + } else { + return new JsonSerializer() + { + @Override + public void serialize(ResultRow resultRow, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) + throws IOException + { + JacksonUtils.writeObjectUsingSerializerProvider( + jsonGenerator, + serializerProvider, + resultRow.toMapBasedRow(query) + ); + } + }; + } + } + } + + private static boolean dimensionRequiresConversion(final ColumnType dimensionType) + { + return dimensionType.is(ValueType.COMPLEX) && !ColumnType.NESTED_DATA.equals(dimensionType); + } + + private static JavaType[] createJavaTypesForResultRow(final GroupByQuery groupByQuery) + { + final TypeFactory typeFactory = TypeFactory.defaultInstance(); + final JavaType[] javaTypes = new JavaType[groupByQuery.getResultRowSizeWithPostAggregators()]; + final List dimensions = groupByQuery.getDimensions(); + for (int i = 0; i < groupByQuery.getResultRowSizeWithPostAggregators(); ++i) { + if (i >= groupByQuery.getResultRowDimensionStart() && i < groupByQuery.getResultRowAggregatorStart()) { + DimensionSpec dimension = dimensions.get(i - groupByQuery.getResultRowDimensionStart()); + ColumnType dimensionType = dimensions.get(i - groupByQuery.getResultRowDimensionStart()).getOutputType(); + if (dimensionType.is(ValueType.COMPLEX)) { + //noinspection rawtypes + NullableTypeStrategy nullableTypeStrategy = dimensionType.getNullableStrategy(); + if (!nullableTypeStrategy.groupable()) { + throw DruidException.defensive( + "Ungroupable dimension [%s] with type [%s] found in the query.", + dimension, + dimensionType + ); + } + javaTypes[i] = typeFactory.constructType(nullableTypeStrategy.getClazz()); + } else { + javaTypes[i] = typeFactory.constructType(Object.class); + } + } else { + javaTypes[i] = typeFactory.constructType(Object.class); + } + } + return javaTypes; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index cc51c8a7cd70..0e73d5db6f48 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonValue; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.core.ObjectCodec; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.ObjectMapper;