Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add serde for ColumnBasedRowsAndColumns to fix window queries without group by #16658

Merged
merged 29 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4e61156
existing stuff
kgyrtkirk Oct 23, 2023
0a07180
hint
kgyrtkirk Oct 24, 2023
fd94bea
Merge remote-tracking branch 'apache/master' into windowing-fixes-not…
kgyrtkirk Oct 24, 2023
05b0d4d
yesterday stuff
kgyrtkirk Oct 25, 2023
e9342cb
some remaining changes
kgyrtkirk Nov 7, 2023
ef66a89
refactor
sreemanamala Jun 26, 2024
b915675
refactor
sreemanamala Jun 26, 2024
67b2e44
checkstyle
sreemanamala Jun 26, 2024
94aa07e
refactor
sreemanamala Jun 26, 2024
f6863a8
test & refactor
sreemanamala Jun 26, 2024
c70b7e4
fix inspection
sreemanamala Jun 26, 2024
ca5bc74
Merge branch 'master' into window-fix-nontransferable
sreemanamala Jun 26, 2024
0cd28bf
FramedRowsAndColumns
sreemanamala Jul 5, 2024
6288da9
Merge branch 'master' into window-fix-nontransferable
sreemanamala Jul 22, 2024
6c42fd0
Frame RAC serializers
sreemanamala Jul 24, 2024
9c6cecb
refactor
sreemanamala Jul 24, 2024
16f44b2
refactor
sreemanamala Jul 25, 2024
b2e2498
refactor
sreemanamala Jul 29, 2024
86d00ec
Merge branch 'master' into window-fix-nontransferable
sreemanamala Sep 3, 2024
b19d2b3
checksty;e
sreemanamala Sep 3, 2024
2ac9509
retire WireTransferable
sreemanamala Sep 8, 2024
b925341
default RowsAndColumns::as
sreemanamala Sep 8, 2024
b0b606a
refactors
sreemanamala Sep 8, 2024
98805d8
bug
sreemanamala Sep 9, 2024
b0b6479
Merge branch 'master' into window-fix-nontransferable
sreemanamala Sep 10, 2024
ce2be7a
comments and conflicts
sreemanamala Sep 10, 2024
c37ad95
Merge branch 'master' into window-fix-nontransferable
sreemanamala Sep 12, 2024
151f6d2
refactor
sreemanamala Sep 14, 2024
f1dc27a
comments
sreemanamala Sep 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContextDeserializer;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
import org.joda.time.DateTimeZone;

import java.io.IOException;
Expand Down Expand Up @@ -189,20 +188,7 @@ public ByteOrder deserialize(JsonParser jp, DeserializationContext ctxt) throws
);
addDeserializer(ResponseContext.class, new ResponseContextDeserializer());

addSerializer(RowsAndColumns.class, new JsonSerializer<RowsAndColumns>()
{
@Override
public void serialize(
RowsAndColumns value,
JsonGenerator gen,
SerializerProvider serializers
) throws IOException
{
// It would be really cool if jackson offered an output stream that would allow us to push bytes
// through, but it doesn't right now, so we have to build a byte[] instead. Maybe something to contribute
// back to Jackson at some point.
gen.writeBinary(WireTransferable.fromRAC(value).bytesToTransfer());
sreemanamala marked this conversation as resolved.
Show resolved Hide resolved
}
});
addSerializer(RowsAndColumns.class, new RowsAndColumns.RowsAndColumnsSerializer());
addDeserializer(RowsAndColumns.class, new RowsAndColumns.RowsAndColumnsDeserializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,31 @@

package org.apache.druid.query.rowsandcols;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.channel.ByteTracker;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.util.Collection;

/**
Expand Down Expand Up @@ -112,4 +131,66 @@ static AppendableRowsAndColumns expectAppendable(RowsAndColumns input)
*/
@Nullable
<T> T as(Class<T> clazz);

/**
* Serializer for {@link RowsAndColumns} by converting the instance to {@link FrameRowsAndColumns}
*/
class RowsAndColumnsSerializer extends StdSerializer<RowsAndColumns>
{
public RowsAndColumnsSerializer()
{
super(RowsAndColumns.class);
}

@Override
public void serialize(
RowsAndColumns rac,
JsonGenerator jsonGenerator,
SerializerProvider serializerProvider
) throws IOException
{
FrameRowsAndColumns frameRAC = rac.as(FrameRowsAndColumns.class);
if (frameRAC == null) {
throw DruidException.defensive("Unable to serialize RAC");
}
JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializerProvider, frameRAC.getSignature());

Frame frame = frameRAC.getFrame();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
frame.writeTo(
Channels.newChannel(baos),
false,
ByteBuffer.allocate(Frame.compressionBufferSize((int) frame.numBytes())),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compression is false; meanwhile a compressionBuffer is being allocated at every call - is that required?
if its needed - would it be possible to reuse the buffer later?

Copy link
Contributor Author

@sreemanamala sreemanamala Sep 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are not trying to compress, I made the buffer null
Lets discuss on this, If compression can improve the performance, I can work on follow-up PR to do that work

ByteTracker.unboundedTracker()
);

jsonGenerator.writeBinary(baos.toByteArray());
}
}

/**
* Deserializer for {@link RowsAndColumns} returning as an instance of {@link FrameRowsAndColumns}
*/
class RowsAndColumnsDeserializer extends StdDeserializer<RowsAndColumns>
{
public RowsAndColumnsDeserializer()
{
super(RowsAndColumns.class);
}

@Override
public FrameRowsAndColumns deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
throws IOException
{
RowSignature sig = jsonParser.readValueAs(RowSignature.class);
jsonParser.nextValue();

ByteArrayOutputStream baos = new ByteArrayOutputStream();
jsonParser.readBinaryValue(baos);
Frame frame = Frame.wrap(baos.toByteArray());
return (frame.type() == FrameType.COLUMNAR)
? new ColumnBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig)
: new RowBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig);
sreemanamala marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,21 @@

package org.apache.druid.query.rowsandcols.concrete;

import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.read.columnar.FrameColumnReaders;
import org.apache.druid.frame.segment.FrameStorageAdapter;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.CloseableShapeshifter;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.LinkedHashMap;

public class ColumnBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter
public class ColumnBasedFrameRowsAndColumns extends FrameRowsAndColumns
{
private final Frame frame;
private final RowSignature signature;
private final LinkedHashMap<String, Column> colCache = new LinkedHashMap<>();

public ColumnBasedFrameRowsAndColumns(Frame frame, RowSignature signature)
{
this.frame = FrameType.COLUMNAR.ensureType(frame);
this.signature = signature;
}

@Override
public Collection<String> getColumnNames()
{
return signature.getColumnNames();
}

@Override
public int numRows()
{
return frame.numRows();
super(FrameType.COLUMNAR.ensureType(frame), signature);
}

@Nullable
Expand All @@ -73,28 +48,17 @@ public Column findColumn(String name)
} else {
final ColumnType columnType = signature
.getColumnType(columnIndex)
.orElseThrow(() -> new ISE("just got the id, why is columnType not there?"));
.orElseThrow(
() -> DruidException.defensive(
"just got the id [%s][%s], why is columnType not there?",
columnIndex,
name
)
);

colCache.put(name, FrameColumnReaders.create(name, columnIndex, columnType).readRACColumn(frame));
}
}
return colCache.get(name);
}

@SuppressWarnings("unchecked")
@Nullable
@Override
public <T> T as(Class<T> clazz)
{
if (StorageAdapter.class.equals(clazz)) {
return (T) new FrameStorageAdapter(frame, FrameReader.create(signature), Intervals.ETERNITY);
}
return null;
}

@Override
public void close()
{
// nothing to close
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.rowsandcols.concrete;

import com.google.common.base.Objects;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.segment.FrameStorageAdapter;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.CloseableShapeshifter;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.LinkedHashMap;

public abstract class FrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter
{
final Frame frame;
final RowSignature signature;
final LinkedHashMap<String, Column> colCache = new LinkedHashMap<>();

public FrameRowsAndColumns(Frame frame, RowSignature signature)
{
this.frame = frame;
this.signature = signature;
}

public Frame getFrame()
{
return frame;
}

public RowSignature getSignature()
{
return signature;
}

@Override
public Collection<String> getColumnNames()
{
return signature.getColumnNames();
}

@Override
public int numRows()
{
return frame.numRows();
}

@SuppressWarnings("unchecked")
@Nullable
@Override
public <T> T as(Class<T> clazz)
{
if (StorageAdapter.class.equals(clazz)) {
return (T) new FrameStorageAdapter(frame, FrameReader.create(signature), Intervals.ETERNITY);
}
if (FrameRowsAndColumns.class.equals(clazz)) {
return (T) this;
}
return null;
sreemanamala marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void close()
{
// nothing to close
}

@Override
public int hashCode()
{
return Objects.hashCode(frame, signature);
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof FrameRowsAndColumns)) {
return false;
}
FrameRowsAndColumns otherFrame = (FrameRowsAndColumns) o;

return frame.writableMemory().equals(otherFrame.frame.writableMemory()) && signature.equals(otherFrame.signature);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,42 +24,17 @@
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.field.FieldReader;
import org.apache.druid.frame.field.FieldReaders;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.segment.FrameStorageAdapter;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.CloseableShapeshifter;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.LinkedHashMap;

public class RowBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter
public class RowBasedFrameRowsAndColumns extends FrameRowsAndColumns
{
private final Frame frame;
private final RowSignature signature;
private final LinkedHashMap<String, Column> colCache = new LinkedHashMap<>();

public RowBasedFrameRowsAndColumns(Frame frame, RowSignature signature)
{
this.frame = FrameType.ROW_BASED.ensureType(frame);
this.signature = signature;
}

@Override
public Collection<String> getColumnNames()
{
return signature.getColumnNames();
}

@Override
public int numRows()
{
return frame.numRows();
super(FrameType.ROW_BASED.ensureType(frame), signature);
}

@Nullable
Expand Down Expand Up @@ -88,21 +63,4 @@ public Column findColumn(String name)
}
return colCache.get(name);
}

@SuppressWarnings("unchecked")
@Nullable
@Override
public <T> T as(Class<T> clazz)
{
if (StorageAdapter.class.equals(clazz)) {
return (T) new FrameStorageAdapter(frame, FrameReader.create(signature), Intervals.ETERNITY);
}
return null;
}

@Override
public void close()
{
// nothing to close
}
}
Loading
Loading