diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java index b26e1dc58920..f912f5e70b29 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java @@ -240,7 +240,8 @@ static int encodeToFile(long[] vals, String encoding, FileChannel output)throws "lz4-longs", ByteOrder.LITTLE_ENDIAN, CompressionFactory.LongEncodingStrategy.LONGS, - CompressionStrategy.LZ4 + CompressionStrategy.LZ4, + writeOutMedium.getCloser() ); break; case "lz4-auto": @@ -250,7 +251,8 @@ static int encodeToFile(long[] vals, String encoding, FileChannel output)throws "lz4-auto", ByteOrder.LITTLE_ENDIAN, CompressionFactory.LongEncodingStrategy.AUTO, - CompressionStrategy.LZ4 + CompressionStrategy.LZ4, + writeOutMedium.getCloser() ); break; case "none-longs": @@ -260,7 +262,8 @@ static int encodeToFile(long[] vals, String encoding, FileChannel output)throws "none-longs", ByteOrder.LITTLE_ENDIAN, CompressionFactory.LongEncodingStrategy.LONGS, - CompressionStrategy.NONE + CompressionStrategy.NONE, + writeOutMedium.getCloser() ); break; case "none-auto": @@ -270,7 +273,8 @@ static int encodeToFile(long[] vals, String encoding, FileChannel output)throws "none-auto", ByteOrder.LITTLE_ENDIAN, CompressionFactory.LongEncodingStrategy.AUTO, - CompressionStrategy.NONE + CompressionStrategy.NONE, + writeOutMedium.getCloser() ); break; case "zstd-longs": @@ -280,7 +284,8 @@ static int encodeToFile(long[] vals, String encoding, FileChannel output)throws "zstd-longs", ByteOrder.LITTLE_ENDIAN, CompressionFactory.LongEncodingStrategy.LONGS, - CompressionStrategy.ZSTD + CompressionStrategy.ZSTD, + writeOutMedium.getCloser() ); break; case "zstd-auto": @@ -290,7 +295,8 @@ static int encodeToFile(long[] vals, String encoding, FileChannel output)throws "zstd-auto", ByteOrder.LITTLE_ENDIAN, CompressionFactory.LongEncodingStrategy.AUTO, - CompressionStrategy.ZSTD + CompressionStrategy.ZSTD, + writeOutMedium.getCloser() ); break; default: diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmarkFileGenerator.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmarkFileGenerator.java index 82709a6c4b3b..9715db49e0ed 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmarkFileGenerator.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmarkFileGenerator.java @@ -29,6 +29,7 @@ import org.apache.druid.segment.generator.ColumnValueGenerator; import org.apache.druid.segment.generator.GeneratorColumnSchema; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import java.io.BufferedReader; import java.io.File; @@ -155,12 +156,14 @@ public static void main(String[] args) throws IOException compFile.delete(); File dataFile = new File(dir, entry.getKey()); + SegmentWriteOutMedium segmentWriteOutMedium = new OffHeapMemorySegmentWriteOutMedium(); ColumnarFloatsSerializer writer = CompressionFactory.getFloatSerializer( "float-benchmark", - new OffHeapMemorySegmentWriteOutMedium(), + segmentWriteOutMedium, "float", ByteOrder.nativeOrder(), - compression + compression, + segmentWriteOutMedium.getCloser() ); try ( BufferedReader br = Files.newBufferedReader(dataFile.toPath(), StandardCharsets.UTF_8); diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmarkFileGenerator.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmarkFileGenerator.java index 55d5f6b82bbb..b1786e82f45c 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmarkFileGenerator.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmarkFileGenerator.java @@ -29,6 +29,7 @@ import org.apache.druid.segment.generator.ColumnValueGenerator; import org.apache.druid.segment.generator.GeneratorColumnSchema; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import java.io.BufferedReader; import java.io.File; @@ -148,13 +149,15 @@ public static void main(String[] args) throws IOException compFile.delete(); File dataFile = new File(dir, entry.getKey()); + SegmentWriteOutMedium segmentWriteOutMedium = new OffHeapMemorySegmentWriteOutMedium(); ColumnarLongsSerializer writer = CompressionFactory.getLongSerializer( "long-benchmark", - new OffHeapMemorySegmentWriteOutMedium(), + segmentWriteOutMedium, "long", ByteOrder.nativeOrder(), encoding, - compression + compression, + segmentWriteOutMedium.getCloser() ); try ( BufferedReader br = Files.newBufferedReader(dataFile.toPath(), StandardCharsets.UTF_8); diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java index e6b6539fe38e..ef899c2f4557 100644 --- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java +++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java @@ -49,7 +49,8 @@ public class CompressedBigDecimalLongColumnSerializer implements GenericColumnSe */ public static CompressedBigDecimalLongColumnSerializer create( SegmentWriteOutMedium segmentWriteOutMedium, - String filenameBase) + String filenameBase + ) { return new CompressedBigDecimalLongColumnSerializer( CompressedVSizeColumnarIntsSerializer.create( @@ -57,13 +58,17 @@ public static CompressedBigDecimalLongColumnSerializer create( segmentWriteOutMedium, String.format(Locale.ROOT, "%s.scale", filenameBase), 16, - CompressionStrategy.LZ4), + CompressionStrategy.LZ4, + segmentWriteOutMedium.getCloser() + ), V3CompressedVSizeColumnarMultiIntsSerializer.create( "dummy", segmentWriteOutMedium, String.format(Locale.ROOT, "%s.magnitude", filenameBase), Integer.MAX_VALUE, - CompressionStrategy.LZ4)); + CompressionStrategy.LZ4 + ) + ); } private final CompressedVSizeColumnarIntsSerializer scaleWriter; diff --git a/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java index 15f7f51120d1..f1d9d7c5bb4d 100644 --- a/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java +++ b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java @@ -423,7 +423,8 @@ protected void setupEncodedValueWriter() throws IOException segmentWriteOutMedium, filenameBase, cardinality, - compressionStrategy + compressionStrategy, + segmentWriteOutMedium.getCloser() ); } else { encodedValueSerializer = new VSizeColumnarIntsSerializer(segmentWriteOutMedium, cardinality); diff --git a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java index cc525cb7ba82..1b87fdb9cebd 100644 --- a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java @@ -72,7 +72,8 @@ public void open() throws IOException segmentWriteOutMedium, StringUtils.format("%s.double_column", filenameBase), byteOrder, - compression + compression, + segmentWriteOutMedium.getCloser() ); writer.open(); } diff --git a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java index 02903eb4a274..a678c695e43f 100644 --- a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java +++ b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java @@ -99,7 +99,8 @@ public void open() throws IOException segmentWriteOutMedium, StringUtils.format("%s.double_column", filenameBase), byteOrder, - compression + compression, + segmentWriteOutMedium.getCloser() ); writer.open(); nullValueBitmapWriter = new ByteBufferWriter<>( diff --git a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java index b96d520e2e2d..e1d23946d5b1 100644 --- a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java @@ -72,7 +72,8 @@ public void open() throws IOException segmentWriteOutMedium, StringUtils.format("%s.float_column", filenameBase), byteOrder, - compression + compression, + segmentWriteOutMedium.getCloser() ); writer.open(); } diff --git a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java index b5371a0aac93..5930ae081ea8 100644 --- a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java +++ b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java @@ -99,7 +99,8 @@ public void open() throws IOException segmentWriteOutMedium, StringUtils.format("%s.float_column", filenameBase), byteOrder, - compression + compression, + segmentWriteOutMedium.getCloser() ); writer.open(); nullValueBitmapWriter = new ByteBufferWriter<>( diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java index 6b1ff6b31953..05c721e5c560 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java @@ -202,7 +202,6 @@ private File makeIndexFiles( mergers.add( handler.makeMerger( indexSpec, - segmentWriteOutMedium, dimFormats.get(i).toColumnCapabilities(), progress, diff --git a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java index 1f8d03bebc68..6a4bcacfbb89 100644 --- a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java @@ -80,7 +80,8 @@ public void open() throws IOException StringUtils.format("%s.long_column", filenameBase), byteOrder, encoding, - compression + compression, + segmentWriteOutMedium.getCloser() ); writer.open(); } diff --git a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java index 364a7af68252..226cd8bafb0c 100644 --- a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java +++ b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java @@ -105,7 +105,8 @@ public void open() throws IOException StringUtils.format("%s.long_column", filenameBase), byteOrder, encoding, - compression + compression, + segmentWriteOutMedium.getCloser() ); writer.open(); nullValueBitmapWriter = new ByteBufferWriter<>( diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java index 2b4612ecb3a6..8c2dfb9c028b 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java @@ -56,7 +56,8 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ByteOrder byteOrder, - CompressionStrategy compression + CompressionStrategy compression, + Closer closer ) { this.columnName = columnName; @@ -64,11 +65,11 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri segmentWriteOutMedium, filenameBase, compression, - CompressedPools.BUFFER_SIZE + CompressedPools.BUFFER_SIZE, + closer ); this.compression = compression; CompressionStrategy.Compressor compressor = compression.getCompressor(); - Closer closer = segmentWriteOutMedium.getCloser(); this.endBuffer = compressor.allocateInBuffer(CompressedPools.BUFFER_SIZE, closer).order(byteOrder); } diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java index 94a3ef6319fc..5640339a316e 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java @@ -56,7 +56,8 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ByteOrder byteOrder, - CompressionStrategy compression + CompressionStrategy compression, + Closer closer ) { this.columnName = columnName; @@ -64,11 +65,11 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial segmentWriteOutMedium, filenameBase, compression, - CompressedPools.BUFFER_SIZE + CompressedPools.BUFFER_SIZE, + closer ); this.compression = compression; CompressionStrategy.Compressor compressor = compression.getCompressor(); - Closer closer = segmentWriteOutMedium.getCloser(); this.endBuffer = compressor.allocateInBuffer(CompressedPools.BUFFER_SIZE, closer).order(byteOrder); } diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java index ff47a34cca46..37d468d62e49 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java @@ -19,13 +19,13 @@ package org.apache.druid.segment.data; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.segment.CompressedPools; import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import javax.annotation.Nullable; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -59,17 +59,24 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ String filenameBase, ByteOrder byteOrder, CompressionFactory.LongEncodingWriter writer, - CompressionStrategy compression + CompressionStrategy compression, + Closer closer ) { this.columnName = columnName; this.sizePer = writer.getBlockSize(CompressedPools.BUFFER_SIZE); int bufferSize = writer.getNumBytes(sizePer); - this.flattener = GenericIndexedWriter.ofCompressedByteBuffers(segmentWriteOutMedium, filenameBase, compression, bufferSize); + this.flattener = GenericIndexedWriter.ofCompressedByteBuffers( + segmentWriteOutMedium, + filenameBase, + compression, + bufferSize, + closer + ); this.writer = writer; this.compression = compression; CompressionStrategy.Compressor compressor = compression.getCompressor(); - endBuffer = compressor.allocateInBuffer(writer.getNumBytes(sizePer), segmentWriteOutMedium.getCloser()).order(byteOrder); + endBuffer = compressor.allocateInBuffer(writer.getNumBytes(sizePer), closer).order(byteOrder); writer.setBuffer(endBuffer); numInsertedForNextFlush = sizePer; } diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockSerializer.java index 07208160af80..d5beedcae51f 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockSerializer.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.data; import org.apache.druid.io.Channels; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.segment.CompressedPools; import org.apache.druid.segment.serde.MetaSerdeHelper; @@ -60,18 +61,16 @@ public class CompressedBlockSerializer implements Serializer public CompressedBlockSerializer( SegmentWriteOutMedium segmentWriteOutMedium, - CompressionStrategy compression, - int blockSize + int blockSize, + Closer closer ) { this.segmentWriteOutMedium = segmentWriteOutMedium; this.compression = compression; this.compressor = compression.getCompressor(); - this.uncompressedDataBuffer = compressor.allocateInBuffer(blockSize, segmentWriteOutMedium.getCloser()) - .order(ByteOrder.nativeOrder()); - this.compressedDataBuffer = compressor.allocateOutBuffer(blockSize, segmentWriteOutMedium.getCloser()) - .order(ByteOrder.nativeOrder()); + this.uncompressedDataBuffer = compressor.allocateInBuffer(blockSize, closer).order(ByteOrder.nativeOrder()); + this.compressedDataBuffer = compressor.allocateOutBuffer(blockSize, closer).order(ByteOrder.nativeOrder()); } public void open() throws IOException diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java index a0442d95adfd..cc724ba1cc7a 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java @@ -58,12 +58,12 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer final String filenameBase, final int chunkFactor, final ByteOrder byteOrder, - final CompressionStrategy compression + final CompressionStrategy compression, + final Closer closer ) { this( columnName, - segmentWriteOutMedium, chunkFactor, byteOrder, compression, @@ -71,18 +71,20 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer segmentWriteOutMedium, filenameBase, compression, - chunkFactor * Integer.BYTES - ) + chunkFactor * Integer.BYTES, + closer + ), + closer ); } CompressedColumnarIntsSerializer( final String columnName, - final SegmentWriteOutMedium segmentWriteOutMedium, final int chunkFactor, final ByteOrder byteOrder, final CompressionStrategy compression, - final GenericIndexedWriter flattener + final GenericIndexedWriter flattener, + final Closer closer ) { this.columnName = columnName; @@ -90,7 +92,6 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer this.compression = compression; this.flattener = flattener; CompressionStrategy.Compressor compressor = compression.getCompressor(); - Closer closer = segmentWriteOutMedium.getCloser(); this.endBuffer = compressor.allocateInBuffer(chunkFactor * Integer.BYTES, closer).order(byteOrder); this.numInserted = 0; } diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedLongsSerializer.java index cbf9211ec17f..cfc56b84c218 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedLongsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedLongsSerializer.java @@ -19,6 +19,7 @@ package org.apache.druid.segment.data; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.segment.CompressedPools; import org.apache.druid.segment.serde.Serializer; @@ -34,12 +35,17 @@ public class CompressedLongsSerializer implements Serializer private final CompressedBlockSerializer blockSerializer; private final ByteBuffer longValueConverter = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.nativeOrder()); - public CompressedLongsSerializer(SegmentWriteOutMedium segmentWriteOutMedium, CompressionStrategy compression) + public CompressedLongsSerializer( + SegmentWriteOutMedium segmentWriteOutMedium, + CompressionStrategy compression, + Closer closer + ) { this.blockSerializer = new CompressedBlockSerializer( segmentWriteOutMedium, compression, - CompressedPools.BUFFER_SIZE + CompressedPools.BUFFER_SIZE, + closer ); } diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java index 6060be2cc3c0..84f4799e6d26 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java @@ -20,13 +20,13 @@ package org.apache.druid.segment.data; import org.apache.druid.common.utils.ByteUtils; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import javax.annotation.Nullable; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -51,7 +51,8 @@ public static CompressedVSizeColumnarIntsSerializer create( final SegmentWriteOutMedium segmentWriteOutMedium, final String filenameBase, final int maxValue, - final CompressionStrategy compression + final CompressionStrategy compression, + final Closer closer ) { return new CompressedVSizeColumnarIntsSerializer( @@ -61,7 +62,8 @@ public static CompressedVSizeColumnarIntsSerializer create( maxValue, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue), IndexIO.BYTE_ORDER, - compression + compression, + closer ); } @@ -84,12 +86,12 @@ public static CompressedVSizeColumnarIntsSerializer create( final int maxValue, final int chunkFactor, final ByteOrder byteOrder, - final CompressionStrategy compression + final CompressionStrategy compression, + final Closer closer ) { this( columnName, - segmentWriteOutMedium, maxValue, chunkFactor, byteOrder, @@ -98,19 +100,21 @@ public static CompressedVSizeColumnarIntsSerializer create( segmentWriteOutMedium, filenameBase, compression, - sizePer(maxValue, chunkFactor) - ) + sizePer(maxValue, chunkFactor), + closer + ), + closer ); } CompressedVSizeColumnarIntsSerializer( final String columnName, - final SegmentWriteOutMedium segmentWriteOutMedium, final int maxValue, final int chunkFactor, final ByteOrder byteOrder, final CompressionStrategy compression, - final GenericIndexedWriter flattener + final GenericIndexedWriter flattener, + final Closer closer ) { this.columnName = columnName; @@ -122,7 +126,7 @@ public static CompressedVSizeColumnarIntsSerializer create( this.flattener = flattener; this.intBuffer = ByteBuffer.allocate(Integer.BYTES).order(byteOrder); CompressionStrategy.Compressor compressor = compression.getCompressor(); - this.endBuffer = compressor.allocateInBuffer(chunkBytes, segmentWriteOutMedium.getCloser()).order(byteOrder); + this.endBuffer = compressor.allocateInBuffer(chunkBytes, closer).order(byteOrder); this.numInserted = 0; } diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVariableSizedBlobColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVariableSizedBlobColumnSerializer.java index 19aa9e445a1c..6693daa4326a 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVariableSizedBlobColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVariableSizedBlobColumnSerializer.java @@ -66,13 +66,18 @@ public void open() throws IOException { numValues = 0; currentOffset = 0; - offsetsSerializer = new CompressedLongsSerializer(segmentWriteOutMedium, compression); + offsetsSerializer = new CompressedLongsSerializer( + segmentWriteOutMedium, + compression, + segmentWriteOutMedium.getCloser() + ); offsetsSerializer.open(); valuesSerializer = new CompressedBlockSerializer( segmentWriteOutMedium, compression, - CompressedPools.BUFFER_SIZE + CompressedPools.BUFFER_SIZE, + segmentWriteOutMedium.getCloser() ); valuesSerializer.open(); } diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java index 10943316b707..dde6a440d9ea 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java @@ -24,6 +24,7 @@ import com.google.common.base.Supplier; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.WriteOutBytes; @@ -324,7 +325,8 @@ public static ColumnarLongsSerializer getLongSerializer( String filenameBase, ByteOrder order, LongEncodingStrategy encodingStrategy, - CompressionStrategy compressionStrategy + CompressionStrategy compressionStrategy, + Closer closer ) { if (encodingStrategy == LongEncodingStrategy.AUTO) { @@ -333,7 +335,8 @@ public static ColumnarLongsSerializer getLongSerializer( segmentWriteOutMedium, filenameBase, order, - compressionStrategy + compressionStrategy, + closer ); } else if (encodingStrategy == LongEncodingStrategy.LONGS) { if (compressionStrategy == CompressionStrategy.NONE) { @@ -349,7 +352,8 @@ public static ColumnarLongsSerializer getLongSerializer( filenameBase, order, new LongsLongEncodingWriter(order), - compressionStrategy + compressionStrategy, + closer ); } } else { @@ -379,7 +383,8 @@ public static ColumnarFloatsSerializer getFloatSerializer( SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ByteOrder order, - CompressionStrategy compressionStrategy + CompressionStrategy compressionStrategy, + Closer closer ) { if (compressionStrategy == CompressionStrategy.NONE) { @@ -390,7 +395,8 @@ public static ColumnarFloatsSerializer getFloatSerializer( segmentWriteOutMedium, filenameBase, order, - compressionStrategy + compressionStrategy, + closer ); } } @@ -417,7 +423,8 @@ public static ColumnarDoublesSerializer getDoubleSerializer( SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ByteOrder byteOrder, - CompressionStrategy compression + CompressionStrategy compression, + Closer closer ) { if (compression == CompressionStrategy.NONE) { @@ -428,7 +435,8 @@ public static ColumnarDoublesSerializer getDoubleSerializer( segmentWriteOutMedium, filenameBase, byteOrder, - compression + compression, + closer ); } } diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java index a69d645be391..8b38125322ba 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java @@ -77,13 +77,14 @@ public static GenericIndexedWriter ofCompressedByteBuffers( final SegmentWriteOutMedium segmentWriteOutMedium, final String filenameBase, final CompressionStrategy compressionStrategy, - final int bufferSize + final int bufferSize, + final Closer closer ) { GenericIndexedWriter writer = new GenericIndexedWriter<>( segmentWriteOutMedium, filenameBase, - compressedByteBuffersWriteObjectStrategy(compressionStrategy, bufferSize, segmentWriteOutMedium.getCloser()) + compressedByteBuffersWriteObjectStrategy(compressionStrategy, bufferSize, closer) ); writer.objectsSorted = false; return writer; diff --git a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java index c0f9355350ac..7403f8dfd20b 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java @@ -24,11 +24,11 @@ import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap; import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.longs.LongList; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import javax.annotation.Nullable; - import java.io.IOException; import java.nio.ByteOrder; import java.nio.channels.WritableByteChannel; @@ -45,6 +45,7 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali private final String filenameBase; private final ByteOrder order; private final CompressionStrategy compression; + private final Closer closer; private int numInserted = 0; @@ -64,7 +65,8 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ByteOrder order, - CompressionStrategy compression + CompressionStrategy compression, + Closer closer ) { this.columnName = columnName; @@ -72,6 +74,7 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali this.filenameBase = filenameBase; this.order = order; this.compression = compression; + this.closer = closer; } @Override @@ -141,7 +144,8 @@ private void makeDelegate() throws IOException filenameBase, order, writer, - compression + compression, + closer ); } diff --git a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java index f6690293012d..0fac36399d1d 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java @@ -51,7 +51,8 @@ public static V3CompressedVSizeColumnarMultiIntsSerializer create( filenameBase, CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER, IndexIO.BYTE_ORDER, - compression + compression, + segmentWriteOutMedium.getCloser() ), new CompressedVSizeColumnarIntsSerializer( columnName, @@ -60,7 +61,8 @@ public static V3CompressedVSizeColumnarMultiIntsSerializer create( maxValue, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue), IndexIO.BYTE_ORDER, - compression + compression, + segmentWriteOutMedium.getCloser() ) ); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java index efa25865fa46..aa6a71ae7545 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java @@ -29,6 +29,7 @@ import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.collections.bitmap.MutableBitmap; import org.apache.druid.io.Channels; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter; import org.apache.druid.java.util.common.logger.Logger; @@ -81,6 +82,8 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter protected final Int2ObjectRBTreeMap arrayElements = new Int2ObjectRBTreeMap<>(); + protected final Closer fieldResourceCloser = Closer.create(); + protected FixedIndexedIntWriter intermediateValueWriter; // maybe someday we allow no bitmap indexes or multi-value columns protected int flags = DictionaryEncodedColumnPartSerde.NO_FLAGS; @@ -300,6 +303,7 @@ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws I } finally { tmpWriteoutMedium.close(); + fieldResourceCloser.close(); } } @@ -312,7 +316,8 @@ private void openColumnSerializer(SegmentWriteOutMedium medium, int maxId) throw medium, columnName, maxId, - indexSpec.getDimensionCompression() + indexSpec.getDimensionCompression(), + fieldResourceCloser ); } else { encodedValueSerializer = new VSizeColumnarIntsSerializer(medium, maxId); diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java index e077282f98f3..874b8b309a4c 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java @@ -95,7 +95,8 @@ protected void openValueColumnSerializer() throws IOException segmentWriteOutMedium, StringUtils.format("%s.double_column", name), ByteOrder.nativeOrder(), - indexSpec.getDimensionCompression() + indexSpec.getDimensionCompression(), + segmentWriteOutMedium.getCloser() ); doublesSerializer.open(); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java index 144e848d831d..8ccd528715bf 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java @@ -67,7 +67,8 @@ public void open() throws IOException segmentWriteOutMedium, StringUtils.format("%s.double_column", fieldName), ByteOrder.nativeOrder(), - indexSpec.getDimensionCompression() + indexSpec.getDimensionCompression(), + fieldResourceCloser ); doublesSerializer.open(); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java index bfb966365e2f..46b70d9907cb 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java @@ -97,7 +97,8 @@ protected void openValueColumnSerializer() throws IOException StringUtils.format("%s.long_column", name), ByteOrder.nativeOrder(), indexSpec.getLongEncoding(), - indexSpec.getDimensionCompression() + indexSpec.getDimensionCompression(), + segmentWriteOutMedium.getCloser() ); longsSerializer.open(); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java index 4ca317edb01d..66b5eca18d9f 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java @@ -68,7 +68,8 @@ public void open() throws IOException StringUtils.format("%s.long_column", fieldName), ByteOrder.nativeOrder(), indexSpec.getLongEncoding(), - indexSpec.getDimensionCompression() + indexSpec.getDimensionCompression(), + fieldResourceCloser ); longsSerializer.open(); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java index 2caa19ad8d62..771cdb7fb5b9 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java @@ -184,7 +184,8 @@ public void writeTo( segmentWriteOutMedium, filenameBase, dictionaryWriter.getCardinality(), - compressionToUse + compressionToUse, + segmentWriteOutMedium.getCloser() ); encodedValueSerializer.open(); diff --git a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java index 58464b2c9e66..abd88b57df06 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java @@ -342,7 +342,8 @@ public void writeTo( segmentWriteOutMedium, filenameBase, cardinality, - compressionToUse + compressionToUse, + segmentWriteOutMedium.getCloser() ); encodedValueSerializer.open(); diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java index 52dca67257ca..6e1c4229084f 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java @@ -167,7 +167,8 @@ public void testTooManyValues() throws IOException "test", CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER, byteOrder, - compressionStrategy + compressionStrategy, + segmentWriteOutMedium.getCloser() ); serializer.open(); @@ -196,7 +197,8 @@ private void checkSerializedSizeAndData(int chunkFactor) throws Exception "test", chunkFactor, byteOrder, - compressionStrategy + compressionStrategy, + segmentWriteOutMedium.getCloser() ); CompressedColumnarIntsSupplier supplierFromList = CompressedColumnarIntsSupplier.fromList( IntArrayList.wrap(vals), @@ -227,6 +229,7 @@ private void checkSerializedSizeAndData(int chunkFactor) throws Exception Assert.assertEquals(vals[i], columnarInts.get(i)); } CloseableUtils.closeAndWrapExceptions(columnarInts); + CloseableUtils.closeAndWrapExceptions(segmentWriteOutMedium); } private void checkV2SerializedSizeAndData(int chunkFactor) throws Exception @@ -236,7 +239,6 @@ private void checkV2SerializedSizeAndData(int chunkFactor) throws Exception CompressedColumnarIntsSerializer writer = new CompressedColumnarIntsSerializer( "test", - segmentWriteOutMedium, chunkFactor, byteOrder, compressionStrategy, @@ -244,8 +246,10 @@ private void checkV2SerializedSizeAndData(int chunkFactor) throws Exception segmentWriteOutMedium, "test", compressionStrategy, - Long.BYTES * 10000 - ) + Long.BYTES * 10000, + segmentWriteOutMedium.getCloser() + ), + segmentWriteOutMedium.getCloser() ); writer.open(); diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java index 2f94a772b4d8..be472a71ed9c 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java @@ -147,7 +147,8 @@ public void testTooManyValues() throws IOException segmentWriteOutMedium, "test", order, - compressionStrategy + compressionStrategy, + segmentWriteOutMedium.getCloser() ); serializer.open(); @@ -160,12 +161,14 @@ public void testTooManyValues() throws IOException public void testWithValues(double[] values) throws Exception { + final SegmentWriteOutMedium segmentWriteOutMedium = new OffHeapMemorySegmentWriteOutMedium(); ColumnarDoublesSerializer serializer = CompressionFactory.getDoubleSerializer( "test", - new OffHeapMemorySegmentWriteOutMedium(), + segmentWriteOutMedium, "test", order, - compressionStrategy + compressionStrategy, + segmentWriteOutMedium.getCloser() ); serializer.open(); @@ -190,6 +193,9 @@ public void testWithValues(double[] values) throws Exception } testConcurrentThreadReads(supplier, doubles, values); } + finally { + segmentWriteOutMedium.close(); + } } private void tryFill(ColumnarDoubles indexed, double[] vals, final int startIndex, final int size) diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java index 14b935be2f35..02e320f46a47 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java @@ -154,7 +154,8 @@ public void testTooManyValues() throws IOException segmentWriteOutMedium, "test", order, - compressionStrategy + compressionStrategy, + segmentWriteOutMedium.getCloser() ); serializer.open(); @@ -167,12 +168,14 @@ public void testTooManyValues() throws IOException public void testWithValues(float[] values) throws Exception { + SegmentWriteOutMedium segmentWriteOutMedium = new OffHeapMemorySegmentWriteOutMedium(); ColumnarFloatsSerializer serializer = CompressionFactory.getFloatSerializer( "test", - new OffHeapMemorySegmentWriteOutMedium(), + segmentWriteOutMedium, "test", order, - compressionStrategy + compressionStrategy, + segmentWriteOutMedium.getCloser() ); serializer.open(); diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java index 87daaddbe42e..0fd5bbf6f890 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java @@ -21,6 +21,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -95,13 +96,15 @@ public void testFidelity() throws Exception public void testValues(long[] values) throws Exception { + SegmentWriteOutMedium segmentWriteOutMedium = new OffHeapMemorySegmentWriteOutMedium(); ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer( "test", - new OffHeapMemorySegmentWriteOutMedium(), + segmentWriteOutMedium, "test", order, encodingStrategy, - compressionStrategy + compressionStrategy, + segmentWriteOutMedium.getCloser() ); serializer.open(); @@ -119,6 +122,7 @@ public void testValues(long[] values) throws Exception assertIndexMatchesVals(longs, values); longs.close(); + segmentWriteOutMedium.close(); } private void assertIndexMatchesVals(ColumnarLongs indexed, long[] vals) diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java index b643ee43d837..ba35a03bff51 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java @@ -154,7 +154,8 @@ public void testTooManyValues() throws IOException "test", order, encodingStrategy, - compressionStrategy + compressionStrategy, + segmentWriteOutMedium.getCloser() ); serializer.open(); @@ -173,13 +174,15 @@ public void testWithValues(long[] values) throws Exception public void testValues(long[] values) throws Exception { + SegmentWriteOutMedium segmentWriteOutMedium = new OffHeapMemorySegmentWriteOutMedium(); ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer( "test", - new OffHeapMemorySegmentWriteOutMedium(), + segmentWriteOutMedium, "test", order, encodingStrategy, - compressionStrategy + compressionStrategy, + segmentWriteOutMedium.getCloser() ); serializer.open(); @@ -206,6 +209,9 @@ public void testValues(long[] values) throws Exception testSupplierSerde(supplier, values); testConcurrentThreadReads(supplier, longs, values); } + finally { + segmentWriteOutMedium.close(); + } } private void tryFill(ColumnarLongs indexed, long[] vals, final int startIndex, final int size) diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java index bb5868f6df2f..c06e11c90d94 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java @@ -124,7 +124,8 @@ private void checkSerializedSizeAndData(int chunkSize) throws Exception vals.length > 0 ? Ints.max(vals) : 0, chunkSize, byteOrder, - compressionStrategy + compressionStrategy, + segmentWriteOutMedium.getCloser() ); CompressedVSizeColumnarIntsSupplier supplierFromList = CompressedVSizeColumnarIntsSupplier.fromList( IntArrayList.wrap(vals), @@ -197,16 +198,17 @@ public void testTooManyValues() throws IOException segmentWriteOutMedium, "test", compressionStrategy, - Long.BYTES * 10000 + Long.BYTES * 10000, + segmentWriteOutMedium.getCloser() ); CompressedVSizeColumnarIntsSerializer serializer = new CompressedVSizeColumnarIntsSerializer( "test", - segmentWriteOutMedium, maxValue, maxChunkSize, byteOrder, compressionStrategy, - genericIndexed + genericIndexed, + segmentWriteOutMedium.getCloser() ); serializer.open(); @@ -233,16 +235,17 @@ private void checkV2SerializedSizeAndData(int chunkSize) throws Exception segmentWriteOutMedium, "test", compressionStrategy, - Long.BYTES * 10000 + Long.BYTES * 10000, + segmentWriteOutMedium.getCloser() ); CompressedVSizeColumnarIntsSerializer writer = new CompressedVSizeColumnarIntsSerializer( columnName, - segmentWriteOutMedium, vals.length > 0 ? Ints.max(vals) : 0, chunkSize, byteOrder, compressionStrategy, - genericIndexed + genericIndexed, + segmentWriteOutMedium.getCloser() ); writer.open(); for (int val : vals) { diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedVariableSizeBlobColumnTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedVariableSizeBlobColumnTest.java index 88685c7caaf7..11609ffdc975 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedVariableSizeBlobColumnTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedVariableSizeBlobColumnTest.java @@ -186,7 +186,8 @@ public void testLongs() throws IOException final CompressionStrategy compressionStrategy = CompressionStrategy.LZ4; CompressedLongsSerializer serializer = new CompressedLongsSerializer( writeOutMedium, - compressionStrategy + compressionStrategy, + writeOutMedium.getCloser() ); serializer.open(); @@ -204,6 +205,7 @@ public void testLongs() throws IOException serializer.writeTo(writer, smoosher); writer.close(); smoosher.close(); + writeOutMedium.close(); SmooshedFileMapper fileMapper = SmooshedFileMapper.load(tmpFile); ByteBuffer base = fileMapper.mapFile(fileNameBase); diff --git a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java index 6877416e764b..29ba49913c44 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java @@ -206,7 +206,8 @@ private void checkSerializedSizeAndData(int offsetChunkFactor, int valueChunkFac "offset", offsetChunkFactor, byteOrder, - compressionStrategy + compressionStrategy, + segmentWriteOutMedium.getCloser() ); CompressedVSizeColumnarIntsSerializer valueWriter = new CompressedVSizeColumnarIntsSerializer( TEST_COLUMN_NAME, @@ -215,7 +216,8 @@ private void checkSerializedSizeAndData(int offsetChunkFactor, int valueChunkFac maxValue, valueChunkFactor, byteOrder, - compressionStrategy + compressionStrategy, + segmentWriteOutMedium.getCloser() ); V3CompressedVSizeColumnarMultiIntsSerializer writer = new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME, offsetWriter, valueWriter); @@ -271,7 +273,6 @@ private void checkV2SerializedSizeAndData(int offsetChunkFactor, int valueChunkF try (SegmentWriteOutMedium segmentWriteOutMedium = new OffHeapMemorySegmentWriteOutMedium()) { CompressedColumnarIntsSerializer offsetWriter = new CompressedColumnarIntsSerializer( TEST_COLUMN_NAME, - segmentWriteOutMedium, offsetChunkFactor, byteOrder, compressionStrategy, @@ -279,24 +280,27 @@ private void checkV2SerializedSizeAndData(int offsetChunkFactor, int valueChunkF segmentWriteOutMedium, "offset", compressionStrategy, - Long.BYTES * 250000 - ) + Long.BYTES * 250000, + segmentWriteOutMedium.getCloser() + ), + segmentWriteOutMedium.getCloser() ); GenericIndexedWriter genericIndexed = GenericIndexedWriter.ofCompressedByteBuffers( segmentWriteOutMedium, "value", compressionStrategy, - Long.BYTES * 250000 + Long.BYTES * 250000, + segmentWriteOutMedium.getCloser() ); CompressedVSizeColumnarIntsSerializer valueWriter = new CompressedVSizeColumnarIntsSerializer( TEST_COLUMN_NAME, - segmentWriteOutMedium, maxValue, valueChunkFactor, byteOrder, compressionStrategy, - genericIndexed + genericIndexed, + segmentWriteOutMedium.getCloser() ); V3CompressedVSizeColumnarMultiIntsSerializer writer = new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME, offsetWriter, valueWriter); @@ -347,7 +351,6 @@ private void generateV2SerializedSizeAndData( ) { CompressedColumnarIntsSerializer offsetWriter = new CompressedColumnarIntsSerializer( TEST_COLUMN_NAME, - segmentWriteOutMedium, offsetChunkFactor, byteOrder, compressionStrategy, @@ -355,24 +358,27 @@ private void generateV2SerializedSizeAndData( segmentWriteOutMedium, "offset", compressionStrategy, - Long.BYTES * 250000 - ) + Long.BYTES * 250000, + segmentWriteOutMedium.getCloser() + ), + segmentWriteOutMedium.getCloser() ); GenericIndexedWriter genericIndexed = GenericIndexedWriter.ofCompressedByteBuffers( segmentWriteOutMedium, "value", compressionStrategy, - Long.BYTES * 250000 + Long.BYTES * 250000, + segmentWriteOutMedium.getCloser() ); CompressedVSizeColumnarIntsSerializer valueWriter = new CompressedVSizeColumnarIntsSerializer( TEST_COLUMN_NAME, - segmentWriteOutMedium, maxValue, valueChunkFactor, byteOrder, compressionStrategy, - genericIndexed + genericIndexed, + segmentWriteOutMedium.getCloser() ); V3CompressedVSizeColumnarMultiIntsSerializer writer = new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME, offsetWriter, valueWriter);