Skip to content

Commit

Permalink
fix ci
Browse files Browse the repository at this point in the history
fix ci
  • Loading branch information
xxhZs committed Sep 2, 2024
1 parent e41b348 commit 5a08d80
Show file tree
Hide file tree
Showing 42 changed files with 150 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ CREATE SINK sink1 AS select * from mv1 WITH (
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1
);

statement ok
Expand All @@ -54,7 +53,6 @@ CREATE SINK sink2 AS select * from mv1 WITH (
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1
);

sleep 20s
Expand Down
2 changes: 2 additions & 0 deletions e2e_test/iceberg/test_case/cdc/load.slt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# CDC source basic test
statement ok
set sink_decouple = false;

statement ok
create source mysql_mydb with (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ CREATE SINK s6 AS select * from mv6 WITH (
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3,
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
primary_key = 'v1',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ CREATE SINK s6 AS select * from mv6 WITH (
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3,
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
primary_key = 'v1',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ CREATE SINK s6 AS select * from mv6 WITH (
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3,
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
primary_key = 'v1',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
1 change: 0 additions & 1 deletion e2e_test/sink/clickhouse_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4,
clickhouse.password = '',
clickhouse.database = 'default',
clickhouse.table='demo_test',
commit_checkpoint_interval = 1,
);

statement ok
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/create_sink_as.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
CREATE TABLE t4 (v1 int primary key, v2 int);

Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/deltalake_rust_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean, v10 decimal, v11 decimal[]);

Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/doris_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamp, v9 boolean, v10 jsonb);

Expand Down
1 change: 0 additions & 1 deletion e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH
catalog.type = 'storage',
database.name='demo_db',
table.name='e2e_demo_table',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/mongodb_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
create table t1(
a smallint,
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/redis_cluster_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
CREATE TABLE t6 (v1 int primary key, v2 int);

Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/redis_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean);

Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/remote/types.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
create table t5 (v1 smallint primary key, v2 int, v3 bigint, v4 float, v5 double, v6 decimal, v7 varchar, v8 timestamp, v9 boolean);

Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/sqlserver_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
create table t_many_data_type_rw (
k1 int, k2 int,
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/starrocks_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamp, v9 boolean, v10 jsonb, v11 decimal);

Expand Down
2 changes: 2 additions & 0 deletions integration_tests/big-query-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

-- create sink with local file
CREATE SINK bhv_big_query_sink
FROM
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/cassandra-and-scylladb-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE SINK bhv_cassandra_sink
FROM
bhv_mv WITH (
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/clickhouse-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE SINK bhv_clickhouse_sink
FROM
bhv_mv WITH (
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/deltalake-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

create sink delta_lake_sink from source
with (
connector = 'deltalake',
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/doris-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

create secret doris_secret with (backend = 'meta') as '123456';

CREATE SINK bhv_doris_sink
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/dynamodb/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE SINK dyn_sink
FROM
movies
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/elasticsearch-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE SINK bhv_es7_sink
FROM
bhv_mv WITH (
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/kafka-cdc-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE SINK IF NOT EXISTS counts_sink
FROM counts
WITH (
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/mqtt/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE SINK mqtt_sink
FROM
personnel
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/mysql-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE SINK target_count_mysql_sink
FROM
target_count WITH (
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/nats/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE TABLE
personnel (id integer, name varchar);

Expand Down
2 changes: 2 additions & 0 deletions integration_tests/postgres-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE SINK target_count_postgres_sink
FROM
target_count WITH (
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/redis-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE SINK bhv_redis_sink_1
FROM
bhv_mv WITH (
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/starrocks-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

create secret starrocks_secret with (backend = 'meta') as '123456';

CREATE SINK bhv_starrocks_sink_primary
Expand Down
36 changes: 21 additions & 15 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use tracing::warn;
use with_options::WithOptions;

use super::decouple_checkpoint_log_sink::{
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf,
DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf, COMMIT_CHECKPOINT_INTERVAL,
DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
};
use super::writer::SinkWriter;
use super::{DummySinkCommitCoordinator, SinkWriterParam};
Expand Down Expand Up @@ -497,23 +497,29 @@ impl Sink for ClickHouseSink {

const SINK_NAME: &'static str = CLICKHOUSE_SINK;

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let commit_checkpoint_interval =
if let Some(interval) = desc.properties.get("commit_checkpoint_interval") {
interval
.parse::<u64>()
.unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL)
} else {
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
};
fn is_sink_decouple(desc: &mut SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let commit_checkpoint_interval = desc.properties.get(COMMIT_CHECKPOINT_INTERVAL);

match user_specified {
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => {
if commit_checkpoint_interval > 1 {
return Err(SinkError::Config(anyhow!(
"config conflict: Clickhouse config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
if let Some(interval) = commit_checkpoint_interval {
let commit_checkpoint_interval = interval.parse::<u64>().map_err(|e| {
SinkError::Config(anyhow!(
"Convert `commit_checkpoint_interval` to u64 error: {:?}",
e
))
})?;
if commit_checkpoint_interval > 1 {
return Err(SinkError::Config(anyhow!(
"config conflict: Clickhouse config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
}
} else {
desc.properties.insert(
COMMIT_CHECKPOINT_INTERVAL.to_string(),
DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
);
}
Ok(false)
}
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/sink/decouple_checkpoint_log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
use crate::sink::writer::SinkWriter;
use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics};
pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL: u64 = 10;
pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE: u64 = 1;
pub const COMMIT_CHECKPOINT_INTERVAL: &str = "commit_checkpoint_interval";

pub fn default_commit_checkpoint_interval() -> u64 {
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
Expand Down
36 changes: 21 additions & 15 deletions src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ use with_options::WithOptions;
use super::catalog::desc::SinkDesc;
use super::coordinate::CoordinatedSinkWriter;
use super::decouple_checkpoint_log_sink::{
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf,
DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf, COMMIT_CHECKPOINT_INTERVAL,
DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
};
use super::writer::SinkWriter;
use super::{
Expand Down Expand Up @@ -285,23 +285,29 @@ impl Sink for DeltaLakeSink {

const SINK_NAME: &'static str = DELTALAKE_SINK;

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let commit_checkpoint_interval =
if let Some(interval) = desc.properties.get("commit_checkpoint_interval") {
interval
.parse::<u64>()
.unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL)
} else {
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
};
fn is_sink_decouple(desc: &mut SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let commit_checkpoint_interval = desc.properties.get(COMMIT_CHECKPOINT_INTERVAL);

match user_specified {
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => {
if commit_checkpoint_interval > 1 {
return Err(SinkError::Config(anyhow!(
"config conflict: DeltaLake config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
if let Some(interval) = commit_checkpoint_interval {
let commit_checkpoint_interval = interval.parse::<u64>().map_err(|e| {
SinkError::Config(anyhow!(
"Convert `commit_checkpoint_interval` to u64 error: {:?}",
e
))
})?;
if commit_checkpoint_interval > 1 {
return Err(SinkError::Config(anyhow!(
"config conflict: DeltaLake config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
}
} else {
desc.properties.insert(
COMMIT_CHECKPOINT_INTERVAL.to_string(),
DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
);
}
Ok(false)
}
Expand Down
Loading

0 comments on commit 5a08d80

Please sign in to comment.