Skip to content

Commit

Permalink
fix docs
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Nov 26, 2024
1 parent dc7f788 commit e57cbd5
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 12 deletions.
3 changes: 3 additions & 0 deletions src/connector/src/connector_common/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ impl PostgresExternalTable {
&self.column_name_to_pg_type
}

// We use `sea-schema` for table schema discovery.
// So we have to map `sea-schema` pg types
// to `tokio-postgres` pg types (which we use for query binding).
fn discovered_type_to_pg_type(
discovered_type: &SeaType,
) -> anyhow::Result<tokio_postgres::types::Type> {
Expand Down
20 changes: 8 additions & 12 deletions src/connector/src/sink/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ pub struct PostgresSinkWriter {
buffer: Buffer,
insert_statement: Statement,
delete_statement: Option<Statement>,
merge_statement: Option<Statement>,
upsert_statement: Option<Statement>,
}

impl PostgresSinkWriter {
Expand Down Expand Up @@ -362,15 +362,15 @@ impl PostgresSinkWriter {
)
};

let merge_statement = if is_append_only {
let upsert_statement = if is_append_only {
None
} else {
let merge_sql = create_upsert_sql(&schema, &config.table, &pk_indices);
let upsert_sql = create_upsert_sql(&schema, &config.table, &pk_indices);
Some(
client
.prepare_typed(&merge_sql, &schema_types)
.prepare_typed(&upsert_sql, &schema_types)
.await
.context("Failed to prepare merge statement")?,
.context("Failed to prepare upsert statement")?,
)
};

Expand All @@ -382,7 +382,7 @@ impl PostgresSinkWriter {
buffer: Buffer::new(),
insert_statement,
delete_statement,
merge_statement,
upsert_statement,
};
Ok(writer)
}
Expand Down Expand Up @@ -421,16 +421,12 @@ impl PostgresSinkWriter {
}
Op::UpdateInsert => {
unmatched_update_insert += 1;
// NOTE(kwannoel): Here we use `MERGE` rather than `UPDATE/INSERT` directly.
// This is because the downstream db could have cleaned the old record,
// in that case it needs to be `INSERTED` rather than UPDATED.
// On the other hand, if the record is there, it should be `UPDATED`.
self.client
.execute_raw(
self.merge_statement.as_ref().unwrap(),
self.upsert_statement.as_ref().unwrap(),
rw_row_to_pg_values!(
row,
self.merge_statement.as_ref().unwrap()
self.upsert_statement.as_ref().unwrap()
),
)
.await?;
Expand Down

0 comments on commit e57cbd5

Please sign in to comment.