diff --git a/e2e_test/sink/remote/jdbc.check.pg.slt b/e2e_test/sink/remote/jdbc.check.pg.slt index 6293c44a5a444..7cf6faa7b0fb6 100644 --- a/e2e_test/sink/remote/jdbc.check.pg.slt +++ b/e2e_test/sink/remote/jdbc.check.pg.slt @@ -22,13 +22,13 @@ select * from t_remote_1 order by id; query III select * from biz.t_types order by id; ---- -1 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"Value 1","Value 2"} {12.345,56.789} -2 Varcharvalue2 Textvalue2 234 567 890 23.45 67.89 1.23 f 2023-05-23 23:45:01 2023-05-23 23:45:01 2 days {"key": "value2"} {"Value 3","Value 4"} {43.21,65.432} -3 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"How're you?","\"hello\\ \\world\""} {12.345,56.789} +1 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"Value 1","Value 2"} {12.345,56.789} {1,2,3} {1,2,3} {1,2,3} {12.3,56.7} +2 Varcharvalue2 Textvalue2 234 567 890 23.45 67.89 1.23 f 2023-05-23 23:45:01 2023-05-23 23:45:01 2 days {"key": "value2"} {"Value 3","Value 4"} {43.21,65.432} {4,5,6} {4,5,6} {4,5,6} {43.2,65.4} +3 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"How're you?","\"hello\\ \\world\""} {12.345,56.789} {1,2,3} {1,2,3} {1,2,3} {43.2,65.4} query IT -select * from t_append_only order by v1, v2; +select * from t_append_only order by v1,v2; ---- 1 aaa 1 bbb diff --git a/e2e_test/sink/remote/jdbc.load.slt b/e2e_test/sink/remote/jdbc.load.slt index 70ad3f9a3a42b..a3bc63e48f7de 100644 --- a/e2e_test/sink/remote/jdbc.load.slt +++ b/e2e_test/sink/remote/jdbc.load.slt @@ -50,7 +50,11 @@ CREATE TABLE rw_typed_data ( interval_column INTERVAL, jsonb_column JSONB, array_column VARCHAR[], - array_column2 FLOAT[] + array_column2 FLOAT[], + array_column3 SMALLINT[], + array_column4 INTEGER[], + array_column5 BIGINT[], + array_column6 DOUBLE PRECISION[], ); statement ok @@ -196,10 +200,10 @@ INSERT INTO t_remote_1 VALUES (6, 'Varchar value 6', 'Text value 6', 789, 123, 456, 67.89, 34.56, 78.91, FALSE, '2023-05-27', '23:45:01', '2023-05-27 23:45:01', '2023-05-27 23:45:01', '2 years 3 months 4 days 5 hours 6 minutes 7 seconds', '{"key": "value6"}', E'\\xDEADBABE'); statement ok -INSERT INTO rw_typed_data (id, varchar_column, text_column, integer_column, smallint_column, bigint_column, decimal_column, real_column, double_column, boolean_column, date_column, time_column, timestamp_column, interval_column, jsonb_column, array_column, array_column2) VALUES - (1, 'Varcharvalue1', 'Textvalue1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '1 day', '{"key": "value"}', ARRAY['Value 1', 'Value 2'], '{12.345,56.789}'), - (2, 'Varcharvalue2', 'Textvalue2', 234, 567, 890, 23.45, 67.89, 01.23, FALSE, '2023-05-23', '23:45:01', '2023-05-23 23:45:01', '2 days', '{"key": "value2"}', ARRAY['Value 3', 'Value 4'], '{43.21,65.432}'), - (3, 'Varcharvalue1', 'Textvalue1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '1 day', '{"key": "value"}', ARRAY['How''re you?', '"hello\ \world"'], ARRAY[12.345,56.789]); +INSERT INTO rw_typed_data (id, varchar_column, text_column, integer_column, smallint_column, bigint_column, decimal_column, real_column, double_column, boolean_column, date_column, time_column, timestamp_column, interval_column, jsonb_column, array_column, array_column2, array_column3, array_column4, array_column5, array_column6) VALUES + (1, 'Varcharvalue1', 'Textvalue1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '1 day', '{"key": "value"}', ARRAY['Value 1', 'Value 2'], '{12.345,56.789}', '{1, 2, 3}', '{1, 2, 3}', '{1, 2, 3}', '{12.3,56.7}'), + (2, 'Varcharvalue2', 'Textvalue2', 234, 567, 890, 23.45, 67.89, 01.23, FALSE, '2023-05-23', '23:45:01', '2023-05-23 23:45:01', '2 days', '{"key": "value2"}', ARRAY['Value 3', 'Value 4'], '{43.21,65.432}', '{4, 5, 6}', '{4, 5, 6}', '{4, 5, 6}', '{43.2,65.4}'), + (3, 'Varcharvalue1', 'Textvalue1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '1 day', '{"key": "value"}', ARRAY['How''re you?', '"hello\ \world"'], ARRAY[12.345,56.789], ARRAY[1, 2, 3], ARRAY[1, 2, 3], ARRAY[1, 2, 3], ARRAY[43.2,65.4]); statement ok FLUSH; diff --git a/e2e_test/sink/remote/mysql_create_table.sql b/e2e_test/sink/remote/mysql_create_table.sql index 0cbe15f7dcb8e..9eab3d0f5e13a 100644 --- a/e2e_test/sink/remote/mysql_create_table.sql +++ b/e2e_test/sink/remote/mysql_create_table.sql @@ -47,5 +47,9 @@ CREATE TABLE t_types ( interval_column VARCHAR(100), jsonb_column JSON, array_column LONGTEXT, - array_column2 LONGTEXT + array_column2 LONGTEXT, + array_column3 LONGTEXT, + array_column4 LONGTEXT, + array_column5 LONGTEXT, + array_column6 LONGTEXT ); diff --git a/e2e_test/sink/remote/mysql_expected_result_2.tsv b/e2e_test/sink/remote/mysql_expected_result_2.tsv index 87ac3cb3bd123..061ee02d39d17 100644 --- a/e2e_test/sink/remote/mysql_expected_result_2.tsv +++ b/e2e_test/sink/remote/mysql_expected_result_2.tsv @@ -1,3 +1,3 @@ -1 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 1 2023-05-22 12:34:56 2023-05-22 12:34:56 P0Y0M1DT0H0M0S {"key": "value"} Value 1,Value 2 12.345,56.789 -2 Varcharvalue2 Textvalue2 234 567 890 23.45 67.89 1.23 0 2023-05-23 23:45:01 2023-05-23 23:45:01 P0Y0M2DT0H0M0S {"key": "value2"} Value 3,Value 4 43.21,65.432 -3 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 1 2023-05-22 12:34:56 2023-05-22 12:34:56 P0Y0M1DT0H0M0S {"key": "value"} How're you?,"hello\ \world" 12.345,56.789 +1 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 1 2023-05-22 12:34:56 2023-05-22 12:34:56 P0Y0M1DT0H0M0S {"key": "value"} Value 1,Value 2 12.345,56.789 1,2,3 1,2,3 1,2,3 12.3,56.7 +2 Varcharvalue2 Textvalue2 234 567 890 23.45 67.89 1.23 0 2023-05-23 23:45:01 2023-05-23 23:45:01 P0Y0M2DT0H0M0S {"key": "value2"} Value 3,Value 4 43.21,65.432 4,5,6 4,5,6 4,5,6 43.2,65.4 +3 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 1 2023-05-22 12:34:56 2023-05-22 12:34:56 P0Y0M1DT0H0M0S {"key": "value"} How're you?,"hello\ \world" 12.345,56.789 1,2,3 1,2,3 1,2,3 43.2,65.4 diff --git a/e2e_test/sink/remote/pg_create_table.sql b/e2e_test/sink/remote/pg_create_table.sql index fd06aca93ce7b..3677dbb067131 100644 --- a/e2e_test/sink/remote/pg_create_table.sql +++ b/e2e_test/sink/remote/pg_create_table.sql @@ -73,7 +73,11 @@ CREATE TABLE biz.t_types ( interval_column INTERVAL, jsonb_column JSONB, array_column VARCHAR[], - array_column2 DECIMAL[] + array_column2 FLOAT[], + array_column3 SMALLINT[], + array_column4 INTEGER[], + array_column5 BIGINT[], + array_column6 DOUBLE PRECISION[] ); CREATE TABLE biz.t2 ( diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/PostgresDialect.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/PostgresDialect.java index 570e2beaf5a67..5bbdf1116f2c1 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/PostgresDialect.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/PostgresDialect.java @@ -16,9 +16,11 @@ import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.SinkRow; +import com.risingwave.proto.Data.DataType.TypeName; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.HashMap; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -33,6 +35,26 @@ public PostgresDialect(int[] columnSqlTypes) { this.columnSqlTypes = columnSqlTypes; } + private static final HashMap RW_TYPE_TO_JDBC_TYPE_NAME; + + static { + RW_TYPE_TO_JDBC_TYPE_NAME = new HashMap(); + RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.INT16, "int2"); + RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.INT32, "int4"); + RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.INT64, "int8"); + RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.FLOAT, "float4"); + RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.DOUBLE, "float8"); + RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.BOOLEAN, "bool"); + RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.VARCHAR, "varchar"); + RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.DECIMAL, "numeric"); + RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.TIME, "time"); + RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.TIMESTAMP, "timestamp"); + RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.INTERVAL, "varchar"); + RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.DATE, "date"); + RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.TIMESTAMPTZ, "timestamptz"); + RW_TYPE_TO_JDBC_TYPE_NAME.put(TypeName.JSONB, "varchar"); + } + @Override public SchemaTableName createSchemaTableName(String schemaName, String tableName) { if (schemaName == null || schemaName.isBlank()) { @@ -115,9 +137,11 @@ public void bindInsertIntoStatement( Object[] objArray = (Object[]) val; assert (column.getDataType().getFieldTypeCount() == 1); var fieldType = column.getDataType().getFieldType(0); - stmt.setArray( - placeholderIdx++, - conn.createArrayOf(fieldType.getTypeName().name(), objArray)); + var typeName = RW_TYPE_TO_JDBC_TYPE_NAME.get(fieldType.getTypeName()); + if (typeName == null) { + typeName = fieldType.getTypeName().name(); + } + stmt.setArray(placeholderIdx++, conn.createArrayOf(typeName, objArray)); break; case VARCHAR: // since VARCHAR column may sink to a UUID column, we get the target type diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs index 4815cd7368370..c92ad2f146e6c 100644 --- a/src/jni_core/src/lib.rs +++ b/src/jni_core/src/lib.rs @@ -782,7 +782,7 @@ extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetArrayValu let obj = env.call_static_method( &class, "valueOf", - "(S)Ljava.lang.Short;", + "(S)Ljava/lang/Short;", &[JValue::from(v as jshort)], )?; if let JValueOwned::Object(o) = obj { @@ -793,7 +793,7 @@ extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetArrayValu let obj = env.call_static_method( &class, "valueOf", - "(I)Ljava.lang.Integer;", + "(I)Ljava/lang/Integer;", &[JValue::from(v as jint)], )?; if let JValueOwned::Object(o) = obj { @@ -804,7 +804,7 @@ extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetArrayValu let obj = env.call_static_method( &class, "valueOf", - "(J)Ljava.lang.Long;", + "(J)Ljava/lang/Long;", &[JValue::from(v as jlong)], )?; if let JValueOwned::Object(o) = obj {