-
Notifications
You must be signed in to change notification settings - Fork 599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(snowflake-sink): add example use case & detailed spec; fix a subtle problem regarding file_suffix
#16241
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
# Example Use Case: Sinking to Snowflake | ||
|
||
this tutorial (and the corresponding examples) aims at showcasing how to sink data to Snowflake in RisingWave. | ||
|
||
## 1. Preparation | ||
|
||
due to the SaaS nature of snowflake, sinking data to snowflake via risingWave typically has some prerequisites. | ||
|
||
for detailed data-pipelining and sinking logic, please refer to the official documentation(s), e.g., [data load with snowpipe overview](https://docs.snowflake.com/user-guide/data-load-snowpipe-rest-overview). | ||
|
||
### 1.1 S3 setup | ||
|
||
users will need to setup an **external** S3 bucket first, and please make sure you have the corresponding credentials, which will be required **both** in your snowflake stage and risingwave sink creation time. | ||
|
||
note: the required credentials including the following, i.e., | ||
- `snowflake.s3_bucket` (a.k.a. the `URL` in snowflake stage) | ||
- `snowflake.aws_access_key_id` (a.k.a., the `AWS_KEY_ID` in snowflake stage) | ||
- `snowflake.aws_secret_access_key` (a.k.a. the `AWS_SECRET_KEY` in snowflake stage) | ||
|
||
### 1.2 Snowflake setup | ||
|
||
users will then need to setup the snowflake, which includes, i.e., | ||
- generate the key-value pair for later authentication | ||
- create a `role` and grant the appropriate permission | ||
- setup the credential for the user (e.g., `RSA_PUBLIC_KEY`), and retrieve the `snowflake.rsa_public_key_fp` which will later be used in risingwave | ||
- create a `table` to store the sink data from risingwave | ||
- create a `stage` to refer the external s3 bucket, which will be used internally by snowflake to load the corresponding data | ||
- create a `pipe` to actual receive loaded data from the pre-defined stage and copy the data to the snowflake table. | ||
|
||
ps. | ||
1. this assumes the users have already created their accounts and the corresponding databases in snowflake. | ||
2. for detailed authentication process, refer to [official authentication guide](https://docs.snowflake.com/en/developer-guide/sql-api/authenticating). | ||
3. for detailed commands, refer to [official reference](https://docs.snowflake.com/en/reference) | ||
|
||
an example for snowflake setup commands could be checked at `snowflake_prep.sql`, this also corresponds to the following example sinking use case. | ||
|
||
## 2. Begin to sink data | ||
|
||
launch your risingwave cluster, and execute the following sql commands respectively. | ||
|
||
- `create_source.sql` | ||
- `create_mv.sql` | ||
- `create_sink.sql` | ||
|
||
note: the column name(s) in your materialized view should be exactly the same as the ones in your pre-defined snowflake table, due to what we specified for snowflake pipe previously in `snowflake_prep.sql`. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
-- please note that the column name(s) for your mv should be *exactly* | ||
-- the same as the column name(s) in your snowflake table, since we are matching column by name. | ||
|
||
CREATE MATERIALIZED VIEW ss_mv AS | ||
SELECT | ||
user_id, | ||
target_id, | ||
event_timestamp AT TIME ZONE 'America/Indiana/Indianapolis' as event_timestamp | ||
FROM | ||
user_behaviors; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
CREATE SINK snowflake_sink FROM ss_mv WITH ( | ||
connector = 'snowflake', | ||
type = 'append-only', | ||
snowflake.database = 'EXAMPLE_DB', | ||
snowflake.schema = 'EXAMPLE_SCHEMA', | ||
snowflake.pipe = 'EXAMPLE_SNOWFLAKE_PIPE', | ||
snowflake.account_identifier = '<ORG_NAME>-<ACCOUNT_NAME>', | ||
snowflake.user = 'XZHSEH', | ||
snowflake.rsa_public_key_fp = 'EXAMPLE_FP', | ||
snowflake.private_key = 'EXAMPLE_PK', | ||
snowflake.s3_bucket = 'EXAMPLE_S3_BUCKET', | ||
snowflake.aws_access_key_id = 'EXAMPLE_AWS_ID', | ||
snowflake.aws_secret_access_key = 'EXAMPLE_SECRET_KEY', | ||
snowflake.aws_region = 'EXAMPLE_REGION', | ||
snowflake.max_batch_row_num = '1030', | ||
snowflake.s3_path = 'EXAMPLE_S3_PATH', | ||
-- depends on your mv setup, note that snowflake sink only supports | ||
-- append-only at present. | ||
force_append_only = 'true' | ||
); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
CREATE table user_behaviors ( | ||
user_id int, | ||
target_id VARCHAR, | ||
target_type VARCHAR, | ||
event_timestamp TIMESTAMPTZ, | ||
behavior_type VARCHAR, | ||
parent_target_type VARCHAR, | ||
parent_target_id VARCHAR, | ||
PRIMARY KEY(user_id) | ||
) WITH ( | ||
connector = 'datagen', | ||
fields.user_id.kind = 'sequence', | ||
fields.user_id.start = '1', | ||
fields.user_id.end = '1000', | ||
datagen.rows.per.second = '100' | ||
) FORMAT PLAIN ENCODE JSON; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
USE EXAMPLE_DB; | ||
|
||
ALTER USER xzhseh SET RSA_PUBLIC_KEY='your local rsa public key'; | ||
|
||
-- set user permission to account admin level | ||
GRANT ROLE ACCOUNTADMIN TO USER xzhseh; | ||
|
||
-- you could either retrieve the fp from desc user's info panel, | ||
-- or from the following select stmt. | ||
DESC USER xzhseh; | ||
-- also fine, see the documentation for details. | ||
SELECT TRIM( | ||
(SELECT "value" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) | ||
WHERE "property" = 'RSA_PUBLIC_KEY_FP'), | ||
'SHA256:' | ||
); | ||
|
||
-- snowflake table, note to keep the same column name(s). | ||
CREATE OR REPLACE TABLE example_snowflake_sink_table (user_id INT, target_id VARCHAR, event_timestamp TIMESTAMP_TZ); | ||
|
||
-- snowflake stage, we only supports json as sink format at present | ||
CREATE OR REPLACE STAGE example_snowflake_stage URL = '<S3_PATH>' | ||
credentials = ( AWS_KEY_ID = '<S3_CREDENTIALS>' AWS_SECRET_KEY = '<S3_CREDENTIALS>' ) file_format = ( type = JSON ); | ||
|
||
-- snowflake pipe | ||
CREATE OR REPLACE PIPE example_snowflake_pipe AS COPY INTO example_snowflake_sink_table FROM @example_snowflake_stage MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE; | ||
|
||
-- select from table after sinking from rw | ||
SELECT * FROM example_snowflake_sink_table WHERE event_timestamp IS NOT NULL; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have replaced the current implementation with streaming upload in the original PR. Any plan to do it? It shouldn't involve too much work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll come with the refactor tomorrow.