diff --git a/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSink.java b/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSink.java index 413edeb10df81..fa892991489b7 100644 --- a/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSink.java +++ b/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSink.java @@ -25,6 +25,7 @@ import io.delta.standalone.actions.AddFile; import io.delta.standalone.exceptions.DeltaConcurrentModificationException; import java.io.IOException; +import java.sql.Timestamp; import java.util.*; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -80,7 +81,13 @@ public void write(Iterator rows) { case INSERT: GenericRecord record = new GenericData.Record(this.sinkSchema); for (int i = 0; i < this.sinkSchema.getFields().size(); i++) { - record.put(i, row.get(i)); + Object values; + if (row.get(i) instanceof Timestamp) { + values = ((Timestamp) row.get(i)).getTime(); + } else { + values = row.get(i); + } + record.put(i, values); } try { this.parquetWriter.write(record); diff --git a/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSinkUtil.java b/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSinkUtil.java index 4548b816a131c..8fc8fe2a9aac7 100644 --- a/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSinkUtil.java +++ b/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSinkUtil.java @@ -73,7 +73,7 @@ public static void checkSchema(TableSchema tableSchema, StructType schema) { } } - private static DataType convertType(Data.DataType.TypeName typeName) { + public static DataType convertType(Data.DataType.TypeName typeName) { switch (typeName) { case INT16: return new ShortType(); @@ -115,7 +115,9 @@ private static DataType convertType(Data.DataType.TypeName typeName) { public static Schema convertSchema(DeltaLog log, TableSchema tableSchema) { StructType schema = log.snapshot().getMetadata().getSchema(); - MessageType parquetSchema = ParquetSchemaConverter.deltaToParquet(schema); + MessageType parquetSchema = + ParquetSchemaConverter.deltaToParquet( + schema, ParquetSchemaConverter.ParquetOutputTimestampType.TIMESTAMP_MILLIS); return new AvroSchemaConverter().convert(parquetSchema); } }