Skip to content

Commit

Permalink
add option
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Mar 7, 2024
1 parent 93fb9d4 commit 80be523
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 40 deletions.
2 changes: 1 addition & 1 deletion integration_tests/big-query-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ FROM
-- bigquery.dataset= '${dataset_id}',
-- bigquery.table= '${table_id}',
-- access_key = '${aws_access_key}',
-- secret_access = '${aws_secret_access}',
-- secret_key = '${aws_secret_key}',
-- region = '${aws_region}',
-- force_append_only='true',
-- );
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ CREATE table user_behaviors (
connector = 'datagen',
fields.user_id.kind = 'sequence',
fields.user_id.start = '1',
fields.user_id.end = '10000000',
fields.user_id.end = '1000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '1000000'
datagen.rows.per.second = '10'
) FORMAT PLAIN ENCODE JSON;

CREATE TABLE cassandra_types (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public class CassandraConfig extends CommonSinkConfig {
@JsonProperty(value = "cassandra.max_batch_rows")
private Integer maxBatchRows = 512;

@JsonProperty(value = "cassandra.request_timeout_ms")
private Integer requestTimeoutMs = 2000;

@JsonCreator
public CassandraConfig(
@JsonProperty(value = "cassandra.url") String url,
Expand Down Expand Up @@ -102,7 +105,23 @@ public Integer getMaxBatchRows() {
}

public CassandraConfig withMaxBatchRows(Integer maxBatchRows) {
if (maxBatchRows > 65536 || maxBatchRows < 1) {
throw new IllegalArgumentException(
"cassandra.max_batch_rows must be <= 65535 and >= 1");
}
this.maxBatchRows = maxBatchRows;
return this;
}

public Integer getRequestTimeoutMs() {
return requestTimeoutMs;
}

public CassandraConfig withRequestTimeoutMs(Integer requestTimeoutMs) {
if (requestTimeoutMs < 1) {
throw new IllegalArgumentException("cassandra.request_timeout_ms must be >= 1");
}
this.requestTimeoutMs = requestTimeoutMs;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.cql.*;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkRow;
Expand Down Expand Up @@ -50,9 +52,16 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) {
throw new IllegalArgumentException(
"Invalid cassandraURL: expected `host:port`, got " + url);
}

DriverConfigLoader loader =
DriverConfigLoader.programmaticBuilder()
.withInt(DefaultDriverOption.REQUEST_TIMEOUT, config.getRequestTimeoutMs())
.build();

// check connection
CqlSessionBuilder sessionBuilder =
CqlSession.builder()
.withConfigLoader(loader)
.addContactPoint(
new InetSocketAddress(hostPort[0], Integer.parseInt(hostPort[1])))
.withKeyspace(config.getKeyspace())
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl AwsAuthProps {
),
))
} else {
bail!("Both \"access_key\" and \"secret_access\" are required.")
bail!("Both \"access_key\" and \"secret_key\" are required.")
}
}

Expand Down
27 changes: 13 additions & 14 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ use url::Url;
use with_options::WithOptions;
use yup_oauth2::ServiceAccountKey;

use super::encoder::{
DateHandlingMode, JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
TimestamptzHandlingMode,
};
use super::encoder::{JsonEncoder, RowEncoder};
use super::writer::LogSinkerOf;
use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
use crate::aws_utils::load_file_descriptor_from_s3;
Expand All @@ -47,7 +44,6 @@ use crate::sink::{
};

pub const BIGQUERY_SINK: &str = "bigquery";
const BIGQUERY_INSERT_MAX_NUMS: usize = 1024;

#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct BigQueryCommon {
Expand All @@ -61,6 +57,12 @@ pub struct BigQueryCommon {
pub dataset: String,
#[serde(rename = "bigquery.table")]
pub table: String,
#[serde(rename = "bigquery.max_batch_rows", default = "default_max_batch_rows")]
pub max_batch_rows: usize,
}

fn default_max_batch_rows() -> usize {
1024
}

impl BigQueryCommon {
Expand Down Expand Up @@ -312,14 +314,7 @@ impl BigQuerySinkWriter {
client,
is_append_only,
insert_request: TableDataInsertAllRequest::new(),
row_encoder: JsonEncoder::new(
schema,
None,
DateHandlingMode::String,
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
TimeHandlingMode::Milli,
),
row_encoder: JsonEncoder::new_with_bigquery(schema, None),
})
}

Expand All @@ -339,7 +334,11 @@ impl BigQuerySinkWriter {
self.insert_request
.add_rows(insert_vec)
.map_err(|e| SinkError::BigQuery(e.into()))?;
if self.insert_request.len().ge(&BIGQUERY_INSERT_MAX_NUMS) {
if self
.insert_request
.len()
.ge(&self.config.common.max_batch_rows)
{
self.insert_data().await?;
}
Ok(())
Expand Down
9 changes: 2 additions & 7 deletions src/connector/src/sink/doris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use super::doris_starrocks_connector::{
POOL_IDLE_TIMEOUT,
};
use super::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode};
use crate::sink::encoder::{JsonEncoder, RowEncoder};
use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkWriterParam};

Expand Down Expand Up @@ -294,12 +294,7 @@ impl DorisSinkWriter {
inserter_inner_builder: doris_insert_builder,
is_append_only,
client: None,
row_encoder: JsonEncoder::new_with_doris(
schema,
None,
TimestampHandlingMode::String,
decimal_map,
),
row_encoder: JsonEncoder::new_with_doris(schema, None, decimal_map),
})
}

Expand Down
35 changes: 27 additions & 8 deletions src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,14 @@ impl JsonEncoder {
pub fn new_with_doris(
schema: Schema,
col_indices: Option<Vec<usize>>,
timestamp_handling_mode: TimestampHandlingMode,
map: HashMap<String, (u8, u8)>,
) -> Self {
Self {
schema,
col_indices,
time_handling_mode: TimeHandlingMode::Milli,
date_handling_mode: DateHandlingMode::String,
timestamp_handling_mode,
timestamp_handling_mode: TimestampHandlingMode::String,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
custom_json_type: CustomJsonType::Doris(map),
kafka_connect: None,
Expand All @@ -101,21 +100,33 @@ impl JsonEncoder {
pub fn new_with_starrocks(
schema: Schema,
col_indices: Option<Vec<usize>>,
timestamp_handling_mode: TimestampHandlingMode,
map: HashMap<String, (u8, u8)>,
) -> Self {
Self {
schema,
col_indices,
time_handling_mode: TimeHandlingMode::Milli,
date_handling_mode: DateHandlingMode::String,
timestamp_handling_mode,
timestamp_handling_mode: TimestampHandlingMode::String,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
custom_json_type: CustomJsonType::StarRocks(map),
kafka_connect: None,
}
}

pub fn new_with_bigquery(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
Self {
schema,
col_indices,
time_handling_mode: TimeHandlingMode::Milli,
date_handling_mode: DateHandlingMode::String,
timestamp_handling_mode: TimestampHandlingMode::String,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
custom_json_type: CustomJsonType::BigQuery,
kafka_connect: None,
}
}

pub fn with_kafka_connect(self, kafka_connect: KafkaConnectParams) -> Self {
Self {
kafka_connect: Some(Arc::new(kafka_connect)),
Expand Down Expand Up @@ -192,7 +203,15 @@ fn datum_to_json_object(
custom_json_type: &CustomJsonType,
) -> ArrayResult<Value> {
let scalar_ref = match datum {
None => return Ok(Value::Null),
None => {
if let CustomJsonType::BigQuery = custom_json_type
&& matches!(field.data_type(), DataType::List(_))
{
return Ok(Value::Array(vec![]));
} else {
return Ok(Value::Null);
}
}
Some(datum) => datum,
};

Expand Down Expand Up @@ -239,7 +258,7 @@ fn datum_to_json_object(
}
json!(v_string)
}
CustomJsonType::Es | CustomJsonType::None => {
CustomJsonType::Es | CustomJsonType::None | CustomJsonType::BigQuery => {
json!(v.to_text())
}
},
Expand Down Expand Up @@ -291,7 +310,7 @@ fn datum_to_json_object(
}
(DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type {
CustomJsonType::Es | CustomJsonType::StarRocks(_) => JsonbVal::from(jsonb_ref).take(),
CustomJsonType::Doris(_) | CustomJsonType::None => {
CustomJsonType::Doris(_) | CustomJsonType::None | CustomJsonType::BigQuery => {
json!(jsonb_ref.to_string())
}
},
Expand Down Expand Up @@ -342,7 +361,7 @@ fn datum_to_json_object(
"starrocks can't support struct".to_string(),
));
}
CustomJsonType::Es | CustomJsonType::None => {
CustomJsonType::Es | CustomJsonType::None | CustomJsonType::BigQuery => {
let mut map = Map::with_capacity(st.len());
for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
st.iter()
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ pub enum CustomJsonType {
Es,
// starrocks' need jsonb is struct
StarRocks(HashMap<String, (u8, u8)>),
// bigquery need null array -> []
BigQuery,
None,
}

Expand Down
9 changes: 2 additions & 7 deletions src/connector/src/sink/starrocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use with_options::WithOptions;
use super::doris_starrocks_connector::{
HeaderBuilder, InserterInner, InserterInnerBuilder, DORIS_SUCCESS_STATUS, STARROCKS_DELETE_SIGN,
};
use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode};
use super::encoder::{JsonEncoder, RowEncoder};
use super::writer::LogSinkerOf;
use super::{SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
use crate::sink::writer::SinkWriterExt;
Expand Down Expand Up @@ -367,12 +367,7 @@ impl StarrocksSinkWriter {
inserter_innet_builder: starrocks_insert_builder,
is_append_only,
client: None,
row_encoder: JsonEncoder::new_with_starrocks(
schema,
None,
TimestampHandlingMode::String,
decimal_map,
),
row_encoder: JsonEncoder::new_with_starrocks(schema, None, decimal_map),
})
}

Expand Down

0 comments on commit 80be523

Please sign in to comment.