From b34c394a5d975c10faab83e5f0a40299f6cb6a23 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 25 Jul 2024 12:53:37 +0800 Subject: [PATCH 1/3] move prepare stmt to snapshot read --- src/connector/src/source/cdc/external/mod.rs | 3 +- .../src/source/cdc/external/postgres.rs | 57 +++++++++---------- src/stream/src/from_proto/stream_cdc_scan.rs | 7 +-- 3 files changed, 29 insertions(+), 38 deletions(-) diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index 528e14bd60229..5d126a7aa5bcc 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -71,14 +71,13 @@ impl CdcTableType { config: ExternalTableConfig, schema: Schema, pk_indices: Vec, - scan_limit: u32, ) -> ConnectorResult { match self { Self::MySql => Ok(ExternalTableReaderImpl::MySql( MySqlExternalTableReader::new(config, schema).await?, )), Self::Postgres => Ok(ExternalTableReaderImpl::Postgres( - PostgresExternalTableReader::new(config, schema, pk_indices, scan_limit).await?, + PostgresExternalTableReader::new(config, schema, pk_indices).await?, )), _ => bail!("invalid external table type: {:?}", *self), } diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index 0bbf5dccbe6de..1794275bea08b 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -33,7 +33,7 @@ use sqlx::postgres::{PgConnectOptions, PgSslMode}; use sqlx::PgPool; use thiserror_ext::AsReport; use tokio_postgres::types::PgLsn; -use tokio_postgres::{NoTls, Statement}; +use tokio_postgres::NoTls; use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::postgres_row_to_owned_row; @@ -232,7 +232,7 @@ fn type_to_rw_type(col_type: &ColumnType) -> ConnectorResult { pub struct PostgresExternalTableReader { rw_schema: Schema, field_names: String, - prepared_scan_stmt: Statement, + pk_indices: Vec, client: tokio::sync::Mutex, } @@ -273,7 +273,6 @@ impl PostgresExternalTableReader { config: ExternalTableConfig, rw_schema: Schema, pk_indices: Vec, - scan_limit: u32, ) -> ConnectorResult { tracing::info!( ?rw_schema, @@ -334,32 +333,10 @@ impl PostgresExternalTableReader { .map(|f| Self::quote_column(&f.name)) .join(","); - // prepare once - let prepared_scan_stmt = { - let primary_keys = pk_indices - .iter() - .map(|i| rw_schema.fields[*i].name.clone()) - .collect_vec(); - - let table_name = SchemaTableName { - schema_name: config.schema.clone(), - table_name: config.table.clone(), - }; - let order_key = Self::get_order_key(&primary_keys); - let scan_sql = format!( - "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {scan_limit}", - field_names, - Self::get_normalized_table_name(&table_name), - Self::filter_expression(&primary_keys), - order_key, - ); - client.prepare(&scan_sql).await? - }; - Ok(Self { rw_schema, field_names, - prepared_scan_stmt, + pk_indices, client: tokio::sync::Mutex::new(client), }) } @@ -385,7 +362,7 @@ impl PostgresExternalTableReader { table_name: SchemaTableName, start_pk_row: Option, primary_keys: Vec, - limit: u32, + scan_limit: u32, ) { let order_key = Self::get_order_key(&primary_keys); let client = self.client.lock().await; @@ -393,9 +370,29 @@ impl PostgresExternalTableReader { let stream = match start_pk_row { Some(ref pk_row) => { + // prepare the scan statement, since we may need to conver the RW data type to postgres data type + // e.g. varchar to uuid + let prepared_scan_stmt = { + let primary_keys = self + .pk_indices + .iter() + .map(|i| self.rw_schema.fields[*i].name.clone()) + .collect_vec(); + + let order_key = Self::get_order_key(&primary_keys); + let scan_sql = format!( + "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {scan_limit}", + self.field_names, + Self::get_normalized_table_name(&table_name), + Self::filter_expression(&primary_keys), + order_key, + ); + client.prepare(&scan_sql).await? + }; + let params: Vec> = pk_row .iter() - .zip_eq_fast(self.prepared_scan_stmt.params()) + .zip_eq_fast(prepared_scan_stmt.params()) .map(|(datum, ty)| { datum .map(|scalar| ScalarAdapter::from_scalar(scalar, ty)) @@ -403,11 +400,11 @@ impl PostgresExternalTableReader { }) .try_collect()?; - client.query_raw(&self.prepared_scan_stmt, ¶ms).await? + client.query_raw(&prepared_scan_stmt, ¶ms).await? } None => { let sql = format!( - "SELECT {} FROM {} ORDER BY {} LIMIT {limit}", + "SELECT {} FROM {} ORDER BY {} LIMIT {scan_limit}", self.field_names, Self::get_normalized_table_name(&table_name), order_key, diff --git a/src/stream/src/from_proto/stream_cdc_scan.rs b/src/stream/src/from_proto/stream_cdc_scan.rs index 150812c57a1c4..020c03af65ce5 100644 --- a/src/stream/src/from_proto/stream_cdc_scan.rs +++ b/src/stream/src/from_proto/stream_cdc_scan.rs @@ -98,12 +98,7 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { .map_err(|e| anyhow!("failed to parse external table config").context(e))?; let database_name = table_config.database.clone(); let table_reader = table_type - .create_table_reader( - table_config, - table_schema.clone(), - table_pk_indices.clone(), - scan_options.snapshot_batch_size, - ) + .create_table_reader(table_config, table_schema.clone(), table_pk_indices.clone()) .await?; let external_table = ExternalStorageTable::new( From 9998eaad29cbde4aaf14c66f6590e38e34a2d2a1 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 25 Jul 2024 15:40:50 +0800 Subject: [PATCH 2/3] minor --- src/connector/src/source/cdc/external/postgres.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index 1794275bea08b..39ffda22faebf 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -370,7 +370,7 @@ impl PostgresExternalTableReader { let stream = match start_pk_row { Some(ref pk_row) => { - // prepare the scan statement, since we may need to conver the RW data type to postgres data type + // prepare the scan statement, since we may need to convert the RW data type to postgres data type // e.g. varchar to uuid let prepared_scan_stmt = { let primary_keys = self @@ -456,7 +456,6 @@ impl PostgresExternalTableReader { #[cfg(test)] mod tests { - use std::collections::HashMap; use futures::pin_mut; From 369b23b396d15d1491504e874f3d2cf805ebaeaa Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 25 Jul 2024 16:01:18 +0800 Subject: [PATCH 3/3] fix --- src/connector/src/source/cdc/external/postgres.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index 39ffda22faebf..ca0caf46d6125 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -543,7 +543,7 @@ mod tests { let config = serde_json::from_value::(serde_json::to_value(props).unwrap()) .unwrap(); - let reader = PostgresExternalTableReader::new(config, rw_schema, vec![0, 1], 1000) + let reader = PostgresExternalTableReader::new(config, rw_schema, vec![0, 1]) .await .unwrap();