Skip to content

Commit

Permalink
avoid unwrap; make new_streaming_uploader pure
Browse files Browse the repository at this point in the history
  • Loading branch information
xzhseh committed Apr 18, 2024
1 parent f9025c5 commit 2498746
Showing 1 changed file with 14 additions and 23 deletions.
37 changes: 14 additions & 23 deletions src/connector/src/sink/snowflake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,12 @@ impl SnowflakeSinkWriter {
})
}

/// update the streaming uploader as well as the file suffix.
/// return a brand new the streaming uploader as well as the file suffix.
/// note: should *only* be called iff after a new epoch begins,
/// and `streaming_upload` being called the first time.
/// i.e., lazily initialization of the internal `streaming_uploader`.
async fn new_streaming_uploader(&mut self) -> Result<()> {
/// plus, this function is *pure*, the `&mut self` is to make rustc (and tokio) happy.
async fn new_streaming_uploader(&mut self) -> Result<(Box<dyn StreamingUploader>, String)> {
let file_suffix = self.file_suffix();
let path = generate_s3_file_name(self.s3_client.s3_path(), &file_suffix);
let uploader = self
Expand All @@ -256,30 +257,20 @@ impl SnowflakeSinkWriter {
err
))
})?;
self.streaming_uploader = Some((uploader, file_suffix));
Ok(())
}

/// the `Option` is the flag to determine if there is data to upload.
fn has_data(&self) -> bool {
self.streaming_uploader.is_some()
Ok((uploader, file_suffix))
}

/// write data to the current streaming uploader for this epoch.
async fn streaming_upload(&mut self, data: Bytes) -> Result<()> {
if !self.has_data() {
// lazily initialization
self.new_streaming_uploader().await?;
}
let Some(uploader) = self.streaming_uploader.as_mut() else {
return Err(SinkError::Snowflake(format!(
"expect streaming uploader to be properly initialized when performing streaming upload for epoch {}",
self.epoch
))
);
let (uploader, _) = match self.streaming_uploader.as_mut() {
Some(s) => s,
None => {
assert_eq!(self.streaming_uploader.is_none(), true, "expect `streaming_uploader` to be None");
let uploader = self.new_streaming_uploader().await?;
self.streaming_uploader.insert(uploader)
}
};
uploader
.0
.write_bytes(data)
.await
.map_err(|err| {
Expand All @@ -295,17 +286,17 @@ impl SnowflakeSinkWriter {
/// ensure all the data has been properly uploaded to intermediate s3.
async fn finish_streaming_upload(&mut self) -> Result<Option<String>> {
let uploader = std::mem::take(&mut self.streaming_uploader);
let Some(uploader) = uploader else {
let Some((uploader, file_suffix)) = uploader else {
// there is no data to be uploaded for this epoch
return Ok(None);
};
uploader.0.finish().await.map_err(|err| {
uploader.finish().await.map_err(|err| {
SinkError::Snowflake(format!(
"failed to finish streaming upload to s3 for snowflake sink, error: {}",
err
))
})?;
Ok(Some(uploader.1))
Ok(Some(file_suffix))
}

async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
Expand Down

0 comments on commit 2498746

Please sign in to comment.