Skip to content

Commit

Permalink
decouple column serializer compression closers from SegmentWriteoutMe…
Browse files Browse the repository at this point in the history
…dium to optionally allow serializers to release direct memory allocated for compression earlier than when segment is completed (#16076)
  • Loading branch information
clintropolis authored Mar 11, 2024
1 parent 8084f22 commit 313da98
Show file tree
Hide file tree
Showing 39 changed files with 224 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ static int encodeToFile(long[] vals, String encoding, FileChannel output)throws
"lz4-longs",
ByteOrder.LITTLE_ENDIAN,
CompressionFactory.LongEncodingStrategy.LONGS,
CompressionStrategy.LZ4
CompressionStrategy.LZ4,
writeOutMedium.getCloser()
);
break;
case "lz4-auto":
Expand All @@ -250,7 +251,8 @@ static int encodeToFile(long[] vals, String encoding, FileChannel output)throws
"lz4-auto",
ByteOrder.LITTLE_ENDIAN,
CompressionFactory.LongEncodingStrategy.AUTO,
CompressionStrategy.LZ4
CompressionStrategy.LZ4,
writeOutMedium.getCloser()
);
break;
case "none-longs":
Expand All @@ -260,7 +262,8 @@ static int encodeToFile(long[] vals, String encoding, FileChannel output)throws
"none-longs",
ByteOrder.LITTLE_ENDIAN,
CompressionFactory.LongEncodingStrategy.LONGS,
CompressionStrategy.NONE
CompressionStrategy.NONE,
writeOutMedium.getCloser()
);
break;
case "none-auto":
Expand All @@ -270,7 +273,8 @@ static int encodeToFile(long[] vals, String encoding, FileChannel output)throws
"none-auto",
ByteOrder.LITTLE_ENDIAN,
CompressionFactory.LongEncodingStrategy.AUTO,
CompressionStrategy.NONE
CompressionStrategy.NONE,
writeOutMedium.getCloser()
);
break;
case "zstd-longs":
Expand All @@ -280,7 +284,8 @@ static int encodeToFile(long[] vals, String encoding, FileChannel output)throws
"zstd-longs",
ByteOrder.LITTLE_ENDIAN,
CompressionFactory.LongEncodingStrategy.LONGS,
CompressionStrategy.ZSTD
CompressionStrategy.ZSTD,
writeOutMedium.getCloser()
);
break;
case "zstd-auto":
Expand All @@ -290,7 +295,8 @@ static int encodeToFile(long[] vals, String encoding, FileChannel output)throws
"zstd-auto",
ByteOrder.LITTLE_ENDIAN,
CompressionFactory.LongEncodingStrategy.AUTO,
CompressionStrategy.ZSTD
CompressionStrategy.ZSTD,
writeOutMedium.getCloser()
);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.segment.generator.ColumnValueGenerator;
import org.apache.druid.segment.generator.GeneratorColumnSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;

import java.io.BufferedReader;
import java.io.File;
Expand Down Expand Up @@ -155,12 +156,14 @@ public static void main(String[] args) throws IOException
compFile.delete();
File dataFile = new File(dir, entry.getKey());

SegmentWriteOutMedium segmentWriteOutMedium = new OffHeapMemorySegmentWriteOutMedium();
ColumnarFloatsSerializer writer = CompressionFactory.getFloatSerializer(
"float-benchmark",
new OffHeapMemorySegmentWriteOutMedium(),
segmentWriteOutMedium,
"float",
ByteOrder.nativeOrder(),
compression
compression,
segmentWriteOutMedium.getCloser()
);
try (
BufferedReader br = Files.newBufferedReader(dataFile.toPath(), StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.segment.generator.ColumnValueGenerator;
import org.apache.druid.segment.generator.GeneratorColumnSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;

import java.io.BufferedReader;
import java.io.File;
Expand Down Expand Up @@ -148,13 +149,15 @@ public static void main(String[] args) throws IOException
compFile.delete();
File dataFile = new File(dir, entry.getKey());

SegmentWriteOutMedium segmentWriteOutMedium = new OffHeapMemorySegmentWriteOutMedium();
ColumnarLongsSerializer writer = CompressionFactory.getLongSerializer(
"long-benchmark",
new OffHeapMemorySegmentWriteOutMedium(),
segmentWriteOutMedium,
"long",
ByteOrder.nativeOrder(),
encoding,
compression
compression,
segmentWriteOutMedium.getCloser()
);
try (
BufferedReader br = Files.newBufferedReader(dataFile.toPath(), StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,26 @@ public class CompressedBigDecimalLongColumnSerializer implements GenericColumnSe
*/
public static CompressedBigDecimalLongColumnSerializer create(
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase)
String filenameBase
)
{
return new CompressedBigDecimalLongColumnSerializer(
CompressedVSizeColumnarIntsSerializer.create(
"dummy",
segmentWriteOutMedium,
String.format(Locale.ROOT, "%s.scale", filenameBase),
16,
CompressionStrategy.LZ4),
CompressionStrategy.LZ4,
segmentWriteOutMedium.getCloser()
),
V3CompressedVSizeColumnarMultiIntsSerializer.create(
"dummy",
segmentWriteOutMedium,
String.format(Locale.ROOT, "%s.magnitude", filenameBase),
Integer.MAX_VALUE,
CompressionStrategy.LZ4));
CompressionStrategy.LZ4
)
);
}

private final CompressedVSizeColumnarIntsSerializer scaleWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,8 @@ protected void setupEncodedValueWriter() throws IOException
segmentWriteOutMedium,
filenameBase,
cardinality,
compressionStrategy
compressionStrategy,
segmentWriteOutMedium.getCloser()
);
} else {
encodedValueSerializer = new VSizeColumnarIntsSerializer(segmentWriteOutMedium, cardinality);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public void open() throws IOException
segmentWriteOutMedium,
StringUtils.format("%s.double_column", filenameBase),
byteOrder,
compression
compression,
segmentWriteOutMedium.getCloser()
);
writer.open();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public void open() throws IOException
segmentWriteOutMedium,
StringUtils.format("%s.double_column", filenameBase),
byteOrder,
compression
compression,
segmentWriteOutMedium.getCloser()
);
writer.open();
nullValueBitmapWriter = new ByteBufferWriter<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public void open() throws IOException
segmentWriteOutMedium,
StringUtils.format("%s.float_column", filenameBase),
byteOrder,
compression
compression,
segmentWriteOutMedium.getCloser()
);
writer.open();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public void open() throws IOException
segmentWriteOutMedium,
StringUtils.format("%s.float_column", filenameBase),
byteOrder,
compression
compression,
segmentWriteOutMedium.getCloser()
);
writer.open();
nullValueBitmapWriter = new ByteBufferWriter<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ private File makeIndexFiles(
mergers.add(
handler.makeMerger(
indexSpec,

segmentWriteOutMedium,
dimFormats.get(i).toColumnCapabilities(),
progress,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public void open() throws IOException
StringUtils.format("%s.long_column", filenameBase),
byteOrder,
encoding,
compression
compression,
segmentWriteOutMedium.getCloser()
);
writer.open();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public void open() throws IOException
StringUtils.format("%s.long_column", filenameBase),
byteOrder,
encoding,
compression
compression,
segmentWriteOutMedium.getCloser()
);
writer.open();
nullValueBitmapWriter = new ByteBufferWriter<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,20 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
CompressionStrategy compression
CompressionStrategy compression,
Closer closer
)
{
this.columnName = columnName;
this.flattener = GenericIndexedWriter.ofCompressedByteBuffers(
segmentWriteOutMedium,
filenameBase,
compression,
CompressedPools.BUFFER_SIZE
CompressedPools.BUFFER_SIZE,
closer
);
this.compression = compression;
CompressionStrategy.Compressor compressor = compression.getCompressor();
Closer closer = segmentWriteOutMedium.getCloser();
this.endBuffer = compressor.allocateInBuffer(CompressedPools.BUFFER_SIZE, closer).order(byteOrder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,20 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
CompressionStrategy compression
CompressionStrategy compression,
Closer closer
)
{
this.columnName = columnName;
this.flattener = GenericIndexedWriter.ofCompressedByteBuffers(
segmentWriteOutMedium,
filenameBase,
compression,
CompressedPools.BUFFER_SIZE
CompressedPools.BUFFER_SIZE,
closer
);
this.compression = compression;
CompressionStrategy.Compressor compressor = compression.getCompressor();
Closer closer = segmentWriteOutMedium.getCloser();
this.endBuffer = compressor.allocateInBuffer(CompressedPools.BUFFER_SIZE, closer).order(byteOrder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

package org.apache.druid.segment.data;

import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.CompressedPools;
import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;

import javax.annotation.Nullable;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand Down Expand Up @@ -59,17 +59,24 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
String filenameBase,
ByteOrder byteOrder,
CompressionFactory.LongEncodingWriter writer,
CompressionStrategy compression
CompressionStrategy compression,
Closer closer
)
{
this.columnName = columnName;
this.sizePer = writer.getBlockSize(CompressedPools.BUFFER_SIZE);
int bufferSize = writer.getNumBytes(sizePer);
this.flattener = GenericIndexedWriter.ofCompressedByteBuffers(segmentWriteOutMedium, filenameBase, compression, bufferSize);
this.flattener = GenericIndexedWriter.ofCompressedByteBuffers(
segmentWriteOutMedium,
filenameBase,
compression,
bufferSize,
closer
);
this.writer = writer;
this.compression = compression;
CompressionStrategy.Compressor compressor = compression.getCompressor();
endBuffer = compressor.allocateInBuffer(writer.getNumBytes(sizePer), segmentWriteOutMedium.getCloser()).order(byteOrder);
endBuffer = compressor.allocateInBuffer(writer.getNumBytes(sizePer), closer).order(byteOrder);
writer.setBuffer(endBuffer);
numInsertedForNextFlush = sizePer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.segment.data;

import org.apache.druid.io.Channels;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.CompressedPools;
import org.apache.druid.segment.serde.MetaSerdeHelper;
Expand Down Expand Up @@ -60,18 +61,16 @@ public class CompressedBlockSerializer implements Serializer

public CompressedBlockSerializer(
SegmentWriteOutMedium segmentWriteOutMedium,

CompressionStrategy compression,
int blockSize
int blockSize,
Closer closer
)
{
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.compression = compression;
this.compressor = compression.getCompressor();
this.uncompressedDataBuffer = compressor.allocateInBuffer(blockSize, segmentWriteOutMedium.getCloser())
.order(ByteOrder.nativeOrder());
this.compressedDataBuffer = compressor.allocateOutBuffer(blockSize, segmentWriteOutMedium.getCloser())
.order(ByteOrder.nativeOrder());
this.uncompressedDataBuffer = compressor.allocateInBuffer(blockSize, closer).order(ByteOrder.nativeOrder());
this.compressedDataBuffer = compressor.allocateOutBuffer(blockSize, closer).order(ByteOrder.nativeOrder());
}

public void open() throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,39 +58,40 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
final String filenameBase,
final int chunkFactor,
final ByteOrder byteOrder,
final CompressionStrategy compression
final CompressionStrategy compression,
final Closer closer
)
{
this(
columnName,
segmentWriteOutMedium,
chunkFactor,
byteOrder,
compression,
GenericIndexedWriter.ofCompressedByteBuffers(
segmentWriteOutMedium,
filenameBase,
compression,
chunkFactor * Integer.BYTES
)
chunkFactor * Integer.BYTES,
closer
),
closer
);
}

CompressedColumnarIntsSerializer(
final String columnName,
final SegmentWriteOutMedium segmentWriteOutMedium,
final int chunkFactor,
final ByteOrder byteOrder,
final CompressionStrategy compression,
final GenericIndexedWriter<ByteBuffer> flattener
final GenericIndexedWriter<ByteBuffer> flattener,
final Closer closer
)
{
this.columnName = columnName;
this.chunkFactor = chunkFactor;
this.compression = compression;
this.flattener = flattener;
CompressionStrategy.Compressor compressor = compression.getCompressor();
Closer closer = segmentWriteOutMedium.getCloser();
this.endBuffer = compressor.allocateInBuffer(chunkFactor * Integer.BYTES, closer).order(byteOrder);
this.numInserted = 0;
}
Expand Down
Loading

0 comments on commit 313da98

Please sign in to comment.