diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index aaa41773cc36..89dba0e233d5 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -168,6 +168,8 @@ pub struct SnowflakeSinkWriter { payload: String, /// the threshold for sinking to s3 max_batch_row_num: u32, + /// The current epoch, used in naming the sink files + epoch: u64, sink_file_suffix: u32, } @@ -222,6 +224,7 @@ impl SnowflakeSinkWriter { row_counter: 0, payload: String::new(), max_batch_row_num, + epoch: 0, // Start from 0, i.e., `RW_SNOWFLAKE_S3_SINK_FILE_0` sink_file_suffix: 0, } @@ -252,15 +255,27 @@ impl SnowflakeSinkWriter { Ok(()) } + fn update_epoch(&mut self, epoch: u64) { + self.epoch = epoch; + } + + /// Construct the file suffix for current sink + fn file_suffix(&self) -> String { + format!("{}_{}", self.epoch, self.sink_file_suffix) + } + /// Sink `payload` to s3, then trigger corresponding `insertFiles` post request /// to snowflake, to finish the overall sinking pipeline. async fn sink_payload(&mut self) -> Result<()> { + if self.payload.is_empty() { + return Ok(()); + } // first sink to the external stage provided by user (i.e., s3) self.s3_client - .sink_to_s3(self.payload.clone().into(), self.sink_file_suffix) + .sink_to_s3(self.payload.clone().into(), self.file_suffix()) .await?; // then trigger `insertFiles` post request to snowflake - self.http_client.send_request(self.sink_file_suffix).await?; + self.http_client.send_request(self.file_suffix()).await?; // reset `payload` & `row_counter` self.reset(); // to ensure s3 sink file unique @@ -271,7 +286,8 @@ impl SnowflakeSinkWriter { #[async_trait] impl SinkWriter for SnowflakeSinkWriter { - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { + async fn begin_epoch(&mut self, epoch: u64) -> Result<()> { + self.update_epoch(epoch); Ok(()) } diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 3a63efa86d4a..0ea32176971d 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -147,7 +147,7 @@ impl SnowflakeHttpClient { /// NOTE: this function should ONLY be called *after* /// uploading files to remote external staged storage, i.e., AWS S3 - pub async fn send_request(&self, file_num: u32) -> Result<()> { + pub async fn send_request(&self, file_suffix: String) -> Result<()> { let (builder, client) = self.build_request_and_client(); // Generate the jwt_token @@ -163,7 +163,7 @@ impl SnowflakeHttpClient { let request = builder .body(Body::from(format!( "{}_{}", - S3_INTERMEDIATE_FILE_NAME, file_num + S3_INTERMEDIATE_FILE_NAME, file_suffix ))) .map_err(|err| SinkError::Snowflake(err.to_string()))?; @@ -223,11 +223,11 @@ impl SnowflakeS3Client { } } - pub async fn sink_to_s3(&self, data: Bytes, file_num: u32) -> Result<()> { + pub async fn sink_to_s3(&self, data: Bytes, file_suffix: String) -> Result<()> { self.s3_client .put_object() .bucket(self.s3_bucket.clone()) - .key(format!("{}_{}", S3_INTERMEDIATE_FILE_NAME, file_num)) + .key(format!("{}_{}", S3_INTERMEDIATE_FILE_NAME, file_suffix)) .body(ByteStream::from(data)) .send() .await