Skip to content

Commit

Permalink
fix update sanity check + fix non-append-only pk check
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Nov 24, 2024
1 parent a14b177 commit c9a164e
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 4 deletions.
5 changes: 3 additions & 2 deletions src/connector/src/connector_common/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl PostgresExternalTable {
table: &str,
ssl_mode: &SslMode,
ssl_root_cert: &Option<String>,
is_append_only: bool,
) -> ConnectorResult<Self> {
tracing::debug!("connect to postgres external table");
let mut options = PgConnectOptions::new()
Expand Down Expand Up @@ -140,8 +141,8 @@ impl PostgresExternalTable {
column_descs.push(column_desc);
}

if table_schema.primary_key_constraints.is_empty() {
return Err(anyhow!("Postgres table doesn't define the primary key").into());
if !is_append_only && table_schema.primary_key_constraints.is_empty() {
return Err(anyhow!("Postgres table should define the primary key for non-append-only tables").into());
}
let mut pk_names = vec![];
table_schema.primary_key_constraints.iter().for_each(|pk| {
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/sink/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ impl Sink for PostgresSink {
&self.config.table,
&self.config.ssl_mode,
&self.config.ssl_root_cert,
self.is_append_only,
)
.await?;

Expand Down Expand Up @@ -414,7 +415,6 @@ impl PostgresSinkWriter {
.await?;
}
Op::Delete => {
unmatched_update_insert -= 1;
self.client
.execute_raw(
self.delete_statement.as_ref().unwrap(),
Expand All @@ -425,7 +425,9 @@ impl PostgresSinkWriter {
)
.await?;
}
Op::UpdateDelete => {}
Op::UpdateDelete => {
unmatched_update_insert -= 1;
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ impl ExternalTableImpl {
&config.table,
&config.ssl_mode,
&config.ssl_root_cert,
false,
)
.await?,
)),
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ mod tests {
&config.table,
&config.ssl_mode,
&config.ssl_root_cert,
false,
)
.await
.unwrap();
Expand Down

0 comments on commit c9a164e

Please sign in to comment.