diff --git a/Cargo.lock b/Cargo.lock index 086d0a13dabb4..d400abdc80445 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9586,7 +9586,7 @@ dependencies = [ "itertools 0.12.1", "jni", "jsonschema-transpiler", - "jsonwebtoken 9.2.0", + "jsonwebtoken 9.3.0", "madsim-rdkafka", "madsim-tokio", "madsim-tonic", diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index bf8545d823007..ba0973a0b0145 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -23,6 +23,7 @@ use risingwave_common::catalog::Schema; use serde::Deserialize; use serde_json::Value; use serde_with::serde_as; +use uuid::Uuid; use with_options::WithOptions; use super::encoder::{ @@ -181,8 +182,8 @@ pub struct SnowflakeSinkWriter { /// the threshold for sinking to s3 max_batch_row_num: u32, /// The current epoch, used in naming the sink files + /// mainly used for debugging purpose epoch: u64, - sink_file_suffix: u32, } impl SnowflakeSinkWriter { @@ -238,16 +239,13 @@ impl SnowflakeSinkWriter { row_counter: 0, payload: String::new(), max_batch_row_num, + // initial value of `epoch` will start from 0 epoch: 0, - // Start from 0, i.e., `RW_SNOWFLAKE_S3_SINK_FILE_0` - sink_file_suffix: 0, } } - /// Note that we shall NOT reset the `sink_file_suffix` - /// since we need to incrementally keep the sink - /// file *unique*, otherwise snowflake will not - /// sink it from external stage (i.e., s3) + /// reset the `payload` and `row_counter`. + /// shall *only* be called after a successful sink. fn reset(&mut self) { self.payload.clear(); self.row_counter = 0; @@ -271,17 +269,26 @@ impl SnowflakeSinkWriter { self.epoch = epoch; } - /// Construct the *unique* file suffix for the sink + /// generate a *global unique* uuid, + /// which is the key to the uniqueness of file suffix. + fn gen_uuid() -> Uuid { + Uuid::new_v4() + } + + /// construct the *global unique* file suffix for the sink. + /// note: this is unique even across multiple parallel writer(s). fn file_suffix(&self) -> String { - format!("{}_{}", self.epoch, self.sink_file_suffix) + // the format of suffix will be _ + format!("{}_{}", self.epoch, Self::gen_uuid()) } - /// Sink `payload` to s3, then trigger corresponding `insertFiles` post request + /// sink `payload` to s3, then trigger corresponding `insertFiles` post request /// to snowflake, to finish the overall sinking pipeline. async fn sink_payload(&mut self) -> Result<()> { if self.payload.is_empty() { return Ok(()); } + // todo: change this to streaming upload // first sink to the external stage provided by user (i.e., s3) self.s3_client .sink_to_s3(self.payload.clone().into(), self.file_suffix()) @@ -290,8 +297,6 @@ impl SnowflakeSinkWriter { self.http_client.send_request(self.file_suffix()).await?; // reset `payload` & `row_counter` self.reset(); - // to ensure s3 sink file unique - self.sink_file_suffix += 1; Ok(()) } } diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 2cc180f3c4fcc..e5e37deb14652 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -38,7 +38,7 @@ const SNOWFLAKE_HOST_ADDR: &str = "snowflakecomputing.com"; const SNOWFLAKE_REQUEST_ID: &str = "RW_SNOWFLAKE_SINK"; const S3_INTERMEDIATE_FILE_NAME: &str = "RW_SNOWFLAKE_S3_SINK_FILE"; -/// The helper function to generate the s3 file name +/// The helper function to generate the *global unique* s3 file name. fn generate_s3_file_name(s3_path: Option, suffix: String) -> String { match s3_path { Some(path) => format!("{}/{}_{}", path, S3_INTERMEDIATE_FILE_NAME, suffix),