diff --git a/src/connector/src/connector_common/postgres.rs b/src/connector/src/connector_common/postgres.rs index d450c65f5ea5..1da128cfb545 100644 --- a/src/connector/src/connector_common/postgres.rs +++ b/src/connector/src/connector_common/postgres.rs @@ -73,6 +73,7 @@ impl PostgresExternalTable { table: &str, ssl_mode: &SslMode, ssl_root_cert: &Option, + is_append_only: bool, ) -> ConnectorResult { tracing::debug!("connect to postgres external table"); let mut options = PgConnectOptions::new() @@ -140,8 +141,8 @@ impl PostgresExternalTable { column_descs.push(column_desc); } - if table_schema.primary_key_constraints.is_empty() { - return Err(anyhow!("Postgres table doesn't define the primary key").into()); + if !is_append_only && table_schema.primary_key_constraints.is_empty() { + return Err(anyhow!("Postgres table should define the primary key for non-append-only tables").into()); } let mut pk_names = vec![]; table_schema.primary_key_constraints.iter().for_each(|pk| { diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index 2dce9bdd8038..f0f1aaf8fea2 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -166,6 +166,7 @@ impl Sink for PostgresSink { &self.config.table, &self.config.ssl_mode, &self.config.ssl_root_cert, + self.is_append_only, ) .await?; @@ -414,7 +415,6 @@ impl PostgresSinkWriter { .await?; } Op::Delete => { - unmatched_update_insert -= 1; self.client .execute_raw( self.delete_statement.as_ref().unwrap(), @@ -425,7 +425,9 @@ impl PostgresSinkWriter { ) .await?; } - Op::UpdateDelete => {} + Op::UpdateDelete => { + unmatched_update_insert -= 1; + } } } } diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index 871ea8b0f080..874e90bc4fc1 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -356,6 +356,7 @@ impl ExternalTableImpl { &config.table, &config.ssl_mode, &config.ssl_root_cert, + false, ) .await?, )), diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index c79cd3ace277..091193f29c32 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -308,6 +308,7 @@ mod tests { &config.table, &config.ssl_mode, &config.ssl_root_cert, + false, ) .await .unwrap();