Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add serde for ColumnBasedRowsAndColumns to fix window queries without group by #16658

Merged
merged 29 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4e61156
existing stuff
kgyrtkirk Oct 23, 2023
0a07180
hint
kgyrtkirk Oct 24, 2023
fd94bea
Merge remote-tracking branch 'apache/master' into windowing-fixes-not…
kgyrtkirk Oct 24, 2023
05b0d4d
yesterday stuff
kgyrtkirk Oct 25, 2023
e9342cb
some remaining changes
kgyrtkirk Nov 7, 2023
ef66a89
refactor
sreemanamala Jun 26, 2024
b915675
refactor
sreemanamala Jun 26, 2024
67b2e44
checkstyle
sreemanamala Jun 26, 2024
94aa07e
refactor
sreemanamala Jun 26, 2024
f6863a8
test & refactor
sreemanamala Jun 26, 2024
c70b7e4
fix inspection
sreemanamala Jun 26, 2024
ca5bc74
Merge branch 'master' into window-fix-nontransferable
sreemanamala Jun 26, 2024
0cd28bf
FramedRowsAndColumns
sreemanamala Jul 5, 2024
6288da9
Merge branch 'master' into window-fix-nontransferable
sreemanamala Jul 22, 2024
6c42fd0
Frame RAC serializers
sreemanamala Jul 24, 2024
9c6cecb
refactor
sreemanamala Jul 24, 2024
16f44b2
refactor
sreemanamala Jul 25, 2024
b2e2498
refactor
sreemanamala Jul 29, 2024
86d00ec
Merge branch 'master' into window-fix-nontransferable
sreemanamala Sep 3, 2024
b19d2b3
checksty;e
sreemanamala Sep 3, 2024
2ac9509
retire WireTransferable
sreemanamala Sep 8, 2024
b925341
default RowsAndColumns::as
sreemanamala Sep 8, 2024
b0b606a
refactors
sreemanamala Sep 8, 2024
98805d8
bug
sreemanamala Sep 9, 2024
b0b6479
Merge branch 'master' into window-fix-nontransferable
sreemanamala Sep 10, 2024
ce2be7a
comments and conflicts
sreemanamala Sep 10, 2024
c37ad95
Merge branch 'master' into window-fix-nontransferable
sreemanamala Sep 12, 2024
151f6d2
refactor
sreemanamala Sep 14, 2024
f1dc27a
comments
sreemanamala Sep 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.channel.ByteTracker;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
Expand All @@ -37,11 +41,16 @@
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContextDeserializer;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
import org.apache.druid.segment.column.RowSignature;
import org.joda.time.DateTimeZone;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channels;

/**
*
Expand Down Expand Up @@ -201,7 +210,54 @@ public void serialize(
// It would be really cool if jackson offered an output stream that would allow us to push bytes
// through, but it doesn't right now, so we have to build a byte[] instead. Maybe something to contribute
// back to Jackson at some point.
gen.writeBinary(WireTransferable.fromRAC(value).bytesToTransfer());
sreemanamala marked this conversation as resolved.
Show resolved Hide resolved

if (value instanceof ColumnBasedFrameRowsAndColumns) {

ColumnBasedFrameRowsAndColumns frc = (ColumnBasedFrameRowsAndColumns) value;
JacksonUtils.writeObjectUsingSerializerProvider(gen, serializers, frc.getSignature());

this.writeFrameToGenerator(frc.getFrame(), gen);
} else if (value instanceof RowBasedFrameRowsAndColumns) {
RowBasedFrameRowsAndColumns frc = (RowBasedFrameRowsAndColumns) value;
JacksonUtils.writeObjectUsingSerializerProvider(gen, serializers, frc.getSignature());

this.writeFrameToGenerator(frc.getFrame(), gen);
} else {
throw DruidException.defensive("expected frame RowsAndColumns");
}

}

private void writeFrameToGenerator(Frame frame, JsonGenerator generator) throws IOException
{
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
frame.writeTo(
Channels.newChannel(baos),
false,
ByteBuffer.allocate(Frame.compressionBufferSize((int) frame.numBytes())),
ByteTracker.unboundedTracker()
);

generator.writeBinary(baos.toByteArray());
}
});

addDeserializer(RowsAndColumns.class, new JsonDeserializer<RowsAndColumns>()
{
@Override
public RowsAndColumns deserialize(
JsonParser p,
DeserializationContext ctxt
) throws IOException
{
RowSignature sig = p.readValueAs(RowSignature.class);
p.nextValue();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
p.readBinaryValue(baos);
Frame frame = Frame.wrap(baos.toByteArray());
return (frame.type() == FrameType.COLUMNAR)
? new ColumnBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig)
: new RowBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.query.operator;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.collect.ImmutableMap;
Expand All @@ -36,6 +37,8 @@
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.NullColumn;
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;

Expand All @@ -45,7 +48,6 @@

public class WindowOperatorQueryQueryToolChest extends QueryToolChest<RowsAndColumns, WindowOperatorQuery>
{

@Override
@SuppressWarnings("unchecked")
public QueryRunner<RowsAndColumns> mergeResults(QueryRunner<RowsAndColumns> runner)
Expand Down Expand Up @@ -196,4 +198,16 @@ public Sequence run(
return baseQueryRunner.run(queryPlus, responseContext);
}
}

@Override
public ObjectMapper decorateObjectMapper(ObjectMapper objectMapper, WindowOperatorQuery query)
{
ObjectMapper om = super.decorateObjectMapper(objectMapper, query).copy();

om.registerSubtypes(ColumnBasedFrameRowsAndColumns.class);
om.registerSubtypes(RowBasedFrameRowsAndColumns.class);

return om;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.apache.druid.query.rowsandcols.concrete;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.read.FrameReader;
Expand All @@ -44,12 +47,27 @@ public class ColumnBasedFrameRowsAndColumns implements RowsAndColumns, AutoClose
private final RowSignature signature;
private final LinkedHashMap<String, Column> colCache = new LinkedHashMap<>();

public ColumnBasedFrameRowsAndColumns(Frame frame, RowSignature signature)
@JsonCreator
sreemanamala marked this conversation as resolved.
Show resolved Hide resolved
public ColumnBasedFrameRowsAndColumns(
@JsonProperty("frame") Frame frame,
@JsonProperty("signature") RowSignature signature)
{
this.frame = FrameType.COLUMNAR.ensureType(frame);
this.signature = signature;
}

@JsonProperty("frame")
public Frame getFrame()
{
return frame;
}

@JsonProperty("signature")
public RowSignature getSignature()
{
return signature;
}

@Override
public Collection<String> getColumnNames()
{
Expand Down Expand Up @@ -102,4 +120,24 @@ public void close()
{
// nothing to close
}

@Override
public int hashCode()
{
return Objects.hashCode(frame, signature);
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof ColumnBasedFrameRowsAndColumns)) {
return false;
}
ColumnBasedFrameRowsAndColumns otherFrame = (ColumnBasedFrameRowsAndColumns) o;

return frame.writableMemory().equals(otherFrame.frame.writableMemory()) && signature.equals(otherFrame.signature);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.apache.druid.query.rowsandcols.concrete;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.read.FrameReader;
Expand All @@ -28,12 +31,14 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
import org.apache.druid.segment.CloseableShapeshifter;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nullable;

import java.util.Collection;
import java.util.LinkedHashMap;

Expand All @@ -43,7 +48,10 @@ public class RowBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseabl
private final RowSignature signature;
private final LinkedHashMap<String, Column> colCache = new LinkedHashMap<>();

public RowBasedFrameRowsAndColumns(Frame frame, RowSignature signature)
@JsonCreator
public RowBasedFrameRowsAndColumns(
@JsonProperty("frame") Frame frame,
@JsonProperty("signature") RowSignature signature)
{
this.frame = FrameType.ROW_BASED.ensureType(frame);
this.signature = signature;
Expand All @@ -55,6 +63,18 @@ public Collection<String> getColumnNames()
return signature.getColumnNames();
}

@JsonProperty("frame")
public Frame getFrame()
{
return frame;
}

@JsonProperty("signature")
public RowSignature getSignature()
{
return signature;
}

@Override
public int numRows()
{
Expand Down Expand Up @@ -88,6 +108,9 @@ public <T> T as(Class<T> clazz)
if (StorageAdapter.class.equals(clazz)) {
return (T) new FrameStorageAdapter(frame, FrameReader.create(signature), Intervals.ETERNITY);
}
if (WireTransferable.class.equals(clazz)) {
return (T) this;
}
return null;
}

Expand All @@ -96,4 +119,24 @@ public void close()
{
// nothing to close
}

@Override
public int hashCode()
{
return Objects.hashCode(frame, signature);
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof RowBasedFrameRowsAndColumns)) {
return false;
}
RowBasedFrameRowsAndColumns otherFrame = (RowBasedFrameRowsAndColumns) o;

return frame.writableMemory().equals(otherFrame.frame.writableMemory()) && signature.equals(otherFrame.signature);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,27 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.query.Query;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumnsTest;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;

import static org.junit.Assert.assertEquals;

/**
*
*/
Expand Down Expand Up @@ -102,4 +110,23 @@ public void testUnknownTypeWithUnknownService() throws JsonProcessingException
}
Assert.fail("We expect InvalidTypeIdException to be thrown");
}

@Test
public void testColumnBasedFrameRowsAndColumns() throws Exception
{
DefaultObjectMapper om = new DefaultObjectMapper("test");

MapOfColumnsRowsAndColumns input = (MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"colA", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
"colB", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0})
)));

ColumnBasedFrameRowsAndColumns frc = ColumnBasedFrameRowsAndColumnsTest.buildFrame(input);
byte[] bytes = om.writeValueAsBytes(frc);

ColumnBasedFrameRowsAndColumns frc2 = (ColumnBasedFrameRowsAndColumns) om.readValue(bytes, RowsAndColumns.class);
assertEquals(frc, frc2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ public static ColumnBasedFrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColum

return (ColumnBasedFrameRowsAndColumns) rac.getBase();
}

sreemanamala marked this conversation as resolved.
Show resolved Hide resolved
}
Loading
Loading