diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java index 53634c5cc83c6d..51ff3a44b08a62 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java @@ -18,11 +18,14 @@ package org.apache.flink.formats.csv; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; -import org.apache.flink.formats.csv.CsvRowDeserializationSchema.RuntimeConverter; import org.apache.flink.types.Row; import org.apache.flink.util.jackson.JacksonMapperFactory; @@ -32,11 +35,19 @@ import java.io.IOException; import java.io.Serializable; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.Arrays; import java.util.NoSuchElementException; -import static org.apache.flink.formats.csv.CsvRowDeserializationSchema.createFieldRuntimeConverters; -import static org.apache.flink.formats.csv.CsvRowDeserializationSchema.validateArity; +import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -140,6 +151,177 @@ public Row nextRecord(Row record) throws IOException { return returnRecord; } + // -------------------------------------------------------------------------------------------- + + interface RuntimeConverter extends Serializable { + Object convert(JsonNode node); + } + + private static RuntimeConverter createRowRuntimeConverter( + RowTypeInfo rowTypeInfo, boolean ignoreParseErrors, boolean isTopLevel) { + final TypeInformation[] fieldTypes = rowTypeInfo.getFieldTypes(); + final String[] fieldNames = rowTypeInfo.getFieldNames(); + + final RuntimeConverter[] fieldConverters = + createFieldRuntimeConverters(ignoreParseErrors, fieldTypes); + + return assembleRowRuntimeConverter( + ignoreParseErrors, isTopLevel, fieldNames, fieldConverters); + } + + static RuntimeConverter[] createFieldRuntimeConverters( + boolean ignoreParseErrors, TypeInformation[] fieldTypes) { + final RuntimeConverter[] fieldConverters = new RuntimeConverter[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + fieldConverters[i] = createNullableRuntimeConverter(fieldTypes[i], ignoreParseErrors); + } + return fieldConverters; + } + + private static RuntimeConverter assembleRowRuntimeConverter( + boolean ignoreParseErrors, + boolean isTopLevel, + String[] fieldNames, + RuntimeConverter[] fieldConverters) { + final int rowArity = fieldNames.length; + + return (node) -> { + final int nodeSize = node.size(); + + if (nodeSize != 0) { + validateArity(rowArity, nodeSize, ignoreParseErrors); + } else { + return null; + } + + final Row row = new Row(rowArity); + for (int i = 0; i < Math.min(rowArity, nodeSize); i++) { + // Jackson only supports mapping by name in the first level + if (isTopLevel) { + row.setField(i, fieldConverters[i].convert(node.get(fieldNames[i]))); + } else { + row.setField(i, fieldConverters[i].convert(node.get(i))); + } + } + return row; + }; + } + + private static RuntimeConverter createNullableRuntimeConverter( + TypeInformation info, boolean ignoreParseErrors) { + final RuntimeConverter valueConverter = createRuntimeConverter(info, ignoreParseErrors); + return (node) -> { + if (node.isNull()) { + return null; + } + try { + return valueConverter.convert(node); + } catch (Throwable t) { + if (!ignoreParseErrors) { + throw t; + } + return null; + } + }; + } + + private static RuntimeConverter createRuntimeConverter( + TypeInformation info, boolean ignoreParseErrors) { + if (info.equals(Types.VOID)) { + return (node) -> null; + } else if (info.equals(Types.STRING)) { + return JsonNode::asText; + } else if (info.equals(Types.BOOLEAN)) { + return (node) -> Boolean.valueOf(node.asText().trim()); + } else if (info.equals(Types.BYTE)) { + return (node) -> Byte.valueOf(node.asText().trim()); + } else if (info.equals(Types.SHORT)) { + return (node) -> Short.valueOf(node.asText().trim()); + } else if (info.equals(Types.INT)) { + return (node) -> Integer.valueOf(node.asText().trim()); + } else if (info.equals(Types.LONG)) { + return (node) -> Long.valueOf(node.asText().trim()); + } else if (info.equals(Types.FLOAT)) { + return (node) -> Float.valueOf(node.asText().trim()); + } else if (info.equals(Types.DOUBLE)) { + return (node) -> Double.valueOf(node.asText().trim()); + } else if (info.equals(Types.BIG_DEC)) { + return (node) -> new BigDecimal(node.asText().trim()); + } else if (info.equals(Types.BIG_INT)) { + return (node) -> new BigInteger(node.asText().trim()); + } else if (info.equals(Types.SQL_DATE)) { + return (node) -> Date.valueOf(node.asText()); + } else if (info.equals(Types.SQL_TIME)) { + return (node) -> Time.valueOf(node.asText()); + } else if (info.equals(Types.SQL_TIMESTAMP)) { + return (node) -> Timestamp.valueOf(node.asText()); + } else if (info.equals(Types.LOCAL_DATE)) { + return (node) -> Date.valueOf(node.asText()).toLocalDate(); + } else if (info.equals(Types.LOCAL_TIME)) { + return (node) -> Time.valueOf(node.asText()).toLocalTime(); + } else if (info.equals(Types.LOCAL_DATE_TIME)) { + return (node) -> LocalDateTime.parse(node.asText().trim(), SQL_TIMESTAMP_FORMAT); + } else if (info.equals(Types.INSTANT)) { + return (node) -> + LocalDateTime.parse(node.asText(), SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT) + .toInstant(ZoneOffset.UTC); + } else if (info instanceof RowTypeInfo) { + final RowTypeInfo rowTypeInfo = (RowTypeInfo) info; + return createRowRuntimeConverter(rowTypeInfo, ignoreParseErrors, false); + } else if (info instanceof BasicArrayTypeInfo) { + return createObjectArrayRuntimeConverter( + ((BasicArrayTypeInfo) info).getComponentInfo(), ignoreParseErrors); + } else if (info instanceof ObjectArrayTypeInfo) { + return createObjectArrayRuntimeConverter( + ((ObjectArrayTypeInfo) info).getComponentInfo(), ignoreParseErrors); + } else if (info instanceof PrimitiveArrayTypeInfo + && ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { + return createByteArrayRuntimeConverter(ignoreParseErrors); + } else { + throw new RuntimeException("Unsupported type information '" + info + "'."); + } + } + + private static RuntimeConverter createObjectArrayRuntimeConverter( + TypeInformation elementType, boolean ignoreParseErrors) { + final Class elementClass = elementType.getTypeClass(); + final RuntimeConverter elementConverter = + createNullableRuntimeConverter(elementType, ignoreParseErrors); + + return (node) -> { + final int nodeSize = node.size(); + final Object[] array = (Object[]) Array.newInstance(elementClass, nodeSize); + for (int i = 0; i < nodeSize; i++) { + array[i] = elementConverter.convert(node.get(i)); + } + return array; + }; + } + + private static RuntimeConverter createByteArrayRuntimeConverter(boolean ignoreParseErrors) { + return (node) -> { + try { + return node.binaryValue(); + } catch (IOException e) { + if (!ignoreParseErrors) { + throw new RuntimeException("Unable to deserialize byte array.", e); + } + return null; + } + }; + } + + static void validateArity(int expected, int actual, boolean ignoreParseErrors) { + if (expected != actual && !ignoreParseErrors) { + throw new RuntimeException( + "Row length mismatch. " + + expected + + " fields expected but was " + + actual + + "."); + } + } + /** Create a builder. */ public static Builder builder(TypeInformation typeInfo, Path... filePaths) { return new Builder(typeInfo, filePaths); diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java index 402168a3d8d4a2..87f109721e48dd 100644 --- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java @@ -474,7 +474,7 @@ private static byte[] serialize( CsvRowDataSerializationSchema schema = InstantiationUtil.deserializeObject( InstantiationUtil.serializeObject(serSchemaBuilder.build()), - CsvRowDeSerializationSchemaTest.class.getClassLoader()); + CsvRowDataSerDeSchemaTest.class.getClassLoader()); open(schema); return schema.serialize(row); } @@ -487,7 +487,7 @@ private static RowData deserialize( CsvRowDataDeserializationSchema schema = InstantiationUtil.deserializeObject( InstantiationUtil.serializeObject(deserSchemaBuilder.build()), - CsvRowDeSerializationSchemaTest.class.getClassLoader()); + CsvRowDataSerDeSchemaTest.class.getClassLoader()); open(schema); return schema.deserialize(csv != null ? csv.getBytes() : null); } diff --git a/flink-python/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-python/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java index 852ee54f850761..923783f6cef090 100644 --- a/flink-python/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java +++ b/flink-python/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java @@ -20,7 +20,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; -import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java b/flink-python/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java similarity index 97% rename from flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java rename to flink-python/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java index b2a91a9fb34416..1f1831303f4dd3 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java +++ b/flink-python/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java @@ -18,7 +18,7 @@ package org.apache.flink.formats.csv; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; @@ -57,13 +57,8 @@ * Row}. * *

Failure during deserialization are forwarded as wrapped {@link IOException}s. - * - * @deprecated The format was developed for the Table API users and will not be maintained for - * DataStream API users anymore. Either use Table API or switch to Data Stream, defining your - * own {@link DeserializationSchema}. */ -@PublicEvolving -@Deprecated +@Internal public final class CsvRowDeserializationSchema implements DeserializationSchema { private static final long serialVersionUID = 2135553495874539201L; @@ -98,7 +93,6 @@ public void open(InitializationContext context) throws Exception { } /** A builder for creating a {@link CsvRowDeserializationSchema}. */ - @PublicEvolving public static class Builder { private final RowTypeInfo typeInfo; diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java b/flink-python/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java similarity index 98% rename from flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java rename to flink-python/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java index 53f4edf16e31ed..a3c8f4fd97219a 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java +++ b/flink-python/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java @@ -18,7 +18,7 @@ package org.apache.flink.formats.csv; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; @@ -57,13 +57,8 @@ * *

Result byte[] messages can be deserialized using {@link * CsvRowDeserializationSchema}. - * - * @deprecated The format was developed for the Table API users and will not be maintained for - * DataStream API users anymore. Either use Table API or switch to Data Stream, defining your - * own {@link SerializationSchema}. */ -@PublicEvolving -@Deprecated +@Internal public final class CsvRowSerializationSchema implements SerializationSchema { private static final long serialVersionUID = 2098447220136965L; diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java b/flink-python/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java similarity index 100% rename from flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java rename to flink-python/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java