From 0e8dc79f9f61970ca33f82c7d3065e711181d912 Mon Sep 17 00:00:00 2001 From: xfz Date: Thu, 28 Mar 2024 22:04:52 +0800 Subject: [PATCH 1/8] fix: support doris JSON type --- ci/scripts/gen-integration-test-yaml.py | 78 +++++++++---------- ci/scripts/integration-tests.sh | 7 -- integration_tests/doris-sink/create_sink.sql | 4 +- .../doris-sink/docker-compose.yml | 63 +-------------- integration_tests/doris-sink/prepare.sh | 2 +- src/connector/src/sink/doris.rs | 2 +- 6 files changed, 45 insertions(+), 111 deletions(-) diff --git a/ci/scripts/gen-integration-test-yaml.py b/ci/scripts/gen-integration-test-yaml.py index 6332b98ecca9f..2948380b67002 100644 --- a/ci/scripts/gen-integration-test-yaml.py +++ b/ci/scripts/gen-integration-test-yaml.py @@ -3,46 +3,46 @@ import subprocess CASES_MAP = { - 'ad-click': ['json'], - 'ad-ctr': ['json'], - 'cdn-metrics': ['json'], - 'clickstream': ['json'], - 'livestream': ['json', 'protobuf'], - 'prometheus': ['json'], - 'schema-registry': ['json'], - 'mysql-cdc': ['json'], - 'postgres-cdc': ['json'], - 'mongodb-cdc': ['json'], - 'mysql-sink': ['json'], - 'postgres-sink': ['json'], - 'iceberg-cdc': ['json'], - 'iceberg-sink': ['none'], - 'iceberg-source': ['none'], - 'twitter': ['json', 'protobuf'], - 'twitter-pulsar': ['json'], - 'debezium-mysql': ['json'], - 'debezium-postgres': ['json'], - 'debezium-sqlserver': ['json'], - 'tidb-cdc-sink': ['json'], - 'citus-cdc': ['json'], - 'kinesis-s3-source': ['json'], - 'clickhouse-sink': ['json'], - 'cockroach-sink': ['json'], - 'kafka-cdc-sink': ['json'], - 'cassandra-and-scylladb-sink': ['json'], - 'elasticsearch-sink': ['json'], - 'redis-sink': ['json'], - 'big-query-sink': ['json'], - 'mindsdb': ['json'], - 'vector': ['json'], - 'nats': ['json', 'protobuf'], - 'mqtt': ['json'], + # 'ad-click': ['json'], + # 'ad-ctr': ['json'], + # 'cdn-metrics': ['json'], + # 'clickstream': ['json'], + # 'livestream': ['json', 'protobuf'], + # 'prometheus': ['json'], + # 'schema-registry': ['json'], + # 'mysql-cdc': ['json'], + # 'postgres-cdc': ['json'], + # 'mongodb-cdc': ['json'], + # 'mysql-sink': ['json'], + # 'postgres-sink': ['json'], + # 'iceberg-cdc': ['json'], + # 'iceberg-sink': ['none'], + # 'iceberg-source': ['none'], + # 'twitter': ['json', 'protobuf'], + # 'twitter-pulsar': ['json'], + # 'debezium-mysql': ['json'], + # 'debezium-postgres': ['json'], + # 'debezium-sqlserver': ['json'], + # 'tidb-cdc-sink': ['json'], + # 'citus-cdc': ['json'], + # 'kinesis-s3-source': ['json'], + # 'clickhouse-sink': ['json'], + # 'cockroach-sink': ['json'], + # 'kafka-cdc-sink': ['json'], + # 'cassandra-and-scylladb-sink': ['json'], + # 'elasticsearch-sink': ['json'], + # 'redis-sink': ['json'], + # 'big-query-sink': ['json'], + # 'mindsdb': ['json'], + # 'vector': ['json'], + # 'nats': ['json', 'protobuf'], + # 'mqtt': ['json'], 'doris-sink': ['json'], - 'starrocks-sink': ['json'], - 'deltalake-sink': ['json'], - 'pinot-sink': ['json'], - 'presto-trino': ['json'], - 'client-library': ['none'], + # 'starrocks-sink': ['json'], + # 'deltalake-sink': ['json'], + # 'pinot-sink': ['json'], + # 'presto-trino': ['json'], + # 'client-library': ['none'], } def gen_pipeline_steps(): diff --git a/ci/scripts/integration-tests.sh b/ci/scripts/integration-tests.sh index ef6c024ac9db0..ffb5cacc3e114 100755 --- a/ci/scripts/integration-tests.sh +++ b/ci/scripts/integration-tests.sh @@ -71,10 +71,6 @@ if [ "${format}" == "protobuf" ]; then python3 gen_pb_compose.py ${case} ${format} fi -echo "--- set vm.max_map_count=2000000 for doris" -max_map_count_original_value=$(sysctl -n vm.max_map_count) -sudo sysctl -w vm.max_map_count=2000000 - echo "--- run Demos" python3 run_demos.py --case ${case} --format ${format} @@ -88,6 +84,3 @@ python3 check_data.py ${case} ${upstream} echo "--- clean Demos" python3 clean_demos.py --case ${case} - -echo "--- reset vm.max_map_count={$max_map_count_original_value}" -sudo sysctl -w vm.max_map_count=$max_map_count_original_value diff --git a/integration_tests/doris-sink/create_sink.sql b/integration_tests/doris-sink/create_sink.sql index 7cd1ac24857e9..017a39573855f 100644 --- a/integration_tests/doris-sink/create_sink.sql +++ b/integration_tests/doris-sink/create_sink.sql @@ -3,7 +3,7 @@ FROM bhv_mv WITH ( connector = 'doris', type = 'append-only', - doris.url = 'http://fe:8030', + doris.url = 'http://doris:8030', doris.user = 'users', doris.password = '123456', doris.database = 'demo', @@ -16,7 +16,7 @@ FROM upsert_bhv_mv WITH ( connector = 'doris', type = 'upsert', - doris.url = 'http://fe:8030', + doris.url = 'http://doris:8030', doris.user = 'users', doris.password = '123456', doris.database = 'demo', diff --git a/integration_tests/doris-sink/docker-compose.yml b/integration_tests/doris-sink/docker-compose.yml index fc7cfd751e989..b03ec51e3ac2c 100644 --- a/integration_tests/doris-sink/docker-compose.yml +++ b/integration_tests/doris-sink/docker-compose.yml @@ -1,88 +1,35 @@ --- version: "3" services: - fe: - platform: linux/amd64 - image: apache/doris:2.0.0_alpha-fe-x86_64 - hostname: fe - environment: - - FE_SERVERS=fe1:172.21.0.2:9010 - - FE_ID=1 + doris: + image: apache/doris:doris-all-in-one-2.1.0 ports: - "8030:8030" - - "9030:9030" - networks: - mynetwork: - ipv4_address: 172.21.0.2 - be: - platform: linux/amd64 - image: apache/doris:2.0.0_alpha-be-x86_64 - hostname: be - environment: - - FE_SERVERS=fe1:172.21.0.2:9010 - - BE_ADDR=172.21.0.3:9050 - depends_on: - - fe - ports: - - "9050:9050" - networks: - mynetwork: - ipv4_address: 172.21.0.3 risingwave-standalone: extends: file: ../../docker/docker-compose.yml service: risingwave-standalone - networks: - mynetwork: - ipv4_address: 172.21.0.4 etcd-0: extends: file: ../../docker/docker-compose.yml service: etcd-0 - networks: - mynetwork: - ipv4_address: 172.21.0.5 grafana-0: extends: file: ../../docker/docker-compose.yml service: grafana-0 - networks: - mynetwork: - ipv4_address: 172.21.0.6 minio-0: extends: file: ../../docker/docker-compose.yml service: minio-0 - networks: - mynetwork: - ipv4_address: 172.21.0.7 prometheus-0: extends: file: ../../docker/docker-compose.yml service: prometheus-0 - networks: - mynetwork: - ipv4_address: 172.21.0.8 - mysql: - image: mysql:latest - ports: - - "3306:3306" - volumes: - - "./doris_prepare.sql:/doris_prepare.sql" - command: tail -f /dev/null - restart: on-failure - networks: - mynetwork: - ipv4_address: 172.21.0.9 postgres: image: postgres:latest command: tail -f /dev/null volumes: - "./update_delete.sql:/update_delete.sql" - restart: on-failure - networks: - mynetwork: - ipv4_address: 172.21.0.11 volumes: risingwave-standalone: external: false @@ -97,9 +44,3 @@ volumes: message_queue: external: false name: risingwave-compose -networks: - mynetwork: - ipam: - config: - - subnet: 172.21.80.0/16 - default: diff --git a/integration_tests/doris-sink/prepare.sh b/integration_tests/doris-sink/prepare.sh index 223d092de1f7e..fb49c68492944 100755 --- a/integration_tests/doris-sink/prepare.sh +++ b/integration_tests/doris-sink/prepare.sh @@ -3,4 +3,4 @@ set -euo pipefail # setup doris -docker compose exec mysql bash -c "mysql -uroot -P9030 -hfe < doris_prepare.sql" +docker compose exec doris bash -c "mysql -uroot -P9030 -h127.0.0.1 < doris_prepare.sql" diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index ea69a03af25c9..740b7885ce78e 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -181,7 +181,7 @@ impl DorisSink { risingwave_common::types::DataType::Bytea => { Err(SinkError::Doris("doris can not support Bytea".to_string())) } - risingwave_common::types::DataType::Jsonb => Ok(doris_data_type.contains("JSONB")), + risingwave_common::types::DataType::Jsonb => Ok(doris_data_type.contains("JSON")), risingwave_common::types::DataType::Serial => Ok(doris_data_type.contains("BIGINT")), risingwave_common::types::DataType::Int256 => { Err(SinkError::Doris("doris can not support Int256".to_string())) From 27b01d624b86e946309a3195f293a9075e599e60 Mon Sep 17 00:00:00 2001 From: xfz Date: Thu, 28 Mar 2024 22:23:28 +0800 Subject: [PATCH 2/8] fix: add volumes for doris --- integration_tests/doris-sink/docker-compose.yml | 2 ++ integration_tests/doris-sink/prepare.sh | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/integration_tests/doris-sink/docker-compose.yml b/integration_tests/doris-sink/docker-compose.yml index b03ec51e3ac2c..278356f933b13 100644 --- a/integration_tests/doris-sink/docker-compose.yml +++ b/integration_tests/doris-sink/docker-compose.yml @@ -5,6 +5,8 @@ services: image: apache/doris:doris-all-in-one-2.1.0 ports: - "8030:8030" + volumes: + - "./doris_prepare.sql:/doris_prepare.sql" risingwave-standalone: extends: file: ../../docker/docker-compose.yml diff --git a/integration_tests/doris-sink/prepare.sh b/integration_tests/doris-sink/prepare.sh index fb49c68492944..85a9eeb1faf9e 100755 --- a/integration_tests/doris-sink/prepare.sh +++ b/integration_tests/doris-sink/prepare.sh @@ -3,4 +3,4 @@ set -euo pipefail # setup doris -docker compose exec doris bash -c "mysql -uroot -P9030 -h127.0.0.1 < doris_prepare.sql" +docker compose exec doris bash -c "mysql -uroot -P9030 -h127.0.0.1 < /doris_prepare.sql" From 6f0e6b04eaeca6853d15fc20fe343ee29a7cf727 Mon Sep 17 00:00:00 2001 From: xfz Date: Fri, 29 Mar 2024 16:52:49 +0800 Subject: [PATCH 3/8] fix: use fe host instead when be host is localhost --- ci/docker-compose.yml | 12 ++++ ci/scripts/e2e-doris-sink-test.sh | 4 +- ci/workflows/main-cron.yml | 49 +++++--------- ci/workflows/pull-request.yml | 39 +++++------ e2e_test/sink/doris_sink.slt | 4 +- integration_tests/doris-sink/create_sink.sql | 4 +- .../doris-sink/docker-compose.yml | 67 +++++++++++++++++-- integration_tests/doris-sink/prepare.sh | 2 +- src/connector/src/sink/doris.rs | 2 +- .../src/sink/doris_starrocks_connector.rs | 45 +++++++++++-- src/connector/src/sink/starrocks.rs | 2 +- 11 files changed, 156 insertions(+), 74 deletions(-) diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 67920dc380bbf..959f2665a73b2 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -188,6 +188,18 @@ services: environment: - CASSANDRA_CLUSTER_NAME=cloudinfra + doris-server: + container_name: doris-server + image: apache/doris:doris-all-in-one-2.1.0 + ports: + - 8030:8030 + - 8040:8040 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9030"] + interval: 5s + timeout: 5s + retries: 30 + starrocks-fe-server: container_name: starrocks-fe-server image: starrocks/fe-ubuntu:3.1.7 diff --git a/ci/scripts/e2e-doris-sink-test.sh b/ci/scripts/e2e-doris-sink-test.sh index 30bfdaf129e26..15a014afb45c6 100755 --- a/ci/scripts/e2e-doris-sink-test.sh +++ b/ci/scripts/e2e-doris-sink-test.sh @@ -30,7 +30,7 @@ sleep 1 echo "--- create doris table" apt-get update -y && apt-get install -y mysql-client sleep 2 -mysql -uroot -P 9030 -h doris-fe-server -e "CREATE database demo;use demo; +mysql -uroot -P 9030 -h doris-server -e "CREATE database demo;use demo; CREATE table demo_bhv_table(v1 int,v2 smallint,v3 bigint,v4 float,v5 double,v6 string,v7 datev2,v8 datetime,v9 boolean) UNIQUE KEY(\`v1\`) DISTRIBUTED BY HASH(\`v1\`) BUCKETS 1 PROPERTIES ( @@ -56,4 +56,4 @@ else fi echo "--- Kill cluster" -cargo make ci-kill \ No newline at end of file +cargo make ci-kill diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index ce28986d50b08..09c627de2ffe7 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -896,37 +896,24 @@ steps: timeout_in_minutes: 10 retry: *auto-retry - # Causes ci error, close it first, fix it later - # - label: "set vm_max_map_count_2000000" - # key: "set-vm_max_map_count" - # if: | - # !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null - # || build.pull_request.labels includes "ci/run-e2e-doris-sink-tests" - # || build.env("CI_STEPS") =~ /(^|,)e2e-doris-sink-tests?(,|$$)/ - # command: "sudo sysctl -w vm.max_map_count=2000000" - # depends_on: - # - "build" - # - "build-other" - - # - label: "end-to-end doris sink test" - # key: "e2e-doris-sink-tests" - # command: "ci/scripts/e2e-doris-sink-test.sh -p ci-release" - # if: | - # !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null - # || build.pull_request.labels includes "ci/run-e2e-doris-sink-tests" - # || build.env("CI_STEPS") =~ /(^|,)e2e-doris-sink-tests?(,|$$)/ - # depends_on: - # - "build" - # - "build-other" - # - "set-vm_max_map_count" - # plugins: - # - docker-compose#v5.1.0: - # run: sink-doris-env - # config: ci/docker-compose.yml - # mount-buildkite-agent: true - # - ./ci/plugins/upload-failure-logs - # timeout_in_minutes: 10 - # retry: *auto-retry + - label: "end-to-end doris sink test" + key: "e2e-doris-sink-tests" + command: "ci/scripts/e2e-doris-sink-test.sh -p ci-release" + if: | + !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null + || build.pull_request.labels includes "ci/run-e2e-doris-sink-tests" + || build.env("CI_STEPS") =~ /(^|,)e2e-doris-sink-tests?(,|$$)/ + depends_on: + - "build" + - "build-other" + plugins: + - docker-compose#v5.1.0: + run: sink-doris-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 10 + retry: *auto-retry - label: "end-to-end starrocks sink test" key: "e2e-starrocks-sink-tests" diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index f263bc18f4b57..51063fad9ac39 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -308,30 +308,21 @@ steps: timeout_in_minutes: 10 retry: *auto-retry - # Causes ci error, close it first, fix it later - # - label: "set vm_max_map_count_2000000" - # key: "set-vm_max_map_count" - # if: build.pull_request.labels includes "ci/run-e2e-doris-sink-tests" || build.env("CI_STEPS") =~ /(^|,) e2e-doris-sink-tests?(,|$$)/ - # command: "sudo sysctl -w vm.max_map_count=2000000" - # depends_on: - # - "build" - # - "build-other" - - # - label: "end-to-end doris sink test" - # if: build.pull_request.labels includes "ci/run-e2e-doris-sink-tests" || build.env("CI_STEPS") =~ /(^|,) e2e-doris-sink-tests?(,|$$)/ - # command: "ci/scripts/e2e-doris-sink-test.sh -p ci-dev" - # depends_on: - # - "build" - # - "build-other" - # - "set-vm_max_map_count" - # plugins: - # - docker-compose#v5.1.0: - # run: sink-doris-env - # config: ci/docker-compose.yml - # mount-buildkite-agent: true - # - ./ci/plugins/upload-failure-logs - # timeout_in_minutes: 10 - # retry: *auto-retry + - label: "end-to-end doris sink test" + if: build.pull_request.labels includes "ci/run-e2e-doris-sink-tests" || build.env("CI_STEPS") =~ /(^|,) e2e-doris-sink-tests?(,|$$)/ + command: "ci/scripts/e2e-doris-sink-test.sh -p ci-dev" + depends_on: + - "build" + - "build-other" + - "set-vm_max_map_count" + plugins: + - docker-compose#v5.1.0: + run: sink-doris-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 10 + retry: *auto-retry - label: "end-to-end starrocks sink test" if: build.pull_request.labels includes "ci/run-e2e-starrocks-sink-tests" || build.env("CI_STEPS") =~ /(^|,) e2e-starrocks-sink-tests?(,|$$)/ diff --git a/e2e_test/sink/doris_sink.slt b/e2e_test/sink/doris_sink.slt index 2c552bbb26143..3401e73a4784e 100644 --- a/e2e_test/sink/doris_sink.slt +++ b/e2e_test/sink/doris_sink.slt @@ -10,7 +10,7 @@ FROM mv6 WITH ( connector = 'doris', type = 'append-only', - doris.url = 'http://doris-fe-server:8030', + doris.url = 'http://doris-server:8030', doris.user = 'users', doris.password = '123456', doris.database = 'demo', @@ -31,4 +31,4 @@ statement ok DROP MATERIALIZED VIEW mv6; statement ok -DROP TABLE t6; \ No newline at end of file +DROP TABLE t6; diff --git a/integration_tests/doris-sink/create_sink.sql b/integration_tests/doris-sink/create_sink.sql index 017a39573855f..7cd1ac24857e9 100644 --- a/integration_tests/doris-sink/create_sink.sql +++ b/integration_tests/doris-sink/create_sink.sql @@ -3,7 +3,7 @@ FROM bhv_mv WITH ( connector = 'doris', type = 'append-only', - doris.url = 'http://doris:8030', + doris.url = 'http://fe:8030', doris.user = 'users', doris.password = '123456', doris.database = 'demo', @@ -16,7 +16,7 @@ FROM upsert_bhv_mv WITH ( connector = 'doris', type = 'upsert', - doris.url = 'http://doris:8030', + doris.url = 'http://fe:8030', doris.user = 'users', doris.password = '123456', doris.database = 'demo', diff --git a/integration_tests/doris-sink/docker-compose.yml b/integration_tests/doris-sink/docker-compose.yml index 278356f933b13..fce87c7f562b7 100644 --- a/integration_tests/doris-sink/docker-compose.yml +++ b/integration_tests/doris-sink/docker-compose.yml @@ -1,37 +1,88 @@ --- version: "3" services: - doris: - image: apache/doris:doris-all-in-one-2.1.0 + fe: + platform: linux/amd64 + image: apache/doris:2.0.0_alpha-fe-x86_64 + hostname: fe + environment: + - FE_SERVERS=fe1:172.21.0.2:9010 + - FE_ID=1 ports: - "8030:8030" - volumes: - - "./doris_prepare.sql:/doris_prepare.sql" + - "9030:9030" + networks: + mynetwork: + ipv4_address: 172.21.0.2 + be: + platform: linux/amd64 + image: apache/doris:2.0.0_alpha-be-x86_64 + hostname: be + environment: + - FE_SERVERS=fe1:172.21.0.2:9010 + - BE_ADDR=172.21.0.3:9050 + depends_on: + - fe + ports: + - "9050:9050" + networks: + mynetwork: + ipv4_address: 172.21.0.3 risingwave-standalone: extends: file: ../../docker/docker-compose.yml service: risingwave-standalone + networks: + mynetwork: + ipv4_address: 172.21.0.4 etcd-0: extends: file: ../../docker/docker-compose.yml service: etcd-0 - grafana-0: + networks: + mynetwork: + ipv4_address: 172.21.0.5 + grafana-0: extends: file: ../../docker/docker-compose.yml service: grafana-0 + networks: + mynetwork: + ipv4_address: 172.21.0.6 minio-0: extends: file: ../../docker/docker-compose.yml service: minio-0 + networks: + mynetwork: + ipv4_address: 172.21.0.7 prometheus-0: extends: file: ../../docker/docker-compose.yml service: prometheus-0 + networks: + mynetwork: + ipv4_address: 172.21.0.8 + mysql: + image: mysql:latest + ports: + - "3306:3306" + volumes: + - "./doris_prepare.sql:/doris_prepare.sql" + command: tail -f /dev/null + restart: on-failure + networks: + mynetwork: + ipv4_address: 172.21.0.9 postgres: image: postgres:latest command: tail -f /dev/null volumes: - "./update_delete.sql:/update_delete.sql" + restart: on-failure + networks: + mynetwork: + ipv4_address: 172.21.0.11 volumes: risingwave-standalone: external: false @@ -46,3 +97,9 @@ volumes: message_queue: external: false name: risingwave-compose +networks: + mynetwork: + ipam: + config: + - subnet: 172.21.80.0/16 + default: diff --git a/integration_tests/doris-sink/prepare.sh b/integration_tests/doris-sink/prepare.sh index 85a9eeb1faf9e..223d092de1f7e 100755 --- a/integration_tests/doris-sink/prepare.sh +++ b/integration_tests/doris-sink/prepare.sh @@ -3,4 +3,4 @@ set -euo pipefail # setup doris -docker compose exec doris bash -c "mysql -uroot -P9030 -h127.0.0.1 < /doris_prepare.sql" +docker compose exec mysql bash -c "mysql -uroot -P9030 -hfe < doris_prepare.sql" diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index 740b7885ce78e..4a5a1ded01f9e 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -286,7 +286,7 @@ impl DorisSinkWriter { config.common.database.clone(), config.common.table.clone(), header, - ); + )?; Ok(Self { config, schema: schema.clone(), diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 3880dad8d1952..3f175950f466e 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -26,6 +26,7 @@ use hyper::client::HttpConnector; use hyper::{body, Client, Request, StatusCode}; use hyper_tls::HttpsConnector; use tokio::task::JoinHandle; +use url::Url; use super::{Result, SinkError}; @@ -40,6 +41,8 @@ const WAIT_HANDDLE_TIMEOUT: Duration = Duration::from_secs(10); pub(crate) const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30); const DORIS: &str = "doris"; const STARROCKS: &str = "starrocks"; +const LOCALHOST: &str = "localhost"; +const LOCALHOST_IP: &str = "127.0.0.1"; pub struct HeaderBuilder { header: HashMap, } @@ -158,16 +161,30 @@ pub struct InserterInnerBuilder { url: String, header: HashMap, sender: Option, + fe_host: String, } impl InserterInnerBuilder { - pub fn new(url: String, db: String, table: String, header: HashMap) -> Self { + pub fn new( + url: String, + db: String, + table: String, + header: HashMap, + ) -> Result { + let fe_host = Url::parse(&url) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? + .host_str() + .ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get fe host from url")) + })? + .to_string(); let url = format!("{}/api/{}/{}/_stream_load", url, db, table); - Self { + Ok(Self { url, sender: None, header, - } + fe_host, + }) } fn build_request_and_client( @@ -196,7 +213,7 @@ impl InserterInnerBuilder { .request(request_get_url) .await .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - let be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT { + let mut be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT { resp.headers() .get("location") .ok_or_else(|| { @@ -207,13 +224,31 @@ impl InserterInnerBuilder { .to_str() .context("Can't get doris BE url in header") .map_err(SinkError::DorisStarrocksConnect)? + .to_string() } else { return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( "Can't get doris BE url", ))); }; - let (builder, client) = self.build_request_and_client(be_url.to_string()); + if self.fe_host != LOCALHOST && self.fe_host != LOCALHOST_IP { + let mut parsed_be_url = + Url::parse(&be_url).map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + let be_host = parsed_be_url.host_str().ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get be host from url")) + })?; + + if be_host == LOCALHOST || be_host == LOCALHOST_IP { + // if be host is 127.0.0.1, we may can't connect to it directly, + // so replace it with fe host + parsed_be_url + .set_host(Some(self.fe_host.as_str())) + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + be_url = parsed_be_url.as_str().into(); + } + } + + let (builder, client) = self.build_request_and_client(be_url); let (sender, body) = Body::channel(); let request = builder .body(body) diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 192bf38fd839f..d6d2c6104d11f 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -359,7 +359,7 @@ impl StarrocksSinkWriter { config.common.database.clone(), config.common.table.clone(), header, - ); + )?; Ok(Self { config, schema: schema.clone(), From 4eebdfcf1103bfa1f4552f36b8caaab5e7c6bcd6 Mon Sep 17 00:00:00 2001 From: xfz Date: Fri, 29 Mar 2024 16:59:29 +0800 Subject: [PATCH 4/8] revert integration test --- ci/scripts/gen-integration-test-yaml.py | 78 +++++++++---------- ci/scripts/integration-tests.sh | 7 ++ .../doris-sink/docker-compose.yml | 2 +- 3 files changed, 47 insertions(+), 40 deletions(-) diff --git a/ci/scripts/gen-integration-test-yaml.py b/ci/scripts/gen-integration-test-yaml.py index 2948380b67002..6332b98ecca9f 100644 --- a/ci/scripts/gen-integration-test-yaml.py +++ b/ci/scripts/gen-integration-test-yaml.py @@ -3,46 +3,46 @@ import subprocess CASES_MAP = { - # 'ad-click': ['json'], - # 'ad-ctr': ['json'], - # 'cdn-metrics': ['json'], - # 'clickstream': ['json'], - # 'livestream': ['json', 'protobuf'], - # 'prometheus': ['json'], - # 'schema-registry': ['json'], - # 'mysql-cdc': ['json'], - # 'postgres-cdc': ['json'], - # 'mongodb-cdc': ['json'], - # 'mysql-sink': ['json'], - # 'postgres-sink': ['json'], - # 'iceberg-cdc': ['json'], - # 'iceberg-sink': ['none'], - # 'iceberg-source': ['none'], - # 'twitter': ['json', 'protobuf'], - # 'twitter-pulsar': ['json'], - # 'debezium-mysql': ['json'], - # 'debezium-postgres': ['json'], - # 'debezium-sqlserver': ['json'], - # 'tidb-cdc-sink': ['json'], - # 'citus-cdc': ['json'], - # 'kinesis-s3-source': ['json'], - # 'clickhouse-sink': ['json'], - # 'cockroach-sink': ['json'], - # 'kafka-cdc-sink': ['json'], - # 'cassandra-and-scylladb-sink': ['json'], - # 'elasticsearch-sink': ['json'], - # 'redis-sink': ['json'], - # 'big-query-sink': ['json'], - # 'mindsdb': ['json'], - # 'vector': ['json'], - # 'nats': ['json', 'protobuf'], - # 'mqtt': ['json'], + 'ad-click': ['json'], + 'ad-ctr': ['json'], + 'cdn-metrics': ['json'], + 'clickstream': ['json'], + 'livestream': ['json', 'protobuf'], + 'prometheus': ['json'], + 'schema-registry': ['json'], + 'mysql-cdc': ['json'], + 'postgres-cdc': ['json'], + 'mongodb-cdc': ['json'], + 'mysql-sink': ['json'], + 'postgres-sink': ['json'], + 'iceberg-cdc': ['json'], + 'iceberg-sink': ['none'], + 'iceberg-source': ['none'], + 'twitter': ['json', 'protobuf'], + 'twitter-pulsar': ['json'], + 'debezium-mysql': ['json'], + 'debezium-postgres': ['json'], + 'debezium-sqlserver': ['json'], + 'tidb-cdc-sink': ['json'], + 'citus-cdc': ['json'], + 'kinesis-s3-source': ['json'], + 'clickhouse-sink': ['json'], + 'cockroach-sink': ['json'], + 'kafka-cdc-sink': ['json'], + 'cassandra-and-scylladb-sink': ['json'], + 'elasticsearch-sink': ['json'], + 'redis-sink': ['json'], + 'big-query-sink': ['json'], + 'mindsdb': ['json'], + 'vector': ['json'], + 'nats': ['json', 'protobuf'], + 'mqtt': ['json'], 'doris-sink': ['json'], - # 'starrocks-sink': ['json'], - # 'deltalake-sink': ['json'], - # 'pinot-sink': ['json'], - # 'presto-trino': ['json'], - # 'client-library': ['none'], + 'starrocks-sink': ['json'], + 'deltalake-sink': ['json'], + 'pinot-sink': ['json'], + 'presto-trino': ['json'], + 'client-library': ['none'], } def gen_pipeline_steps(): diff --git a/ci/scripts/integration-tests.sh b/ci/scripts/integration-tests.sh index ffb5cacc3e114..ef6c024ac9db0 100755 --- a/ci/scripts/integration-tests.sh +++ b/ci/scripts/integration-tests.sh @@ -71,6 +71,10 @@ if [ "${format}" == "protobuf" ]; then python3 gen_pb_compose.py ${case} ${format} fi +echo "--- set vm.max_map_count=2000000 for doris" +max_map_count_original_value=$(sysctl -n vm.max_map_count) +sudo sysctl -w vm.max_map_count=2000000 + echo "--- run Demos" python3 run_demos.py --case ${case} --format ${format} @@ -84,3 +88,6 @@ python3 check_data.py ${case} ${upstream} echo "--- clean Demos" python3 clean_demos.py --case ${case} + +echo "--- reset vm.max_map_count={$max_map_count_original_value}" +sudo sysctl -w vm.max_map_count=$max_map_count_original_value diff --git a/integration_tests/doris-sink/docker-compose.yml b/integration_tests/doris-sink/docker-compose.yml index fce87c7f562b7..fc7cfd751e989 100644 --- a/integration_tests/doris-sink/docker-compose.yml +++ b/integration_tests/doris-sink/docker-compose.yml @@ -42,7 +42,7 @@ services: networks: mynetwork: ipv4_address: 172.21.0.5 - grafana-0: + grafana-0: extends: file: ../../docker/docker-compose.yml service: grafana-0 From 621d298c775dbe378b9190da9f7135f907e5fbf5 Mon Sep 17 00:00:00 2001 From: xfz Date: Fri, 29 Mar 2024 17:09:04 +0800 Subject: [PATCH 5/8] fix --- ci/docker-compose.yml | 1 + ci/workflows/main-cron.yml | 2 +- ci/workflows/pull-request.yml | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 959f2665a73b2..498a0420763be 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -94,6 +94,7 @@ services: - redis-server - pulsar-server - cassandra-server + - doris-server - starrocks-fe-server - starrocks-be-server volumes: diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 09c627de2ffe7..c4469346690a8 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -908,7 +908,7 @@ steps: - "build-other" plugins: - docker-compose#v5.1.0: - run: sink-doris-env + run: sink-test-env config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 51063fad9ac39..e8e513be4fc7b 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -317,7 +317,7 @@ steps: - "set-vm_max_map_count" plugins: - docker-compose#v5.1.0: - run: sink-doris-env + run: sink-test-env config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs From 4711e2971e58ab8546ed3d22844740284747729c Mon Sep 17 00:00:00 2001 From: xfz Date: Fri, 29 Mar 2024 17:11:41 +0800 Subject: [PATCH 6/8] fix --- ci/workflows/pull-request.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index e8e513be4fc7b..3efe7d29f3768 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -314,7 +314,6 @@ steps: depends_on: - "build" - "build-other" - - "set-vm_max_map_count" plugins: - docker-compose#v5.1.0: run: sink-test-env From ad0848cf6f0e5b148aab49cb62e9c143b228a50d Mon Sep 17 00:00:00 2001 From: xfz Date: Fri, 29 Mar 2024 17:52:39 +0800 Subject: [PATCH 7/8] fix host --- ci/scripts/e2e-doris-sink-test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/scripts/e2e-doris-sink-test.sh b/ci/scripts/e2e-doris-sink-test.sh index 15a014afb45c6..7c4c8ec1c7c41 100755 --- a/ci/scripts/e2e-doris-sink-test.sh +++ b/ci/scripts/e2e-doris-sink-test.sh @@ -43,7 +43,7 @@ sleep 2 echo "--- testing sinks" sqllogictest -p 4566 -d dev './e2e_test/sink/doris_sink.slt' sleep 1 -mysql -uroot -P 9030 -h doris-fe-server -e "select * from demo.demo_bhv_table" > ./query_result.csv +mysql -uroot -P 9030 -h doris-server -e "select * from demo.demo_bhv_table" > ./query_result.csv if cat ./query_result.csv | sed '1d; s/\t/,/g' | awk -F "," '{ From 031d6674a7f52759c5cc4f10f32db6843e843409 Mon Sep 17 00:00:00 2001 From: xfz Date: Fri, 29 Mar 2024 18:42:29 +0800 Subject: [PATCH 8/8] test: add jsonb column --- ci/scripts/e2e-doris-sink-test.sh | 4 ++-- e2e_test/sink/doris_sink.slt | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ci/scripts/e2e-doris-sink-test.sh b/ci/scripts/e2e-doris-sink-test.sh index 7c4c8ec1c7c41..859dd411469dd 100755 --- a/ci/scripts/e2e-doris-sink-test.sh +++ b/ci/scripts/e2e-doris-sink-test.sh @@ -31,7 +31,7 @@ echo "--- create doris table" apt-get update -y && apt-get install -y mysql-client sleep 2 mysql -uroot -P 9030 -h doris-server -e "CREATE database demo;use demo; -CREATE table demo_bhv_table(v1 int,v2 smallint,v3 bigint,v4 float,v5 double,v6 string,v7 datev2,v8 datetime,v9 boolean) UNIQUE KEY(\`v1\`) +CREATE table demo_bhv_table(v1 int,v2 smallint,v3 bigint,v4 float,v5 double,v6 string,v7 datev2,v8 datetime,v9 boolean,v10 json) UNIQUE KEY(\`v1\`) DISTRIBUTED BY HASH(\`v1\`) BUCKETS 1 PROPERTIES ( \"replication_allocation\" = \"tag.location.default: 1\" @@ -47,7 +47,7 @@ mysql -uroot -P 9030 -h doris-server -e "select * from demo.demo_bhv_table" > ./ if cat ./query_result.csv | sed '1d; s/\t/,/g' | awk -F "," '{ - exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01" && $9 == 0); }'; then + exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01" && $9 == 0 && $10 == "{\"a\":1}"); }'; then echo "Doris sink check passed" else cat ./query_result.csv diff --git a/e2e_test/sink/doris_sink.slt b/e2e_test/sink/doris_sink.slt index 3401e73a4784e..3242206badaea 100644 --- a/e2e_test/sink/doris_sink.slt +++ b/e2e_test/sink/doris_sink.slt @@ -1,5 +1,5 @@ statement ok -CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamp, v9 boolean); +CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamp, v9 boolean, v10 jsonb); statement ok CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; @@ -19,7 +19,7 @@ FROM ); statement ok -INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01' , false); +INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01' , false, '{"a":1}'); statement ok FLUSH;