Skip to content

Commit

Permalink
[FLINK-36311][format/csv] Remove deprecated APIs in Flink Csv format
Browse files Browse the repository at this point in the history
  • Loading branch information
dianfu committed Sep 19, 2024
1 parent ec2c32d commit 215cc04
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

package org.apache.flink.formats.csv;

import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.csv.CsvRowDeserializationSchema.RuntimeConverter;
import org.apache.flink.types.Row;
import org.apache.flink.util.jackson.JacksonMapperFactory;

Expand All @@ -32,11 +35,19 @@

import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.NoSuchElementException;

import static org.apache.flink.formats.csv.CsvRowDeserializationSchema.createFieldRuntimeConverters;
import static org.apache.flink.formats.csv.CsvRowDeserializationSchema.validateArity;
import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT;
import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -140,6 +151,177 @@ public Row nextRecord(Row record) throws IOException {
return returnRecord;
}

// --------------------------------------------------------------------------------------------

interface RuntimeConverter extends Serializable {
Object convert(JsonNode node);
}

private static RuntimeConverter createRowRuntimeConverter(
RowTypeInfo rowTypeInfo, boolean ignoreParseErrors, boolean isTopLevel) {
final TypeInformation<?>[] fieldTypes = rowTypeInfo.getFieldTypes();
final String[] fieldNames = rowTypeInfo.getFieldNames();

final RuntimeConverter[] fieldConverters =
createFieldRuntimeConverters(ignoreParseErrors, fieldTypes);

return assembleRowRuntimeConverter(
ignoreParseErrors, isTopLevel, fieldNames, fieldConverters);
}

static RuntimeConverter[] createFieldRuntimeConverters(
boolean ignoreParseErrors, TypeInformation<?>[] fieldTypes) {
final RuntimeConverter[] fieldConverters = new RuntimeConverter[fieldTypes.length];
for (int i = 0; i < fieldTypes.length; i++) {
fieldConverters[i] = createNullableRuntimeConverter(fieldTypes[i], ignoreParseErrors);
}
return fieldConverters;
}

private static RuntimeConverter assembleRowRuntimeConverter(
boolean ignoreParseErrors,
boolean isTopLevel,
String[] fieldNames,
RuntimeConverter[] fieldConverters) {
final int rowArity = fieldNames.length;

return (node) -> {
final int nodeSize = node.size();

if (nodeSize != 0) {
validateArity(rowArity, nodeSize, ignoreParseErrors);
} else {
return null;
}

final Row row = new Row(rowArity);
for (int i = 0; i < Math.min(rowArity, nodeSize); i++) {
// Jackson only supports mapping by name in the first level
if (isTopLevel) {
row.setField(i, fieldConverters[i].convert(node.get(fieldNames[i])));
} else {
row.setField(i, fieldConverters[i].convert(node.get(i)));
}
}
return row;
};
}

private static RuntimeConverter createNullableRuntimeConverter(
TypeInformation<?> info, boolean ignoreParseErrors) {
final RuntimeConverter valueConverter = createRuntimeConverter(info, ignoreParseErrors);
return (node) -> {
if (node.isNull()) {
return null;
}
try {
return valueConverter.convert(node);
} catch (Throwable t) {
if (!ignoreParseErrors) {
throw t;
}
return null;
}
};
}

private static RuntimeConverter createRuntimeConverter(
TypeInformation<?> info, boolean ignoreParseErrors) {
if (info.equals(Types.VOID)) {
return (node) -> null;
} else if (info.equals(Types.STRING)) {
return JsonNode::asText;
} else if (info.equals(Types.BOOLEAN)) {
return (node) -> Boolean.valueOf(node.asText().trim());
} else if (info.equals(Types.BYTE)) {
return (node) -> Byte.valueOf(node.asText().trim());
} else if (info.equals(Types.SHORT)) {
return (node) -> Short.valueOf(node.asText().trim());
} else if (info.equals(Types.INT)) {
return (node) -> Integer.valueOf(node.asText().trim());
} else if (info.equals(Types.LONG)) {
return (node) -> Long.valueOf(node.asText().trim());
} else if (info.equals(Types.FLOAT)) {
return (node) -> Float.valueOf(node.asText().trim());
} else if (info.equals(Types.DOUBLE)) {
return (node) -> Double.valueOf(node.asText().trim());
} else if (info.equals(Types.BIG_DEC)) {
return (node) -> new BigDecimal(node.asText().trim());
} else if (info.equals(Types.BIG_INT)) {
return (node) -> new BigInteger(node.asText().trim());
} else if (info.equals(Types.SQL_DATE)) {
return (node) -> Date.valueOf(node.asText());
} else if (info.equals(Types.SQL_TIME)) {
return (node) -> Time.valueOf(node.asText());
} else if (info.equals(Types.SQL_TIMESTAMP)) {
return (node) -> Timestamp.valueOf(node.asText());
} else if (info.equals(Types.LOCAL_DATE)) {
return (node) -> Date.valueOf(node.asText()).toLocalDate();
} else if (info.equals(Types.LOCAL_TIME)) {
return (node) -> Time.valueOf(node.asText()).toLocalTime();
} else if (info.equals(Types.LOCAL_DATE_TIME)) {
return (node) -> LocalDateTime.parse(node.asText().trim(), SQL_TIMESTAMP_FORMAT);
} else if (info.equals(Types.INSTANT)) {
return (node) ->
LocalDateTime.parse(node.asText(), SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT)
.toInstant(ZoneOffset.UTC);
} else if (info instanceof RowTypeInfo) {
final RowTypeInfo rowTypeInfo = (RowTypeInfo) info;
return createRowRuntimeConverter(rowTypeInfo, ignoreParseErrors, false);
} else if (info instanceof BasicArrayTypeInfo) {
return createObjectArrayRuntimeConverter(
((BasicArrayTypeInfo<?, ?>) info).getComponentInfo(), ignoreParseErrors);
} else if (info instanceof ObjectArrayTypeInfo) {
return createObjectArrayRuntimeConverter(
((ObjectArrayTypeInfo<?, ?>) info).getComponentInfo(), ignoreParseErrors);
} else if (info instanceof PrimitiveArrayTypeInfo
&& ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) {
return createByteArrayRuntimeConverter(ignoreParseErrors);
} else {
throw new RuntimeException("Unsupported type information '" + info + "'.");
}
}

private static RuntimeConverter createObjectArrayRuntimeConverter(
TypeInformation<?> elementType, boolean ignoreParseErrors) {
final Class<?> elementClass = elementType.getTypeClass();
final RuntimeConverter elementConverter =
createNullableRuntimeConverter(elementType, ignoreParseErrors);

return (node) -> {
final int nodeSize = node.size();
final Object[] array = (Object[]) Array.newInstance(elementClass, nodeSize);
for (int i = 0; i < nodeSize; i++) {
array[i] = elementConverter.convert(node.get(i));
}
return array;
};
}

private static RuntimeConverter createByteArrayRuntimeConverter(boolean ignoreParseErrors) {
return (node) -> {
try {
return node.binaryValue();
} catch (IOException e) {
if (!ignoreParseErrors) {
throw new RuntimeException("Unable to deserialize byte array.", e);
}
return null;
}
};
}

static void validateArity(int expected, int actual, boolean ignoreParseErrors) {
if (expected != actual && !ignoreParseErrors) {
throw new RuntimeException(
"Row length mismatch. "
+ expected
+ " fields expected but was "
+ actual
+ ".");
}
}

/** Create a builder. */
public static Builder builder(TypeInformation<Row> typeInfo, Path... filePaths) {
return new Builder(typeInfo, filePaths);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ private static byte[] serialize(
CsvRowDataSerializationSchema schema =
InstantiationUtil.deserializeObject(
InstantiationUtil.serializeObject(serSchemaBuilder.build()),
CsvRowDeSerializationSchemaTest.class.getClassLoader());
CsvRowDataSerDeSchemaTest.class.getClassLoader());
open(schema);
return schema.serialize(row);
}
Expand All @@ -487,7 +487,7 @@ private static RowData deserialize(
CsvRowDataDeserializationSchema schema =
InstantiationUtil.deserializeObject(
InstantiationUtil.serializeObject(deserSchemaBuilder.build()),
CsvRowDeSerializationSchemaTest.class.getClassLoader());
CsvRowDataSerDeSchemaTest.class.getClassLoader());
open(schema);
return schema.deserialize(csv != null ? csv.getBytes() : null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.formats.csv;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
Expand Down Expand Up @@ -57,13 +57,8 @@
* Row}.
*
* <p>Failure during deserialization are forwarded as wrapped {@link IOException}s.
*
* @deprecated The format was developed for the Table API users and will not be maintained for
* DataStream API users anymore. Either use Table API or switch to Data Stream, defining your
* own {@link DeserializationSchema}.
*/
@PublicEvolving
@Deprecated
@Internal
public final class CsvRowDeserializationSchema implements DeserializationSchema<Row> {

private static final long serialVersionUID = 2135553495874539201L;
Expand Down Expand Up @@ -98,7 +93,6 @@ public void open(InitializationContext context) throws Exception {
}

/** A builder for creating a {@link CsvRowDeserializationSchema}. */
@PublicEvolving
public static class Builder {

private final RowTypeInfo typeInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.formats.csv;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
Expand Down Expand Up @@ -57,13 +58,8 @@
*
* <p>Result <code>byte[]</code> messages can be deserialized using {@link
* CsvRowDeserializationSchema}.
*
* @deprecated The format was developed for the Table API users and will not be maintained for
* DataStream API users anymore. Either use Table API or switch to Data Stream, defining your
* own {@link SerializationSchema}.
*/
@PublicEvolving
@Deprecated
@Internal
public final class CsvRowSerializationSchema implements SerializationSchema<Row> {

private static final long serialVersionUID = 2098447220136965L;
Expand Down

0 comments on commit 215cc04

Please sign in to comment.