Skip to content

Commit

Permalink
Check if data is read in chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo committed Nov 15, 2024
1 parent 0a67a2b commit 61bfcdc
Showing 1 changed file with 77 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.core.JsonParser;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.CountingInputStream;
import io.trino.client.spooling.DataAttributes;
import io.trino.client.spooling.EncodedQueryData;
import io.trino.client.spooling.Segment;
Expand All @@ -33,6 +34,7 @@
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import static io.trino.client.JsonResultRows.createJsonFactory;
import static io.trino.client.spooling.Segment.inlined;
Expand Down Expand Up @@ -83,6 +85,32 @@ public void testInlineJsonNodeMaterialization()
}
}

@Test
public void testEagerInlineJsonNodeScanningMaterialization()
throws Exception
{
CountingInputStream stream = new CountingInputStream(new ByteArrayInputStream("[[2137], [1337]]".getBytes(UTF_8)));
try (ResultRowsDecoder decoder = new ResultRowsDecoder(); JsonParser parser = createJsonFactory().createParser(stream)) {
Iterator<List<Object>> iterator = decoder.toRows(fromQueryData(new JsonQueryData(parser.readValueAsTree()))).iterator();
assertThat(stream.getCount()).isEqualTo(16);
iterator.next();
assertThat(stream.getCount()).isEqualTo(16);
}
}

@Test
public void testLazyInlineJsonNodeScanningMaterialization()
throws Exception
{
CountingInputStream stream = new CountingInputStream(new ByteArrayInputStream("[[2137], [1337]]".getBytes(UTF_8)));
try (ResultRowsDecoder decoder = new ResultRowsDecoder(); JsonParser parser = createJsonFactory().createParser(stream)) {
Iterator<List<Object>> iterator = decoder.toRows(fromQueryData(new JsonQueryData(parser.readValueAsTree()))).iterator();
assertThat(stream.getCount()).isEqualTo(16);
iterator.next();
assertThat(stream.getCount()).isEqualTo(16);
}
}

@Test
public void testSpooledJsonMaterialization()
throws Exception
Expand Down Expand Up @@ -112,6 +140,34 @@ public void testSpooledJsonNodeMaterialization()
assertThat(loaded.get()).isEqualTo(2);
}

@Test
public void testSpooledJsonNodeScanningMaterialization()
throws Exception
{
String data = IntStream.range(0, 2500)
.mapToObj(Integer::toString)
.reduce("[", (a, b) -> a + "[" + b + "],", String::concat) + "[1337]]";
CountingInputStream stream = new CountingInputStream(new ByteArrayInputStream(data.getBytes(UTF_8)));
try (ResultRowsDecoder decoder = new ResultRowsDecoder(loaderFromStream(stream))) {
Iterator<List<Object>> iterator = decoder.toRows(fromSegments(spooledSegment())).iterator();
assertThat(stream.getCount()).isEqualTo(0);
iterator.next();
assertThat(stream.getCount()).isEqualTo(8000); // Jackson reads data in 8K chunks
for (int i = 0; i < 1200; i++) {
iterator.next();
}
assertThat(stream.getCount()).isEqualTo(8000);
for (int i = 0; i < 1200; i++) {
iterator.next();
}
assertThat(stream.getCount()).isEqualTo(16000);
for (int i = 0; i < 100; i++) {
iterator.next();
}
assertThat(stream.getCount()).isEqualTo(data.length());
}
}

@Test
public void testLazySpooledMaterialization()
throws Exception
Expand Down Expand Up @@ -188,6 +244,27 @@ private static List<List<Object>> eagerlyMaterialize(Iterable<List<Object>> valu
return ImmutableList.copyOf(values);
}

private static SegmentLoader loaderFromStream(InputStream stream)
{
return new SegmentLoader() {
@Override
public InputStream load(SpooledSegment segment)
{
return stream;
}

@Override
public void acknowledge(SpooledSegment segment)
{
}

@Override
public void close()
{
}
};
}

private static QueryResults fromQueryData(QueryData queryData)
{
return new QueryResults(
Expand Down

0 comments on commit 61bfcdc

Please sign in to comment.