Skip to content

Commit

Permalink
use uuid to ensure the global uniqueness of file suffix
Browse files Browse the repository at this point in the history
  • Loading branch information
xzhseh committed Apr 10, 2024
1 parent 583964a commit 9e52dc2
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 17 additions & 12 deletions src/connector/src/sink/snowflake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_common::catalog::Schema;
use serde::Deserialize;
use serde_json::Value;
use serde_with::serde_as;
use uuid::Uuid;
use with_options::WithOptions;

use super::encoder::{
Expand Down Expand Up @@ -181,8 +182,8 @@ pub struct SnowflakeSinkWriter {
/// the threshold for sinking to s3
max_batch_row_num: u32,
/// The current epoch, used in naming the sink files
/// mainly used for debugging purpose
epoch: u64,
sink_file_suffix: u32,
}

impl SnowflakeSinkWriter {
Expand Down Expand Up @@ -238,16 +239,13 @@ impl SnowflakeSinkWriter {
row_counter: 0,
payload: String::new(),
max_batch_row_num,
// initial value of `epoch` will start from 0
epoch: 0,
// Start from 0, i.e., `RW_SNOWFLAKE_S3_SINK_FILE_0`
sink_file_suffix: 0,
}
}

/// Note that we shall NOT reset the `sink_file_suffix`
/// since we need to incrementally keep the sink
/// file *unique*, otherwise snowflake will not
/// sink it from external stage (i.e., s3)
/// reset the `payload` and `row_counter`.
/// shall *only* be called after a successful sink.
fn reset(&mut self) {
self.payload.clear();
self.row_counter = 0;
Expand All @@ -271,17 +269,26 @@ impl SnowflakeSinkWriter {
self.epoch = epoch;
}

/// Construct the *unique* file suffix for the sink
/// generate a *global unique* uuid,
/// which is the key to the uniqueness of file suffix.
fn gen_uuid() -> Uuid {
Uuid::new_v4()
}

/// construct the *global unique* file suffix for the sink.
/// note: this is unique even across multiple parallel writer(s).
fn file_suffix(&self) -> String {
format!("{}_{}", self.epoch, self.sink_file_suffix)
// the format of suffix will be <epoch>_<uuid>
format!("{}_{}", self.epoch, Self::gen_uuid())
}

/// Sink `payload` to s3, then trigger corresponding `insertFiles` post request
/// sink `payload` to s3, then trigger corresponding `insertFiles` post request
/// to snowflake, to finish the overall sinking pipeline.
async fn sink_payload(&mut self) -> Result<()> {
if self.payload.is_empty() {
return Ok(());
}
// todo: change this to streaming upload
// first sink to the external stage provided by user (i.e., s3)
self.s3_client
.sink_to_s3(self.payload.clone().into(), self.file_suffix())
Expand All @@ -290,8 +297,6 @@ impl SnowflakeSinkWriter {
self.http_client.send_request(self.file_suffix()).await?;
// reset `payload` & `row_counter`
self.reset();
// to ensure s3 sink file unique
self.sink_file_suffix += 1;
Ok(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/snowflake_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const SNOWFLAKE_HOST_ADDR: &str = "snowflakecomputing.com";
const SNOWFLAKE_REQUEST_ID: &str = "RW_SNOWFLAKE_SINK";
const S3_INTERMEDIATE_FILE_NAME: &str = "RW_SNOWFLAKE_S3_SINK_FILE";

/// The helper function to generate the s3 file name
/// The helper function to generate the *global unique* s3 file name.
fn generate_s3_file_name(s3_path: Option<String>, suffix: String) -> String {
match s3_path {
Some(path) => format!("{}/{}_{}", path, S3_INTERMEDIATE_FILE_NAME, suffix),
Expand Down

0 comments on commit 9e52dc2

Please sign in to comment.