diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index 9a7a4dcac2473..9b266506023be 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -73,14 +73,13 @@ impl CdcTableType { config: ExternalTableConfig, schema: Schema, pk_indices: Vec, - scan_limit: u32, ) -> ConnectorResult { match self { Self::MySql => Ok(ExternalTableReaderImpl::MySql( MySqlExternalTableReader::new(config, schema).await?, )), Self::Postgres => Ok(ExternalTableReaderImpl::Postgres( - PostgresExternalTableReader::new(config, schema, pk_indices, scan_limit).await?, + PostgresExternalTableReader::new(config, schema, pk_indices).await?, )), _ => bail!("invalid external table type: {:?}", *self), } diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index 0bbf5dccbe6de..ca0caf46d6125 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -33,7 +33,7 @@ use sqlx::postgres::{PgConnectOptions, PgSslMode}; use sqlx::PgPool; use thiserror_ext::AsReport; use tokio_postgres::types::PgLsn; -use tokio_postgres::{NoTls, Statement}; +use tokio_postgres::NoTls; use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::postgres_row_to_owned_row; @@ -232,7 +232,7 @@ fn type_to_rw_type(col_type: &ColumnType) -> ConnectorResult { pub struct PostgresExternalTableReader { rw_schema: Schema, field_names: String, - prepared_scan_stmt: Statement, + pk_indices: Vec, client: tokio::sync::Mutex, } @@ -273,7 +273,6 @@ impl PostgresExternalTableReader { config: ExternalTableConfig, rw_schema: Schema, pk_indices: Vec, - scan_limit: u32, ) -> ConnectorResult { tracing::info!( ?rw_schema, @@ -334,32 +333,10 @@ impl PostgresExternalTableReader { .map(|f| Self::quote_column(&f.name)) .join(","); - // prepare once - let prepared_scan_stmt = { - let primary_keys = pk_indices - .iter() - .map(|i| rw_schema.fields[*i].name.clone()) - .collect_vec(); - - let table_name = SchemaTableName { - schema_name: config.schema.clone(), - table_name: config.table.clone(), - }; - let order_key = Self::get_order_key(&primary_keys); - let scan_sql = format!( - "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {scan_limit}", - field_names, - Self::get_normalized_table_name(&table_name), - Self::filter_expression(&primary_keys), - order_key, - ); - client.prepare(&scan_sql).await? - }; - Ok(Self { rw_schema, field_names, - prepared_scan_stmt, + pk_indices, client: tokio::sync::Mutex::new(client), }) } @@ -385,7 +362,7 @@ impl PostgresExternalTableReader { table_name: SchemaTableName, start_pk_row: Option, primary_keys: Vec, - limit: u32, + scan_limit: u32, ) { let order_key = Self::get_order_key(&primary_keys); let client = self.client.lock().await; @@ -393,9 +370,29 @@ impl PostgresExternalTableReader { let stream = match start_pk_row { Some(ref pk_row) => { + // prepare the scan statement, since we may need to convert the RW data type to postgres data type + // e.g. varchar to uuid + let prepared_scan_stmt = { + let primary_keys = self + .pk_indices + .iter() + .map(|i| self.rw_schema.fields[*i].name.clone()) + .collect_vec(); + + let order_key = Self::get_order_key(&primary_keys); + let scan_sql = format!( + "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {scan_limit}", + self.field_names, + Self::get_normalized_table_name(&table_name), + Self::filter_expression(&primary_keys), + order_key, + ); + client.prepare(&scan_sql).await? + }; + let params: Vec> = pk_row .iter() - .zip_eq_fast(self.prepared_scan_stmt.params()) + .zip_eq_fast(prepared_scan_stmt.params()) .map(|(datum, ty)| { datum .map(|scalar| ScalarAdapter::from_scalar(scalar, ty)) @@ -403,11 +400,11 @@ impl PostgresExternalTableReader { }) .try_collect()?; - client.query_raw(&self.prepared_scan_stmt, ¶ms).await? + client.query_raw(&prepared_scan_stmt, ¶ms).await? } None => { let sql = format!( - "SELECT {} FROM {} ORDER BY {} LIMIT {limit}", + "SELECT {} FROM {} ORDER BY {} LIMIT {scan_limit}", self.field_names, Self::get_normalized_table_name(&table_name), order_key, @@ -459,7 +456,6 @@ impl PostgresExternalTableReader { #[cfg(test)] mod tests { - use std::collections::HashMap; use futures::pin_mut; @@ -547,7 +543,7 @@ mod tests { let config = serde_json::from_value::(serde_json::to_value(props).unwrap()) .unwrap(); - let reader = PostgresExternalTableReader::new(config, rw_schema, vec![0, 1], 1000) + let reader = PostgresExternalTableReader::new(config, rw_schema, vec![0, 1]) .await .unwrap(); diff --git a/src/stream/src/from_proto/stream_cdc_scan.rs b/src/stream/src/from_proto/stream_cdc_scan.rs index 26794bc6153bc..3c81ecb80e859 100644 --- a/src/stream/src/from_proto/stream_cdc_scan.rs +++ b/src/stream/src/from_proto/stream_cdc_scan.rs @@ -95,12 +95,7 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { let database_name = table_config.database.clone(); let table_reader = table_type - .create_table_reader( - table_config, - table_schema.clone(), - table_pk_indices.clone(), - scan_options.snapshot_batch_size, - ) + .create_table_reader(table_config, table_schema.clone(), table_pk_indices.clone()) .await?; let external_table = ExternalStorageTable::new(