Skip to content

Commit

Permalink
fix comm
Browse files Browse the repository at this point in the history
fix comm
  • Loading branch information
xxhZs committed Sep 3, 2024
1 parent 5a08d80 commit 07cc7e4
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 150 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
statement ok
set sink_decouple = false;

statement ok
set streaming_parallelism=4;

Expand Down
34 changes: 1 addition & 33 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::row::Row;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial};
use serde::ser::{SerializeSeq, SerializeStruct};
use serde::Serialize;
Expand All @@ -37,13 +36,11 @@ use tracing::warn;
use with_options::WithOptions;

use super::decouple_checkpoint_log_sink::{
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf, COMMIT_CHECKPOINT_INTERVAL,
DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf,
};
use super::writer::SinkWriter;
use super::{DummySinkCommitCoordinator, SinkWriterParam};
use crate::error::ConnectorResult;
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::{
Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
Expand Down Expand Up @@ -497,35 +494,6 @@ impl Sink for ClickHouseSink {

const SINK_NAME: &'static str = CLICKHOUSE_SINK;

fn is_sink_decouple(desc: &mut SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let commit_checkpoint_interval = desc.properties.get(COMMIT_CHECKPOINT_INTERVAL);

match user_specified {
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => {
if let Some(interval) = commit_checkpoint_interval {
let commit_checkpoint_interval = interval.parse::<u64>().map_err(|e| {
SinkError::Config(anyhow!(
"Convert `commit_checkpoint_interval` to u64 error: {:?}",
e
))
})?;
if commit_checkpoint_interval > 1 {
return Err(SinkError::Config(anyhow!(
"config conflict: Clickhouse config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
}
} else {
desc.properties.insert(
COMMIT_CHECKPOINT_INTERVAL.to_string(),
DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
);
}
Ok(false)
}
}
}

async fn validate(&self) -> Result<()> {
// For upsert clickhouse sink, the primary key must be defined.
if !self.is_append_only && self.pk_indices.is_empty() {
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/decouple_checkpoint_log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ use async_trait::async_trait;
use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
use crate::sink::writer::SinkWriter;
use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics};
pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL: u64 = 10;
pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE: u64 = 10;
pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE: u64 = 1;
pub const COMMIT_CHECKPOINT_INTERVAL: &str = "commit_checkpoint_interval";

pub fn default_commit_checkpoint_interval() -> u64 {
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE
}

/// The `LogSinker` implementation used for commit-decoupled sinks (such as `Iceberg`, `DeltaLake` and `StarRocks`).
Expand Down
34 changes: 1 addition & 33 deletions src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use risingwave_common::array::StreamChunk;
use risingwave_common::bail;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
Expand All @@ -41,11 +40,9 @@ use serde_derive::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use with_options::WithOptions;

use super::catalog::desc::SinkDesc;
use super::coordinate::CoordinatedSinkWriter;
use super::decouple_checkpoint_log_sink::{
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf, COMMIT_CHECKPOINT_INTERVAL,
DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf,
};
use super::writer::SinkWriter;
use super::{
Expand Down Expand Up @@ -285,35 +282,6 @@ impl Sink for DeltaLakeSink {

const SINK_NAME: &'static str = DELTALAKE_SINK;

fn is_sink_decouple(desc: &mut SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let commit_checkpoint_interval = desc.properties.get(COMMIT_CHECKPOINT_INTERVAL);

match user_specified {
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => {
if let Some(interval) = commit_checkpoint_interval {
let commit_checkpoint_interval = interval.parse::<u64>().map_err(|e| {
SinkError::Config(anyhow!(
"Convert `commit_checkpoint_interval` to u64 error: {:?}",
e
))
})?;
if commit_checkpoint_interval > 1 {
return Err(SinkError::Config(anyhow!(
"config conflict: DeltaLake config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
}
} else {
desc.properties.insert(
COMMIT_CHECKPOINT_INTERVAL.to_string(),
DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
);
}
Ok(false)
}
}
}

async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
let inner = DeltaLakeSinkWriter::new(
self.config.clone(),
Expand Down
39 changes: 4 additions & 35 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,16 @@ use with_options::WithOptions;
use self::mock_catalog::MockCatalog;
use self::prometheus::monitored_base_file_writer::MonitoredBaseFileWriterBuilder;
use self::prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
use super::catalog::desc::SinkDesc;
use super::decouple_checkpoint_log_sink::{
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf, COMMIT_CHECKPOINT_INTERVAL,
DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf,
};
use super::{
Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
use crate::error::ConnectorResult;
use crate::sink::coordinate::CoordinatedSinkWriter;
use crate::sink::writer::SinkWriter;
use crate::sink::{Result, SinkCommitCoordinator, SinkDecouple, SinkParam};
use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
use crate::{
deserialize_bool_from_string, deserialize_optional_bool_from_string,
deserialize_optional_string_seq_from_string,
Expand Down Expand Up @@ -762,35 +760,6 @@ impl Sink for IcebergSink {

const SINK_NAME: &'static str = ICEBERG_SINK;

fn is_sink_decouple(desc: &mut SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let commit_checkpoint_interval = desc.properties.get(COMMIT_CHECKPOINT_INTERVAL);

match user_specified {
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => {
if let Some(interval) = commit_checkpoint_interval {
let commit_checkpoint_interval = interval.parse::<u64>().map_err(|e| {
SinkError::Config(anyhow!(
"Convert `commit_checkpoint_interval` to u64 error: {:?}",
e
))
})?;
if commit_checkpoint_interval > 1 {
return Err(SinkError::Config(anyhow!(
"config conflict: Iceberg config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
}
} else {
desc.properties.insert(
COMMIT_CHECKPOINT_INTERVAL.to_string(),
DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
);
}
Ok(false)
}
}
}

async fn validate(&self) -> Result<()> {
if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
risingwave_common::license::Feature::IcebergSinkWithGlue
Expand Down Expand Up @@ -1314,7 +1283,7 @@ mod test {

use risingwave_common::catalog::Field;

use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE;
use crate::sink::iceberg::IcebergConfig;
use crate::source::DataType;

Expand Down Expand Up @@ -1397,7 +1366,7 @@ mod test {
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
};

assert_eq!(iceberg_config, expected_iceberg_config);
Expand Down
51 changes: 50 additions & 1 deletion src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ use ::deltalake::DeltaTableError;
use ::redis::RedisError;
use anyhow::anyhow;
use async_trait::async_trait;
use clickhouse::CLICKHOUSE_SINK;
use decouple_checkpoint_log_sink::{
COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
};
use deltalake::DELTALAKE_SINK;
use iceberg::ICEBERG_SINK;
use opendal::Error as OpendalError;
use risingwave_common::array::ArrayError;
use risingwave_common::bitmap::Bitmap;
Expand All @@ -66,6 +73,7 @@ use risingwave_pb::catalog::PbSinkType;
use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
use risingwave_rpc_client::error::RpcError;
use risingwave_rpc_client::MetaClient;
use starrocks::STARROCKS_SINK;
use thiserror::Error;
use thiserror_ext::AsReport;
pub use tracing;
Expand Down Expand Up @@ -366,13 +374,54 @@ impl SinkWriterParam {
}
}

fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
matches!(
sink_name,
ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK
)
}
pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
const SINK_NAME: &'static str;
type LogSinker: LogSinker;
type Coordinator: SinkCommitCoordinator;

fn set_default_commit_checkpoint_interval(
desc: &mut SinkDesc,
user_specified: &SinkDecouple,
) -> Result<()> {
if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
Some(commit_checkpoint_interval) => {
let commit_checkpoint_interval = commit_checkpoint_interval
.parse::<u64>()
.map_err(|e| SinkError::Config(anyhow!(e)))?;
if matches!(user_specified, SinkDecouple::Disable)
&& commit_checkpoint_interval > 1
{
return Err(SinkError::Config(anyhow!("config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled")));
}
}
None => match user_specified {
SinkDecouple::Default | SinkDecouple::Enable => {
desc.properties.insert(
COMMIT_CHECKPOINT_INTERVAL.to_string(),
DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
);
}
SinkDecouple::Disable => {
desc.properties.insert(
COMMIT_CHECKPOINT_INTERVAL.to_string(),
DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
);
}
},
}
}
Ok(())
}

/// `user_specified` is the value of `sink_decouple` config.
fn is_sink_decouple(_desc: &mut SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => Ok(false),
Expand Down
7 changes: 3 additions & 4 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ use tracing::warn;

use super::elasticsearch::{is_es_sink, StreamChunkConverter, ES_OPTION_DELIMITER};
use crate::error::ConnectorResult;
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::coordinate::CoordinatedSinkWriter;
use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt};
Expand Down Expand Up @@ -116,7 +115,7 @@ def_remote_sink!();

pub trait RemoteSinkTrait: Send + Sync + 'static {
const SINK_NAME: &'static str;
fn default_sink_decouple(_desc: &SinkDesc) -> bool {
fn default_sink_decouple() -> bool {
true
}
}
Expand Down Expand Up @@ -144,9 +143,9 @@ impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {

const SINK_NAME: &'static str = R::SINK_NAME;

fn is_sink_decouple(desc: &mut SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default => Ok(R::default_sink_decouple(desc)),
SinkDecouple::Default => Ok(R::default_sink_decouple()),
SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => Ok(false),
}
Expand Down
38 changes: 2 additions & 36 deletions src/connector/src/sink/starrocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use mysql_async::Opts;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::types::DataType;
use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
Expand All @@ -38,10 +37,7 @@ use tokio::task::JoinHandle;
use url::form_urlencoded;
use with_options::WithOptions;

use super::decouple_checkpoint_log_sink::{
COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
};
use super::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE;
use super::doris_starrocks_connector::{
HeaderBuilder, InserterInner, StarrocksTxnRequestBuilder, STARROCKS_DELETE_SIGN,
STARROCKS_SUCCESS_STATUS,
Expand All @@ -51,7 +47,6 @@ use super::{
SinkCommitCoordinator, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION,
SINK_TYPE_UPSERT,
};
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::coordinate::CoordinatedSinkWriter;
use crate::sink::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf;
use crate::sink::{Result, Sink, SinkWriter, SinkWriterParam};
Expand Down Expand Up @@ -121,7 +116,7 @@ pub struct StarrocksConfig {
}

fn default_commit_checkpoint_interval() -> u64 {
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE
}

impl StarrocksConfig {
Expand Down Expand Up @@ -267,35 +262,6 @@ impl Sink for StarrocksSink {

const SINK_NAME: &'static str = STARROCKS_SINK;

fn is_sink_decouple(desc: &mut SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let commit_checkpoint_interval = desc.properties.get(COMMIT_CHECKPOINT_INTERVAL);

match user_specified {
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => {
if let Some(interval) = commit_checkpoint_interval {
let commit_checkpoint_interval = interval.parse::<u64>().map_err(|e| {
SinkError::Config(anyhow!(
"Convert `commit_checkpoint_interval` to u64 error: {:?}",
e
))
})?;
if commit_checkpoint_interval > 1 {
return Err(SinkError::Config(anyhow!(
"config conflict: Starrocks config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
}
} else {
desc.properties.insert(
COMMIT_CHECKPOINT_INTERVAL.to_string(),
DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
);
}
Ok(false)
}
}
}

async fn validate(&self) -> Result<()> {
if !self.is_append_only && self.pk_indices.is_empty() {
return Err(SinkError::Config(anyhow!(
Expand Down
Loading

0 comments on commit 07cc7e4

Please sign in to comment.