diff --git a/processing/src/main/java/org/apache/druid/frame/key/RowKeyComparisonRunLengths.java b/processing/src/main/java/org/apache/druid/frame/key/RowKeyComparisonRunLengths.java index cc05dea993e5..ab6797a7c003 100644 --- a/processing/src/main/java/org/apache/druid/frame/key/RowKeyComparisonRunLengths.java +++ b/processing/src/main/java/org/apache/druid/frame/key/RowKeyComparisonRunLengths.java @@ -88,11 +88,12 @@ public static RowKeyComparisonRunLengths create(final List keyColumns ); } - ColumnType columnType = rowSignature.getColumnType(keyColumn.columnName()) - .orElseThrow(() -> DruidException.defensive("Need column types")); + ColumnType columnType = + rowSignature.getColumnType(keyColumn.columnName()) + .orElseThrow(() -> DruidException.defensive("No type for column[%s]", keyColumn.columnName())); // First key column to be processed - if (runLengthEntryBuilders.size() == 0) { + if (runLengthEntryBuilders.isEmpty()) { final boolean isByteComparable = isByteComparable(columnType); runLengthEntryBuilders.add( new RunLengthEntryBuilder(isByteComparable, keyColumn.order()) diff --git a/processing/src/test/java/org/apache/druid/frame/TestArrayCursorFactory.java b/processing/src/test/java/org/apache/druid/frame/TestArrayCursorFactory.java index 2a6116bfddff..4bde3d31fe0c 100644 --- a/processing/src/test/java/org/apache/druid/frame/TestArrayCursorFactory.java +++ b/processing/src/test/java/org/apache/druid/frame/TestArrayCursorFactory.java @@ -48,9 +48,12 @@ */ public class TestArrayCursorFactory extends QueryableIndexCursorFactory { + private final RowSignature signature; + public TestArrayCursorFactory(QueryableIndex index) { super(index); + this.signature = computeRowSignature(index); } @Override @@ -81,15 +84,31 @@ public void close() }; } - @Override public RowSignature getRowSignature() + { + return signature; + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + final ColumnCapabilities ourType = getRowSignature().getColumnCapabilities(column); + if (ourType != null) { + return ColumnCapabilitiesImpl.copyOf(super.getColumnCapabilities(column)).setType(ourType.toColumnType()); + } else { + return super.getColumnCapabilities(column); + } + } + + private static RowSignature computeRowSignature(final QueryableIndex index) { final RowSignature.Builder builder = RowSignature.builder(); builder.addTimeColumn(); - for (final String column : super.getRowSignature().getColumnNames()) { - ColumnCapabilities columnCapabilities = super.getColumnCapabilities(column); + for (final String column : new QueryableIndexCursorFactory(index).getRowSignature().getColumnNames()) { + ColumnCapabilities columnCapabilities = index.getColumnCapabilities(column); ColumnType columnType = columnCapabilities == null ? null : columnCapabilities.toColumnType(); //change MV strings columns to Array if (columnType != null @@ -103,18 +122,6 @@ public RowSignature getRowSignature() return builder.build(); } - @Nullable - @Override - public ColumnCapabilities getColumnCapabilities(String column) - { - final ColumnCapabilities ourType = getRowSignature().getColumnCapabilities(column); - if (ourType != null) { - return ColumnCapabilitiesImpl.copyOf(super.getColumnCapabilities(column)).setType(ourType.toColumnType()); - } else { - return super.getColumnCapabilities(column); - } - } - private class DecoratedCursor implements Cursor { private final Cursor cursor; diff --git a/processing/src/test/java/org/apache/druid/frame/file/FrameFileTest.java b/processing/src/test/java/org/apache/druid/frame/file/FrameFileTest.java index 9c92eb14e3e5..c916a458564c 100644 --- a/processing/src/test/java/org/apache/druid/frame/file/FrameFileTest.java +++ b/processing/src/test/java/org/apache/druid/frame/file/FrameFileTest.java @@ -39,6 +39,7 @@ import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory; import org.apache.druid.testing.InitializedNullHandlingTest; import org.hamcrest.Matchers; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Assume; import org.junit.Before; @@ -49,17 +50,28 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.OutputStream; import java.math.RoundingMode; +import java.nio.file.Files; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.function.Function; import java.util.stream.IntStream; @RunWith(Parameterized.class) public class FrameFileTest extends InitializedNullHandlingTest { + /** + * Static cache of generated frame files, to speed up tests. Cleared in {@link #afterClass()}. + */ + private static final Map FRAME_FILES = new HashMap<>(); + // Partition every 99 rows if "partitioned" is true. private static final int PARTITION_SIZE = 99; @@ -122,6 +134,7 @@ int getRowCount() }; abstract CursorFactory getCursorFactory(); + abstract int getRowCount(); } @@ -195,38 +208,21 @@ public void setUp() throws IOException { cursorFactory = adapterType.getCursorFactory(); rowCount = adapterType.getRowCount(); + file = temporaryFolder.newFile(); - if (partitioned) { - // Partition every PARTITION_SIZE rows. - file = FrameTestUtil.writeFrameFileWithPartitions( - FrameSequenceBuilder.fromCursorFactory(cursorFactory).frameType(frameType).maxRowsPerFrame(maxRowsPerFrame).frames().map( - new Function>() - { - private int rows = 0; - - @Override - public IntObjectPair apply(final Frame frame) - { - final int partitionNum = rows / PARTITION_SIZE; - rows += frame.numRows(); - return IntObjectPair.of( - partitionNum >= SKIP_PARTITION ? partitionNum + 1 : partitionNum, - frame - ); - } - } - ), - temporaryFolder.newFile() - ); - - } else { - file = FrameTestUtil.writeFrameFile( - FrameSequenceBuilder.fromCursorFactory(cursorFactory).frameType(frameType).maxRowsPerFrame(maxRowsPerFrame).frames(), - temporaryFolder.newFile() - ); + try (final OutputStream out = Files.newOutputStream(file.toPath())) { + final FrameFileKey frameFileKey = new FrameFileKey(adapterType, frameType, maxRowsPerFrame, partitioned); + final byte[] frameFileBytes = FRAME_FILES.computeIfAbsent(frameFileKey, FrameFileTest::computeFrameFile); + out.write(frameFileBytes); } } + @AfterClass + public static void afterClass() + { + FRAME_FILES.clear(); + } + @Test public void test_numFrames() throws IOException { @@ -414,4 +410,107 @@ private static int countRows(final CursorFactory cursorFactory) return FrameTestUtil.readRowsFromCursorFactory(cursorFactory, RowSignature.empty(), false) .accumulate(0, (i, in) -> i + 1); } + + /** + * Returns bytes, in frame file format, corresponding to the given {@link FrameFileKey}. + */ + private static byte[] computeFrameFile(final FrameFileKey frameFileKey) + { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + try { + if (frameFileKey.partitioned) { + // Partition every PARTITION_SIZE rows. + FrameTestUtil.writeFrameFileWithPartitions( + FrameSequenceBuilder.fromCursorFactory(frameFileKey.adapterType.getCursorFactory()) + .frameType(frameFileKey.frameType) + .maxRowsPerFrame(frameFileKey.maxRowsPerFrame) + .frames() + .map( + new Function>() + { + private int rows = 0; + + @Override + public IntObjectPair apply(final Frame frame) + { + final int partitionNum = rows / PARTITION_SIZE; + rows += frame.numRows(); + return IntObjectPair.of( + partitionNum >= SKIP_PARTITION ? partitionNum + 1 : partitionNum, + frame + ); + } + } + ), + baos + ); + } else { + FrameTestUtil.writeFrameFile( + FrameSequenceBuilder.fromCursorFactory(frameFileKey.adapterType.getCursorFactory()) + .frameType(frameFileKey.frameType) + .maxRowsPerFrame(frameFileKey.maxRowsPerFrame) + .frames(), + baos + ); + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + + return baos.toByteArray(); + } + + /** + * Key for {@link #FRAME_FILES}, and input to {@link #computeFrameFile(FrameFileKey)}. + */ + private static class FrameFileKey + { + final AdapterType adapterType; + final FrameType frameType; + final int maxRowsPerFrame; + final boolean partitioned; + + public FrameFileKey(AdapterType adapterType, FrameType frameType, int maxRowsPerFrame, boolean partitioned) + { + this.adapterType = adapterType; + this.frameType = frameType; + this.maxRowsPerFrame = maxRowsPerFrame; + this.partitioned = partitioned; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FrameFileKey that = (FrameFileKey) o; + return maxRowsPerFrame == that.maxRowsPerFrame + && partitioned == that.partitioned + && adapterType == that.adapterType + && frameType == that.frameType; + } + + @Override + public int hashCode() + { + return Objects.hash(adapterType, frameType, maxRowsPerFrame, partitioned); + } + + @Override + public String toString() + { + return "FrameFileKey{" + + "adapterType=" + adapterType + + ", frameType=" + frameType + + ", maxRowsPerFrame=" + maxRowsPerFrame + + ", partitioned=" + partitioned + + '}'; + } + } } diff --git a/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java b/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java index 2149d6cbf1c7..7a885af49c54 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java @@ -29,13 +29,7 @@ import org.apache.druid.frame.FrameType; import org.apache.druid.frame.allocation.ArenaMemoryAllocator; import org.apache.druid.frame.channel.BlockingQueueFrameChannel; -import org.apache.druid.frame.channel.ByteTracker; -import org.apache.druid.frame.channel.ReadableFileFrameChannel; import org.apache.druid.frame.channel.ReadableFrameChannel; -import org.apache.druid.frame.channel.WritableFrameChannel; -import org.apache.druid.frame.channel.WritableFrameFileChannel; -import org.apache.druid.frame.file.FrameFile; -import org.apache.druid.frame.file.FrameFileWriter; import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.frame.key.ClusterByPartition; import org.apache.druid.frame.key.ClusterByPartitions; @@ -47,6 +41,7 @@ import org.apache.druid.frame.read.FrameReader; import org.apache.druid.frame.testutil.FrameSequenceBuilder; import org.apache.druid.frame.testutil.FrameTestUtil; +import org.apache.druid.frame.write.FrameWriters; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; @@ -62,8 +57,10 @@ import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -73,12 +70,12 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -228,6 +225,15 @@ public void testLimitHint() throws Exception @RunWith(Parameterized.class) public static class ParameterizedCasesTest extends InitializedNullHandlingTest { + private static CursorFactory CURSOR_FACTORY; + private static RowSignature CURSOR_FACTORY_SIGNATURE_WITH_ROW_NUMBER; + + /** + * Static cache of sorted versions of the {@link #CURSOR_FACTORY} dataset, to speed up tests. + * Cleared in {@link #tearDownClass()}. + */ + private static final Map>> SORTED_TEST_ROWS = new HashMap<>(); + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -241,7 +247,6 @@ public static class ParameterizedCasesTest extends InitializedNullHandlingTest private final boolean partitionsDeferred; private final long limitHint; - private CursorFactory cursorFactory; private RowSignature signature; private FrameProcessorExecutor exec; private List inputChannels; @@ -285,11 +290,12 @@ public static Iterable constructorFeeder() { final List constructors = new ArrayList<>(); - for (int maxRowsPerFrame : new int[]{Integer.MAX_VALUE, 50, 1}) { + // Add some constructors for testing maxRowsPerFrame > 1. Later on, we'll add some for maxRowsPerFrame = 1. + for (int maxRowsPerFrame : new int[]{Integer.MAX_VALUE, 50}) { for (int maxBytesPerFrame : new int[]{20_000, 2_000_000}) { for (int numChannels : new int[]{1, 3}) { - for (int maxActiveProcessors : new int[]{1, 2, 4}) { - for (int maxChannelsPerProcessor : new int[]{2, 3, 8}) { + for (int maxActiveProcessors : new int[]{1, 3}) { + for (int maxChannelsPerProcessor : new int[]{2, 7}) { for (int numThreads : new int[]{1, 3}) { for (boolean isComposedStorage : new boolean[]{true, false}) { for (boolean partitionsDeferred : new boolean[]{true, false}) { @@ -317,16 +323,51 @@ public static Iterable constructorFeeder() } } + // Add some constructors for testing maxRowsPerFrame = 1. This isn't part of the full matrix since it's quite + // slow, but we still want to exercise it a bit. + for (boolean isComposedStorage : new boolean[]{true, false}) { + for (long limitHint : new long[]{SuperSorter.UNLIMITED, 3, 1_000}) { + constructors.add( + new Object[]{ + 1 /* maxRowsPerFrame */, + 20_000 /* maxBytesPerFrame */, + 3 /* numChannels */, + 2 /* maxActiveProcessors */, + 3 /* maxChannelsPerProcessor */, + 1 /* numThreads */, + isComposedStorage, + false /* partitionsDeferred */, + limitHint + } + ); + } + } + return constructors; } + @BeforeClass + public static void setUpClass() + { + CURSOR_FACTORY = new QueryableIndexCursorFactory(TestIndex.getNoRollupMMappedTestIndex()); + CURSOR_FACTORY_SIGNATURE_WITH_ROW_NUMBER = + FrameSequenceBuilder.signatureWithRowNumber(CURSOR_FACTORY.getRowSignature()); + } + + @AfterClass + public static void tearDownClass() + { + CURSOR_FACTORY = null; + CURSOR_FACTORY_SIGNATURE_WITH_ROW_NUMBER = null; + SORTED_TEST_ROWS.clear(); + } + @Before public void setUp() { exec = new FrameProcessorExecutor( MoreExecutors.listeningDecorator(Execs.multiThreaded(numThreads, getClass().getSimpleName() + "[%d]")) ); - cursorFactory = new QueryableIndexCursorFactory(TestIndex.getNoRollupMMappedTestIndex()); } @After @@ -352,15 +393,15 @@ private void setUpInputChannels(final ClusterBy clusterBy) throws Exception } final FrameSequenceBuilder frameSequenceBuilder = - FrameSequenceBuilder.fromCursorFactory(cursorFactory) + FrameSequenceBuilder.fromCursorFactory(CURSOR_FACTORY) .maxRowsPerFrame(maxRowsPerFrame) .sortBy(clusterBy.getColumns()) .allocator(ArenaMemoryAllocator.create(ByteBuffer.allocate(maxBytesPerFrame))) .frameType(FrameType.ROW_BASED) .populateRowNumber(); - inputChannels = makeFileChannels(frameSequenceBuilder.frames(), temporaryFolder.newFolder(), numChannels); - signature = frameSequenceBuilder.signature(); + inputChannels = makeRoundRobinChannels(frameSequenceBuilder.frames(), numChannels); + signature = FrameWriters.sortableSignature(CURSOR_FACTORY_SIGNATURE_WITH_ROW_NUMBER, clusterBy.getColumns()); frameReader = FrameReader.create(signature); } @@ -411,7 +452,7 @@ private void verifySuperSorter( Assert.assertEquals(clusterByPartitions.size(), outputChannels.getAllChannels().size()); Assert.assertEquals(Double.valueOf(1.0), superSorterProgressTracker.snapshot().getProgressDigest()); - final int[] clusterByPartColumns = clusterBy.getColumns().stream().mapToInt( + final int[] clusterByColumns = clusterBy.getColumns().stream().mapToInt( part -> signature.indexOf(part.columnName()) ).toArray(); @@ -427,33 +468,36 @@ private void verifySuperSorter( frameReader ).forEach( row -> { - final Object[] array = new Object[clusterByPartColumns.length]; + final Object[] array = new Object[clusterByColumns.length]; for (int i = 0; i < array.length; i++) { - array[i] = row.get(clusterByPartColumns[i]); + array[i] = row.get(clusterByColumns[i]); } final RowKey key = createKey(clusterBy, array); - Assert.assertTrue( - StringUtils.format( - "Key %s >= partition %,d start %s", - keyReader.read(key), - partitionNumber, - partition.getStart() == null ? null : keyReader.read(partition.getStart()) - ), - partition.getStart() == null || keyComparator.compare(key, partition.getStart()) >= 0 - ); - - Assert.assertTrue( - StringUtils.format( - "Key %s < partition %,d end %s", - keyReader.read(key), - partitionNumber, - partition.getEnd() == null ? null : keyReader.read(partition.getEnd()) - ), - partition.getEnd() == null || keyComparator.compare(key, partition.getEnd()) < 0 - ); + if (!(partition.getStart() == null || keyComparator.compare(key, partition.getStart()) >= 0)) { + // Defer formatting of error message until it's actually needed + Assert.fail( + StringUtils.format( + "Key %s >= partition %,d start %s", + keyReader.read(key), + partitionNumber, + partition.getStart() == null ? null : keyReader.read(partition.getStart()) + ) + ); + } + + if (!(partition.getEnd() == null || keyComparator.compare(key, partition.getEnd()) < 0)) { + Assert.fail( + StringUtils.format( + "Key %s < partition %,d end %s", + keyReader.read(key), + partitionNumber, + partition.getEnd() == null ? null : keyReader.read(partition.getEnd()) + ) + ); + } readRows.add(row); } @@ -464,21 +508,9 @@ private void verifySuperSorter( MatcherAssert.assertThat(readRows.size(), Matchers.greaterThanOrEqualTo(Ints.checkedCast(limitHint))); } - final Sequence> expectedRows = Sequences.sort( - FrameTestUtil.readRowsFromCursorFactory(cursorFactory, signature, true), - Comparator.comparing( - row -> { - final Object[] array = new Object[clusterByPartColumns.length]; - - for (int i = 0; i < array.length; i++) { - array[i] = row.get(clusterByPartColumns[i]); - } - - return createKey(clusterBy, array); - }, - keyComparator - ) - ).limit(limitHint == SuperSorter.UNLIMITED ? Long.MAX_VALUE : readRows.size()); + final Sequence> expectedRows = + Sequences.simple(getOrComputeSortedTestRows(clusterBy)) + .limit(limitHint == SuperSorter.UNLIMITED ? Long.MAX_VALUE : readRows.size()); FrameTestUtil.assertRowsEqual(expectedRows, Sequences.simple(readRows)); } @@ -724,29 +756,63 @@ private RowKey createKey(final ClusterBy clusterBy, final Object... objects) final RowSignature keySignature = KeyTestUtils.createKeySignature(clusterBy.getColumns(), signature); return KeyTestUtils.createKey(keySignature, objects); } + + /** + * Retrieve sorted test rows from {@link #SORTED_TEST_ROWS}, or else compute using + * {@link #computeSortedTestRows(ClusterBy)}. + */ + private static List> getOrComputeSortedTestRows(final ClusterBy clusterBy) + { + return SORTED_TEST_ROWS.computeIfAbsent(clusterBy, SuperSorterTest.ParameterizedCasesTest::computeSortedTestRows); + } + + /** + * Sort test rows from {@link TestIndex#getNoRollupMMappedTestIndex()} by the given {@link ClusterBy}. + */ + private static List> computeSortedTestRows(final ClusterBy clusterBy) + { + final QueryableIndexCursorFactory cursorFactory = + new QueryableIndexCursorFactory(TestIndex.getNoRollupMMappedTestIndex()); + final RowSignature signature = + FrameWriters.sortableSignature( + FrameSequenceBuilder.signatureWithRowNumber(cursorFactory.getRowSignature()), + clusterBy.getColumns() + ); + final RowSignature keySignature = KeyTestUtils.createKeySignature(clusterBy.getColumns(), signature); + final int[] clusterByColumns = + clusterBy.getColumns().stream().mapToInt(part -> signature.indexOf(part.columnName())).toArray(); + final Comparator keyComparator = clusterBy.keyComparator(keySignature); + + return Sequences.sort( + FrameTestUtil.readRowsFromCursorFactory(cursorFactory, signature, true), + Comparator.comparing( + row -> { + final Object[] array = new Object[clusterByColumns.length]; + + for (int i = 0; i < array.length; i++) { + array[i] = row.get(clusterByColumns[i]); + } + + return KeyTestUtils.createKey(keySignature, array); + }, + keyComparator + ) + ).toList(); + } } - private static List makeFileChannels( + /** + * Distribute frames round-robin to some number of channels. + */ + private static List makeRoundRobinChannels( final Sequence frames, - final File tmpDir, final int numChannels ) throws IOException { - final List files = new ArrayList<>(); - final List writableChannels = new ArrayList<>(); + final List channels = new ArrayList<>(numChannels); for (int i = 0; i < numChannels; i++) { - final File file = new File(tmpDir, StringUtils.format("channel-%d", i)); - files.add(file); - writableChannels.add( - new WritableFrameFileChannel( - FrameFileWriter.open( - Channels.newChannel(Files.newOutputStream(file.toPath())), - null, - ByteTracker.unboundedTracker() - ) - ) - ); + channels.add(new BlockingQueueFrameChannel(2000) /* enough even for 1 row per frame; dataset has < 2000 rows */); } frames.forEach( @@ -758,7 +824,7 @@ private static List makeFileChannels( public void accept(final Frame frame) { try { - writableChannels.get(i % writableChannels.size()).write(frame); + channels.get(i % channels.size()).writable().write(frame); } catch (IOException e) { throw new RuntimeException(e); @@ -771,20 +837,11 @@ public void accept(final Frame frame) final List retVal = new ArrayList<>(); - for (int i = 0; i < writableChannels.size(); i++) { - WritableFrameChannel writableChannel = writableChannels.get(i); - writableChannel.close(); - retVal.add(new ReadableFileFrameChannel(FrameFile.open(files.get(i), null))); + for (final BlockingQueueFrameChannel channel : channels) { + channel.writable().close(); + retVal.add(channel.readable()); } return retVal; } - - private static long countSequence(final Sequence sequence) - { - return sequence.accumulate( - 0L, - (accumulated, in) -> accumulated + 1 - ); - } } diff --git a/processing/src/test/java/org/apache/druid/frame/testutil/FrameSequenceBuilder.java b/processing/src/test/java/org/apache/druid/frame/testutil/FrameSequenceBuilder.java index d3fbffd7b4af..1cb6298b8b10 100644 --- a/processing/src/test/java/org/apache/druid/frame/testutil/FrameSequenceBuilder.java +++ b/processing/src/test/java/org/apache/druid/frame/testutil/FrameSequenceBuilder.java @@ -67,6 +67,17 @@ public static FrameSequenceBuilder fromCursorFactory(final CursorFactory cursorF return new FrameSequenceBuilder(cursorFactory); } + /** + * Returns what {@link #signature()} would return if {@link #populateRowNumber()} is set. + */ + public static RowSignature signatureWithRowNumber(final RowSignature signature) + { + return RowSignature.builder() + .addAll(signature) + .add(FrameTestUtil.ROW_NUMBER_COLUMN, ColumnType.LONG) + .build(); + } + public FrameSequenceBuilder frameType(final FrameType frameType) { this.frameType = frameType; @@ -108,10 +119,7 @@ public RowSignature signature() final RowSignature baseSignature; if (populateRowNumber) { - baseSignature = RowSignature.builder() - .addAll(cursorFactory.getRowSignature()) - .add(FrameTestUtil.ROW_NUMBER_COLUMN, ColumnType.LONG) - .build(); + baseSignature = signatureWithRowNumber(cursorFactory.getRowSignature()); } else { baseSignature = cursorFactory.getRowSignature(); } diff --git a/processing/src/test/java/org/apache/druid/frame/testutil/FrameTestUtil.java b/processing/src/test/java/org/apache/druid/frame/testutil/FrameTestUtil.java index c75a57a86990..2bb8789740c4 100644 --- a/processing/src/test/java/org/apache/druid/frame/testutil/FrameTestUtil.java +++ b/processing/src/test/java/org/apache/druid/frame/testutil/FrameTestUtil.java @@ -56,9 +56,10 @@ import javax.annotation.Nullable; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.channels.Channels; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -79,12 +80,14 @@ private FrameTestUtil() public static File writeFrameFile(final Sequence frames, final File file) throws IOException { - try ( - final FileOutputStream fos = new FileOutputStream(file); - final FrameFileWriter writer = FrameFileWriter.open( - Channels.newChannel(fos), null, ByteTracker.unboundedTracker() - ) - ) { + writeFrameFile(frames, Files.newOutputStream(file.toPath())); + return file; + } + + public static void writeFrameFile(final Sequence frames, final OutputStream out) throws IOException + { + try (final FrameFileWriter writer = + FrameFileWriter.open(Channels.newChannel(out), null, ByteTracker.unboundedTracker())) { frames.forEach( frame -> { try { @@ -96,17 +99,15 @@ public static File writeFrameFile(final Sequence frames, final File file) } ); } - - return file; } - public static File writeFrameFileWithPartitions( + public static void writeFrameFileWithPartitions( final Sequence> framesWithPartitions, - final File file + final OutputStream out ) throws IOException { try (final FrameFileWriter writer = FrameFileWriter.open( - Channels.newChannel(new FileOutputStream(file)), + Channels.newChannel(out), null, ByteTracker.unboundedTracker() )) { @@ -121,8 +122,6 @@ public static File writeFrameFileWithPartitions( } ); } - - return file; } public static void assertRowsEqual(final Sequence> expected, final Sequence> actual)