From 37c05a62b7339f0154c89844151989edea70d5cd Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 1 Jul 2024 15:38:08 +0800 Subject: [PATCH] add demo fix --- integration_tests/snowflake-sink/README.md | 24 +++++++++++++++++++ .../snowflake-sink/upsert/create_mv.sql | 13 ++++++++++ .../snowflake-sink/upsert/create_sink.sql | 19 +++++++++++++++ .../snowflake-sink/upsert/create_source.sql | 19 +++++++++++++++ .../optimizer/plan_node/logical_changelog.rs | 2 +- 5 files changed, 76 insertions(+), 1 deletion(-) create mode 100644 integration_tests/snowflake-sink/upsert/create_mv.sql create mode 100644 integration_tests/snowflake-sink/upsert/create_sink.sql create mode 100644 integration_tests/snowflake-sink/upsert/create_source.sql diff --git a/integration_tests/snowflake-sink/README.md b/integration_tests/snowflake-sink/README.md index 98bcf73bcb265..26eb2c86111ae 100644 --- a/integration_tests/snowflake-sink/README.md +++ b/integration_tests/snowflake-sink/README.md @@ -43,3 +43,27 @@ launch your risingwave cluster, and execute the following sql commands respectiv - `create_sink.sql` note: the column name(s) in your materialized view should be exactly the same as the ones in your pre-defined snowflake table, due to what we specified for snowflake pipe previously in `snowflake_prep.sql`. + +## 3. Sink data into snowflake with UPSERT + +1. To begin the process of sink data into Snowflake with upsert, we first need to set up the necessary components, similar to the procedures outlined in the previous context. + +2. execute the following sql commands respectively. + - `upsert/create_source.sql` + - `upsert/create_mv.sql` + - `upsert/create_sink.sql` + + After execution, we will import RisingWave's data change log into the snowflake's table. + +3. We then use the following sql statement to create the resulting dynamic table + ``` + CREATE OR REPLACE DYNAMIC TABLE user_behaviors + TARGET_LAG = '1 minute' + WAREHOUSE = test_warehouse + AS SELECT * + FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY {primary_key} ORDER BY __row_id DESC) AS dedupe_id + FROM t3 + ) AS subquery + WHERE dedupe_id = 1 AND (__op = 1 or __op = 3) + ``` \ No newline at end of file diff --git a/integration_tests/snowflake-sink/upsert/create_mv.sql b/integration_tests/snowflake-sink/upsert/create_mv.sql new file mode 100644 index 0000000000000..c40ebe4caa252 --- /dev/null +++ b/integration_tests/snowflake-sink/upsert/create_mv.sql @@ -0,0 +1,13 @@ +-- please note that the column name(s) for your mv should be *exactly* +-- the same as the column name(s) in your snowflake table, since we are matching column by name. + +CREATE MATERIALIZED VIEW ss_mv AS +WITH sub AS changelog FROM user_behaviors +SELECT + user_id, + target_id, + event_timestamp AT TIME ZONE 'America/Indiana/Indianapolis' as event_timestamp, + changelog_op AS __op, + _changelog_row_id::bigint AS __row_id +FROM + sub; \ No newline at end of file diff --git a/integration_tests/snowflake-sink/upsert/create_sink.sql b/integration_tests/snowflake-sink/upsert/create_sink.sql new file mode 100644 index 0000000000000..0986005d8c717 --- /dev/null +++ b/integration_tests/snowflake-sink/upsert/create_sink.sql @@ -0,0 +1,19 @@ +CREATE SINK snowflake_sink FROM ss_mv WITH ( + connector = 'snowflake', + type = 'append-only', + snowflake.database = 'EXAMPLE_DB', + snowflake.schema = 'EXAMPLE_SCHEMA', + snowflake.pipe = 'EXAMPLE_SNOWFLAKE_PIPE', + snowflake.account_identifier = '-', + snowflake.user = 'XZHSEH', + snowflake.rsa_public_key_fp = 'EXAMPLE_FP', + snowflake.private_key = 'EXAMPLE_PK', + snowflake.s3_bucket = 'EXAMPLE_S3_BUCKET', + snowflake.aws_access_key_id = 'EXAMPLE_AWS_ID', + snowflake.aws_secret_access_key = 'EXAMPLE_SECRET_KEY', + snowflake.aws_region = 'EXAMPLE_REGION', + snowflake.s3_path = 'EXAMPLE_S3_PATH', + -- depends on your mv setup, note that snowflake sink *only* supports + -- `append-only` mode at present. + force_append_only = 'true' +); \ No newline at end of file diff --git a/integration_tests/snowflake-sink/upsert/create_source.sql b/integration_tests/snowflake-sink/upsert/create_source.sql new file mode 100644 index 0000000000000..0a5bc60f49922 --- /dev/null +++ b/integration_tests/snowflake-sink/upsert/create_source.sql @@ -0,0 +1,19 @@ +-- please note that this will create a source that generates 1,000 rows in 10 seconds +-- you may want to change the configuration for better testing / demo purpose + +CREATE table user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMPTZ, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +) WITH ( + connector = 'datagen', + fields.user_id.kind = 'sequence', + fields.user_id.start = '1', + fields.user_id.end = '1000', + datagen.rows.per.second = '100' +) FORMAT PLAIN ENCODE JSON; diff --git a/src/frontend/src/optimizer/plan_node/logical_changelog.rs b/src/frontend/src/optimizer/plan_node/logical_changelog.rs index 00b800ce2c59e..c4747d826adbb 100644 --- a/src/frontend/src/optimizer/plan_node/logical_changelog.rs +++ b/src/frontend/src/optimizer/plan_node/logical_changelog.rs @@ -80,7 +80,7 @@ impl PlanTreeNodeUnary for LogicalChangeLog { let len = out_col_change.to_parts().1; let out_col_change = if self.base.stream_key().is_none() { ColIndexMapping::new(output_vec, len + 1) - }else{ + } else { output_vec.push(Some(len)); ColIndexMapping::new(output_vec, len + 1) };