Skip to content

Commit

Permalink
use PostgresExternalTable to validate
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Nov 24, 2024
1 parent c267845 commit 9193266
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 79 deletions.
4 changes: 2 additions & 2 deletions src/connector/src/connector_common/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl PostgresExternalTable {
username: &str,
password: &str,
host: &str,
port: &str,
port: u16,
database: &str,
schema: &str,
table: &str,
Expand All @@ -79,7 +79,7 @@ impl PostgresExternalTable {
.username(username)
.password(password)
.host(host)
.port(port.parse::<u16>().unwrap())
.port(port)
.database(database)
.ssl_mode(match ssl_mode {
SslMode::Disabled => PgSslMode::Disable,
Expand Down
153 changes: 79 additions & 74 deletions src/connector/src/sink/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;

use anyhow::{anyhow, Context};
Expand All @@ -23,6 +23,7 @@ use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqDebug;
use serde_derive::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use simd_json::prelude::ArrayTrait;
Expand All @@ -33,7 +34,7 @@ use with_options::WithOptions;
use super::{
SinkError, SinkWriterMetrics, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
use crate::connector_common::{create_pg_client, SslMode};
use crate::connector_common::{create_pg_client, PostgresExternalTable, SslMode};
use crate::parser::scalar_adapter::ScalarAdapter;
use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt};
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam};
Expand Down Expand Up @@ -138,81 +139,85 @@ impl Sink for PostgresSink {
"Primary key not defined for upsert Postgres sink (please define in `primary_key` field)")));
}

for field in self.schema.fields() {
check_data_type_compatibility(field.data_type())?;
}

// Verify pg table schema matches rw table schema, and pk indices are valid
let table_name = &self.config.table;
let client = create_pg_client(
&self.config.user,
&self.config.password,
&self.config.host,
&self.config.port.to_string(),
&self.config.database,
&self.config.ssl_mode,
&self.config.ssl_root_cert,
)
.await?;

let result = client
.query(
"
SELECT
column_name,
EXISTS (
SELECT 1
FROM pg_index i
WHERE i.indrelid = c.table_name::regclass
AND i.indisprimary
AND column_name = ANY(
SELECT a.attname
FROM pg_attribute a
WHERE a.attrelid = i.indrelid
AND a.attnum = ANY(i.indkey)
)
) AS is_primary_key
FROM
information_schema.columns c
WHERE
table_name = $1
ORDER BY
ordinal_position;",
&[&table_name],
// Verify our sink schema is compatible with Postgres
{
let pg_table = PostgresExternalTable::connect(
&self.config.user,
&self.config.password,
&self.config.host,
self.config.port,
&self.config.database,
&self.config.schema,
&self.config.table,
&self.config.ssl_mode,
&self.config.ssl_root_cert,
)
.await
.context("Failed to query Postgres for Sinking")?;

let mut pg_schema = BTreeMap::new();
for row in result {
let col_name: String = row.get(0);
let is_pk: bool = row.get(1);
pg_schema.insert(col_name, is_pk);
}
.await?;

// Check that names and types match, order of columns doesn't matter.
{
let pg_columns = pg_table.column_descs();
let sink_columns = self.schema.fields();
if pg_columns.len() != sink_columns.len() {
return Err(SinkError::Config(anyhow!(
"Column count mismatch: Postgres table has {} columns, but sink schema has {} columns",
pg_columns.len(),
sink_columns.len()
)));
}

for (i, field) in self.schema.fields().iter().enumerate() {
let col_name = &field.name;
let is_pk = pg_schema.get(col_name);
match is_pk {
None => return Err(SinkError::Config(anyhow!(
"Column `{}` not found in Postgres table `{}`",
col_name,
table_name
))),
Some(is_pk) =>
match (*is_pk, self.pk_indices.contains(&i)) {
(false, false) | (true, true) => continue,
(false, true) => return Err(SinkError::Config(anyhow!(
"Column `{}` in Postgres table `{}` is not a primary key, but RW schema defines it as a primary key",
col_name,
table_name
))),
(true, false) => return Err(SinkError::Config(anyhow!(
"Column `{}` in Postgres table `{}` is a primary key, but RW schema does not define it as a primary key",
col_name,
table_name
))),
let pg_columns_lookup = pg_columns
.iter()
.map(|c| (c.name.clone(), c.data_type.clone()))
.collect::<BTreeMap<_, _>>();
for sink_column in sink_columns {
let pg_column = pg_columns_lookup.get(&sink_column.name);
match pg_column {
None => {
return Err(SinkError::Config(anyhow!(
"Column `{}` not found in Postgres table `{}`",
sink_column.name,
self.config.table
)))
}
Some(pg_column) => {
if pg_column != &sink_column.data_type() {
return Err(SinkError::Config(anyhow!(
"Column `{}` in Postgres table `{}` has type `{}`, but sink schema defines it as type `{}`",
sink_column.name,
self.config.table,
pg_column,
sink_column.data_type()
)));
}
}
}
}
}

// check that pk matches
{
let pg_pk_names = pg_table.pk_names();
let sink_pk_names = self
.pk_indices
.iter()
.map(|i| &self.schema.fields()[*i].name)
.collect::<HashSet<_>>();
if pg_pk_names.len() != sink_pk_names.len() {
return Err(SinkError::Config(anyhow!(
"Primary key mismatch: Postgres table has primary key on columns {:?}, but sink schema defines primary key on columns {:?}",
pg_pk_names,
sink_pk_names
)));
}
for name in pg_pk_names {
if !sink_pk_names.contains(name) {
return Err(SinkError::Config(anyhow!(
"Primary key mismatch: Postgres table has primary key on column `{}`, but sink schema does not define it as a primary key",
name
)));
}
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ impl ExternalTableImpl {
&config.username,
&config.password,
&config.host,
&config.port,
config.port.parse::<u16>().unwrap(),
&config.database,
&config.schema,
&config.table,
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,10 @@ mod tests {
};

let table = PostgresExternalTable::connect(
&config.host,
&config.port,
&config.username,
&config.password,
&config.host,
(&config.port).parse::<u16>().unwrap(),
&config.database,
&config.schema,
&config.table,
Expand Down

0 comments on commit 9193266

Please sign in to comment.