diff --git a/.gitignore b/.gitignore index 19fb6643dd8a6..375738f67093e 100644 --- a/.gitignore +++ b/.gitignore @@ -74,4 +74,7 @@ simulation-it-test.tar.zst # hummock-trace .trace +# spark binary +e2e_test/iceberg/spark-*-bin* + **/poetry.lock \ No newline at end of file diff --git a/ci/scripts/e2e-iceberg-cdc.sh b/ci/scripts/e2e-iceberg-cdc.sh new file mode 100755 index 0000000000000..081f5bbd2afcb --- /dev/null +++ b/ci/scripts/e2e-iceberg-cdc.sh @@ -0,0 +1,91 @@ +#!/usr/bin/env bash + +# Exits as soon as any line fails. +set -euo pipefail + +source ci/scripts/common.sh + +# prepare environment +export CONNECTOR_RPC_ENDPOINT="localhost:50051" +export CONNECTOR_LIBS_PATH="./connector-node/libs" + +while getopts 'p:' opt; do + case ${opt} in + p ) + profile=$OPTARG + ;; + \? ) + echo "Invalid Option: -$OPTARG" 1>&2 + exit 1 + ;; + : ) + echo "Invalid option: $OPTARG requires an argument" 1>&2 + ;; + esac +done +shift $((OPTIND -1)) + +download_and_prepare_rw "$profile" source + +echo "--- Download connector node package" +buildkite-agent artifact download risingwave-connector.tar.gz ./ +mkdir ./connector-node +tar xf ./risingwave-connector.tar.gz -C ./connector-node + +echo "--- e2e, ci-1cn-1fe, iceberg cdc" + +node_port=50051 +node_timeout=10 + +wait_for_connector_node_start() { + start_time=$(date +%s) + while : + do + if nc -z localhost $node_port; then + echo "Port $node_port is listened! Connector Node is up!" + break + fi + + current_time=$(date +%s) + elapsed_time=$((current_time - start_time)) + if [ $elapsed_time -ge $node_timeout ]; then + echo "Timeout waiting for port $node_port to be listened!" + exit 1 + fi + sleep 0.1 + done + sleep 2 +} + +echo "--- starting risingwave cluster with connector node" + +RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ +cargo make ci-start ci-1cn-1fe-with-recovery +./connector-node/start-service.sh -p $node_port > .risingwave/log/connector-node.log 2>&1 & +echo "waiting for connector node to start" +wait_for_connector_node_start + +# prepare minio iceberg sink +echo "--- preparing iceberg" +.risingwave/bin/mcli -C .risingwave/config/mcli mb hummock-minio/icebergdata + +cd e2e_test/iceberg +bash ./start_spark_connect_server.sh + +# Don't remove the `--quiet` option since poetry has a bug when printing output, see +# https://github.com/python-poetry/poetry/issues/3412 +"$HOME"/.local/bin/poetry update --quiet + +# 1. import data to mysql +mysql --host=mysql --port=3306 -u root -p123456 < ./test_case/cdc/mysql_cdc.sql + +# 2. create table and sink +"$HOME"/.local/bin/poetry run python main.py -t ./test_case/cdc/no_partition_cdc_init.toml + +# 3. insert new data to mysql +mysql --host=mysql --port=3306 -u root -p123456 < ./test_case/cdc/mysql_cdc_insert.sql + +sleep 20 + +# 4. check change +"$HOME"/.local/bin/poetry run python main.py -t ./test_case/cdc/no_partition_cdc.toml \ No newline at end of file diff --git a/ci/workflows/integration-tests.yml b/ci/workflows/integration-tests.yml index 4bd0ec1a000b1..455f29b210ec1 100644 --- a/ci/workflows/integration-tests.yml +++ b/ci/workflows/integration-tests.yml @@ -29,6 +29,7 @@ steps: - "postgres-cdc" - "mysql-sink" - "postgres-sink" + - "iceberg-cdc" # - "iceberg-sink" - "debezium-mysql" format: @@ -79,6 +80,10 @@ steps: # testcase: "iceberg-sink" # format: "protobuf" # skip: true + - with: + testcase: "iceberg-cdc" + format: "protobuf" + skip: true - with: testcase: "debezium-mysql" format: "protobuf" diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 985bd0be4b822..3aaa09f0d7716 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -209,6 +209,21 @@ steps: timeout_in_minutes: 10 retry: *auto-retry + - label: "end-to-end iceberg cdc test" + if: build.pull_request.labels includes "ci/run-e2e-iceberg-sink-tests" + command: "ci/scripts/e2e-iceberg-cdc.sh -p ci-dev" + depends_on: + - "build" + - "build-other" + plugins: + - docker-compose#v4.9.0: + run: sink-test-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 10 + retry: *auto-retry + - label: "end-to-end pulsar sink test" if: build.pull_request.labels includes "ci/run-e2e-pulsar-sink-tests" command: "ci/scripts/e2e-pulsar-sink-test.sh -p ci-dev" diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index d25c94daf2670..4dbd5fe5bb28d 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -260,6 +260,7 @@ services: MINIO_PROMETHEUS_URL: "http://prometheus-0:9500" MINIO_ROOT_PASSWORD: hummockadmin MINIO_ROOT_USER: hummockadmin + MINIO_DOMAIN: "minio-0" container_name: minio-0 healthcheck: test: diff --git a/e2e_test/iceberg/main.py b/e2e_test/iceberg/main.py index fa07aa367a9b3..3f3120227e6e7 100644 --- a/e2e_test/iceberg/main.py +++ b/e2e_test/iceberg/main.py @@ -42,14 +42,16 @@ def init_iceberg_table(args,init_sqls): spark.sql(sql) -def init_risingwave_mv(args,slt): +def execute_slt(args,slt): + if slt is None or slt == "": + return rw_config = args['risingwave'] cmd = f"sqllogictest -p {rw_config['port']} -d {rw_config['db']} {slt}" print(f"Command line is [{cmd}]") subprocess.run(cmd, shell=True, check=True) - time.sleep(10) + time.sleep(30) def verify_result(args,verify_sql,verify_schema,verify_data): @@ -110,6 +112,6 @@ def drop_table(args,drop_sqls): print({section: dict(config[section]) for section in config.sections()}) init_iceberg_table(config,init_sqls) - init_risingwave_mv(config,slt) + execute_slt(config,slt) verify_result(config,verify_sql,verify_schema,verify_data) drop_table(config,drop_sqls) diff --git a/e2e_test/iceberg/test_case/cdc/load.slt b/e2e_test/iceberg/test_case/cdc/load.slt new file mode 100644 index 0000000000000..caefd1326bbda --- /dev/null +++ b/e2e_test/iceberg/test_case/cdc/load.slt @@ -0,0 +1,46 @@ +# CDC source basic test + +# enable cdc backfill in ci +statement ok +set cdc_backfill='true'; + +statement ok +create table products ( id INT, + name STRING, + description STRING, + PRIMARY KEY (id) +) with ( + connector = 'mysql-cdc', + hostname = 'mysql', + port = '3306', + username = 'root', + password = '123456', + database.name = 'my@db', + table.name = 'products', + server.id = '5085' +); + + +statement ok +CREATE SINK s1 AS select * from products WITH ( + connector = 'iceberg', + type = 'upsert', + force_append_only = 'false', + database.name = 'demo', + table.name = 'demo_db.demo_table', + catalog.type = 'storage', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + primary_key = 'id' +); + +query I +select count(*) from products; +---- +8 + +statement ok +flush; diff --git a/e2e_test/iceberg/test_case/cdc/mysql_cdc.sql b/e2e_test/iceberg/test_case/cdc/mysql_cdc.sql new file mode 100644 index 0000000000000..b7b6f13af83cf --- /dev/null +++ b/e2e_test/iceberg/test_case/cdc/mysql_cdc.sql @@ -0,0 +1,21 @@ +DROP DATABASE IF EXISTS `my@db`; +CREATE DATABASE `my@db`; + +USE `my@db`; + +CREATE TABLE products ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL, + description VARCHAR(512) +); + +ALTER TABLE products AUTO_INCREMENT = 101; + +INSERT INTO products VALUES (default,"101","101"), +(default,"102","102"), +(default,"103","103"), +(default,"104","104"), +(default,"105","105"), +(default,"106","106"), +(default,"107","107"), +(default,"108","108") diff --git a/e2e_test/iceberg/test_case/cdc/mysql_cdc_insert.sql b/e2e_test/iceberg/test_case/cdc/mysql_cdc_insert.sql new file mode 100644 index 0000000000000..641d6220ea8dc --- /dev/null +++ b/e2e_test/iceberg/test_case/cdc/mysql_cdc_insert.sql @@ -0,0 +1,7 @@ +USE `my@db`; + +INSERT INTO products VALUES (default,"109","109"), +(default,"110","110"), +(default,"111","111"), +(default,"112","112"), +(default,"113","113"); diff --git a/e2e_test/iceberg/test_case/cdc/no_partition_cdc.toml b/e2e_test/iceberg/test_case/cdc/no_partition_cdc.toml new file mode 100644 index 0000000000000..5ab9647b12eb0 --- /dev/null +++ b/e2e_test/iceberg/test_case/cdc/no_partition_cdc.toml @@ -0,0 +1,25 @@ +init_sqls = [] + +slt = '' + +verify_schema = ['int','string','string'] + +verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id ASC' + +verify_data = """ +101,101,101 +102,102,102 +103,103,103 +104,104,104 +105,105,105 +106,106,106 +107,107,107 +108,108,108 +109,109,109 +110,110,110 +111,111,111 +112,112,112 +113,113,113 +""" + +drop_sqls = [] diff --git a/e2e_test/iceberg/test_case/cdc/no_partition_cdc_init.toml b/e2e_test/iceberg/test_case/cdc/no_partition_cdc_init.toml new file mode 100644 index 0000000000000..17e5f7497aae5 --- /dev/null +++ b/e2e_test/iceberg/test_case/cdc/no_partition_cdc_init.toml @@ -0,0 +1,31 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.demo_table', + ''' + CREATE TABLE demo_db.demo_table ( + id int, + name string, + description string + ) USING iceberg + TBLPROPERTIES ('format-version'='2'); + ''' +] + +slt = 'test_case/cdc/load.slt' + +verify_schema = ['int','string','string'] + +verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id ASC' + +verify_data = """ +101,101,101 +102,102,102 +103,103,103 +104,104,104 +105,105,105 +106,106,106 +107,107,107 +108,108,108 +""" + +drop_sqls = [] diff --git a/integration_tests/iceberg-cdc/README.md b/integration_tests/iceberg-cdc/README.md new file mode 100644 index 0000000000000..56f40172c3dfa --- /dev/null +++ b/integration_tests/iceberg-cdc/README.md @@ -0,0 +1,5 @@ +# Iceberg CDC Integration Tests +`mysql -> rw -> iceberg` + +# How to run +./run_test.sh \ No newline at end of file diff --git a/integration_tests/iceberg-cdc/docker-compose.yaml b/integration_tests/iceberg-cdc/docker-compose.yaml new file mode 100644 index 0000000000000..8e9ad1062ef38 --- /dev/null +++ b/integration_tests/iceberg-cdc/docker-compose.yaml @@ -0,0 +1,142 @@ +version: '3.8' + +services: + compactor-0: + extends: + file: ../../docker/docker-compose.yml + service: compactor-0 + compute-node-0: + extends: + file: ../../docker/docker-compose.yml + service: compute-node-0 + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + frontend-node-0: + extends: + file: ../../docker/docker-compose.yml + service: frontend-node-0 + meta-node-0: + extends: + file: ../../docker/docker-compose.yml + service: meta-node-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + mc: + depends_on: + - minio-0 + image: minio/mc + environment: + - AWS_ACCESS_KEY_ID=hummockadmin + - AWS_SECRET_ACCESS_KEY=hummockadmin + - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " + until (/usr/bin/mc config host add minio http://minio-0:9301 hummockadmin hummockadmin) do echo '...waiting...' && sleep 1; done; + /usr/bin/mc rm -r --force minio/icebergdata; + /usr/bin/mc mb minio/icebergdata; + /usr/bin/mc anonymous set public minio/icebergdata; + tail -f /dev/null + " + + mysql: + image: mysql:8.0 + expose: + - 3306 + ports: + - "3306:3306" + environment: + - MYSQL_ROOT_PASSWORD=123456 + - MYSQL_USER=mysqluser + - MYSQL_PASSWORD=mysqlpw + - MYSQL_DATABASE=mydb + healthcheck: + test: [ "CMD-SHELL", "mysqladmin ping -h 127.0.0.1 -u root -p123456" ] + interval: 5s + timeout: 5s + retries: 5 + container_name: mysql + prepare_mysql: + image: mysql:8.0 + depends_on: + - mysql + command: + - /bin/sh + - -c + - "mysql -p123456 -h mysql mydb < mysql_prepare.sql" + volumes: + - "./mysql_prepare.sql:/mysql_prepare.sql" + container_name: prepare_mysql + restart: on-failure + + rest: + image: tabulario/iceberg-rest:0.6.0 + environment: + - AWS_ACCESS_KEY_ID=hummockadmin + - AWS_SECRET_ACCESS_KEY=hummockadmin + - AWS_REGION=us-east-1 + - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog + - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory + - CATALOG_WAREHOUSE=s3://icebergdata/demo + - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO + - CATALOG_S3_ENDPOINT=http://minio-0:9301 + depends_on: + - minio-0 + # let the rest access minio through: hummock001.minio-0 + links: + - minio-0:icebergdata.minio-0 + expose: + - 8181 + ports: + - "8181:8181" + + spark: + depends_on: + - minio-0 + - rest + image: ghcr.io/icelake-io/icelake-spark:latest + environment: + - AWS_ACCESS_KEY_ID=hummockadmin + - AWS_SECRET_ACCESS_KEY=hummockadmin + - AWS_REGION=us-east-1 + - SPARK_HOME=/opt/spark + - PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/spark/bin:/opt/spark/sbin + user: root + links: + - minio-0:icebergdata.minio-0 + expose: + - 15002 + ports: + - "15002:15002" + healthcheck: + test: netstat -ltn | grep -c 15002 + interval: 1s + retries: 1200 + volumes: + - ./spark:/spark + command: [ "bash", "/spark/spark-connect-server.sh" ] + +volumes: + compute-node-0: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false + spark: + external: false diff --git a/integration_tests/iceberg-cdc/mysql_prepare.sql b/integration_tests/iceberg-cdc/mysql_prepare.sql new file mode 100644 index 0000000000000..3e5a236a41205 --- /dev/null +++ b/integration_tests/iceberg-cdc/mysql_prepare.sql @@ -0,0 +1,15 @@ +-- mysql -p123456 -uroot -h 127.0.0.1 mydb < mysql_prepare.sql +-- +-- Mysql +USE mydb; + +CREATE TABLE user_behaviors ( + user_id VARCHAR(60), + target_id VARCHAR(60), + target_type VARCHAR(60), + event_timestamp VARCHAR(100), + behavior_type VARCHAR(60), + parent_target_type VARCHAR(60), + parent_target_id VARCHAR(60), + PRIMARY KEY(user_id, target_id, event_timestamp) +); diff --git a/integration_tests/iceberg-cdc/python/check.py b/integration_tests/iceberg-cdc/python/check.py new file mode 100644 index 0000000000000..699fa4df29c30 --- /dev/null +++ b/integration_tests/iceberg-cdc/python/check.py @@ -0,0 +1,25 @@ +from pyspark.sql import SparkSession +import configparser +import psycopg2 + +def check_spark_table(args): + expect_row_count = 0 + rw_config = args['risingwave'] + with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'], + port=rw_config['port']) as conn: + with conn.cursor() as cursor: + cursor.execute("SELECT COUNT(*) FROM user_behaviors") + expect_row_count = cursor.fetchone()[0] + print(f"expect_row_count is {expect_row_count}") + spark_config = args['spark'] + spark = SparkSession.builder.remote(spark_config['url']).getOrCreate() + actual_row_count = spark.sql("SELECT COUNT(*) FROM s1.t1").collect()[0][0] + print(f"actual_row_count is {actual_row_count}") + assert actual_row_count==expect_row_count + + +if __name__ == "__main__": + config = configparser.ConfigParser() + config.read("config.ini") + print({section: dict(config[section]) for section in config.sections()}) + check_spark_table(config) diff --git a/integration_tests/iceberg-cdc/python/config.ini b/integration_tests/iceberg-cdc/python/config.ini new file mode 100644 index 0000000000000..bd95eddc5b80e --- /dev/null +++ b/integration_tests/iceberg-cdc/python/config.ini @@ -0,0 +1,8 @@ +[spark] +url=sc://localhost:15002 + +[risingwave] +db=dev +user=root +host=127.0.0.1 +port=4566 diff --git a/integration_tests/iceberg-cdc/python/init.py b/integration_tests/iceberg-cdc/python/init.py new file mode 100644 index 0000000000000..289fa2f161889 --- /dev/null +++ b/integration_tests/iceberg-cdc/python/init.py @@ -0,0 +1,103 @@ +from pyspark.sql import SparkSession +import configparser +import psycopg2 + + +def init_spark_table(args): + spark_config = args['spark'] + spark = SparkSession.builder.remote(spark_config['url']).getOrCreate() + + init_table_sqls = [ + "CREATE SCHEMA IF NOT EXISTS s1", + "DROP TABLE IF EXISTS s1.t1", + """ + CREATE TABLE s1.t1 + ( + user_id string, + target_id string, + target_type string, + event_timestamp string, + behavior_type string, + parent_target_type string, + parent_target_id string + ) USING iceberg + TBLPROPERTIES ('format-version'='2'); + """, + ] + + for sql in init_table_sqls: + print(f"Executing sql: {sql}") + spark.sql(sql) + + +def init_risingwave_mv(args): + rw_config = args['risingwave'] + sqls = [ + "set streaming_parallelism = 4", + """ + CREATE TABLE user_behaviors ( + user_id VARCHAR, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp VARCHAR, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id, target_id, event_timestamp) + ) with ( + connector = 'mysql-cdc', + hostname = 'mysql', + port = '3306', + username = 'root', + password = '123456', + database.name = 'mydb', + table.name = 'user_behaviors', + server.id = '1' + ); + """, + # f""" + # CREATE SINK s1 + # AS SELECT * FROM user_behaviors + # WITH ( + # connector='iceberg', + # type='upsert', + # primary_key = 'user_id, target_id, event_timestamp', + # catalog.type = 'storage', + # s3.endpoint = 'http://minio-0:9301', + # s3.access.key = 'hummockadmin', + # s3.secret.key = 'hummockadmin', + # database.name='demo', + # table.name='s1.t1',warehouse.path = 's3://hummock001/icebergdata/demo',s3.region = 'us-east-1' + # ); + # """ + f""" + CREATE SINK s1 + AS SELECT * FROM user_behaviors + WITH ( + connector='iceberg', + type='upsert', + primary_key = 'user_id, target_id, event_timestamp', + catalog.type = 'rest', + catalog.uri = 'http://rest:8181', + s3.endpoint = 'http://minio-0:9301', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + database.name='demo', + table.name='s1.t1',warehouse.path = 's3://icebergdata/demo/s1/t1',s3.region = 'us-east-1' + ); + """ + ] + with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'], + port=rw_config['port']) as conn: + with conn.cursor() as cursor: + for sql in sqls: + print(f"Executing sql {sql}") + cursor.execute(sql) + + +if __name__ == "__main__": + config = configparser.ConfigParser() + config.read("config.ini") + print({section: dict(config[section]) for section in config.sections()}) + init_spark_table(config) + init_risingwave_mv(config) diff --git a/integration_tests/iceberg-cdc/python/pyproject.toml b/integration_tests/iceberg-cdc/python/pyproject.toml new file mode 100644 index 0000000000000..4c7bce1165796 --- /dev/null +++ b/integration_tests/iceberg-cdc/python/pyproject.toml @@ -0,0 +1,16 @@ +[tool.poetry] +name = "icelake-integration-tests" +version = "0.0.9" +description = "" +authors = ["Renjie Liu "] +readme = "README.md" +packages = [{include = "icelake_integration_tests"}] + +[tool.poetry.dependencies] +python = "^3.11" +pyspark = { version = "3.4.1", extras = ["sql", "connect"] } +psycopg2-binary = "^2.9" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/integration_tests/iceberg-cdc/run_test.sh b/integration_tests/iceberg-cdc/run_test.sh new file mode 100755 index 0000000000000..2d8b691bc7284 --- /dev/null +++ b/integration_tests/iceberg-cdc/run_test.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +# Start test environment. +docker-compose up -d --wait + +# To avoid exiting by unhealth, set it after start environment. +set -ex + +# Generate data +docker build -t iceberg-cdc-datagen ../datagen +timeout 20 docker run --network=iceberg-cdc_default iceberg-cdc-datagen /datagen --mode clickstream --qps 1 mysql --user mysqluser --password mysqlpw --host mysql --port 3306 --db mydb & + +cd python +poetry update --quiet +# Init source, mv, and sink. +poetry run python init.py +# Wait for sink to be finished. +sleep 40; +poetry run python check.py diff --git a/integration_tests/iceberg-cdc/spark/.gitignore b/integration_tests/iceberg-cdc/spark/.gitignore new file mode 100644 index 0000000000000..51dcf07222856 --- /dev/null +++ b/integration_tests/iceberg-cdc/spark/.gitignore @@ -0,0 +1,3 @@ +derby.log +metastore_db +.ivy \ No newline at end of file diff --git a/integration_tests/iceberg-cdc/spark/spark-connect-server.sh b/integration_tests/iceberg-cdc/spark/spark-connect-server.sh new file mode 100755 index 0000000000000..7c1cd64f1a2f2 --- /dev/null +++ b/integration_tests/iceberg-cdc/spark/spark-connect-server.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +set -ex + +JARS=$(find /opt/spark/deps -type f -name "*.jar" | tr '\n' ':') + +/opt/spark/sbin/start-connect-server.sh \ + --master local[3] \ + --driver-class-path $JARS \ + --conf spark.driver.bindAddress=0.0.0.0 \ + --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ + --conf spark.sql.catalog.demo.catalog-impl=org.apache.iceberg.rest.RESTCatalog \ + --conf spark.sql.catalog.demo.uri=http://rest:8181 \ + --conf spark.sql.catalog.demo.s3.endpoint=http://minio-0:9301 \ + --conf spark.sql.catalog.demo.s3.path.style.access=true \ + --conf spark.sql.catalog.demo.s3.access.key=hummockadmin \ + --conf spark.sql.catalog.demo.s3.secret.key=hummockadmin \ + --conf spark.sql.defaultCatalog=demo + +tail -f /opt/spark/logs/spark*.out diff --git a/integration_tests/scripts/run_demos.py b/integration_tests/scripts/run_demos.py index 28623f7ddc4a7..da2519e18db44 100644 --- a/integration_tests/scripts/run_demos.py +++ b/integration_tests/scripts/run_demos.py @@ -42,6 +42,13 @@ def run_demo(demo: str, format: str, wait_time = 40): run_sql_file(sql_file, demo_dir) sleep(10) +def iceberg_cdc_demo(): + demo = "iceberg-cdc" + file_dir = dirname(abspath(__file__)) + project_dir = dirname(file_dir) + demo_dir = os.path.join(project_dir, demo) + print("Running demo: iceberg-cdc") + subprocess.run(["bash","./run_test.sh"], cwd=demo_dir, check=True) def run_iceberg_demo(): demo = "iceberg-sink" @@ -149,5 +156,7 @@ def run_clickhouse_demo(): run_iceberg_demo() elif args.case == "clickhouse-sink": run_clickhouse_demo() +elif args.case == "iceberg-cdc": + iceberg_cdc_demo() else: run_demo(args.case, args.format) diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 27c9f2ee82f83..1efc933a7d033 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -314,6 +314,10 @@ message GetTablesResponse { map tables = 1; } +message WaitRequest {} + +message WaitResponse {} + service DdlService { rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse); rpc DropDatabase(DropDatabaseRequest) returns (DropDatabaseResponse); @@ -343,4 +347,5 @@ service DdlService { rpc ListConnections(ListConnectionsRequest) returns (ListConnectionsResponse); rpc DropConnection(DropConnectionRequest) returns (DropConnectionResponse); rpc GetTables(GetTablesRequest) returns (GetTablesResponse); + rpc Wait(WaitRequest) returns (WaitResponse); } diff --git a/src/connector/src/sink/blackhole.rs b/src/connector/src/sink/blackhole.rs index 1f1ace3b0d104..60b0506604c97 100644 --- a/src/connector/src/sink/blackhole.rs +++ b/src/connector/src/sink/blackhole.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use async_trait::async_trait; + use crate::sink::log_store::{LogReader, LogStoreReadItem, TruncateOffset}; use crate::sink::{ DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkError, SinkParam, SinkWriterParam, @@ -45,6 +47,7 @@ impl Sink for BlackHoleSink { } } +#[async_trait] impl LogSinker for BlackHoleSink { async fn consume_log_and_sink(self, mut log_reader: impl LogReader) -> Result<()> { log_reader.init().await?; diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 2bddf8026216f..f4fdf9b761f38 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -29,7 +29,11 @@ use serde_derive::Deserialize; use serde_with::serde_as; use super::{DummySinkCommitCoordinator, SinkWriterParam}; -use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; +use crate::sink::catalog::desc::SinkDesc; +use crate::sink::log_store::DeliveryFutureManagerAddFuture; +use crate::sink::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, +}; use crate::sink::{ Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; @@ -243,10 +247,14 @@ impl ClickHouseSink { } impl Sink for ClickHouseSink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; + type LogSinker = AsyncTruncateLogSinkerOf; const SINK_NAME: &'static str = CLICKHOUSE_SINK; + fn default_sink_decouple(desc: &SinkDesc) -> bool { + desc.sink_type.is_append_only() + } + async fn validate(&self) -> Result<()> { // For upsert clickhouse sink, the primary key must be defined. if !self.is_append_only && self.pk_indices.is_empty() { @@ -277,7 +285,7 @@ impl Sink for ClickHouseSink { Ok(()) } - async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { Ok(ClickHouseSinkWriter::new( self.config.clone(), self.schema.clone(), @@ -285,7 +293,7 @@ impl Sink for ClickHouseSink { self.is_append_only, ) .await? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(usize::MAX)) } } pub struct ClickHouseSinkWriter { @@ -496,24 +504,18 @@ impl ClickHouseSinkWriter { } } -#[async_trait::async_trait] -impl SinkWriter for ClickHouseSinkWriter { - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { +impl AsyncTruncateSinkWriter for ClickHouseSinkWriter { + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { if self.is_append_only { self.append_only(chunk).await } else { self.upsert(chunk).await } } - - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { - // clickhouse no transactional guarantees, so we do nothing here. - Ok(()) - } - - async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { - Ok(()) - } } #[derive(ClickHouseRow, Deserialize, Clone)] diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 8525a751ba9fb..07709f182dc47 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -14,20 +14,18 @@ use std::collections::HashMap; use std::fmt::Debug; -use std::pin::pin; use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; -use futures::future::{select, Either}; use futures::{Future, FutureExt, TryFuture}; use rdkafka::error::KafkaError; use rdkafka::message::ToBytes; use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord}; use rdkafka::types::RDKafkaErrorCode; use rdkafka::ClientConfig; +use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; -use risingwave_common::util::drop_either_future; use serde_derive::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use strum_macros::{Display, EnumString}; @@ -37,11 +35,11 @@ use super::{Sink, SinkError, SinkParam}; use crate::common::KafkaCommon; use crate::sink::catalog::desc::SinkDesc; use crate::sink::formatter::SinkFormatterImpl; -use crate::sink::log_store::{ - DeliveryFutureManager, DeliveryFutureManagerAddFuture, LogReader, LogStoreReadItem, +use crate::sink::log_store::DeliveryFutureManagerAddFuture; +use crate::sink::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink, }; -use crate::sink::writer::FormattedSink; -use crate::sink::{DummySinkCommitCoordinator, LogSinker, Result, SinkWriterParam}; +use crate::sink::{DummySinkCommitCoordinator, Result, SinkWriterParam}; use crate::source::kafka::{KafkaProperties, KafkaSplitEnumerator, PrivateLinkProducerContext}; use crate::source::{SourceEnumeratorContext, SplitEnumerator}; use crate::{ @@ -299,7 +297,7 @@ impl TryFrom for KafkaSink { impl Sink for KafkaSink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = KafkaLogSinker; + type LogSinker = AsyncTruncateLogSinkerOf; const SINK_NAME: &'static str = KAFKA_SINK; @@ -317,7 +315,18 @@ impl Sink for KafkaSink { &self.config.common.topic, ) .await?; - KafkaLogSinker::new(self.config.clone(), formatter).await + let max_delivery_buffer_size = (self + .config + .rdkafka_properties + .queue_buffering_max_messages + .as_ref() + .cloned() + .unwrap_or(KAFKA_WRITER_MAX_QUEUE_SIZE) as f32 + * KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO) as usize; + + Ok(KafkaSinkWriter::new(self.config.clone(), formatter) + .await? + .into_log_sinker(max_delivery_buffer_size)) } async fn validate(&self) -> Result<()> { @@ -372,16 +381,15 @@ struct KafkaPayloadWriter<'a> { config: &'a KafkaConfig, } -type KafkaSinkDeliveryFuture = impl TryFuture + Unpin + 'static; +pub type KafkaSinkDeliveryFuture = impl TryFuture + Unpin + 'static; -pub struct KafkaLogSinker { +pub struct KafkaSinkWriter { formatter: SinkFormatterImpl, inner: FutureProducer, - future_manager: DeliveryFutureManager, config: KafkaConfig, } -impl KafkaLogSinker { +impl KafkaSinkWriter { async fn new(config: KafkaConfig, formatter: SinkFormatterImpl) -> Result { let inner: FutureProducer = { let mut c = ClientConfig::new(); @@ -405,19 +413,29 @@ impl KafkaLogSinker { c.create_with_context(producer_ctx).await? }; - let max_delivery_buffer_size = (config - .rdkafka_properties - .queue_buffering_max_messages - .as_ref() - .cloned() - .unwrap_or(KAFKA_WRITER_MAX_QUEUE_SIZE) as f32 - * KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO) as usize; - - Ok(KafkaLogSinker { + Ok(KafkaSinkWriter { formatter, inner, config: config.clone(), - future_manager: DeliveryFutureManager::new(max_delivery_buffer_size), + }) + } +} + +impl AsyncTruncateSinkWriter for KafkaSinkWriter { + type DeliveryFuture = KafkaSinkDeliveryFuture; + + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { + let mut payload_writer = KafkaPayloadWriter { + inner: &mut self.inner, + add_future, + config: &self.config, + }; + dispatch_sink_formatter_impl!(&self.formatter, formatter, { + payload_writer.write_chunk(chunk, formatter).await }) } } @@ -539,50 +557,6 @@ impl<'a> FormattedSink for KafkaPayloadWriter<'a> { } } -impl LogSinker for KafkaLogSinker { - async fn consume_log_and_sink(mut self, mut log_reader: impl LogReader) -> Result<()> { - log_reader.init().await?; - loop { - let select_result = drop_either_future( - select( - pin!(log_reader.next_item()), - pin!(self.future_manager.next_truncate_offset()), - ) - .await, - ); - match select_result { - Either::Left(item_result) => { - let (epoch, item) = item_result?; - match item { - LogStoreReadItem::StreamChunk { chunk_id, chunk } => { - dispatch_sink_formatter_impl!(&self.formatter, formatter, { - let mut writer = KafkaPayloadWriter { - inner: &self.inner, - add_future: self - .future_manager - .start_write_chunk(epoch, chunk_id), - config: &self.config, - }; - writer.write_chunk(chunk, formatter).await?; - }) - } - LogStoreReadItem::Barrier { - is_checkpoint: _is_checkpoint, - } => { - self.future_manager.add_barrier(epoch); - } - LogStoreReadItem::UpdateVnodeBitmap(_) => {} - } - } - Either::Right(offset_result) => { - let offset = offset_result?; - log_reader.truncate(offset).await?; - } - } - } - } -} - #[cfg(test)] mod test { use maplit::hashmap; @@ -750,7 +724,7 @@ mod test { let kafka_config = KafkaConfig::from_hashmap(properties)?; // Create the actual sink writer to Kafka - let mut sink = KafkaLogSinker::new( + let sink = KafkaSinkWriter::new( kafka_config.clone(), SinkFormatterImpl::AppendOnlyJson(AppendOnlyFormatter::new( // We do not specify primary key for this schema @@ -761,12 +735,16 @@ mod test { .await .unwrap(); + use crate::sink::log_store::DeliveryFutureManager; + + let mut future_manager = DeliveryFutureManager::new(usize::MAX); + for i in 0..10 { println!("epoch: {}", i); for j in 0..100 { let mut writer = KafkaPayloadWriter { inner: &sink.inner, - add_future: sink.future_manager.start_write_chunk(i, j), + add_future: future_manager.start_write_chunk(i, j), config: &sink.config, }; match writer diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index 764d6e618b44a..03e044ad37b91 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -30,8 +30,12 @@ use super::catalog::SinkFormatDesc; use super::SinkParam; use crate::common::KinesisCommon; use crate::dispatch_sink_formatter_str_key_impl; +use crate::sink::catalog::desc::SinkDesc; use crate::sink::formatter::SinkFormatterImpl; -use crate::sink::writer::{FormattedSink, LogSinkerOf, SinkWriter, SinkWriterExt}; +use crate::sink::log_store::DeliveryFutureManagerAddFuture; +use crate::sink::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink, +}; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriterParam}; pub const KINESIS_SINK: &str = "kinesis"; @@ -67,10 +71,14 @@ impl TryFrom for KinesisSink { impl Sink for KinesisSink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; + type LogSinker = AsyncTruncateLogSinkerOf; const SINK_NAME: &'static str = KINESIS_SINK; + fn default_sink_decouple(desc: &SinkDesc) -> bool { + desc.sink_type.is_append_only() + } + async fn validate(&self) -> Result<()> { // Kinesis requires partition key. There is no builtin support for round-robin as in kafka/pulsar. // https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-PartitionKey @@ -104,7 +112,7 @@ impl Sink for KinesisSink { Ok(()) } - async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { Ok(KinesisSinkWriter::new( self.config.clone(), self.schema.clone(), @@ -114,7 +122,7 @@ impl Sink for KinesisSink { self.sink_from_name.clone(), ) .await? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(usize::MAX)) } } @@ -221,20 +229,16 @@ impl FormattedSink for KinesisSinkPayloadWriter { } } -#[async_trait::async_trait] -impl SinkWriter for KinesisSinkWriter { - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, { +impl AsyncTruncateSinkWriter for KinesisSinkWriter { + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { + dispatch_sink_formatter_str_key_impl!( + &self.formatter, + formatter, self.payload_writer.write_chunk(chunk, formatter).await - }) - } - - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { - // Kinesis offers no transactional guarantees, so we do nothing here. - Ok(()) - } - - async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { - Ok(()) + ) } } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 7769a87f4e715..6afd08778cd96 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -34,7 +34,6 @@ pub mod utils; pub mod writer; use std::collections::HashMap; -use std::future::Future; use ::clickhouse::error::Error as ClickHouseError; use ::redis::RedisError; @@ -278,11 +277,9 @@ pub trait Sink: TryFrom { } } -pub trait LogSinker: Send + 'static { - fn consume_log_and_sink( - self, - log_reader: impl LogReader, - ) -> impl Future> + Send + 'static; +#[async_trait] +pub trait LogSinker: 'static { + async fn consume_log_and_sink(self, log_reader: impl LogReader) -> Result<()>; } #[async_trait] diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index 8e3f3e2c18022..2f810eed786a9 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -25,10 +25,14 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; use super::utils::chunk_to_json; -use super::{DummySinkCommitCoordinator, SinkWriter, SinkWriterParam}; +use super::{DummySinkCommitCoordinator, SinkWriterParam}; use crate::common::NatsCommon; +use crate::sink::catalog::desc::SinkDesc; use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode}; -use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; +use crate::sink::log_store::DeliveryFutureManagerAddFuture; +use crate::sink::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, +}; use crate::sink::{Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY}; pub const NATS_SINK: &str = "nats"; @@ -88,10 +92,14 @@ impl TryFrom for NatsSink { impl Sink for NatsSink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; + type LogSinker = AsyncTruncateLogSinkerOf; const SINK_NAME: &'static str = NATS_SINK; + fn default_sink_decouple(desc: &SinkDesc) -> bool { + desc.sink_type.is_append_only() + } + async fn validate(&self) -> Result<()> { if !self.is_append_only { return Err(SinkError::Nats(anyhow!( @@ -110,11 +118,11 @@ impl Sink for NatsSink { Ok(()) } - async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { Ok( NatsSinkWriter::new(self.config.clone(), self.schema.clone()) .await? - .into_log_sinker(writer_param.sink_metrics), + .into_log_sinker(usize::MAX), ) } } @@ -153,17 +161,12 @@ impl NatsSinkWriter { } } -#[async_trait::async_trait] -impl SinkWriter for NatsSinkWriter { - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { +impl AsyncTruncateSinkWriter for NatsSinkWriter { + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { self.append_only(chunk).await } - - async fn begin_epoch(&mut self, _epoch_id: u64) -> Result<()> { - Ok(()) - } - - async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { - Ok(()) - } } diff --git a/src/connector/src/sink/pulsar.rs b/src/connector/src/sink/pulsar.rs index 5fd950852b5c1..04da204ef79e7 100644 --- a/src/connector/src/sink/pulsar.rs +++ b/src/connector/src/sink/pulsar.rs @@ -12,14 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use std::fmt::Debug; use std::time::Duration; use anyhow::anyhow; -use async_trait::async_trait; -use futures::future::try_join_all; -use futures::TryFutureExt; +use futures::{FutureExt, TryFuture, TryFutureExt}; use pulsar::producer::{Message, SendFuture}; use pulsar::{Producer, ProducerOptions, Pulsar, TokioExecutor}; use risingwave_common::array::StreamChunk; @@ -28,10 +26,15 @@ use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use super::catalog::{SinkFormat, SinkFormatDesc}; -use super::{Sink, SinkError, SinkParam, SinkWriter, SinkWriterParam}; +use super::{Sink, SinkError, SinkParam, SinkWriterParam}; use crate::common::PulsarCommon; -use crate::sink::formatter::SinkFormatterImpl; -use crate::sink::writer::{FormattedSink, LogSinkerOf, SinkWriterExt}; +use crate::sink::catalog::desc::SinkDesc; +use crate::sink::encoder::SerTo; +use crate::sink::formatter::{SinkFormatter, SinkFormatterImpl}; +use crate::sink::log_store::DeliveryFutureManagerAddFuture; +use crate::sink::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink, +}; use crate::sink::{DummySinkCommitCoordinator, Result}; use crate::{deserialize_duration_from_string, dispatch_sink_formatter_str_key_impl}; @@ -155,11 +158,15 @@ impl TryFrom for PulsarSink { impl Sink for PulsarSink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; + type LogSinker = AsyncTruncateLogSinkerOf; const SINK_NAME: &'static str = PULSAR_SINK; - async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + fn default_sink_decouple(desc: &SinkDesc) -> bool { + desc.sink_type.is_append_only() + } + + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { Ok(PulsarSinkWriter::new( self.config.clone(), self.schema.clone(), @@ -169,7 +176,7 @@ impl Sink for PulsarSink { self.sink_from_name.clone(), ) .await? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE)) } async fn validate(&self) -> Result<()> { @@ -200,15 +207,26 @@ impl Sink for PulsarSink { } pub struct PulsarSinkWriter { - payload_writer: PulsarPayloadWriter, formatter: SinkFormatterImpl, -} - -struct PulsarPayloadWriter { pulsar: Pulsar, producer: Producer, config: PulsarConfig, - send_future_buffer: VecDeque, +} + +struct PulsarPayloadWriter<'w> { + producer: &'w mut Producer, + config: &'w PulsarConfig, + add_future: DeliveryFutureManagerAddFuture<'w, PulsarDeliveryFuture>, +} + +pub type PulsarDeliveryFuture = impl TryFuture + Unpin + 'static; + +fn may_delivery_future(future: SendFuture) -> PulsarDeliveryFuture { + future.map(|result| { + result + .map(|_| ()) + .map_err(|e: pulsar::Error| SinkError::Pulsar(anyhow!(e))) + }) } impl PulsarSinkWriter { @@ -233,17 +251,14 @@ impl PulsarSinkWriter { let producer = build_pulsar_producer(&pulsar, &config).await?; Ok(Self { formatter, - payload_writer: PulsarPayloadWriter { - pulsar, - producer, - config, - send_future_buffer: VecDeque::new(), - }, + pulsar, + producer, + config, }) } } -impl PulsarPayloadWriter { +impl<'w> PulsarPayloadWriter<'w> { async fn send_message(&mut self, message: Message) -> Result<()> { let mut success_flag = false; let mut connection_err = None; @@ -254,17 +269,10 @@ impl PulsarPayloadWriter { // a SendFuture holding the message receipt // or error after sending is returned Ok(send_future) => { - // Check if send_future_buffer is greater than the preset limit - while self.send_future_buffer.len() >= PULSAR_SEND_FUTURE_BUFFER_MAX_SIZE { - self.send_future_buffer - .pop_front() - .expect("Expect the SendFuture not to be None") - .map_err(|e| SinkError::Pulsar(anyhow!(e))) - .await?; - } - + self.add_future + .add_future_may_await(may_delivery_future(send_future)) + .await?; success_flag = true; - self.send_future_buffer.push_back(send_future); break; } // error upon sending @@ -302,24 +310,9 @@ impl PulsarPayloadWriter { self.send_message(message).await?; Ok(()) } - - async fn commit_inner(&mut self) -> Result<()> { - self.producer - .send_batch() - .map_err(pulsar_to_sink_err) - .await?; - try_join_all( - self.send_future_buffer - .drain(..) - .map(|send_future| send_future.map_err(|e| SinkError::Pulsar(anyhow!(e)))), - ) - .await?; - - Ok(()) - } } -impl FormattedSink for PulsarPayloadWriter { +impl<'w> FormattedSink for PulsarPayloadWriter<'w> { type K = String; type V = Vec; @@ -328,23 +321,33 @@ impl FormattedSink for PulsarPayloadWriter { } } -#[async_trait] -impl SinkWriter for PulsarSinkWriter { - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { +impl AsyncTruncateSinkWriter for PulsarSinkWriter { + type DeliveryFuture = PulsarDeliveryFuture; + + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, { - self.payload_writer.write_chunk(chunk, formatter).await + let mut payload_writer = PulsarPayloadWriter { + producer: &mut self.producer, + add_future, + config: &self.config, + }; + // TODO: we can call `payload_writer.write_chunk(chunk, formatter)`, + // but for an unknown reason, this will greatly increase the compile time, + // by nearly 4x. May investigate it later. + for r in formatter.format_chunk(&chunk) { + let (key, value) = r?; + payload_writer + .write_inner( + key.map(SerTo::ser_to).transpose()?, + value.map(SerTo::ser_to).transpose()?, + ) + .await?; + } + Ok(()) }) } - - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { - Ok(()) - } - - async fn barrier(&mut self, is_checkpoint: bool) -> Result { - if is_checkpoint { - self.payload_writer.commit_inner().await?; - } - - Ok(()) - } } diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index e218a584d69b8..910582b9662b7 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -29,8 +29,11 @@ use super::formatter::SinkFormatterImpl; use super::writer::FormattedSink; use super::{SinkError, SinkParam}; use crate::dispatch_sink_formatter_str_key_impl; -use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; -use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; +use crate::sink::log_store::DeliveryFutureManagerAddFuture; +use crate::sink::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, +}; +use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriterParam}; pub const REDIS_SINK: &str = "redis"; pub const KEY_FORMAT: &str = "key_format"; @@ -99,11 +102,11 @@ impl TryFrom for RedisSink { impl Sink for RedisSink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; + type LogSinker = AsyncTruncateLogSinkerOf; const SINK_NAME: &'static str = "redis"; - async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { Ok(RedisSinkWriter::new( self.config.clone(), self.schema.clone(), @@ -113,7 +116,7 @@ impl Sink for RedisSink { self.sink_from_name.clone(), ) .await? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(usize::MAX)) } async fn validate(&self) -> Result<()> { @@ -259,25 +262,16 @@ impl RedisSinkWriter { } } -#[async_trait] -impl SinkWriter for RedisSinkWriter { - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { +impl AsyncTruncateSinkWriter for RedisSinkWriter { + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, { self.payload_writer.write_chunk(chunk, formatter).await }) } - - async fn begin_epoch(&mut self, epoch: u64) -> Result<()> { - self.epoch = epoch; - Ok(()) - } - - async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { - if is_checkpoint { - self.payload_writer.commit().await?; - } - Ok(()) - } } #[cfg(test)] @@ -292,6 +286,7 @@ mod test { use super::*; use crate::sink::catalog::{SinkEncode, SinkFormat}; + use crate::sink::log_store::DeliveryFutureManager; #[tokio::test] async fn test_write() { @@ -328,8 +323,10 @@ mod test { ], ); + let mut manager = DeliveryFutureManager::new(0); + redis_sink_writer - .write_batch(chunk_a) + .write_chunk(chunk_a, manager.start_write_chunk(0, 0)) .await .expect("failed to write batch"); let expected_a = @@ -385,6 +382,8 @@ mod test { .await .unwrap(); + let mut future_manager = DeliveryFutureManager::new(0); + let chunk_a = StreamChunk::new( vec![Op::Insert, Op::Insert, Op::Insert], vec![ @@ -394,7 +393,7 @@ mod test { ); redis_sink_writer - .write_batch(chunk_a) + .write_chunk(chunk_a, future_manager.start_write_chunk(0, 0)) .await .expect("failed to write batch"); let expected_a = vec![ diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 310213262b2ad..3c52cb720dbd4 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -227,6 +227,7 @@ async fn await_future_with_monitor_receiver_err> } } +#[async_trait] impl LogSinker for RemoteLogSinker { async fn consume_log_and_sink(self, mut log_reader: impl LogReader) -> Result<()> { // Note: this is a total copy of the implementation of LogSinkerOf, diff --git a/src/connector/src/sink/writer.rs b/src/connector/src/sink/writer.rs index 37ad452831b2e..64261bb42ab48 100644 --- a/src/connector/src/sink/writer.rs +++ b/src/connector/src/sink/writer.rs @@ -12,17 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::future::{Future, Ready}; +use std::pin::pin; use std::sync::Arc; use std::time::Instant; use async_trait::async_trait; +use futures::future::{select, Either}; +use futures::TryFuture; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; +use risingwave_common::util::drop_either_future; use crate::sink::encoder::SerTo; use crate::sink::formatter::SinkFormatter; -use crate::sink::log_store::{LogReader, LogStoreReadItem, TruncateOffset}; -use crate::sink::{LogSinker, Result, SinkMetrics}; +use crate::sink::log_store::{ + DeliveryFutureManager, DeliveryFutureManagerAddFuture, LogReader, LogStoreReadItem, + TruncateOffset, +}; +use crate::sink::{LogSinker, Result, SinkError, SinkMetrics}; #[async_trait] pub trait SinkWriter: Send + 'static { @@ -48,22 +56,17 @@ pub trait SinkWriter: Send + 'static { } } -// TODO: remove this trait after KafkaSinkWriter implements SinkWriter -#[async_trait] -// An old version of SinkWriter for backward compatibility -pub trait SinkWriterV1: Send + 'static { - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()>; - - // the following interface is for transactions, if not supported, return Ok(()) - // start a transaction with epoch number. Note that epoch number should be increasing. - async fn begin_epoch(&mut self, epoch: u64) -> Result<()>; +pub type DummyDeliveryFuture = Ready>; - // commits the current transaction and marks all messages in the transaction success. - async fn commit(&mut self) -> Result<()>; +pub trait AsyncTruncateSinkWriter: Send + 'static { + type DeliveryFuture: TryFuture + Unpin + Send + 'static = + DummyDeliveryFuture; - // aborts the current transaction because some error happens. we should rollback to the last - // commit point. - async fn abort(&mut self) -> Result<()>; + fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> impl Future> + Send + 'a; } /// A free-form sink that may output in multiple formats and encodings. Examples include kafka, @@ -104,12 +107,12 @@ pub trait FormattedSink { } } -pub struct LogSinkerOf> { +pub struct LogSinkerOf { writer: W, sink_metrics: SinkMetrics, } -impl> LogSinkerOf { +impl LogSinkerOf { pub fn new(writer: W, sink_metrics: SinkMetrics) -> Self { LogSinkerOf { writer, @@ -118,6 +121,7 @@ impl> LogSinkerOf { } } +#[async_trait] impl> LogSinker for LogSinkerOf { async fn consume_log_and_sink(self, mut log_reader: impl LogReader) -> Result<()> { let mut sink_writer = self.writer; @@ -222,3 +226,64 @@ where } } } + +pub struct AsyncTruncateLogSinkerOf { + writer: W, + future_manager: DeliveryFutureManager, +} + +impl AsyncTruncateLogSinkerOf { + pub fn new(writer: W, max_future_count: usize) -> Self { + AsyncTruncateLogSinkerOf { + writer, + future_manager: DeliveryFutureManager::new(max_future_count), + } + } +} + +#[async_trait] +impl LogSinker for AsyncTruncateLogSinkerOf { + async fn consume_log_and_sink(mut self, mut log_reader: impl LogReader) -> Result<()> { + log_reader.init().await?; + loop { + let select_result = drop_either_future( + select( + pin!(log_reader.next_item()), + pin!(self.future_manager.next_truncate_offset()), + ) + .await, + ); + match select_result { + Either::Left(item_result) => { + let (epoch, item) = item_result?; + match item { + LogStoreReadItem::StreamChunk { chunk_id, chunk } => { + let add_future = self.future_manager.start_write_chunk(epoch, chunk_id); + self.writer.write_chunk(chunk, add_future).await?; + } + LogStoreReadItem::Barrier { + is_checkpoint: _is_checkpoint, + } => { + self.future_manager.add_barrier(epoch); + } + LogStoreReadItem::UpdateVnodeBitmap(_) => {} + } + } + Either::Right(offset_result) => { + let offset = offset_result?; + log_reader.truncate(offset).await?; + } + } + } + } +} + +#[easy_ext::ext(AsyncTruncateSinkWriterExt)] +impl T +where + T: AsyncTruncateSinkWriter + Sized, +{ + pub fn into_log_sinker(self, max_future_count: usize) -> AsyncTruncateLogSinkerOf { + AsyncTruncateLogSinkerOf::new(self, max_future_count) + } +} diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 149f39bead330..174ed23e03ec5 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -73,6 +73,7 @@ mod show; mod transaction; pub mod util; pub mod variable; +mod wait; /// The [`PgResponseBuilder`] used by RisingWave. pub type RwPgResponseBuilder = PgResponseBuilder; @@ -419,6 +420,7 @@ pub async fn handle( } } Statement::Flush => flush::handle_flush(handler_args).await, + Statement::Wait => wait::handle_wait(handler_args).await, Statement::SetVariable { local: _, variable, diff --git a/src/frontend/src/handler/wait.rs b/src/frontend/src/handler/wait.rs new file mode 100644 index 0000000000000..83f2784ec8c17 --- /dev/null +++ b/src/frontend/src/handler/wait.rs @@ -0,0 +1,31 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::error::Result; + +use super::RwPgResponse; +use crate::handler::HandlerArgs; +use crate::session::SessionImpl; + +pub(super) async fn handle_wait(handler_args: HandlerArgs) -> Result { + do_wait(&handler_args.session).await?; + Ok(PgResponse::empty_result(StatementType::WAIT)) +} + +pub(crate) async fn do_wait(session: &SessionImpl) -> Result<()> { + let client = session.env().meta_client(); + client.wait().await?; + Ok(()) +} diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index ae90c2e345f9f..d37c5dec127f1 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -43,6 +43,8 @@ pub trait FrontendMetaClient: Send + Sync { async fn flush(&self, checkpoint: bool) -> Result; + async fn wait(&self) -> Result<()>; + async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result>; async fn list_table_fragments( @@ -111,6 +113,10 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0.flush(checkpoint).await } + async fn wait(&self) -> Result<()> { + self.0.wait().await + } + async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result> { self.0.cancel_creating_jobs(infos).await } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 20eb252fc5053..cf915ae35713d 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -773,6 +773,10 @@ impl FrontendMetaClient for MockFrontendMetaClient { }) } + async fn wait(&self) -> RpcResult<()> { + Ok(()) + } + async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult> { Ok(vec![]) } diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 2fa5f50e15666..061ff93589163 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -732,6 +732,11 @@ impl DdlService for DdlServiceImpl { } Ok(Response::new(GetTablesResponse { tables })) } + + async fn wait(&self, _request: Request) -> Result, Status> { + self.ddl_controller.wait().await; + Ok(Response::new(WaitResponse {})) + } } impl DdlServiceImpl { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 04b9729c5a5b8..36615bd93b757 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -15,6 +15,7 @@ use std::cmp::Ordering; use std::num::NonZeroUsize; use std::sync::Arc; +use std::time::Duration; use itertools::Itertools; use risingwave_common::config::DefaultParallelism; @@ -29,6 +30,7 @@ use risingwave_pb::ddl_service::alter_relation_name_request::Relation; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto; use tokio::sync::Semaphore; +use tokio::time::sleep; use tracing::log::warn; use tracing::Instrument; @@ -1094,4 +1096,18 @@ impl DdlController { } } } + + pub async fn wait(&self) { + for _ in 0..30 * 60 { + if self + .catalog_manager + .list_creating_background_mvs() + .await + .is_empty() + { + break; + } + sleep(Duration::from_secs(1)).await; + } + } } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 827860d1af7b3..95b746ea33e6c 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -698,6 +698,12 @@ impl MetaClient { Ok(resp.snapshot.unwrap()) } + pub async fn wait(&self) -> Result<()> { + let request = WaitRequest {}; + self.inner.wait(request).await?; + Ok(()) + } + pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result> { let request = CancelCreatingJobsRequest { jobs: Some(jobs) }; let resp = self.inner.cancel_creating_jobs(request).await?; @@ -1719,6 +1725,7 @@ macro_rules! for_all_meta_rpc { ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse } ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse } ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse } + ,{ ddl_client, wait, WaitRequest, WaitResponse } ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse } ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse } ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse } diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index ecae5a9663a88..5d802bae99cdc 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -1294,6 +1294,9 @@ pub enum Statement { /// /// Note: RisingWave specific statement. Flush, + /// WAIT for ALL running stream jobs to finish. + /// It will block the current session the condition is met. + Wait, } impl fmt::Display for Statement { @@ -1787,6 +1790,9 @@ impl fmt::Display for Statement { Statement::Flush => { write!(f, "FLUSH") } + Statement::Wait => { + write!(f, "WAIT") + } Statement::Begin { modes } => { write!(f, "BEGIN")?; if !modes.is_empty() { diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 5c2fedb0ea547..4188f06f76ae3 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -540,6 +540,7 @@ define_keywords!( VIEWS, VIRTUAL, VOLATILE, + WAIT, WATERMARK, WHEN, WHENEVER, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index ee054f7d17031..5cc094a204268 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -259,6 +259,7 @@ impl Parser { Keyword::PREPARE => Ok(self.parse_prepare()?), Keyword::COMMENT => Ok(self.parse_comment()?), Keyword::FLUSH => Ok(Statement::Flush), + Keyword::WAIT => Ok(Statement::Wait), _ => self.expected( "an SQL statement", Token::Word(w).with_location(token.location), diff --git a/src/utils/pgwire/src/pg_response.rs b/src/utils/pgwire/src/pg_response.rs index 29ea77f83b71b..eeec929732f50 100644 --- a/src/utils/pgwire/src/pg_response.rs +++ b/src/utils/pgwire/src/pg_response.rs @@ -92,6 +92,7 @@ pub enum StatementType { ROLLBACK, SET_TRANSACTION, CANCEL_COMMAND, + WAIT, } impl std::fmt::Display for StatementType { @@ -278,6 +279,7 @@ impl StatementType { }, Statement::Explain { .. } => Ok(StatementType::EXPLAIN), Statement::Flush => Ok(StatementType::FLUSH), + Statement::Wait => Ok(StatementType::WAIT), _ => Err("unsupported statement type".to_string()), } }