From ace4c4b865f67ebdbb2d543d57e261b37fb2f827 Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Fri, 8 Mar 2024 11:48:45 +0800 Subject: [PATCH] fix(sink): add cassandra batch size and fix bigquery array null (#15516) --- .../big-query-sink/create_sink.sql | 2 +- .../risingwave/connector/CassandraConfig.java | 32 +++++++++++++++ .../risingwave/connector/CassandraSink.java | 12 +++++- src/connector/src/common.rs | 2 +- src/connector/src/sink/big_query.rs | 40 +++++++++++-------- src/connector/src/sink/doris.rs | 9 +---- src/connector/src/sink/encoder/json.rs | 36 +++++++++++++---- src/connector/src/sink/encoder/mod.rs | 2 + src/connector/src/sink/starrocks.rs | 9 +---- src/connector/with_options_sink.yaml | 4 ++ 10 files changed, 106 insertions(+), 42 deletions(-) diff --git a/integration_tests/big-query-sink/create_sink.sql b/integration_tests/big-query-sink/create_sink.sql index d9f49fcb6c8ee..3b8ed3b3ef9d8 100644 --- a/integration_tests/big-query-sink/create_sink.sql +++ b/integration_tests/big-query-sink/create_sink.sql @@ -23,7 +23,7 @@ FROM -- bigquery.dataset= '${dataset_id}', -- bigquery.table= '${table_id}', -- access_key = '${aws_access_key}', - -- secret_access = '${aws_secret_access}', + -- secret_key = '${aws_secret_key}', -- region = '${aws_region}', -- force_append_only='true', -- ); diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java index 7c883335cfc23..bfc40111818a4 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java @@ -42,6 +42,12 @@ public class CassandraConfig extends CommonSinkConfig { @JsonProperty(value = "cassandra.password") private String password; + @JsonProperty(value = "cassandra.max_batch_rows") + private Integer maxBatchRows = 512; + + @JsonProperty(value = "cassandra.request_timeout_ms") + private Integer requestTimeoutMs = 2000; + @JsonCreator public CassandraConfig( @JsonProperty(value = "cassandra.url") String url, @@ -93,4 +99,30 @@ public CassandraConfig withPassword(String password) { this.password = password; return this; } + + public Integer getMaxBatchRows() { + return maxBatchRows; + } + + public CassandraConfig withMaxBatchRows(Integer maxBatchRows) { + if (maxBatchRows > 65536 || maxBatchRows < 1) { + throw new IllegalArgumentException( + "Cassandra sink option: maxBatchRows must be <= 65535 and >= 1"); + } + this.maxBatchRows = maxBatchRows; + return this; + } + + public Integer getRequestTimeoutMs() { + return requestTimeoutMs; + } + + public CassandraConfig withRequestTimeoutMs(Integer requestTimeoutMs) { + if (requestTimeoutMs < 1) { + throw new IllegalArgumentException( + "Cassandra sink option: requestTimeoutMs must be >= 1"); + } + this.requestTimeoutMs = requestTimeoutMs; + return this; + } } diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java index b0b7fb93c7b51..2f8a035911f24 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java @@ -18,6 +18,8 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.CqlSessionBuilder; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverConfigLoader; import com.datastax.oss.driver.api.core.cql.*; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.SinkRow; @@ -34,7 +36,6 @@ public class CassandraSink extends SinkWriterBase { private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class); - private static final Integer MAX_BATCH_SIZE = 1024 * 16; private final CqlSession session; private final List updateRowCache = new ArrayList<>(1); @@ -51,9 +52,16 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) { throw new IllegalArgumentException( "Invalid cassandraURL: expected `host:port`, got " + url); } + + DriverConfigLoader loader = + DriverConfigLoader.programmaticBuilder() + .withInt(DefaultDriverOption.REQUEST_TIMEOUT, config.getRequestTimeoutMs()) + .build(); + // check connection CqlSessionBuilder sessionBuilder = CqlSession.builder() + .withConfigLoader(loader) .addContactPoint( new InetSocketAddress(hostPort[0], Integer.parseInt(hostPort[1]))) .withKeyspace(config.getKeyspace()) @@ -163,7 +171,7 @@ private void write_upsert(Iterator rows) { } private void tryCommit() { - if (batchBuilder.getStatementsCount() >= MAX_BATCH_SIZE) { + if (batchBuilder.getStatementsCount() >= config.getMaxBatchRows()) { sync(); } } diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index d5944eb07fa3c..66afaf55f0cc1 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -100,7 +100,7 @@ impl AwsAuthProps { ), )) } else { - bail!("Both \"access_key\" and \"secret_access\" are required.") + bail!("Both \"access_key\" and \"secret_key\" are required.") } } diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 41cfe7783819f..42918a6b72dfe 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -28,15 +28,12 @@ use risingwave_common::catalog::Schema; use risingwave_common::types::DataType; use serde_derive::Deserialize; use serde_json::Value; -use serde_with::serde_as; +use serde_with::{serde_as, DisplayFromStr}; use url::Url; use with_options::WithOptions; use yup_oauth2::ServiceAccountKey; -use super::encoder::{ - DateHandlingMode, JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode, - TimestamptzHandlingMode, -}; +use super::encoder::{JsonEncoder, RowEncoder}; use super::writer::LogSinkerOf; use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; use crate::aws_utils::load_file_descriptor_from_s3; @@ -47,8 +44,8 @@ use crate::sink::{ }; pub const BIGQUERY_SINK: &str = "bigquery"; -const BIGQUERY_INSERT_MAX_NUMS: usize = 1024; +#[serde_as] #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct BigQueryCommon { #[serde(rename = "bigquery.local.path")] @@ -61,6 +58,13 @@ pub struct BigQueryCommon { pub dataset: String, #[serde(rename = "bigquery.table")] pub table: String, + #[serde(rename = "bigquery.max_batch_rows", default = "default_max_batch_rows")] + #[serde_as(as = "DisplayFromStr")] + pub max_batch_rows: usize, +} + +fn default_max_batch_rows() -> usize { + 1024 } impl BigQueryCommon { @@ -312,14 +316,7 @@ impl BigQuerySinkWriter { client, is_append_only, insert_request: TableDataInsertAllRequest::new(), - row_encoder: JsonEncoder::new( - schema, - None, - DateHandlingMode::String, - TimestampHandlingMode::String, - TimestamptzHandlingMode::UtcString, - TimeHandlingMode::Milli, - ), + row_encoder: JsonEncoder::new_with_bigquery(schema, None), }) } @@ -339,7 +336,11 @@ impl BigQuerySinkWriter { self.insert_request .add_rows(insert_vec) .map_err(|e| SinkError::BigQuery(e.into()))?; - if self.insert_request.len().ge(&BIGQUERY_INSERT_MAX_NUMS) { + if self + .insert_request + .len() + .ge(&self.config.common.max_batch_rows) + { self.insert_data().await?; } Ok(()) @@ -349,7 +350,8 @@ impl BigQuerySinkWriter { if !self.insert_request.is_empty() { let insert_request = mem::replace(&mut self.insert_request, TableDataInsertAllRequest::new()); - self.client + let request = self + .client .tabledata() .insert_all( &self.config.common.project, @@ -359,6 +361,12 @@ impl BigQuerySinkWriter { ) .await .map_err(|e| SinkError::BigQuery(e.into()))?; + if let Some(error) = request.insert_errors { + return Err(SinkError::BigQuery(anyhow::anyhow!( + "Insert error: {:?}", + error + ))); + } } Ok(()) } diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index 64ab8121aaaa7..ea69a03af25c9 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -39,7 +39,7 @@ use super::doris_starrocks_connector::{ POOL_IDLE_TIMEOUT, }; use super::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; -use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; +use crate::sink::encoder::{JsonEncoder, RowEncoder}; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkWriterParam}; @@ -294,12 +294,7 @@ impl DorisSinkWriter { inserter_inner_builder: doris_insert_builder, is_append_only, client: None, - row_encoder: JsonEncoder::new_with_doris( - schema, - None, - TimestampHandlingMode::String, - decimal_map, - ), + row_encoder: JsonEncoder::new_with_doris(schema, None, decimal_map), }) } diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index eb5c7b129385d..64a06ff70770f 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -83,7 +83,6 @@ impl JsonEncoder { pub fn new_with_doris( schema: Schema, col_indices: Option>, - timestamp_handling_mode: TimestampHandlingMode, map: HashMap, ) -> Self { Self { @@ -91,7 +90,7 @@ impl JsonEncoder { col_indices, time_handling_mode: TimeHandlingMode::Milli, date_handling_mode: DateHandlingMode::String, - timestamp_handling_mode, + timestamp_handling_mode: TimestampHandlingMode::String, timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, custom_json_type: CustomJsonType::Doris(map), kafka_connect: None, @@ -101,7 +100,6 @@ impl JsonEncoder { pub fn new_with_starrocks( schema: Schema, col_indices: Option>, - timestamp_handling_mode: TimestampHandlingMode, map: HashMap, ) -> Self { Self { @@ -109,13 +107,26 @@ impl JsonEncoder { col_indices, time_handling_mode: TimeHandlingMode::Milli, date_handling_mode: DateHandlingMode::String, - timestamp_handling_mode, + timestamp_handling_mode: TimestampHandlingMode::String, timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, custom_json_type: CustomJsonType::StarRocks(map), kafka_connect: None, } } + pub fn new_with_bigquery(schema: Schema, col_indices: Option>) -> Self { + Self { + schema, + col_indices, + time_handling_mode: TimeHandlingMode::Milli, + date_handling_mode: DateHandlingMode::String, + timestamp_handling_mode: TimestampHandlingMode::String, + timestamptz_handling_mode: TimestamptzHandlingMode::UtcString, + custom_json_type: CustomJsonType::BigQuery, + kafka_connect: None, + } + } + pub fn with_kafka_connect(self, kafka_connect: KafkaConnectParams) -> Self { Self { kafka_connect: Some(Arc::new(kafka_connect)), @@ -192,7 +203,16 @@ fn datum_to_json_object( custom_json_type: &CustomJsonType, ) -> ArrayResult { let scalar_ref = match datum { - None => return Ok(Value::Null), + None => { + if let CustomJsonType::BigQuery = custom_json_type + && matches!(field.data_type(), DataType::List(_)) + { + // Bigquery need to convert null of array to empty array https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types + return Ok(Value::Array(vec![])); + } else { + return Ok(Value::Null); + } + } Some(datum) => datum, }; @@ -239,7 +259,7 @@ fn datum_to_json_object( } json!(v_string) } - CustomJsonType::Es | CustomJsonType::None => { + CustomJsonType::Es | CustomJsonType::None | CustomJsonType::BigQuery => { json!(v.to_text()) } }, @@ -291,7 +311,7 @@ fn datum_to_json_object( } (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type { CustomJsonType::Es | CustomJsonType::StarRocks(_) => JsonbVal::from(jsonb_ref).take(), - CustomJsonType::Doris(_) | CustomJsonType::None => { + CustomJsonType::Doris(_) | CustomJsonType::None | CustomJsonType::BigQuery => { json!(jsonb_ref.to_string()) } }, @@ -342,7 +362,7 @@ fn datum_to_json_object( "starrocks can't support struct".to_string(), )); } - CustomJsonType::Es | CustomJsonType::None => { + CustomJsonType::Es | CustomJsonType::None | CustomJsonType::BigQuery => { let mut map = Map::with_capacity(st.len()); for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug( st.iter() diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index 3c76803c8e0f1..3254447e27077 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -144,6 +144,8 @@ pub enum CustomJsonType { Es, // starrocks' need jsonb is struct StarRocks(HashMap), + // bigquery need null array -> [] + BigQuery, None, } diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index edefaf7aa1201..2397c942746d7 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -35,7 +35,7 @@ use with_options::WithOptions; use super::doris_starrocks_connector::{ HeaderBuilder, InserterInner, InserterInnerBuilder, DORIS_SUCCESS_STATUS, STARROCKS_DELETE_SIGN, }; -use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; +use super::encoder::{JsonEncoder, RowEncoder}; use super::writer::LogSinkerOf; use super::{SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; use crate::sink::writer::SinkWriterExt; @@ -367,12 +367,7 @@ impl StarrocksSinkWriter { inserter_innet_builder: starrocks_insert_builder, is_append_only, client: None, - row_encoder: JsonEncoder::new_with_starrocks( - schema, - None, - TimestampHandlingMode::String, - decimal_map, - ), + row_encoder: JsonEncoder::new_with_starrocks(schema, None, decimal_map), }) } diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 35b06ec33fb76..28b492f0c80ea 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -17,6 +17,10 @@ BigQueryConfig: - name: bigquery.table field_type: String required: true + - name: bigquery.max_batch_rows + field_type: usize + required: false + default: '1024' - name: region field_type: String required: false