Skip to content

Commit

Permalink
* improve test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
zachjsh committed Jul 30, 2024
1 parent 90e0f12 commit 709fd16
Showing 1 changed file with 189 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
Expand Down Expand Up @@ -192,6 +193,121 @@ public void testTimestampFromHeader() throws IOException
}
}

@Test
public void testRawSample() throws IOException
{
KinesisRecordEntity inputEntity = makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS);

final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec(KinesisInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING, "auto", DateTimes.EPOCH),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(
ImmutableList.of(
"bar",
"foo"
)
)
),
ColumnsFilter.all()
),
newSettableByteEntity(inputEntity),
null
);

final int numExpectedIterations = 1;
try (CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample()) {
int numActualIterations = 0;
while (iterator.hasNext()) {

final InputRowListPlusRawValues rawValues = iterator.next();
Assert.assertEquals(1, rawValues.getInputRows().size());
InputRow row = rawValues.getInputRows().get(0);
// Payload verifications
// this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec
// but test reading them anyway since it isn't technically illegal

Assert.assertEquals(
String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS),
Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp"))
);
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));

Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());

numActualIterations++;
}

Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}

@Test
public void testProcessesSampleTimestampFromHeader() throws IOException
{
KinesisRecordEntity inputEntity = makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS);

final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("kinesis.newts.timestamp", "iso", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(
ImmutableList.of(
"bar",
"foo"
)
)
),
ColumnsFilter.all()
),
newSettableByteEntity(inputEntity),
null
);

final int numExpectedIterations = 1;
try (CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample()) {
int numActualIterations = 0;
while (iterator.hasNext()) {

final InputRowListPlusRawValues rawValues = iterator.next();
Assert.assertEquals(1, rawValues.getInputRows().size());
InputRow row = rawValues.getInputRows().get(0);
// Payload verifications
// this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec
// but test reading them anyway since it isn't technically illegal

Assert.assertEquals(DateTimes.of(String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS)), row.getTimestamp());
Assert.assertEquals(
String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS),
Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp"))
);
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));

Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());

numActualIterations++;
}

Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}

@Test
public void testWithMultipleMixedRecordsTimestampFromHeader() throws IOException
{
Expand Down Expand Up @@ -438,15 +554,18 @@ public void testMissingTimestampThrowsException() throws IOException
}

@Test
public void testWithSchemaDiscovery() throws IOException
public void testWithSchemaDiscoveryKinesisTimestampExcluded() throws IOException
{
KinesisRecordEntity inputEntity =
makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS);

final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
DimensionsSpec.builder().useSchemaDiscovery(true).build(),
DimensionsSpec.builder()
.useSchemaDiscovery(true)
.setDimensionExclusions(ImmutableList.of("kinesis.newts.timestamp"))
.build(),
ColumnsFilter.all()
),
newSettableByteEntity(inputEntity),
Expand All @@ -461,7 +580,6 @@ public void testWithSchemaDiscovery() throws IOException
final InputRow row = iterator.next();
List<String> expectedDimensions = Arrays.asList(
"foo",
"kinesis.newts.timestamp",
"root_baz",
"o",
"bar",
Expand Down Expand Up @@ -503,6 +621,74 @@ public void testWithSchemaDiscovery() throws IOException
}
}

@Test
public void testWithSchemaDiscoveryTimestampFromHeader() throws IOException
{
KinesisRecordEntity inputEntity =
makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS);

final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("kinesis.newts.timestamp", "iso", null),
DimensionsSpec.builder()
.useSchemaDiscovery(true)
.build(),
ColumnsFilter.all()
),
newSettableByteEntity(inputEntity),
null
);

final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {

final InputRow row = iterator.next();
List<String> expectedDimensions = Arrays.asList(
"foo",
"timestamp",
"root_baz",
"o",
"bar",
"path_omg",
"jq_omg",
"jq_omg2",
"baz",
"root_baz2",
"path_omg2"
);
Collections.sort(expectedDimensions);
Collections.sort(row.getDimensions());
Assert.assertEquals(
expectedDimensions,
row.getDimensions()
);

// Payload verifications
Assert.assertEquals(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE), row.getTimestamp());
Assert.assertEquals(
String.valueOf(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis()),
Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp"))
);
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));

Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());

numActualIterations++;
}

Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}

@Test
public void testValueInCsvFormat() throws IOException
{
Expand Down

0 comments on commit 709fd16

Please sign in to comment.