Skip to content

Commit

Permalink
update validate to ensure append-only
Browse files Browse the repository at this point in the history
  • Loading branch information
xzhseh committed Mar 18, 2024
1 parent 3fc56b9 commit 9ceab04
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions src/connector/src/sink/snowflake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ impl Sink for SnowflakeSink {
}

async fn validate(&self) -> Result<()> {
if !self.is_append_only {
return Err(SinkError::Config(
anyhow!("SnowflakeSink only supports append-only mode at present, please change the configuration accordingly to enable sinking.")
));
}
Ok(())
}
}
Expand Down Expand Up @@ -245,9 +250,7 @@ impl SnowflakeSinkWriter {

fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
for (op, row) in chunk.rows() {
if op != Op::Insert {
continue;
}
assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`");
let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string();
self.payload.push_str(&row_json_string);
self.row_counter += 1;
Expand All @@ -259,7 +262,7 @@ impl SnowflakeSinkWriter {
self.epoch = epoch;
}

/// Construct the file suffix for current sink
/// Construct the *unique* file suffix for the sink
fn file_suffix(&self) -> String {
format!("{}_{}", self.epoch, self.sink_file_suffix)
}
Expand Down

0 comments on commit 9ceab04

Please sign in to comment.