diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatSerDeSchemaTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatSerDeSchemaTest.java index 10c97497cd39c..cd3d15131c7b8 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatSerDeSchemaTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatSerDeSchemaTest.java @@ -39,6 +39,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -67,76 +68,107 @@ public class RawFormatSerDeSchemaTest { @Parameterized.Parameters(name = "{index}: {0}") public static List testData() { return Arrays.asList( - TestSpec.type(TINYINT()).value(Byte.MAX_VALUE).binary(new byte[] {Byte.MAX_VALUE}), - TestSpec.type(SMALLINT()).value(Short.MAX_VALUE).binary(hexStringToByte("7fff")), + TestSpec.type(TINYINT()) + .values(new Byte[] {Byte.MAX_VALUE}) + .binary(new byte[][] {{Byte.MAX_VALUE}}), TestSpec.type(SMALLINT()) - .value(Short.MAX_VALUE) + .values(new Short[] {Short.MAX_VALUE}) + .binary(new byte[][] {hexStringToByte("7fff")}), + TestSpec.type(SMALLINT()) + .values(new Short[] {Short.MAX_VALUE}) .withLittleEndian() - .binary(hexStringToByte("ff7f")), - TestSpec.type(INT()).value(Integer.MAX_VALUE).binary(hexStringToByte("7fffffff")), + .binary(new byte[][] {hexStringToByte("ff7f")}), + TestSpec.type(INT()) + .values(new Integer[] {Integer.MAX_VALUE}) + .binary(new byte[][] {hexStringToByte("7fffffff")}), TestSpec.type(INT()) - .value(Integer.MAX_VALUE) + .values(new Integer[] {Integer.MAX_VALUE}) .withLittleEndian() - .binary(hexStringToByte("ffffff7f")), + .binary(new byte[][] {hexStringToByte("ffffff7f")}), TestSpec.type(BIGINT()) - .value(Long.MAX_VALUE) - .binary(hexStringToByte("7fffffffffffffff")), + .values(new Long[] {Long.MAX_VALUE}) + .binary(new byte[][] {hexStringToByte("7fffffffffffffff")}), TestSpec.type(BIGINT()) - .value(Long.MAX_VALUE) + .values(new Long[] {Long.MAX_VALUE}) .withLittleEndian() - .binary(hexStringToByte("ffffffffffffff7f")), - TestSpec.type(FLOAT()).value(Float.MAX_VALUE).binary(hexStringToByte("7f7fffff")), + .binary(new byte[][] {hexStringToByte("ffffffffffffff7f")}), TestSpec.type(FLOAT()) - .value(Float.MAX_VALUE) + .values(new Float[] {Float.MAX_VALUE}) + .binary(new byte[][] {hexStringToByte("7f7fffff")}), + TestSpec.type(FLOAT()) + .values(new Float[] {Float.MAX_VALUE}) .withLittleEndian() - .binary(hexStringToByte("ffff7f7f")), + .binary(new byte[][] {hexStringToByte("ffff7f7f")}), TestSpec.type(DOUBLE()) - .value(Double.MAX_VALUE) - .binary(hexStringToByte("7fefffffffffffff")), + .values(new Double[] {Double.MAX_VALUE}) + .binary(new byte[][] {hexStringToByte("7fefffffffffffff")}), TestSpec.type(DOUBLE()) - .value(Double.MAX_VALUE) + .values(new Double[] {Double.MAX_VALUE}) .withLittleEndian() - .binary(hexStringToByte("ffffffffffffef7f")), - TestSpec.type(BOOLEAN()).value(true).binary(new byte[] {1}), - TestSpec.type(BOOLEAN()).value(false).binary(new byte[] {0}), - TestSpec.type(STRING()).value("Hello World").binary("Hello World".getBytes()), + .binary(new byte[][] {hexStringToByte("ffffffffffffef7f")}), + TestSpec.type(BOOLEAN()) + .values(new Boolean[] {true}) + .binary(new byte[][] {new byte[] {1}}), + TestSpec.type(BOOLEAN()) + .values(new Boolean[] {false}) + .binary(new byte[][] {new byte[] {0}}), + TestSpec.type(STRING()) + .values(new String[] {"Hello World"}) + .binary(new byte[][] {"Hello World".getBytes()}), TestSpec.type(STRING()) - .value("你好世界,Hello World") - .binary("你好世界,Hello World".getBytes()), + .values(new String[] {"你好世界,Hello World"}) + .binary(new byte[][] {"你好世界,Hello World".getBytes()}), TestSpec.type(STRING()) - .value("Flink Awesome!") + .values(new String[] {"Flink Awesome!"}) .withCharset("UTF-16") - .binary("Flink Awesome!".getBytes(StandardCharsets.UTF_16)), + .binary(new byte[][] {"Flink Awesome!".getBytes(StandardCharsets.UTF_16)}), TestSpec.type(STRING()) - .value("Flink 帅哭!") + .values(new String[] {"Flink 帅哭!"}) .withCharset("UTF-16") - .binary("Flink 帅哭!".getBytes(StandardCharsets.UTF_16)), - TestSpec.type(STRING()).value("").binary("".getBytes()), - TestSpec.type(VARCHAR(5)).value("HELLO").binary("HELLO".getBytes()), + .binary(new byte[][] {"Flink 帅哭!".getBytes(StandardCharsets.UTF_16)}), + TestSpec.type(STRING()) + .values(new String[] {""}) + .binary(new byte[][] {"".getBytes()}), + TestSpec.type(VARCHAR(5)) + .values(new String[] {"HELLO"}) + .binary(new byte[][] {"HELLO".getBytes()}), + TestSpec.type(STRING()) + .values(new String[] {"line 1", "line 2", "line 3"}) + .binary( + new byte[][] { + "line 1".getBytes(), "line 2".getBytes(), "line 3".getBytes() + }), TestSpec.type(BYTES()) - .value(new byte[] {1, 3, 5, 7, 9}) - .binary(new byte[] {1, 3, 5, 7, 9}), - TestSpec.type(BYTES()).value(new byte[] {}).binary(new byte[] {}), - TestSpec.type(BINARY(3)).value(new byte[] {1, 3, 5}).binary(new byte[] {1, 3, 5}), + .values(new byte[][] {{1, 3, 5, 7, 9}}) + .binary(new byte[][] {{1, 3, 5, 7, 9}}), + TestSpec.type(BYTES()).values(new byte[][] {}).binary(new byte[][] {{}}), + TestSpec.type(BINARY(3)) + .values(new byte[][] {{1, 3, 5}}) + .binary(new byte[][] {{1, 3, 5}}), TestSpec.type(RAW(LocalDateTime.class, new LocalDateTimeSerializer())) - .value(LocalDateTime.parse("2020-11-11T18:08:01.123")) + .values( + new LocalDateTime[] { + LocalDateTime.parse("2020-11-11T18:08:01.123") + }) .binary( - serializeLocalDateTime( - LocalDateTime.parse("2020-11-11T18:08:01.123"))), + new byte[][] { + serializeLocalDateTime( + LocalDateTime.parse("2020-11-11T18:08:01.123")) + }), // test nulls - TestSpec.type(TINYINT()).value(null).binary(null), - TestSpec.type(SMALLINT()).value(null).binary(null), - TestSpec.type(INT()).value(null).binary(null), - TestSpec.type(BIGINT()).value(null).binary(null), - TestSpec.type(FLOAT()).value(null).binary(null), - TestSpec.type(DOUBLE()).value(null).binary(null), - TestSpec.type(BOOLEAN()).value(null).binary(null), - TestSpec.type(STRING()).value(null).binary(null), - TestSpec.type(BYTES()).value(null).binary(null), + TestSpec.type(TINYINT()).values(new Byte[] {null}).binary(new byte[][] {null}), + TestSpec.type(SMALLINT()).values(new Short[] {null}).binary(new byte[][] {null}), + TestSpec.type(INT()).values(new Integer[] {null}).binary(new byte[][] {null}), + TestSpec.type(BIGINT()).values(new Long[] {null}).binary(new byte[][] {null}), + TestSpec.type(FLOAT()).values(new Float[] {null}).binary(new byte[][] {null}), + TestSpec.type(DOUBLE()).values(new Double[] {null}).binary(new byte[][] {null}), + TestSpec.type(BOOLEAN()).values(new Boolean[] {null}).binary(new byte[][] {null}), + TestSpec.type(STRING()).values(new String[] {null}).binary(new byte[][] {null}), + TestSpec.type(BYTES()).values(new byte[][] {null}).binary(new byte[][] {null}), TestSpec.type(RAW(LocalDateTime.class, new LocalDateTimeSerializer())) - .value(null) - .binary(null)); + .values(new LocalDateTime[] {null}) + .binary(new byte[][] {null})); } @Parameterized.Parameter public TestSpec testSpec; @@ -155,17 +187,35 @@ public void testSerializationAndDeserialization() throws Exception { deserializationSchema.open(mock(DeserializationSchema.InitializationContext.class)); serializationSchema.open(mock(SerializationSchema.InitializationContext.class)); - Row row = Row.of(testSpec.value); DataStructureConverter converter = DataStructureConverters.getConverter(ROW(FIELD("single", testSpec.type))); - RowData originalRowData = (RowData) converter.toInternal(row); - byte[] serializedBytes = serializationSchema.serialize(originalRowData); - assertThat(serializedBytes).isEqualTo(testSpec.binary); + byte[][] serializedBytesArr = new byte[testSpec.values.length][]; + RowData[] deserializedRowDataArr = new RowData[testSpec.values.length]; + + // The following loops are partitioned to ensure the serialized/deserialized + // values are not copied by reference. (see FLINK-35097) - RowData deserializeRowData = deserializationSchema.deserialize(serializedBytes); - Row actual = (Row) converter.toExternal(deserializeRowData); - assertThat(actual).isEqualTo(row); + // Process serialization + for (int i = 0; i < testSpec.values.length; i++) { + Row row = Row.of(testSpec.values[i]); + RowData originalRowData = (RowData) converter.toInternal(row); + serializedBytesArr[i] = serializationSchema.serialize(originalRowData); + } + + // Test serialization and process deserialization + for (int i = 0; i < testSpec.values.length; i++) { + assertThat(serializedBytesArr[i]).isEqualTo(testSpec.binary[i]); + + deserializedRowDataArr[i] = deserializationSchema.deserialize(serializedBytesArr[i]); + } + + // Test deserialization + for (int i = 0; i < testSpec.values.length; i++) { + Row row = Row.of(testSpec.values[i]); + Row actual = (Row) converter.toExternal(deserializedRowDataArr[i]); + assertThat(actual).isEqualTo(row); + } } private static byte[] serializeLocalDateTime(LocalDateTime localDateTime) { @@ -183,9 +233,9 @@ private static byte[] serializeLocalDateTime(LocalDateTime localDateTime) { private static class TestSpec { - private Object value; - private byte[] binary; - private DataType type; + private Object[] values; + private byte[][] binary; + private final DataType type; private String charsetName = "UTF-8"; private boolean isBigEndian = true; @@ -197,12 +247,12 @@ public static TestSpec type(DataType fieldType) { return new TestSpec(fieldType); } - public TestSpec value(Object value) { - this.value = value; + public TestSpec values(Object[] values) { + this.values = values; return this; } - public TestSpec binary(byte[] bytes) { + public TestSpec binary(byte[][] bytes) { this.binary = bytes; return this; } @@ -219,12 +269,16 @@ public TestSpec withLittleEndian() { @Override public String toString() { - String hex = binary == null ? "null" : "0x" + StringUtils.byteToHexString(binary); + ArrayList hexes = new ArrayList<>(); + for (byte[] b : binary) { + hexes.add(b == null ? "" : "0x" + StringUtils.byteToHexString(b)); + } + return "TestSpec{" - + "value=" - + value + + "values=" + + Arrays.toString(values) + ", binary=" - + hex + + hexes + ", type=" + type + ", charsetName='"