Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snowflake-sink): add example use case & detailed spec; fix a subtle problem regarding file_suffix #16241

Merged
merged 4 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions integration_tests/snowflake-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Example Use Case: Sinking to Snowflake

this tutorial (and the corresponding examples) aims at showcasing how to sink data to Snowflake in RisingWave.

## 1. Preparation

due to the SaaS nature of snowflake, sinking data to snowflake via risingWave typically has some prerequisites.

for detailed data-pipelining and sinking logic, please refer to the official documentation(s), e.g., [data load with snowpipe overview](https://docs.snowflake.com/user-guide/data-load-snowpipe-rest-overview).

### 1.1 S3 setup

users will need to setup an **external** S3 bucket first, and please make sure you have the corresponding credentials, which will be required **both** in your snowflake stage and risingwave sink creation time.

note: the required credentials including the following, i.e.,
- `snowflake.s3_bucket` (a.k.a. the `URL` in snowflake stage)
- `snowflake.aws_access_key_id` (a.k.a., the `AWS_KEY_ID` in snowflake stage)
- `snowflake.aws_secret_access_key` (a.k.a. the `AWS_SECRET_KEY` in snowflake stage)

### 1.2 Snowflake setup

users will then need to setup the snowflake, which includes, i.e.,
- generate the key-value pair for later authentication
- create a `role` and grant the appropriate permission
- setup the credential for the user (e.g., `RSA_PUBLIC_KEY`), and retrieve the `snowflake.rsa_public_key_fp` which will later be used in risingwave
- create a `table` to store the sink data from risingwave
- create a `stage` to refer the external s3 bucket, which will be used internally by snowflake to load the corresponding data
- create a `pipe` to actual receive loaded data from the pre-defined stage and copy the data to the snowflake table.

ps.
1. this assumes the users have already created their accounts and the corresponding databases in snowflake.
2. for detailed authentication process, refer to [official authentication guide](https://docs.snowflake.com/en/developer-guide/sql-api/authenticating).
3. for detailed commands, refer to [official reference](https://docs.snowflake.com/en/reference)

an example for snowflake setup commands could be checked at `snowflake_prep.sql`, this also corresponds to the following example sinking use case.

## 2. Begin to sink data

launch your risingwave cluster, and execute the following sql commands respectively.

- `create_source.sql`
- `create_mv.sql`
- `create_sink.sql`

note: the column name(s) in your materialized view should be exactly the same as the ones in your pre-defined snowflake table, due to what we specified for snowflake pipe previously in `snowflake_prep.sql`.
10 changes: 10 additions & 0 deletions integration_tests/snowflake-sink/create_mv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- please note that the column name(s) for your mv should be *exactly*
-- the same as the column name(s) in your snowflake table, since we are matching column by name.

CREATE MATERIALIZED VIEW ss_mv AS
SELECT
user_id,
target_id,
event_timestamp AT TIME ZONE 'America/Indiana/Indianapolis' as event_timestamp
FROM
user_behaviors;
20 changes: 20 additions & 0 deletions integration_tests/snowflake-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
CREATE SINK snowflake_sink FROM ss_mv WITH (
connector = 'snowflake',
type = 'append-only',
snowflake.database = 'EXAMPLE_DB',
snowflake.schema = 'EXAMPLE_SCHEMA',
snowflake.pipe = 'EXAMPLE_SNOWFLAKE_PIPE',
snowflake.account_identifier = '<ORG_NAME>-<ACCOUNT_NAME>',
snowflake.user = 'XZHSEH',
snowflake.rsa_public_key_fp = 'EXAMPLE_FP',
snowflake.private_key = 'EXAMPLE_PK',
snowflake.s3_bucket = 'EXAMPLE_S3_BUCKET',
snowflake.aws_access_key_id = 'EXAMPLE_AWS_ID',
snowflake.aws_secret_access_key = 'EXAMPLE_SECRET_KEY',
snowflake.aws_region = 'EXAMPLE_REGION',
snowflake.max_batch_row_num = '1030',
snowflake.s3_path = 'EXAMPLE_S3_PATH',
-- depends on your mv setup, note that snowflake sink only supports
-- append-only at present.
force_append_only = 'true'
);
16 changes: 16 additions & 0 deletions integration_tests/snowflake-sink/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMPTZ,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
) WITH (
connector = 'datagen',
fields.user_id.kind = 'sequence',
fields.user_id.start = '1',
fields.user_id.end = '1000',
datagen.rows.per.second = '100'
) FORMAT PLAIN ENCODE JSON;
29 changes: 29 additions & 0 deletions integration_tests/snowflake-sink/snowflake_prep.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
USE EXAMPLE_DB;

ALTER USER xzhseh SET RSA_PUBLIC_KEY='your local rsa public key';

-- set user permission to account admin level
GRANT ROLE ACCOUNTADMIN TO USER xzhseh;

-- you could either retrieve the fp from desc user's info panel,
-- or from the following select stmt.
DESC USER xzhseh;
-- also fine, see the documentation for details.
SELECT TRIM(
(SELECT "value" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))
WHERE "property" = 'RSA_PUBLIC_KEY_FP'),
'SHA256:'
);

-- snowflake table, note to keep the same column name(s).
CREATE OR REPLACE TABLE example_snowflake_sink_table (user_id INT, target_id VARCHAR, event_timestamp TIMESTAMP_TZ);

-- snowflake stage, we only supports json as sink format at present
CREATE OR REPLACE STAGE example_snowflake_stage URL = '<S3_PATH>'
credentials = ( AWS_KEY_ID = '<S3_CREDENTIALS>' AWS_SECRET_KEY = '<S3_CREDENTIALS>' ) file_format = ( type = JSON );

-- snowflake pipe
CREATE OR REPLACE PIPE example_snowflake_pipe AS COPY INTO example_snowflake_sink_table FROM @example_snowflake_stage MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;

-- select from table after sinking from rw
SELECT * FROM example_snowflake_sink_table WHERE event_timestamp IS NOT NULL;
7 changes: 4 additions & 3 deletions src/connector/src/sink/snowflake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl SnowflakeSinkWriter {
}

/// reset the `payload` and `row_counter`.
/// shall *only* be called after a successful sink.
/// shall *only* be called after a successful sink to s3.
fn reset(&mut self) {
self.payload.clear();
self.row_counter = 0;
Expand Down Expand Up @@ -288,13 +288,14 @@ impl SnowflakeSinkWriter {
if self.payload.is_empty() {
return Ok(());
}
let file_suffix = self.file_suffix();
// todo: change this to streaming upload
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have replaced the current implementation with streaming upload in the original PR. Any plan to do it? It shouldn't involve too much work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll come with the refactor tomorrow.

// first sink to the external stage provided by user (i.e., s3)
self.s3_client
.sink_to_s3(self.payload.clone().into(), self.file_suffix())
.sink_to_s3(self.payload.clone().into(), file_suffix.clone())
.await?;
// then trigger `insertFiles` post request to snowflake
self.http_client.send_request(self.file_suffix()).await?;
self.http_client.send_request(file_suffix).await?;
// reset `payload` & `row_counter`
self.reset();
Ok(())
Expand Down
Loading