From e57cbd55410f69b5b318b225cc49e29d8e6f4036 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 25 Nov 2024 23:02:57 +0800 Subject: [PATCH] fix docs --- .../src/connector_common/postgres.rs | 3 +++ src/connector/src/sink/postgres.rs | 20 ++++++++----------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/connector/src/connector_common/postgres.rs b/src/connector/src/connector_common/postgres.rs index 73c8241aae83e..fc79bf9b961e6 100644 --- a/src/connector/src/connector_common/postgres.rs +++ b/src/connector/src/connector_common/postgres.rs @@ -179,6 +179,9 @@ impl PostgresExternalTable { &self.column_name_to_pg_type } + // We use `sea-schema` for table schema discovery. + // So we have to map `sea-schema` pg types + // to `tokio-postgres` pg types (which we use for query binding). fn discovered_type_to_pg_type( discovered_type: &SeaType, ) -> anyhow::Result { diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index b7568a44ea822..3f722e1016ef5 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -284,7 +284,7 @@ pub struct PostgresSinkWriter { buffer: Buffer, insert_statement: Statement, delete_statement: Option, - merge_statement: Option, + upsert_statement: Option, } impl PostgresSinkWriter { @@ -362,15 +362,15 @@ impl PostgresSinkWriter { ) }; - let merge_statement = if is_append_only { + let upsert_statement = if is_append_only { None } else { - let merge_sql = create_upsert_sql(&schema, &config.table, &pk_indices); + let upsert_sql = create_upsert_sql(&schema, &config.table, &pk_indices); Some( client - .prepare_typed(&merge_sql, &schema_types) + .prepare_typed(&upsert_sql, &schema_types) .await - .context("Failed to prepare merge statement")?, + .context("Failed to prepare upsert statement")?, ) }; @@ -382,7 +382,7 @@ impl PostgresSinkWriter { buffer: Buffer::new(), insert_statement, delete_statement, - merge_statement, + upsert_statement, }; Ok(writer) } @@ -421,16 +421,12 @@ impl PostgresSinkWriter { } Op::UpdateInsert => { unmatched_update_insert += 1; - // NOTE(kwannoel): Here we use `MERGE` rather than `UPDATE/INSERT` directly. - // This is because the downstream db could have cleaned the old record, - // in that case it needs to be `INSERTED` rather than UPDATED. - // On the other hand, if the record is there, it should be `UPDATED`. self.client .execute_raw( - self.merge_statement.as_ref().unwrap(), + self.upsert_statement.as_ref().unwrap(), rw_row_to_pg_values!( row, - self.merge_statement.as_ref().unwrap() + self.upsert_statement.as_ref().unwrap() ), ) .await?;