Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #16658 to 31.0.0 #17102

Closed
wants to merge 2 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add serde for ColumnBasedRowsAndColumns to fix window queries without…
… group by (#16658)

Register a Ser-De for RowsAndColumns so that the window operator query running on leaf operators would be transferred properly on the wire. Would fix the empty response given by window queries without group by on the native engine.
  • Loading branch information
sreemanamala committed Sep 18, 2024
commit 70356b351ff7060d381c9a4a81dc7fbce13d51ca
Original file line number Diff line number Diff line change
@@ -37,7 +37,6 @@
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContextDeserializer;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
import org.joda.time.DateTimeZone;

import java.io.IOException;
@@ -189,20 +188,7 @@ public ByteOrder deserialize(JsonParser jp, DeserializationContext ctxt) throws
);
addDeserializer(ResponseContext.class, new ResponseContextDeserializer());

addSerializer(RowsAndColumns.class, new JsonSerializer<RowsAndColumns>()
{
@Override
public void serialize(
RowsAndColumns value,
JsonGenerator gen,
SerializerProvider serializers
) throws IOException
{
// It would be really cool if jackson offered an output stream that would allow us to push bytes
// through, but it doesn't right now, so we have to build a byte[] instead. Maybe something to contribute
// back to Jackson at some point.
gen.writeBinary(WireTransferable.fromRAC(value).bytesToTransfer());
}
});
addSerializer(RowsAndColumns.class, new RowsAndColumns.RowsAndColumnsSerializer());
addDeserializer(RowsAndColumns.class, new RowsAndColumns.RowsAndColumnsDeserializer());
}
}
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@

import com.google.common.base.Function;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@@ -31,10 +30,8 @@
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.column.RowSignature;
import org.joda.time.Interval;

import javax.annotation.Nullable;
@@ -100,19 +97,8 @@ public Sequence<RowsAndColumns> apply(
@Override
public RowsAndColumns apply(@Nullable RowsAndColumns input)
{
// This is interim code to force a materialization by synthesizing the wire transfer
// that will need to naturally happen as we flesh out this code more. For now, we
// materialize the bytes on-heap and then read them back in as a frame.
if (input instanceof LazilyDecoratedRowsAndColumns) {
final WireTransferable wire = WireTransferable.fromRAC(input);
final byte[] frameBytes = wire.bytesToTransfer();

RowSignature.Builder sigBob = RowSignature.builder();
for (String column : input.getColumnNames()) {
sigBob.add(column, input.findColumn(column).toAccessor().getType());
}

return new ColumnBasedFrameRowsAndColumns(Frame.wrap(frameBytes), sigBob.build());
return input.as(FrameRowsAndColumns.class);
}
return input;
}
Original file line number Diff line number Diff line change
@@ -79,14 +79,4 @@ public Column findColumn(String name)
}
return retVal;
}

@Override
@SuppressWarnings("unchecked")
public <T> T as(Class<T> clazz)
{
if (AppendableRowsAndColumns.class.equals(clazz)) {
return (T) this;
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -141,13 +141,6 @@ public Column findColumn(String name)
}
}

@Nullable
@Override
public <T> T as(Class<T> clazz)
{
return null;
}

private class ConcatedidColumn implements Column
{

Original file line number Diff line number Diff line change
@@ -61,7 +61,7 @@ public <T> T as(Class<T> clazz)
if (CursorFactory.class == clazz) {
return (T) cursorFactory;
}
return null;
return RowsAndColumns.super.as(clazz);
}

@Override
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@

import org.apache.druid.query.rowsandcols.column.Column;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;

@@ -44,11 +43,4 @@ public Column findColumn(String name)
{
return null;
}

@Nullable
@Override
public <T> T as(Class<T> clazz)
{
return null;
}
}
Original file line number Diff line number Diff line change
@@ -39,10 +39,10 @@
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
import org.apache.druid.query.rowsandcols.semantic.DefaultRowsAndColumnsDecorator;
import org.apache.druid.query.rowsandcols.semantic.RowsAndColumnsDecorator;
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorBuildSpec;
@@ -150,16 +150,10 @@ public RowsAndColumnsDecorator toRowsAndColumnsDecorator()

@SuppressWarnings("unused")
@SemanticCreator
public WireTransferable toWireTransferable()
public FrameRowsAndColumns toFrameRowsAndColumns()
{
return () -> {
final Pair<byte[], RowSignature> materialized = materialize();
if (materialized == null) {
return new byte[]{};
} else {
return materialized.lhs;
}
};
maybeMaterialize();
return base.as(FrameRowsAndColumns.class);
}

private void maybeMaterialize()
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.LimitedColumn;

import javax.annotation.Nullable;
import java.util.Collection;

public class LimitedRowsAndColumns implements RowsAndColumns
@@ -66,12 +65,4 @@ public Column findColumn(String name)

return new LimitedColumn(column, start, end);
}

@Nullable
@Override
public <T> T as(Class<T> clazz)
{
return null;
}

}
Original file line number Diff line number Diff line change
@@ -164,7 +164,7 @@ public <T> T as(Class<T> clazz)
if (AppendableRowsAndColumns.class.equals(clazz)) {
return (T) new AppendableMapOfColumns(this);
}
return null;
return RowsAndColumns.super.as(clazz);
}

public static class Builder
Original file line number Diff line number Diff line change
@@ -164,11 +164,4 @@ public int compareRows(int lhsRowNum, int rhsRowNum)
);
}
}

@Nullable
@Override
public <T> T as(Class<T> clazz)
{
return null;
}
}
Original file line number Diff line number Diff line change
@@ -19,12 +19,30 @@

package org.apache.druid.query.rowsandcols;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.channel.ByteTracker;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.Collection;

/**
@@ -110,6 +128,72 @@ static AppendableRowsAndColumns expectAppendable(RowsAndColumns input)
* @return A concrete implementation of the interface, or null if there is no meaningful optimization to be had
* through a local implementation of the interface.
*/
@SuppressWarnings("unchecked")
@Nullable
<T> T as(Class<T> clazz);
default <T> T as(Class<T> clazz)
{
if (clazz.isInstance(this)) {
return (T) this;
}
return null;
}

/**
* Serializer for {@link RowsAndColumns} by converting the instance to {@link FrameRowsAndColumns}
*/
class RowsAndColumnsSerializer extends StdSerializer<RowsAndColumns>
{
public RowsAndColumnsSerializer()
{
super(RowsAndColumns.class);
}

@Override
public void serialize(
RowsAndColumns rac,
JsonGenerator jsonGenerator,
SerializerProvider serializerProvider
) throws IOException
{
FrameRowsAndColumns frameRAC = rac.as(FrameRowsAndColumns.class);
if (frameRAC == null) {
throw DruidException.defensive("Unable to serialize RAC");
}
JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializerProvider, frameRAC.getSignature());

Frame frame = frameRAC.getFrame();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
frame.writeTo(Channels.newChannel(baos), false, null, ByteTracker.unboundedTracker());

jsonGenerator.writeBinary(baos.toByteArray());
}
}

/**
* Deserializer for {@link RowsAndColumns} returning as an instance of {@link FrameRowsAndColumns}
*/
class RowsAndColumnsDeserializer extends StdDeserializer<RowsAndColumns>
{
public RowsAndColumnsDeserializer()
{
super(RowsAndColumns.class);
}

@Override
public FrameRowsAndColumns deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
throws IOException
{
RowSignature sig = jsonParser.readValueAs(RowSignature.class);
jsonParser.nextValue();

ByteArrayOutputStream baos = new ByteArrayOutputStream();
jsonParser.readBinaryValue(baos);
Frame frame = Frame.wrap(baos.toByteArray());
if (frame.type() == FrameType.COLUMNAR) {
return new ColumnBasedFrameRowsAndColumns(frame, sig);
} else {
return new RowBasedFrameRowsAndColumns(frame, sig);
}
}
}
}
Loading
Loading