Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add serde for ColumnBasedRowsAndColumns to fix window queries without group by #16658

Merged
merged 29 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4e61156
existing stuff
kgyrtkirk Oct 23, 2023
0a07180
hint
kgyrtkirk Oct 24, 2023
fd94bea
Merge remote-tracking branch 'apache/master' into windowing-fixes-not…
kgyrtkirk Oct 24, 2023
05b0d4d
yesterday stuff
kgyrtkirk Oct 25, 2023
e9342cb
some remaining changes
kgyrtkirk Nov 7, 2023
ef66a89
refactor
sreemanamala Jun 26, 2024
b915675
refactor
sreemanamala Jun 26, 2024
67b2e44
checkstyle
sreemanamala Jun 26, 2024
94aa07e
refactor
sreemanamala Jun 26, 2024
f6863a8
test & refactor
sreemanamala Jun 26, 2024
c70b7e4
fix inspection
sreemanamala Jun 26, 2024
ca5bc74
Merge branch 'master' into window-fix-nontransferable
sreemanamala Jun 26, 2024
0cd28bf
FramedRowsAndColumns
sreemanamala Jul 5, 2024
6288da9
Merge branch 'master' into window-fix-nontransferable
sreemanamala Jul 22, 2024
6c42fd0
Frame RAC serializers
sreemanamala Jul 24, 2024
9c6cecb
refactor
sreemanamala Jul 24, 2024
16f44b2
refactor
sreemanamala Jul 25, 2024
b2e2498
refactor
sreemanamala Jul 29, 2024
86d00ec
Merge branch 'master' into window-fix-nontransferable
sreemanamala Sep 3, 2024
b19d2b3
checksty;e
sreemanamala Sep 3, 2024
2ac9509
retire WireTransferable
sreemanamala Sep 8, 2024
b925341
default RowsAndColumns::as
sreemanamala Sep 8, 2024
b0b606a
refactors
sreemanamala Sep 8, 2024
98805d8
bug
sreemanamala Sep 9, 2024
b0b6479
Merge branch 'master' into window-fix-nontransferable
sreemanamala Sep 10, 2024
ce2be7a
comments and conflicts
sreemanamala Sep 10, 2024
c37ad95
Merge branch 'master' into window-fix-nontransferable
sreemanamala Sep 12, 2024
151f6d2
refactor
sreemanamala Sep 14, 2024
f1dc27a
comments
sreemanamala Sep 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContextDeserializer;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
import org.joda.time.DateTimeZone;

import java.io.IOException;
Expand Down Expand Up @@ -189,20 +188,7 @@ public ByteOrder deserialize(JsonParser jp, DeserializationContext ctxt) throws
);
addDeserializer(ResponseContext.class, new ResponseContextDeserializer());

addSerializer(RowsAndColumns.class, new JsonSerializer<RowsAndColumns>()
{
@Override
public void serialize(
RowsAndColumns value,
JsonGenerator gen,
SerializerProvider serializers
) throws IOException
{
// It would be really cool if jackson offered an output stream that would allow us to push bytes
// through, but it doesn't right now, so we have to build a byte[] instead. Maybe something to contribute
// back to Jackson at some point.
gen.writeBinary(WireTransferable.fromRAC(value).bytesToTransfer());
sreemanamala marked this conversation as resolved.
Show resolved Hide resolved
}
});
addSerializer(RowsAndColumns.class, new RowsAndColumns.RowsAndColumnsSerializer());
addDeserializer(RowsAndColumns.class, new RowsAndColumns.RowsAndColumnsDeserializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.base.Function;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
Expand All @@ -31,10 +30,8 @@
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.column.RowSignature;
import org.joda.time.Interval;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -100,19 +97,8 @@ public Sequence<RowsAndColumns> apply(
@Override
public RowsAndColumns apply(@Nullable RowsAndColumns input)
{
// This is interim code to force a materialization by synthesizing the wire transfer
// that will need to naturally happen as we flesh out this code more. For now, we
// materialize the bytes on-heap and then read them back in as a frame.
if (input instanceof LazilyDecoratedRowsAndColumns) {
final WireTransferable wire = WireTransferable.fromRAC(input);
final byte[] frameBytes = wire.bytesToTransfer();

RowSignature.Builder sigBob = RowSignature.builder();
for (String column : input.getColumnNames()) {
sigBob.add(column, input.findColumn(column).toAccessor().getType());
}

return new ColumnBasedFrameRowsAndColumns(Frame.wrap(frameBytes), sigBob.build());
return input.as(FrameRowsAndColumns.class);
}
return input;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,4 @@ public Column findColumn(String name)
}
return retVal;
}

@Override
@SuppressWarnings("unchecked")
public <T> T as(Class<T> clazz)
{
if (AppendableRowsAndColumns.class.equals(clazz)) {
return (T) this;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,6 @@ public Column findColumn(String name)
}
}

@Nullable
@Override
public <T> T as(Class<T> clazz)
{
return null;
}

private class ConcatedidColumn implements Column
{

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.druid.query.rowsandcols.column.Column;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;

Expand All @@ -44,11 +43,4 @@ public Column findColumn(String name)
{
return null;
}

@Nullable
@Override
public <T> T as(Class<T> clazz)
{
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
import org.apache.druid.query.rowsandcols.semantic.DefaultRowsAndColumnsDecorator;
import org.apache.druid.query.rowsandcols.semantic.RowsAndColumnsDecorator;
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorBuildSpec;
Expand Down Expand Up @@ -150,16 +150,13 @@ public RowsAndColumnsDecorator toRowsAndColumnsDecorator()

@SuppressWarnings("unused")
@SemanticCreator
public WireTransferable toWireTransferable()
public FrameRowsAndColumns toFrameRowsAndColumns()
{
return () -> {
final Pair<byte[], RowSignature> materialized = materialize();
if (materialized == null) {
return new byte[]{};
} else {
return materialized.lhs;
}
};
maybeMaterialize();
if (base instanceof FrameRowsAndColumns) {
return (FrameRowsAndColumns) base;
}
return null;
sreemanamala marked this conversation as resolved.
Show resolved Hide resolved
}

private void maybeMaterialize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.LimitedColumn;

import javax.annotation.Nullable;
import java.util.Collection;

public class LimitedRowsAndColumns implements RowsAndColumns
Expand Down Expand Up @@ -66,12 +65,4 @@ public Column findColumn(String name)

return new LimitedColumn(column, start, end);
}

@Nullable
@Override
public <T> T as(Class<T> clazz)
{
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public <T> T as(Class<T> clazz)
if (AppendableRowsAndColumns.class.equals(clazz)) {
return (T) new AppendableMapOfColumns(this);
}
return null;
return RowsAndColumns.super.as(clazz);
}

public static class Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,4 @@ public int compareRows(int lhsRowNum, int rhsRowNum)
);
}
}

@Nullable
@Override
public <T> T as(Class<T> clazz)
{
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,31 @@

package org.apache.druid.query.rowsandcols;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.channel.ByteTracker;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.Arrays;
import java.util.Collection;

/**
Expand Down Expand Up @@ -110,6 +129,77 @@ static AppendableRowsAndColumns expectAppendable(RowsAndColumns input)
* @return A concrete implementation of the interface, or null if there is no meaningful optimization to be had
* through a local implementation of the interface.
*/
@SuppressWarnings("unchecked")
@Nullable
<T> T as(Class<T> clazz);
default <T> T as(Class<T> clazz)
{
if (Arrays.asList(getClass().getInterfaces()).contains(clazz)) {
return (T) this;
}
Class<?> superClass = this.getClass().getSuperclass();
while (superClass != null) {
if (superClass.equals(clazz)) {
return (T) this;
}
superClass = superClass.getSuperclass();
}
return null;
sreemanamala marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Serializer for {@link RowsAndColumns} by converting the instance to {@link FrameRowsAndColumns}
*/
class RowsAndColumnsSerializer extends StdSerializer<RowsAndColumns>
{
public RowsAndColumnsSerializer()
{
super(RowsAndColumns.class);
}

@Override
public void serialize(
RowsAndColumns rac,
JsonGenerator jsonGenerator,
SerializerProvider serializerProvider
) throws IOException
{
FrameRowsAndColumns frameRAC = rac.as(FrameRowsAndColumns.class);
if (frameRAC == null) {
throw DruidException.defensive("Unable to serialize RAC");
}
JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializerProvider, frameRAC.getSignature());

Frame frame = frameRAC.getFrame();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
frame.writeTo(Channels.newChannel(baos), false, null, ByteTracker.unboundedTracker());

jsonGenerator.writeBinary(baos.toByteArray());
}
}

/**
* Deserializer for {@link RowsAndColumns} returning as an instance of {@link FrameRowsAndColumns}
*/
class RowsAndColumnsDeserializer extends StdDeserializer<RowsAndColumns>
{
public RowsAndColumnsDeserializer()
{
super(RowsAndColumns.class);
}

@Override
public FrameRowsAndColumns deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
throws IOException
{
RowSignature sig = jsonParser.readValueAs(RowSignature.class);
jsonParser.nextValue();

ByteArrayOutputStream baos = new ByteArrayOutputStream();
jsonParser.readBinaryValue(baos);
Frame frame = Frame.wrap(baos.toByteArray());
return (frame.type() == FrameType.COLUMNAR)
? new ColumnBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig)
: new RowBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig);
sreemanamala marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public <T> T as(Class<T> clazz)
if (StorageAdapter.class == clazz) {
return (T) storageAdapter;
}
return null;
return RowsAndColumns.super.as(clazz);
}

@Override
Expand Down
Loading
Loading