Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sink): add cassandra batch size and fix bigquery array null #15516

Merged
merged 5 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration_tests/big-query-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ FROM
-- bigquery.dataset= '${dataset_id}',
-- bigquery.table= '${table_id}',
-- access_key = '${aws_access_key}',
-- secret_access = '${aws_secret_access}',
-- secret_key = '${aws_secret_key}',
-- region = '${aws_region}',
-- force_append_only='true',
-- );
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ public class CassandraConfig extends CommonSinkConfig {
@JsonProperty(value = "cassandra.password")
private String password;

@JsonProperty(value = "cassandra.max_batch_rows")
private Integer maxBatchRows = 512;

@JsonProperty(value = "cassandra.request_timeout_ms")
private Integer requestTimeoutMs = 2000;

@JsonCreator
public CassandraConfig(
@JsonProperty(value = "cassandra.url") String url,
Expand Down Expand Up @@ -93,4 +99,29 @@ public CassandraConfig withPassword(String password) {
this.password = password;
return this;
}

public Integer getMaxBatchRows() {
return maxBatchRows;
}

public CassandraConfig withMaxBatchRows(Integer maxBatchRows) {
if (maxBatchRows > 65536 || maxBatchRows < 1) {
throw new IllegalArgumentException(
"cassandra.max_batch_rows must be <= 65535 and >= 1");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: better to include the maxBatchRows in the error message.

}
this.maxBatchRows = maxBatchRows;
return this;
}

public Integer getRequestTimeoutMs() {
return requestTimeoutMs;
}

public CassandraConfig withRequestTimeoutMs(Integer requestTimeoutMs) {
if (requestTimeoutMs < 1) {
throw new IllegalArgumentException("cassandra.request_timeout_ms must be >= 1");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: better to include requestTimeoutMs in the error message.

}
this.requestTimeoutMs = requestTimeoutMs;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.cql.*;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkRow;
Expand All @@ -34,7 +36,6 @@

public class CassandraSink extends SinkWriterBase {
private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class);
private static final Integer MAX_BATCH_SIZE = 1024 * 16;

private final CqlSession session;
private final List<SinkRow> updateRowCache = new ArrayList<>(1);
Expand All @@ -51,9 +52,16 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) {
throw new IllegalArgumentException(
"Invalid cassandraURL: expected `host:port`, got " + url);
}

DriverConfigLoader loader =
DriverConfigLoader.programmaticBuilder()
.withInt(DefaultDriverOption.REQUEST_TIMEOUT, config.getRequestTimeoutMs())
.build();

// check connection
CqlSessionBuilder sessionBuilder =
CqlSession.builder()
.withConfigLoader(loader)
.addContactPoint(
new InetSocketAddress(hostPort[0], Integer.parseInt(hostPort[1])))
.withKeyspace(config.getKeyspace())
Expand Down Expand Up @@ -163,7 +171,7 @@ private void write_upsert(Iterator<SinkRow> rows) {
}

private void tryCommit() {
if (batchBuilder.getStatementsCount() >= MAX_BATCH_SIZE) {
if (batchBuilder.getStatementsCount() >= config.getMaxBatchRows()) {
sync();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl AwsAuthProps {
),
))
} else {
bail!("Both \"access_key\" and \"secret_access\" are required.")
bail!("Both \"access_key\" and \"secret_key\" are required.")
}
}

Expand Down
40 changes: 24 additions & 16 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,12 @@ use risingwave_common::catalog::Schema;
use risingwave_common::types::DataType;
use serde_derive::Deserialize;
use serde_json::Value;
use serde_with::serde_as;
use serde_with::{serde_as, DisplayFromStr};
use url::Url;
use with_options::WithOptions;
use yup_oauth2::ServiceAccountKey;

use super::encoder::{
DateHandlingMode, JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
TimestamptzHandlingMode,
};
use super::encoder::{JsonEncoder, RowEncoder};
use super::writer::LogSinkerOf;
use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
use crate::aws_utils::load_file_descriptor_from_s3;
Expand All @@ -47,8 +44,8 @@ use crate::sink::{
};

pub const BIGQUERY_SINK: &str = "bigquery";
const BIGQUERY_INSERT_MAX_NUMS: usize = 1024;

#[serde_as]
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct BigQueryCommon {
#[serde(rename = "bigquery.local.path")]
Expand All @@ -61,6 +58,13 @@ pub struct BigQueryCommon {
pub dataset: String,
#[serde(rename = "bigquery.table")]
pub table: String,
#[serde(rename = "bigquery.max_batch_rows", default = "default_max_batch_rows")]
#[serde_as(as = "DisplayFromStr")]
hzxa21 marked this conversation as resolved.
Show resolved Hide resolved
pub max_batch_rows: usize,
}

fn default_max_batch_rows() -> usize {
1024
}

impl BigQueryCommon {
Expand Down Expand Up @@ -312,14 +316,7 @@ impl BigQuerySinkWriter {
client,
is_append_only,
insert_request: TableDataInsertAllRequest::new(),
row_encoder: JsonEncoder::new(
schema,
None,
DateHandlingMode::String,
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
TimeHandlingMode::Milli,
),
row_encoder: JsonEncoder::new_with_bigquery(schema, None),
})
}

Expand All @@ -339,7 +336,11 @@ impl BigQuerySinkWriter {
self.insert_request
.add_rows(insert_vec)
.map_err(|e| SinkError::BigQuery(e.into()))?;
if self.insert_request.len().ge(&BIGQUERY_INSERT_MAX_NUMS) {
if self
.insert_request
.len()
.ge(&self.config.common.max_batch_rows)
{
self.insert_data().await?;
}
Ok(())
Expand All @@ -349,7 +350,8 @@ impl BigQuerySinkWriter {
if !self.insert_request.is_empty() {
let insert_request =
mem::replace(&mut self.insert_request, TableDataInsertAllRequest::new());
self.client
let request = self
.client
.tabledata()
.insert_all(
&self.config.common.project,
Expand All @@ -359,6 +361,12 @@ impl BigQuerySinkWriter {
)
.await
.map_err(|e| SinkError::BigQuery(e.into()))?;
if let Some(error) = request.insert_errors {
return Err(SinkError::BigQuery(anyhow::anyhow!(
"Insert error: {:?}",
error
)));
}
}
Ok(())
}
Expand Down
9 changes: 2 additions & 7 deletions src/connector/src/sink/doris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use super::doris_starrocks_connector::{
POOL_IDLE_TIMEOUT,
};
use super::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode};
use crate::sink::encoder::{JsonEncoder, RowEncoder};
use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkWriterParam};

Expand Down Expand Up @@ -294,12 +294,7 @@ impl DorisSinkWriter {
inserter_inner_builder: doris_insert_builder,
is_append_only,
client: None,
row_encoder: JsonEncoder::new_with_doris(
schema,
None,
TimestampHandlingMode::String,
decimal_map,
),
row_encoder: JsonEncoder::new_with_doris(schema, None, decimal_map),
})
}

Expand Down
35 changes: 27 additions & 8 deletions src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,14 @@ impl JsonEncoder {
pub fn new_with_doris(
schema: Schema,
col_indices: Option<Vec<usize>>,
timestamp_handling_mode: TimestampHandlingMode,
map: HashMap<String, (u8, u8)>,
) -> Self {
Self {
schema,
col_indices,
time_handling_mode: TimeHandlingMode::Milli,
date_handling_mode: DateHandlingMode::String,
timestamp_handling_mode,
timestamp_handling_mode: TimestampHandlingMode::String,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
custom_json_type: CustomJsonType::Doris(map),
kafka_connect: None,
Expand All @@ -101,21 +100,33 @@ impl JsonEncoder {
pub fn new_with_starrocks(
schema: Schema,
col_indices: Option<Vec<usize>>,
timestamp_handling_mode: TimestampHandlingMode,
map: HashMap<String, (u8, u8)>,
) -> Self {
Self {
schema,
col_indices,
time_handling_mode: TimeHandlingMode::Milli,
date_handling_mode: DateHandlingMode::String,
timestamp_handling_mode,
timestamp_handling_mode: TimestampHandlingMode::String,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
custom_json_type: CustomJsonType::StarRocks(map),
kafka_connect: None,
}
}

pub fn new_with_bigquery(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
Self {
schema,
col_indices,
time_handling_mode: TimeHandlingMode::Milli,
date_handling_mode: DateHandlingMode::String,
timestamp_handling_mode: TimestampHandlingMode::String,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
custom_json_type: CustomJsonType::BigQuery,
kafka_connect: None,
}
}

pub fn with_kafka_connect(self, kafka_connect: KafkaConnectParams) -> Self {
Self {
kafka_connect: Some(Arc::new(kafka_connect)),
Expand Down Expand Up @@ -192,7 +203,15 @@ fn datum_to_json_object(
custom_json_type: &CustomJsonType,
) -> ArrayResult<Value> {
let scalar_ref = match datum {
None => return Ok(Value::Null),
None => {
if let CustomJsonType::BigQuery = custom_json_type
&& matches!(field.data_type(), DataType::List(_))
{
return Ok(Value::Array(vec![]));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include the BigQuery doc link as a comment here to explain why we need to do this?

} else {
return Ok(Value::Null);
}
}
Some(datum) => datum,
};

Expand Down Expand Up @@ -239,7 +258,7 @@ fn datum_to_json_object(
}
json!(v_string)
}
CustomJsonType::Es | CustomJsonType::None => {
CustomJsonType::Es | CustomJsonType::None | CustomJsonType::BigQuery => {
json!(v.to_text())
}
},
Expand Down Expand Up @@ -291,7 +310,7 @@ fn datum_to_json_object(
}
(DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type {
CustomJsonType::Es | CustomJsonType::StarRocks(_) => JsonbVal::from(jsonb_ref).take(),
CustomJsonType::Doris(_) | CustomJsonType::None => {
CustomJsonType::Doris(_) | CustomJsonType::None | CustomJsonType::BigQuery => {
json!(jsonb_ref.to_string())
}
},
Expand Down Expand Up @@ -342,7 +361,7 @@ fn datum_to_json_object(
"starrocks can't support struct".to_string(),
));
}
CustomJsonType::Es | CustomJsonType::None => {
CustomJsonType::Es | CustomJsonType::None | CustomJsonType::BigQuery => {
let mut map = Map::with_capacity(st.len());
for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
st.iter()
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ pub enum CustomJsonType {
Es,
// starrocks' need jsonb is struct
StarRocks(HashMap<String, (u8, u8)>),
// bigquery need null array -> []
BigQuery,
None,
}

Expand Down
9 changes: 2 additions & 7 deletions src/connector/src/sink/starrocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use with_options::WithOptions;
use super::doris_starrocks_connector::{
HeaderBuilder, InserterInner, InserterInnerBuilder, DORIS_SUCCESS_STATUS, STARROCKS_DELETE_SIGN,
};
use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode};
use super::encoder::{JsonEncoder, RowEncoder};
use super::writer::LogSinkerOf;
use super::{SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
use crate::sink::writer::SinkWriterExt;
Expand Down Expand Up @@ -367,12 +367,7 @@ impl StarrocksSinkWriter {
inserter_innet_builder: starrocks_insert_builder,
is_append_only,
client: None,
row_encoder: JsonEncoder::new_with_starrocks(
schema,
None,
TimestampHandlingMode::String,
decimal_map,
),
row_encoder: JsonEncoder::new_with_starrocks(schema, None, decimal_map),
})
}

Expand Down
Loading