From 99ed97d774c26cf041f5b5a20114c8b2f5319c40 Mon Sep 17 00:00:00 2001 From: libailin Date: Fri, 29 Nov 2024 09:52:41 +0800 Subject: [PATCH] [Feature-#1933][s3] Support more data type conversions --- .../s3/converter/S3SqlConverter.java | 60 ++++++++++++++++++- 1 file changed, 58 insertions(+), 2 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/converter/S3SqlConverter.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/converter/S3SqlConverter.java index 04566646ed..01fe4074d1 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/converter/S3SqlConverter.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/converter/S3SqlConverter.java @@ -23,16 +23,27 @@ import com.dtstack.chunjun.converter.IDeserializationConverter; import com.dtstack.chunjun.converter.ISerializationConverter; import com.dtstack.chunjun.throwable.UnsupportedTypeException; +import com.dtstack.chunjun.util.DateUtil; +import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.commons.lang3.StringUtils; + +import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.Time; +import java.sql.Timestamp; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.LocalTime; public class S3SqlConverter extends AbstractRowConverter { @@ -82,12 +93,41 @@ protected IDeserializationConverter createInternalConverter(LogicalType type) { return val -> Float.valueOf((String) val); case DOUBLE: return val -> Double.valueOf((String) val); + case DECIMAL: + final int precision = ((DecimalType) type).getPrecision(); + final int scale = ((DecimalType) type).getScale(); + return val -> + val instanceof BigInteger + ? DecimalData.fromBigDecimal( + new BigDecimal((BigInteger) val, 0), precision, scale) + : DecimalData.fromBigDecimal( + StringUtils.isNotEmpty(String.valueOf(val)) + ? new BigDecimal(String.valueOf(val)) + : BigDecimal.ZERO, + precision, + scale); + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return val -> { + if (val instanceof String) { + return TimestampData.fromTimestamp(Timestamp.valueOf((String) val)); + } else if (val instanceof LocalDateTime) { + return TimestampData.fromTimestamp(Timestamp.valueOf((LocalDateTime) val)); + } else { + return TimestampData.fromTimestamp(((Timestamp) val)); + } + }; case CHAR: case VARCHAR: return val -> StringData.fromString((String) val); case DATE: - return val -> - (int) ((Date.valueOf(String.valueOf(val))).toLocalDate().toEpochDay()); + return val -> { + if (StringUtils.isEmpty(String.valueOf(val))) { + return null; + } + Date date = new Date(DateUtil.stringToDate(String.valueOf(val)).getTime()); + return (int) date.toLocalDate().toEpochDay(); + }; case TIME_WITHOUT_TIME_ZONE: return val -> (int) @@ -124,6 +164,22 @@ protected ISerializationConverter createExternalConverter(LogicalType output[index] = Time.valueOf(LocalTime.ofNanoOfDay(val.getInt(index) * 1_000_000L)) .toString(); + case DECIMAL: + return (rowData, index, data) -> + data[index] = + String.valueOf( + rowData.getDecimal( + index, + ((DecimalType) type).getPrecision(), + ((DecimalType) type).getScale())); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return (rowData, index, data) -> + data[index] = + String.valueOf( + rowData.getTimestamp( + index, + ((TimestampType) type).getPrecision()) + .toTimestamp()); default: throw new UnsupportedTypeException(type.toString()); }