diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java index 80181fe26090c..bfc40111818a4 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java @@ -107,7 +107,7 @@ public Integer getMaxBatchRows() { public CassandraConfig withMaxBatchRows(Integer maxBatchRows) { if (maxBatchRows > 65536 || maxBatchRows < 1) { throw new IllegalArgumentException( - "cassandra.max_batch_rows must be <= 65535 and >= 1"); + "Cassandra sink option: maxBatchRows must be <= 65535 and >= 1"); } this.maxBatchRows = maxBatchRows; return this; @@ -119,7 +119,8 @@ public Integer getRequestTimeoutMs() { public CassandraConfig withRequestTimeoutMs(Integer requestTimeoutMs) { if (requestTimeoutMs < 1) { - throw new IllegalArgumentException("cassandra.request_timeout_ms must be >= 1"); + throw new IllegalArgumentException( + "Cassandra sink option: requestTimeoutMs must be >= 1"); } this.requestTimeoutMs = requestTimeoutMs; return this; diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index a51b9830ec6de..64a06ff70770f 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -207,6 +207,7 @@ fn datum_to_json_object( if let CustomJsonType::BigQuery = custom_json_type && matches!(field.data_type(), DataType::List(_)) { + // Bigquery need to convert null of array to empty array https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types return Ok(Value::Array(vec![])); } else { return Ok(Value::Null); diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 35b06ec33fb76..28b492f0c80ea 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -17,6 +17,10 @@ BigQueryConfig: - name: bigquery.table field_type: String required: true + - name: bigquery.max_batch_rows + field_type: usize + required: false + default: '1024' - name: region field_type: String required: false