Skip to content

Commit

Permalink
add demo
Browse files Browse the repository at this point in the history
fix
  • Loading branch information
xxhZs committed Jul 1, 2024
1 parent bc60958 commit 37c05a6
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 1 deletion.
24 changes: 24 additions & 0 deletions integration_tests/snowflake-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,27 @@ launch your risingwave cluster, and execute the following sql commands respectiv
- `create_sink.sql`

note: the column name(s) in your materialized view should be exactly the same as the ones in your pre-defined snowflake table, due to what we specified for snowflake pipe previously in `snowflake_prep.sql`.

## 3. Sink data into snowflake with UPSERT

1. To begin the process of sink data into Snowflake with upsert, we first need to set up the necessary components, similar to the procedures outlined in the previous context.

2. execute the following sql commands respectively.
- `upsert/create_source.sql`
- `upsert/create_mv.sql`
- `upsert/create_sink.sql`

After execution, we will import RisingWave's data change log into the snowflake's table.

3. We then use the following sql statement to create the resulting dynamic table
```
CREATE OR REPLACE DYNAMIC TABLE user_behaviors
TARGET_LAG = '1 minute'
WAREHOUSE = test_warehouse
AS SELECT *
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY {primary_key} ORDER BY __row_id DESC) AS dedupe_id
FROM t3
) AS subquery
WHERE dedupe_id = 1 AND (__op = 1 or __op = 3)
```
13 changes: 13 additions & 0 deletions integration_tests/snowflake-sink/upsert/create_mv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- please note that the column name(s) for your mv should be *exactly*
-- the same as the column name(s) in your snowflake table, since we are matching column by name.

CREATE MATERIALIZED VIEW ss_mv AS
WITH sub AS changelog FROM user_behaviors
SELECT
user_id,
target_id,
event_timestamp AT TIME ZONE 'America/Indiana/Indianapolis' as event_timestamp,
changelog_op AS __op,
_changelog_row_id::bigint AS __row_id
FROM
sub;
19 changes: 19 additions & 0 deletions integration_tests/snowflake-sink/upsert/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
CREATE SINK snowflake_sink FROM ss_mv WITH (
connector = 'snowflake',
type = 'append-only',
snowflake.database = 'EXAMPLE_DB',
snowflake.schema = 'EXAMPLE_SCHEMA',
snowflake.pipe = 'EXAMPLE_SNOWFLAKE_PIPE',
snowflake.account_identifier = '<ORG_NAME>-<ACCOUNT_NAME>',
snowflake.user = 'XZHSEH',
snowflake.rsa_public_key_fp = 'EXAMPLE_FP',
snowflake.private_key = 'EXAMPLE_PK',
snowflake.s3_bucket = 'EXAMPLE_S3_BUCKET',
snowflake.aws_access_key_id = 'EXAMPLE_AWS_ID',
snowflake.aws_secret_access_key = 'EXAMPLE_SECRET_KEY',
snowflake.aws_region = 'EXAMPLE_REGION',
snowflake.s3_path = 'EXAMPLE_S3_PATH',
-- depends on your mv setup, note that snowflake sink *only* supports
-- `append-only` mode at present.
force_append_only = 'true'
);
19 changes: 19 additions & 0 deletions integration_tests/snowflake-sink/upsert/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- please note that this will create a source that generates 1,000 rows in 10 seconds
-- you may want to change the configuration for better testing / demo purpose

CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMPTZ,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
) WITH (
connector = 'datagen',
fields.user_id.kind = 'sequence',
fields.user_id.start = '1',
fields.user_id.end = '1000',
datagen.rows.per.second = '100'
) FORMAT PLAIN ENCODE JSON;
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_changelog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl PlanTreeNodeUnary for LogicalChangeLog {
let len = out_col_change.to_parts().1;
let out_col_change = if self.base.stream_key().is_none() {
ColIndexMapping::new(output_vec, len + 1)
}else{
} else {
output_vec.push(Some(len));
ColIndexMapping::new(output_vec, len + 1)
};
Expand Down

0 comments on commit 37c05a6

Please sign in to comment.