Skip to content

Commit

Permalink
use epoch with file_suffix as the unique identifier for sink file to s3
Browse files Browse the repository at this point in the history
  • Loading branch information
xzhseh committed Mar 18, 2024
1 parent 7ddbdf8 commit 3fc56b9
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 7 deletions.
22 changes: 19 additions & 3 deletions src/connector/src/sink/snowflake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ pub struct SnowflakeSinkWriter {
payload: String,
/// the threshold for sinking to s3
max_batch_row_num: u32,
/// The current epoch, used in naming the sink files
epoch: u64,
sink_file_suffix: u32,
}

Expand Down Expand Up @@ -222,6 +224,7 @@ impl SnowflakeSinkWriter {
row_counter: 0,
payload: String::new(),
max_batch_row_num,
epoch: 0,
// Start from 0, i.e., `RW_SNOWFLAKE_S3_SINK_FILE_0`
sink_file_suffix: 0,
}
Expand Down Expand Up @@ -252,15 +255,27 @@ impl SnowflakeSinkWriter {
Ok(())
}

fn update_epoch(&mut self, epoch: u64) {
self.epoch = epoch;
}

/// Construct the file suffix for current sink
fn file_suffix(&self) -> String {
format!("{}_{}", self.epoch, self.sink_file_suffix)
}

/// Sink `payload` to s3, then trigger corresponding `insertFiles` post request
/// to snowflake, to finish the overall sinking pipeline.
async fn sink_payload(&mut self) -> Result<()> {
if self.payload.is_empty() {
return Ok(());
}
// first sink to the external stage provided by user (i.e., s3)
self.s3_client
.sink_to_s3(self.payload.clone().into(), self.sink_file_suffix)
.sink_to_s3(self.payload.clone().into(), self.file_suffix())
.await?;
// then trigger `insertFiles` post request to snowflake
self.http_client.send_request(self.sink_file_suffix).await?;
self.http_client.send_request(self.file_suffix()).await?;
// reset `payload` & `row_counter`
self.reset();
// to ensure s3 sink file unique
Expand All @@ -271,7 +286,8 @@ impl SnowflakeSinkWriter {

#[async_trait]
impl SinkWriter for SnowflakeSinkWriter {
async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
self.update_epoch(epoch);
Ok(())
}

Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/sink/snowflake_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl SnowflakeHttpClient {

/// NOTE: this function should ONLY be called *after*
/// uploading files to remote external staged storage, i.e., AWS S3
pub async fn send_request(&self, file_num: u32) -> Result<()> {
pub async fn send_request(&self, file_suffix: String) -> Result<()> {
let (builder, client) = self.build_request_and_client();

// Generate the jwt_token
Expand All @@ -163,7 +163,7 @@ impl SnowflakeHttpClient {
let request = builder
.body(Body::from(format!(
"{}_{}",
S3_INTERMEDIATE_FILE_NAME, file_num
S3_INTERMEDIATE_FILE_NAME, file_suffix
)))
.map_err(|err| SinkError::Snowflake(err.to_string()))?;

Expand Down Expand Up @@ -223,11 +223,11 @@ impl SnowflakeS3Client {
}
}

pub async fn sink_to_s3(&self, data: Bytes, file_num: u32) -> Result<()> {
pub async fn sink_to_s3(&self, data: Bytes, file_suffix: String) -> Result<()> {
self.s3_client
.put_object()
.bucket(self.s3_bucket.clone())
.key(format!("{}_{}", S3_INTERMEDIATE_FILE_NAME, file_num))
.key(format!("{}_{}", S3_INTERMEDIATE_FILE_NAME, file_suffix))
.body(ByteStream::from(data))
.send()
.await
Expand Down

0 comments on commit 3fc56b9

Please sign in to comment.