diff --git a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java index 30cc388f1d9d6..cad8fdfd83155 100644 --- a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java @@ -37,7 +37,6 @@ import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.context.ResponseContextDeserializer; import org.apache.druid.query.rowsandcols.RowsAndColumns; -import org.apache.druid.query.rowsandcols.semantic.WireTransferable; import org.joda.time.DateTimeZone; import java.io.IOException; @@ -189,20 +188,7 @@ public ByteOrder deserialize(JsonParser jp, DeserializationContext ctxt) throws ); addDeserializer(ResponseContext.class, new ResponseContextDeserializer()); - addSerializer(RowsAndColumns.class, new JsonSerializer() - { - @Override - public void serialize( - RowsAndColumns value, - JsonGenerator gen, - SerializerProvider serializers - ) throws IOException - { - // It would be really cool if jackson offered an output stream that would allow us to push bytes - // through, but it doesn't right now, so we have to build a byte[] instead. Maybe something to contribute - // back to Jackson at some point. - gen.writeBinary(WireTransferable.fromRAC(value).bytesToTransfer()); - } - }); + addSerializer(RowsAndColumns.class, new RowsAndColumns.RowsAndColumnsSerializer()); + addDeserializer(RowsAndColumns.class, new RowsAndColumns.RowsAndColumnsDeserializer()); } } diff --git a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java index d18f6c252c1ad..f86f91be18bea 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java @@ -21,7 +21,6 @@ import com.google.common.base.Function; import org.apache.druid.error.DruidException; -import org.apache.druid.frame.Frame; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -31,10 +30,8 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; -import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; -import org.apache.druid.query.rowsandcols.semantic.WireTransferable; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; import org.apache.druid.segment.Segment; -import org.apache.druid.segment.column.RowSignature; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -100,19 +97,8 @@ public Sequence apply( @Override public RowsAndColumns apply(@Nullable RowsAndColumns input) { - // This is interim code to force a materialization by synthesizing the wire transfer - // that will need to naturally happen as we flesh out this code more. For now, we - // materialize the bytes on-heap and then read them back in as a frame. if (input instanceof LazilyDecoratedRowsAndColumns) { - final WireTransferable wire = WireTransferable.fromRAC(input); - final byte[] frameBytes = wire.bytesToTransfer(); - - RowSignature.Builder sigBob = RowSignature.builder(); - for (String column : input.getColumnNames()) { - sigBob.add(column, input.findColumn(column).toAccessor().getType()); - } - - return new ColumnBasedFrameRowsAndColumns(Frame.wrap(frameBytes), sigBob.build()); + return input.as(FrameRowsAndColumns.class); } return input; } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java index 61f6855cd01c4..d83f56c7ba5f3 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java @@ -79,14 +79,4 @@ public Column findColumn(String name) } return retVal; } - - @Override - @SuppressWarnings("unchecked") - public T as(Class clazz) - { - if (AppendableRowsAndColumns.class.equals(clazz)) { - return (T) this; - } - return null; - } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java index c6ced60849d45..3f70f82a2537b 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java @@ -141,13 +141,6 @@ public Column findColumn(String name) } } - @Nullable - @Override - public T as(Class clazz) - { - return null; - } - private class ConcatedidColumn implements Column { diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/CursorFactoryRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/CursorFactoryRowsAndColumns.java index 6fa74660f7df7..46fda857516f4 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/CursorFactoryRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/CursorFactoryRowsAndColumns.java @@ -61,7 +61,7 @@ public T as(Class clazz) if (CursorFactory.class == clazz) { return (T) cursorFactory; } - return null; + return RowsAndColumns.super.as(clazz); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java index dd0c7dab1cdae..56647e0f56873 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java @@ -21,7 +21,6 @@ import org.apache.druid.query.rowsandcols.column.Column; -import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; @@ -44,11 +43,4 @@ public Column findColumn(String name) { return null; } - - @Nullable - @Override - public T as(Class clazz) - { - return null; - } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java index a05b31dc2cb45..bb35f6837976b 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java @@ -39,10 +39,10 @@ import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker; import org.apache.druid.query.rowsandcols.semantic.DefaultRowsAndColumnsDecorator; import org.apache.druid.query.rowsandcols.semantic.RowsAndColumnsDecorator; -import org.apache.druid.query.rowsandcols.semantic.WireTransferable; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.CursorBuildSpec; @@ -150,16 +150,10 @@ public RowsAndColumnsDecorator toRowsAndColumnsDecorator() @SuppressWarnings("unused") @SemanticCreator - public WireTransferable toWireTransferable() + public FrameRowsAndColumns toFrameRowsAndColumns() { - return () -> { - final Pair materialized = materialize(); - if (materialized == null) { - return new byte[]{}; - } else { - return materialized.lhs; - } - }; + maybeMaterialize(); + return base.as(FrameRowsAndColumns.class); } private void maybeMaterialize() diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java index abb3d4649b1ac..8cfadecb4dd2e 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java @@ -23,7 +23,6 @@ import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.LimitedColumn; -import javax.annotation.Nullable; import java.util.Collection; public class LimitedRowsAndColumns implements RowsAndColumns @@ -66,12 +65,4 @@ public Column findColumn(String name) return new LimitedColumn(column, start, end); } - - @Nullable - @Override - public T as(Class clazz) - { - return null; - } - } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java index 29f092f67440d..d6bc1026a98df 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java @@ -164,7 +164,7 @@ public T as(Class clazz) if (AppendableRowsAndColumns.class.equals(clazz)) { return (T) new AppendableMapOfColumns(this); } - return null; + return RowsAndColumns.super.as(clazz); } public static class Builder diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java index f1793f8fd0e4d..e64f086edd7f7 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java @@ -164,11 +164,4 @@ public int compareRows(int lhsRowNum, int rhsRowNum) ); } } - - @Nullable - @Override - public T as(Class clazz) - { - return null; - } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java index 7b6a1f6215d38..a34d0e463c070 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java @@ -19,12 +19,30 @@ package org.apache.druid.query.rowsandcols; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.channel.ByteTracker; +import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable; +import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; import java.util.Collection; /** @@ -110,6 +128,72 @@ static AppendableRowsAndColumns expectAppendable(RowsAndColumns input) * @return A concrete implementation of the interface, or null if there is no meaningful optimization to be had * through a local implementation of the interface. */ + @SuppressWarnings("unchecked") @Nullable - T as(Class clazz); + default T as(Class clazz) + { + if (clazz.isInstance(this)) { + return (T) this; + } + return null; + } + + /** + * Serializer for {@link RowsAndColumns} by converting the instance to {@link FrameRowsAndColumns} + */ + class RowsAndColumnsSerializer extends StdSerializer + { + public RowsAndColumnsSerializer() + { + super(RowsAndColumns.class); + } + + @Override + public void serialize( + RowsAndColumns rac, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider + ) throws IOException + { + FrameRowsAndColumns frameRAC = rac.as(FrameRowsAndColumns.class); + if (frameRAC == null) { + throw DruidException.defensive("Unable to serialize RAC"); + } + JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializerProvider, frameRAC.getSignature()); + + Frame frame = frameRAC.getFrame(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + frame.writeTo(Channels.newChannel(baos), false, null, ByteTracker.unboundedTracker()); + + jsonGenerator.writeBinary(baos.toByteArray()); + } + } + + /** + * Deserializer for {@link RowsAndColumns} returning as an instance of {@link FrameRowsAndColumns} + */ + class RowsAndColumnsDeserializer extends StdDeserializer + { + public RowsAndColumnsDeserializer() + { + super(RowsAndColumns.class); + } + + @Override + public FrameRowsAndColumns deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) + throws IOException + { + RowSignature sig = jsonParser.readValueAs(RowSignature.class); + jsonParser.nextValue(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + jsonParser.readBinaryValue(baos); + Frame frame = Frame.wrap(baos.toByteArray()); + if (frame.type() == FrameType.COLUMNAR) { + return new ColumnBasedFrameRowsAndColumns(frame, sig); + } else { + return new RowBasedFrameRowsAndColumns(frame, sig); + } + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/AbstractFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/AbstractFrameRowsAndColumns.java new file mode 100644 index 0000000000000..5295326c86225 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/AbstractFrameRowsAndColumns.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.concrete; + +import com.google.common.base.Objects; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.segment.CloseableShapeshifter; +import org.apache.druid.segment.CursorFactory; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.LinkedHashMap; + +public abstract class AbstractFrameRowsAndColumns implements FrameRowsAndColumns, AutoCloseable, CloseableShapeshifter +{ + final Frame frame; + final RowSignature signature; + final LinkedHashMap colCache = new LinkedHashMap<>(); + + public AbstractFrameRowsAndColumns(Frame frame, RowSignature signature) + { + this.frame = frame; + this.signature = signature; + } + + @Override + public Frame getFrame() + { + return frame; + } + + @Override + public RowSignature getSignature() + { + return signature; + } + + @Override + public Collection getColumnNames() + { + return signature.getColumnNames(); + } + + @Override + public int numRows() + { + return frame.numRows(); + } + + @SuppressWarnings("unchecked") + @Nullable + @Override + public T as(Class clazz) + { + if (CursorFactory.class.equals(clazz)) { + return (T) FrameReader.create(signature).makeCursorFactory(frame); + } + return FrameRowsAndColumns.super.as(clazz); + } + + @Override + public void close() + { + // nothing to close + } + + @Override + public int hashCode() + { + return Objects.hashCode(frame, signature); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof AbstractFrameRowsAndColumns)) { + return false; + } + AbstractFrameRowsAndColumns otherFrame = (AbstractFrameRowsAndColumns) o; + + return frame.writableMemory().equals(otherFrame.frame.writableMemory()) && signature.equals(otherFrame.signature); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java index e99a3f7f31392..c4a4577dc1af1 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java @@ -19,44 +19,21 @@ package org.apache.druid.query.rowsandcols.concrete; +import org.apache.druid.error.DruidException; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; -import org.apache.druid.frame.read.FrameReader; import org.apache.druid.frame.read.columnar.FrameColumnReaders; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; -import org.apache.druid.segment.CloseableShapeshifter; -import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; -import java.util.Collection; -import java.util.LinkedHashMap; -public class ColumnBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter +public class ColumnBasedFrameRowsAndColumns extends AbstractFrameRowsAndColumns { - private final Frame frame; - private final RowSignature signature; - private final LinkedHashMap colCache = new LinkedHashMap<>(); - public ColumnBasedFrameRowsAndColumns(Frame frame, RowSignature signature) { - this.frame = FrameType.COLUMNAR.ensureType(frame); - this.signature = signature; - } - - @Override - public Collection getColumnNames() - { - return signature.getColumnNames(); - } - - @Override - public int numRows() - { - return frame.numRows(); + super(FrameType.COLUMNAR.ensureType(frame), signature); } @Nullable @@ -71,28 +48,17 @@ public Column findColumn(String name) } else { final ColumnType columnType = signature .getColumnType(columnIndex) - .orElseThrow(() -> new ISE("just got the id, why is columnType not there?")); + .orElseThrow( + () -> DruidException.defensive( + "just got the id [%s][%s], why is columnType not there?", + columnIndex, + name + ) + ); colCache.put(name, FrameColumnReaders.create(name, columnIndex, columnType).readRACColumn(frame)); } } return colCache.get(name); } - - @SuppressWarnings("unchecked") - @Nullable - @Override - public T as(Class clazz) - { - if (CursorFactory.class.equals(clazz)) { - return (T) FrameReader.create(signature).makeCursorFactory(frame); - } - return null; - } - - @Override - public void close() - { - // nothing to close - } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java similarity index 67% rename from processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java rename to processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java index a7d55f5992939..022a0f91ac160 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java @@ -17,21 +17,15 @@ * under the License. */ -package org.apache.druid.query.rowsandcols.semantic; +package org.apache.druid.query.rowsandcols.concrete; -import org.apache.druid.java.util.common.ISE; +import org.apache.druid.frame.Frame; import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.segment.column.RowSignature; -public interface WireTransferable +public interface FrameRowsAndColumns extends RowsAndColumns { - static WireTransferable fromRAC(RowsAndColumns rac) - { - WireTransferable retVal = rac.as(WireTransferable.class); - if (retVal == null) { - throw new ISE("Rac[%s] cannot be transferred over the wire", rac.getClass()); - } - return retVal; - } + Frame getFrame(); - byte[] bytesToTransfer(); + RowSignature getSignature(); } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java index 865a24e5d6da5..c702c210775ca 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java @@ -24,40 +24,17 @@ import org.apache.druid.frame.FrameType; import org.apache.druid.frame.field.FieldReader; import org.apache.druid.frame.field.FieldReaders; -import org.apache.druid.frame.read.FrameReader; -import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; -import org.apache.druid.segment.CloseableShapeshifter; -import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; -import java.util.Collection; -import java.util.LinkedHashMap; -public class RowBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter +public class RowBasedFrameRowsAndColumns extends AbstractFrameRowsAndColumns { - private final Frame frame; - private final RowSignature signature; - private final LinkedHashMap colCache = new LinkedHashMap<>(); - public RowBasedFrameRowsAndColumns(Frame frame, RowSignature signature) { - this.frame = FrameType.ROW_BASED.ensureType(frame); - this.signature = signature; - } - - @Override - public Collection getColumnNames() - { - return signature.getColumnNames(); - } - - @Override - public int numRows() - { - return frame.numRows(); + super(FrameType.ROW_BASED.ensureType(frame), signature); } @Nullable @@ -86,21 +63,4 @@ public Column findColumn(String name) } return colCache.get(name); } - - @SuppressWarnings("unchecked") - @Nullable - @Override - public T as(Class clazz) - { - if (CursorFactory.class.equals(clazz)) { - return (T) FrameReader.create(signature).makeCursorFactory(frame); - } - return null; - } - - @Override - public void close() - { - // nothing to close - } } diff --git a/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java b/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java index 989d137770ee3..92c8a2cb29898 100644 --- a/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java +++ b/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java @@ -22,12 +22,18 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.exc.InvalidTypeIdException; +import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.query.Query; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumnsTest; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.Assert; @@ -35,6 +41,8 @@ import java.util.Arrays; +import static org.junit.Assert.assertEquals; + /** * */ @@ -102,4 +110,22 @@ public void testUnknownTypeWithUnknownService() throws JsonProcessingException } Assert.fail("We expect InvalidTypeIdException to be thrown"); } + + @Test + public void testColumnBasedFrameRowsAndColumns() throws Exception + { + DefaultObjectMapper om = new DefaultObjectMapper("test"); + + MapOfColumnsRowsAndColumns input = (MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of( + "colA", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), + "colB", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0}) + ))); + + ColumnBasedFrameRowsAndColumns frc = ColumnBasedFrameRowsAndColumnsTest.buildFrame(input); + byte[] bytes = om.writeValueAsBytes(frc); + + ColumnBasedFrameRowsAndColumns frc2 = (ColumnBasedFrameRowsAndColumns) om.readValue(bytes, RowsAndColumns.class); + assertEquals(frc, frc2); + } } diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java index 422c87c8b7c60..16cd44e870ba1 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java @@ -21,7 +21,6 @@ import org.apache.druid.query.rowsandcols.column.Column; -import javax.annotation.Nullable; import java.util.Collection; public class NoAsRowsAndColumns implements RowsAndColumns @@ -50,12 +49,4 @@ public Column findColumn(String name) { return rac.findColumn(name); } - - @Nullable - @Override - public T as(Class clazz) - { - // Pretend like this doesn't implement any semantic interfaces - return null; - } } diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java index acfcbe6f83ed0..f6a10e0114647 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java @@ -37,7 +37,15 @@ public ColumnBasedFrameRowsAndColumnsTest() public static ColumnBasedFrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input) { - LazilyDecoratedRowsAndColumns rac = new LazilyDecoratedRowsAndColumns(input, null, null, null, OffsetLimit.limit(Integer.MAX_VALUE), null, null); + LazilyDecoratedRowsAndColumns rac = new LazilyDecoratedRowsAndColumns( + input, + null, + null, + null, + OffsetLimit.limit(Integer.MAX_VALUE), + null, + null + ); rac.numRows(); // materialize return (ColumnBasedFrameRowsAndColumns) rac.getBase(); diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java index f90c2ea191729..41295f4801763 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.rowsandcols.semantic; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -32,6 +33,9 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumnsTest; import org.apache.druid.segment.ArrayListSegment; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.Cursor; @@ -214,6 +218,39 @@ public void testDecorationWithListOfResultRows() } } + @Test + public void testDecoratorWithColumnBasedFrameRAC() + { + RowSignature siggy = RowSignature.builder() + .add("colA", ColumnType.LONG) + .add("colB", ColumnType.LONG) + .build(); + + Object[][] vals = new Object[][]{ + {1L, 4L}, + {2L, -4L}, + {3L, 3L}, + {4L, -3L}, + {5L, 4L}, + {6L, 82L}, + {7L, -90L}, + {8L, 4L}, + {9L, 0L}, + {10L, 0L} + }; + + MapOfColumnsRowsAndColumns input = MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of( + "colA", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), + "colB", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0}) + ) + ); + + ColumnBasedFrameRowsAndColumns frc = ColumnBasedFrameRowsAndColumnsTest.buildFrame(input); + + validateDecorated(frc, siggy, vals, null, null, OffsetLimit.NONE, null); + } + private void validateDecorated( RowsAndColumns base, RowSignature siggy, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java index 2477ac38dec14..d02d302437b87 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java @@ -109,7 +109,6 @@ public boolean featureAvailable(EngineFeature feature) case ALLOW_TOP_LEVEL_UNION_ALL: case TIME_BOUNDARY_QUERY: case GROUPBY_IMPLICITLY_SORTS: - case WINDOW_LEAF_OPERATOR: return true; case CAN_INSERT: case CAN_REPLACE: @@ -117,6 +116,7 @@ public boolean featureAvailable(EngineFeature feature) case WRITE_EXTERNAL_DATA: case SCAN_ORDER_BY_NON_TIME: case SCAN_NEEDS_SIGNATURE: + case WINDOW_LEAF_OPERATOR: return false; default: throw SqlEngines.generateUnrecognizedFeatureException(NativeSqlEngine.class.getSimpleName(), feature); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index a8dcc35ea7add..732681de238aa 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -16086,6 +16086,7 @@ public void testScanAndSortOnJoin() .run(); } + @NotYetSupported(Modes.UNSUPPORTED_DATASOURCE) @Test public void testWindowingOverJoin() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java index ccf459e743e7d..b03938b6ee963 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java @@ -266,7 +266,7 @@ public void testFailure_partitionByMVD() ); assertEquals( - "Encountered a multi value column [v0]. Window processing does not support MVDs. " + "Encountered a multi value column. Window processing does not support MVDs. " + "Consider using UNNEST or MV_TO_ARRAY.", e.getMessage() ); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java index da1431f433d43..f0c48ff44f2f4 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java @@ -89,6 +89,7 @@ enum Modes RESULT_MISMATCH(AssertionError.class, "(assertResulEquals|AssertionError: column content mismatch)"), LONG_CASTING(AssertionError.class, "expected: java.lang.Long"), UNSUPPORTED_NULL_ORDERING(DruidException.class, "(A|DE)SCENDING ordering with NULLS (LAST|FIRST)"), + UNSUPPORTED_DATASOURCE(DruidException.class, "WindowOperatorQuery must run on top of a query or inline data source"), UNION_WITH_COMPLEX_OPERAND(DruidException.class, "Only Table and Values are supported as inputs for Union"), UNION_MORE_STRICT_ROWTYPE_CHECK(DruidException.class, "Row signature mismatch in Union inputs"), JOIN_CONDITION_NOT_PUSHED_CONDITION(DruidException.class, "SQL requires a join with '.*' condition"), diff --git a/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest b/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest index 87873d44c485c..104cb0d2422d6 100644 --- a/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest @@ -2,7 +2,7 @@ type: "operatorValidation" sql: | SELECT - countryIsoCode, + countryIsoCode, CAST (FLOOR(__time TO HOUR) AS BIGINT) t, SUM(delta) delta, SUM(SUM(delta)) OVER (PARTITION BY countryIsoCode ORDER BY CAST (FLOOR(__time TO HOUR) AS BIGINT) ROWS BETWEEN 3 PRECEDING AND 2 FOLLOWING) windowedDelta