diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 67920dc380bbf..498a0420763be 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -94,6 +94,7 @@ services: - redis-server - pulsar-server - cassandra-server + - doris-server - starrocks-fe-server - starrocks-be-server volumes: @@ -188,6 +189,18 @@ services: environment: - CASSANDRA_CLUSTER_NAME=cloudinfra + doris-server: + container_name: doris-server + image: apache/doris:doris-all-in-one-2.1.0 + ports: + - 8030:8030 + - 8040:8040 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9030"] + interval: 5s + timeout: 5s + retries: 30 + starrocks-fe-server: container_name: starrocks-fe-server image: starrocks/fe-ubuntu:3.1.7 diff --git a/ci/scripts/e2e-doris-sink-test.sh b/ci/scripts/e2e-doris-sink-test.sh index 30bfdaf129e26..859dd411469dd 100755 --- a/ci/scripts/e2e-doris-sink-test.sh +++ b/ci/scripts/e2e-doris-sink-test.sh @@ -30,8 +30,8 @@ sleep 1 echo "--- create doris table" apt-get update -y && apt-get install -y mysql-client sleep 2 -mysql -uroot -P 9030 -h doris-fe-server -e "CREATE database demo;use demo; -CREATE table demo_bhv_table(v1 int,v2 smallint,v3 bigint,v4 float,v5 double,v6 string,v7 datev2,v8 datetime,v9 boolean) UNIQUE KEY(\`v1\`) +mysql -uroot -P 9030 -h doris-server -e "CREATE database demo;use demo; +CREATE table demo_bhv_table(v1 int,v2 smallint,v3 bigint,v4 float,v5 double,v6 string,v7 datev2,v8 datetime,v9 boolean,v10 json) UNIQUE KEY(\`v1\`) DISTRIBUTED BY HASH(\`v1\`) BUCKETS 1 PROPERTIES ( \"replication_allocation\" = \"tag.location.default: 1\" @@ -43,11 +43,11 @@ sleep 2 echo "--- testing sinks" sqllogictest -p 4566 -d dev './e2e_test/sink/doris_sink.slt' sleep 1 -mysql -uroot -P 9030 -h doris-fe-server -e "select * from demo.demo_bhv_table" > ./query_result.csv +mysql -uroot -P 9030 -h doris-server -e "select * from demo.demo_bhv_table" > ./query_result.csv if cat ./query_result.csv | sed '1d; s/\t/,/g' | awk -F "," '{ - exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01" && $9 == 0); }'; then + exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01" && $9 == 0 && $10 == "{\"a\":1}"); }'; then echo "Doris sink check passed" else cat ./query_result.csv @@ -56,4 +56,4 @@ else fi echo "--- Kill cluster" -cargo make ci-kill \ No newline at end of file +cargo make ci-kill diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index ce28986d50b08..c4469346690a8 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -896,37 +896,24 @@ steps: timeout_in_minutes: 10 retry: *auto-retry - # Causes ci error, close it first, fix it later - # - label: "set vm_max_map_count_2000000" - # key: "set-vm_max_map_count" - # if: | - # !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null - # || build.pull_request.labels includes "ci/run-e2e-doris-sink-tests" - # || build.env("CI_STEPS") =~ /(^|,)e2e-doris-sink-tests?(,|$$)/ - # command: "sudo sysctl -w vm.max_map_count=2000000" - # depends_on: - # - "build" - # - "build-other" - - # - label: "end-to-end doris sink test" - # key: "e2e-doris-sink-tests" - # command: "ci/scripts/e2e-doris-sink-test.sh -p ci-release" - # if: | - # !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null - # || build.pull_request.labels includes "ci/run-e2e-doris-sink-tests" - # || build.env("CI_STEPS") =~ /(^|,)e2e-doris-sink-tests?(,|$$)/ - # depends_on: - # - "build" - # - "build-other" - # - "set-vm_max_map_count" - # plugins: - # - docker-compose#v5.1.0: - # run: sink-doris-env - # config: ci/docker-compose.yml - # mount-buildkite-agent: true - # - ./ci/plugins/upload-failure-logs - # timeout_in_minutes: 10 - # retry: *auto-retry + - label: "end-to-end doris sink test" + key: "e2e-doris-sink-tests" + command: "ci/scripts/e2e-doris-sink-test.sh -p ci-release" + if: | + !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null + || build.pull_request.labels includes "ci/run-e2e-doris-sink-tests" + || build.env("CI_STEPS") =~ /(^|,)e2e-doris-sink-tests?(,|$$)/ + depends_on: + - "build" + - "build-other" + plugins: + - docker-compose#v5.1.0: + run: sink-test-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 10 + retry: *auto-retry - label: "end-to-end starrocks sink test" key: "e2e-starrocks-sink-tests" diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index f263bc18f4b57..3efe7d29f3768 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -308,30 +308,20 @@ steps: timeout_in_minutes: 10 retry: *auto-retry - # Causes ci error, close it first, fix it later - # - label: "set vm_max_map_count_2000000" - # key: "set-vm_max_map_count" - # if: build.pull_request.labels includes "ci/run-e2e-doris-sink-tests" || build.env("CI_STEPS") =~ /(^|,) e2e-doris-sink-tests?(,|$$)/ - # command: "sudo sysctl -w vm.max_map_count=2000000" - # depends_on: - # - "build" - # - "build-other" - - # - label: "end-to-end doris sink test" - # if: build.pull_request.labels includes "ci/run-e2e-doris-sink-tests" || build.env("CI_STEPS") =~ /(^|,) e2e-doris-sink-tests?(,|$$)/ - # command: "ci/scripts/e2e-doris-sink-test.sh -p ci-dev" - # depends_on: - # - "build" - # - "build-other" - # - "set-vm_max_map_count" - # plugins: - # - docker-compose#v5.1.0: - # run: sink-doris-env - # config: ci/docker-compose.yml - # mount-buildkite-agent: true - # - ./ci/plugins/upload-failure-logs - # timeout_in_minutes: 10 - # retry: *auto-retry + - label: "end-to-end doris sink test" + if: build.pull_request.labels includes "ci/run-e2e-doris-sink-tests" || build.env("CI_STEPS") =~ /(^|,) e2e-doris-sink-tests?(,|$$)/ + command: "ci/scripts/e2e-doris-sink-test.sh -p ci-dev" + depends_on: + - "build" + - "build-other" + plugins: + - docker-compose#v5.1.0: + run: sink-test-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 10 + retry: *auto-retry - label: "end-to-end starrocks sink test" if: build.pull_request.labels includes "ci/run-e2e-starrocks-sink-tests" || build.env("CI_STEPS") =~ /(^|,) e2e-starrocks-sink-tests?(,|$$)/ diff --git a/e2e_test/sink/doris_sink.slt b/e2e_test/sink/doris_sink.slt index 2c552bbb26143..3242206badaea 100644 --- a/e2e_test/sink/doris_sink.slt +++ b/e2e_test/sink/doris_sink.slt @@ -1,5 +1,5 @@ statement ok -CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamp, v9 boolean); +CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamp, v9 boolean, v10 jsonb); statement ok CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; @@ -10,7 +10,7 @@ FROM mv6 WITH ( connector = 'doris', type = 'append-only', - doris.url = 'http://doris-fe-server:8030', + doris.url = 'http://doris-server:8030', doris.user = 'users', doris.password = '123456', doris.database = 'demo', @@ -19,7 +19,7 @@ FROM ); statement ok -INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01' , false); +INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01' , false, '{"a":1}'); statement ok FLUSH; @@ -31,4 +31,4 @@ statement ok DROP MATERIALIZED VIEW mv6; statement ok -DROP TABLE t6; \ No newline at end of file +DROP TABLE t6; diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index ea69a03af25c9..4a5a1ded01f9e 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -181,7 +181,7 @@ impl DorisSink { risingwave_common::types::DataType::Bytea => { Err(SinkError::Doris("doris can not support Bytea".to_string())) } - risingwave_common::types::DataType::Jsonb => Ok(doris_data_type.contains("JSONB")), + risingwave_common::types::DataType::Jsonb => Ok(doris_data_type.contains("JSON")), risingwave_common::types::DataType::Serial => Ok(doris_data_type.contains("BIGINT")), risingwave_common::types::DataType::Int256 => { Err(SinkError::Doris("doris can not support Int256".to_string())) @@ -286,7 +286,7 @@ impl DorisSinkWriter { config.common.database.clone(), config.common.table.clone(), header, - ); + )?; Ok(Self { config, schema: schema.clone(), diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 3880dad8d1952..3f175950f466e 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -26,6 +26,7 @@ use hyper::client::HttpConnector; use hyper::{body, Client, Request, StatusCode}; use hyper_tls::HttpsConnector; use tokio::task::JoinHandle; +use url::Url; use super::{Result, SinkError}; @@ -40,6 +41,8 @@ const WAIT_HANDDLE_TIMEOUT: Duration = Duration::from_secs(10); pub(crate) const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30); const DORIS: &str = "doris"; const STARROCKS: &str = "starrocks"; +const LOCALHOST: &str = "localhost"; +const LOCALHOST_IP: &str = "127.0.0.1"; pub struct HeaderBuilder { header: HashMap, } @@ -158,16 +161,30 @@ pub struct InserterInnerBuilder { url: String, header: HashMap, sender: Option, + fe_host: String, } impl InserterInnerBuilder { - pub fn new(url: String, db: String, table: String, header: HashMap) -> Self { + pub fn new( + url: String, + db: String, + table: String, + header: HashMap, + ) -> Result { + let fe_host = Url::parse(&url) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + .host_str() + .ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get fe host from url")) + })? + .to_string(); let url = format!("{}/api/{}/{}/_stream_load", url, db, table); - Self { + Ok(Self { url, sender: None, header, - } + fe_host, + }) } fn build_request_and_client( @@ -196,7 +213,7 @@ impl InserterInnerBuilder { .request(request_get_url) .await .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - let be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT { + let mut be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT { resp.headers() .get("location") .ok_or_else(|| { @@ -207,13 +224,31 @@ impl InserterInnerBuilder { .to_str() .context("Can't get doris BE url in header") .map_err(SinkError::DorisStarrocksConnect)? + .to_string() } else { return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( "Can't get doris BE url", ))); }; - let (builder, client) = self.build_request_and_client(be_url.to_string()); + if self.fe_host != LOCALHOST && self.fe_host != LOCALHOST_IP { + let mut parsed_be_url = + Url::parse(&be_url).map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + let be_host = parsed_be_url.host_str().ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get be host from url")) + })?; + + if be_host == LOCALHOST || be_host == LOCALHOST_IP { + // if be host is 127.0.0.1, we may can't connect to it directly, + // so replace it with fe host + parsed_be_url + .set_host(Some(self.fe_host.as_str())) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + be_url = parsed_be_url.as_str().into(); + } + } + + let (builder, client) = self.build_request_and_client(be_url); let (sender, body) = Body::channel(); let request = builder .body(body) diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 192bf38fd839f..d6d2c6104d11f 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -359,7 +359,7 @@ impl StarrocksSinkWriter { config.common.database.clone(), config.common.table.clone(), header, - ); + )?; Ok(Self { config, schema: schema.clone(),