Skip to content

Commit

Permalink
[FLINK-35097][table] Update RawFormatSerDeSchemaTest to validate mult…
Browse files Browse the repository at this point in the history
…iple serializations/deserializations
  • Loading branch information
kumar-mallikarjuna authored and twalthr committed Apr 22, 2024
1 parent 3a56c2f commit 8ffe6d6
Showing 1 changed file with 118 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -67,76 +68,107 @@ public class RawFormatSerDeSchemaTest {
@Parameterized.Parameters(name = "{index}: {0}")
public static List<TestSpec> testData() {
return Arrays.asList(
TestSpec.type(TINYINT()).value(Byte.MAX_VALUE).binary(new byte[] {Byte.MAX_VALUE}),
TestSpec.type(SMALLINT()).value(Short.MAX_VALUE).binary(hexStringToByte("7fff")),
TestSpec.type(TINYINT())
.values(new Byte[] {Byte.MAX_VALUE})
.binary(new byte[][] {{Byte.MAX_VALUE}}),
TestSpec.type(SMALLINT())
.value(Short.MAX_VALUE)
.values(new Short[] {Short.MAX_VALUE})
.binary(new byte[][] {hexStringToByte("7fff")}),
TestSpec.type(SMALLINT())
.values(new Short[] {Short.MAX_VALUE})
.withLittleEndian()
.binary(hexStringToByte("ff7f")),
TestSpec.type(INT()).value(Integer.MAX_VALUE).binary(hexStringToByte("7fffffff")),
.binary(new byte[][] {hexStringToByte("ff7f")}),
TestSpec.type(INT())
.values(new Integer[] {Integer.MAX_VALUE})
.binary(new byte[][] {hexStringToByte("7fffffff")}),
TestSpec.type(INT())
.value(Integer.MAX_VALUE)
.values(new Integer[] {Integer.MAX_VALUE})
.withLittleEndian()
.binary(hexStringToByte("ffffff7f")),
.binary(new byte[][] {hexStringToByte("ffffff7f")}),
TestSpec.type(BIGINT())
.value(Long.MAX_VALUE)
.binary(hexStringToByte("7fffffffffffffff")),
.values(new Long[] {Long.MAX_VALUE})
.binary(new byte[][] {hexStringToByte("7fffffffffffffff")}),
TestSpec.type(BIGINT())
.value(Long.MAX_VALUE)
.values(new Long[] {Long.MAX_VALUE})
.withLittleEndian()
.binary(hexStringToByte("ffffffffffffff7f")),
TestSpec.type(FLOAT()).value(Float.MAX_VALUE).binary(hexStringToByte("7f7fffff")),
.binary(new byte[][] {hexStringToByte("ffffffffffffff7f")}),
TestSpec.type(FLOAT())
.value(Float.MAX_VALUE)
.values(new Float[] {Float.MAX_VALUE})
.binary(new byte[][] {hexStringToByte("7f7fffff")}),
TestSpec.type(FLOAT())
.values(new Float[] {Float.MAX_VALUE})
.withLittleEndian()
.binary(hexStringToByte("ffff7f7f")),
.binary(new byte[][] {hexStringToByte("ffff7f7f")}),
TestSpec.type(DOUBLE())
.value(Double.MAX_VALUE)
.binary(hexStringToByte("7fefffffffffffff")),
.values(new Double[] {Double.MAX_VALUE})
.binary(new byte[][] {hexStringToByte("7fefffffffffffff")}),
TestSpec.type(DOUBLE())
.value(Double.MAX_VALUE)
.values(new Double[] {Double.MAX_VALUE})
.withLittleEndian()
.binary(hexStringToByte("ffffffffffffef7f")),
TestSpec.type(BOOLEAN()).value(true).binary(new byte[] {1}),
TestSpec.type(BOOLEAN()).value(false).binary(new byte[] {0}),
TestSpec.type(STRING()).value("Hello World").binary("Hello World".getBytes()),
.binary(new byte[][] {hexStringToByte("ffffffffffffef7f")}),
TestSpec.type(BOOLEAN())
.values(new Boolean[] {true})
.binary(new byte[][] {new byte[] {1}}),
TestSpec.type(BOOLEAN())
.values(new Boolean[] {false})
.binary(new byte[][] {new byte[] {0}}),
TestSpec.type(STRING())
.values(new String[] {"Hello World"})
.binary(new byte[][] {"Hello World".getBytes()}),
TestSpec.type(STRING())
.value("你好世界,Hello World")
.binary("你好世界,Hello World".getBytes()),
.values(new String[] {"你好世界,Hello World"})
.binary(new byte[][] {"你好世界,Hello World".getBytes()}),
TestSpec.type(STRING())
.value("Flink Awesome!")
.values(new String[] {"Flink Awesome!"})
.withCharset("UTF-16")
.binary("Flink Awesome!".getBytes(StandardCharsets.UTF_16)),
.binary(new byte[][] {"Flink Awesome!".getBytes(StandardCharsets.UTF_16)}),
TestSpec.type(STRING())
.value("Flink 帅哭!")
.values(new String[] {"Flink 帅哭!"})
.withCharset("UTF-16")
.binary("Flink 帅哭!".getBytes(StandardCharsets.UTF_16)),
TestSpec.type(STRING()).value("").binary("".getBytes()),
TestSpec.type(VARCHAR(5)).value("HELLO").binary("HELLO".getBytes()),
.binary(new byte[][] {"Flink 帅哭!".getBytes(StandardCharsets.UTF_16)}),
TestSpec.type(STRING())
.values(new String[] {""})
.binary(new byte[][] {"".getBytes()}),
TestSpec.type(VARCHAR(5))
.values(new String[] {"HELLO"})
.binary(new byte[][] {"HELLO".getBytes()}),
TestSpec.type(STRING())
.values(new String[] {"line 1", "line 2", "line 3"})
.binary(
new byte[][] {
"line 1".getBytes(), "line 2".getBytes(), "line 3".getBytes()
}),
TestSpec.type(BYTES())
.value(new byte[] {1, 3, 5, 7, 9})
.binary(new byte[] {1, 3, 5, 7, 9}),
TestSpec.type(BYTES()).value(new byte[] {}).binary(new byte[] {}),
TestSpec.type(BINARY(3)).value(new byte[] {1, 3, 5}).binary(new byte[] {1, 3, 5}),
.values(new byte[][] {{1, 3, 5, 7, 9}})
.binary(new byte[][] {{1, 3, 5, 7, 9}}),
TestSpec.type(BYTES()).values(new byte[][] {}).binary(new byte[][] {{}}),
TestSpec.type(BINARY(3))
.values(new byte[][] {{1, 3, 5}})
.binary(new byte[][] {{1, 3, 5}}),
TestSpec.type(RAW(LocalDateTime.class, new LocalDateTimeSerializer()))
.value(LocalDateTime.parse("2020-11-11T18:08:01.123"))
.values(
new LocalDateTime[] {
LocalDateTime.parse("2020-11-11T18:08:01.123")
})
.binary(
serializeLocalDateTime(
LocalDateTime.parse("2020-11-11T18:08:01.123"))),
new byte[][] {
serializeLocalDateTime(
LocalDateTime.parse("2020-11-11T18:08:01.123"))
}),

// test nulls
TestSpec.type(TINYINT()).value(null).binary(null),
TestSpec.type(SMALLINT()).value(null).binary(null),
TestSpec.type(INT()).value(null).binary(null),
TestSpec.type(BIGINT()).value(null).binary(null),
TestSpec.type(FLOAT()).value(null).binary(null),
TestSpec.type(DOUBLE()).value(null).binary(null),
TestSpec.type(BOOLEAN()).value(null).binary(null),
TestSpec.type(STRING()).value(null).binary(null),
TestSpec.type(BYTES()).value(null).binary(null),
TestSpec.type(TINYINT()).values(new Byte[] {null}).binary(new byte[][] {null}),
TestSpec.type(SMALLINT()).values(new Short[] {null}).binary(new byte[][] {null}),
TestSpec.type(INT()).values(new Integer[] {null}).binary(new byte[][] {null}),
TestSpec.type(BIGINT()).values(new Long[] {null}).binary(new byte[][] {null}),
TestSpec.type(FLOAT()).values(new Float[] {null}).binary(new byte[][] {null}),
TestSpec.type(DOUBLE()).values(new Double[] {null}).binary(new byte[][] {null}),
TestSpec.type(BOOLEAN()).values(new Boolean[] {null}).binary(new byte[][] {null}),
TestSpec.type(STRING()).values(new String[] {null}).binary(new byte[][] {null}),
TestSpec.type(BYTES()).values(new byte[][] {null}).binary(new byte[][] {null}),
TestSpec.type(RAW(LocalDateTime.class, new LocalDateTimeSerializer()))
.value(null)
.binary(null));
.values(new LocalDateTime[] {null})
.binary(new byte[][] {null}));
}

@Parameterized.Parameter public TestSpec testSpec;
Expand All @@ -155,17 +187,35 @@ public void testSerializationAndDeserialization() throws Exception {
deserializationSchema.open(mock(DeserializationSchema.InitializationContext.class));
serializationSchema.open(mock(SerializationSchema.InitializationContext.class));

Row row = Row.of(testSpec.value);
DataStructureConverter<Object, Object> converter =
DataStructureConverters.getConverter(ROW(FIELD("single", testSpec.type)));
RowData originalRowData = (RowData) converter.toInternal(row);

byte[] serializedBytes = serializationSchema.serialize(originalRowData);
assertThat(serializedBytes).isEqualTo(testSpec.binary);
byte[][] serializedBytesArr = new byte[testSpec.values.length][];
RowData[] deserializedRowDataArr = new RowData[testSpec.values.length];

// The following loops are partitioned to ensure the serialized/deserialized
// values are not copied by reference. (see FLINK-35097)

RowData deserializeRowData = deserializationSchema.deserialize(serializedBytes);
Row actual = (Row) converter.toExternal(deserializeRowData);
assertThat(actual).isEqualTo(row);
// Process serialization
for (int i = 0; i < testSpec.values.length; i++) {
Row row = Row.of(testSpec.values[i]);
RowData originalRowData = (RowData) converter.toInternal(row);
serializedBytesArr[i] = serializationSchema.serialize(originalRowData);
}

// Test serialization and process deserialization
for (int i = 0; i < testSpec.values.length; i++) {
assertThat(serializedBytesArr[i]).isEqualTo(testSpec.binary[i]);

deserializedRowDataArr[i] = deserializationSchema.deserialize(serializedBytesArr[i]);
}

// Test deserialization
for (int i = 0; i < testSpec.values.length; i++) {
Row row = Row.of(testSpec.values[i]);
Row actual = (Row) converter.toExternal(deserializedRowDataArr[i]);
assertThat(actual).isEqualTo(row);
}
}

private static byte[] serializeLocalDateTime(LocalDateTime localDateTime) {
Expand All @@ -183,9 +233,9 @@ private static byte[] serializeLocalDateTime(LocalDateTime localDateTime) {

private static class TestSpec {

private Object value;
private byte[] binary;
private DataType type;
private Object[] values;
private byte[][] binary;
private final DataType type;
private String charsetName = "UTF-8";
private boolean isBigEndian = true;

Expand All @@ -197,12 +247,12 @@ public static TestSpec type(DataType fieldType) {
return new TestSpec(fieldType);
}

public TestSpec value(Object value) {
this.value = value;
public TestSpec values(Object[] values) {
this.values = values;
return this;
}

public TestSpec binary(byte[] bytes) {
public TestSpec binary(byte[][] bytes) {
this.binary = bytes;
return this;
}
Expand All @@ -219,12 +269,16 @@ public TestSpec withLittleEndian() {

@Override
public String toString() {
String hex = binary == null ? "null" : "0x" + StringUtils.byteToHexString(binary);
ArrayList<String> hexes = new ArrayList<>();
for (byte[] b : binary) {
hexes.add(b == null ? "" : "0x" + StringUtils.byteToHexString(b));
}

return "TestSpec{"
+ "value="
+ value
+ "values="
+ Arrays.toString(values)
+ ", binary="
+ hex
+ hexes
+ ", type="
+ type
+ ", charsetName='"
Expand Down

0 comments on commit 8ffe6d6

Please sign in to comment.