Skip to content

Commit

Permalink
sink payload when checkpoint barrier comes
Browse files Browse the repository at this point in the history
  • Loading branch information
xzhseh committed Mar 18, 2024
1 parent 4c10695 commit ccfd638
Showing 1 changed file with 24 additions and 11 deletions.
35 changes: 24 additions & 11 deletions src/connector/src/sink/snowflake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ pub struct SnowflakeCommon {
#[serde(rename = "snowflake.aws_region")]
pub aws_region: String,

/// The configurable max row(s) to batch,
/// which should be explicitly specified by user(s)
#[serde(rename = "snowflake.max_batch_row_num")]
pub max_batch_row_num: String,
}
Expand Down Expand Up @@ -249,6 +251,22 @@ impl SnowflakeSinkWriter {
}
Ok(())
}

/// Sink `payload` to s3, then trigger corresponding `insertFiles` post request
/// to snowflake, to finish the overall sinking pipeline.
async fn sink_payload(&mut self) -> Result<()> {
// first sink to the external stage provided by user (i.e., s3)
self.s3_client
.sink_to_s3(self.payload.clone().into(), self.sink_file_suffix)
.await?;
// then trigger `insertFiles` post request to snowflake
self.http_client.send_request(self.sink_file_suffix).await?;
// reset `payload` & `row_counter`
self.reset();
// to ensure s3 sink file unique
self.sink_file_suffix += 1;
Ok(())
}
}

#[async_trait]
Expand All @@ -265,7 +283,11 @@ impl SinkWriter for SnowflakeSinkWriter {
Ok(())
}

async fn barrier(&mut self, _is_checkpoint: bool) -> Result<Self::CommitMetadata> {
async fn barrier(&mut self, is_checkpoint: bool) -> Result<Self::CommitMetadata> {
if is_checkpoint {
// sink all the row(s) currently batched in `self.payload`
self.sink_payload().await?;
}
Ok(())
}

Expand All @@ -274,16 +296,7 @@ impl SinkWriter for SnowflakeSinkWriter {

// When the number of row exceeds `MAX_BATCH_ROW_NUM`
if self.at_sink_threshold() {
// first sink to the external stage provided by user (i.e., s3)
self.s3_client
.sink_to_s3(self.payload.clone().into(), self.sink_file_suffix)
.await?;
// then trigger `insertFiles` post request to snowflake
self.http_client.send_request(self.sink_file_suffix).await?;
// reset `payload` & `row_counter`
self.reset();
// to ensure s3 sink file unique
self.sink_file_suffix += 1;
self.sink_payload().await?;
}

Ok(())
Expand Down

0 comments on commit ccfd638

Please sign in to comment.