Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(postgres-cdc): only prepare statement in the backfill process (#17813) #17827

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,13 @@ impl CdcTableType {
config: ExternalTableConfig,
schema: Schema,
pk_indices: Vec<usize>,
scan_limit: u32,
) -> ConnectorResult<ExternalTableReaderImpl> {
match self {
Self::MySql => Ok(ExternalTableReaderImpl::MySql(
MySqlExternalTableReader::new(config, schema).await?,
)),
Self::Postgres => Ok(ExternalTableReaderImpl::Postgres(
PostgresExternalTableReader::new(config, schema, pk_indices, scan_limit).await?,
PostgresExternalTableReader::new(config, schema, pk_indices).await?,
)),
_ => bail!("invalid external table type: {:?}", *self),
}
Expand Down
60 changes: 28 additions & 32 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use sqlx::postgres::{PgConnectOptions, PgSslMode};
use sqlx::PgPool;
use thiserror_ext::AsReport;
use tokio_postgres::types::PgLsn;
use tokio_postgres::{NoTls, Statement};
use tokio_postgres::NoTls;

use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::postgres_row_to_owned_row;
Expand Down Expand Up @@ -232,7 +232,7 @@ fn type_to_rw_type(col_type: &ColumnType) -> ConnectorResult<DataType> {
pub struct PostgresExternalTableReader {
rw_schema: Schema,
field_names: String,
prepared_scan_stmt: Statement,
pk_indices: Vec<usize>,
client: tokio::sync::Mutex<tokio_postgres::Client>,
}

Expand Down Expand Up @@ -273,7 +273,6 @@ impl PostgresExternalTableReader {
config: ExternalTableConfig,
rw_schema: Schema,
pk_indices: Vec<usize>,
scan_limit: u32,
) -> ConnectorResult<Self> {
tracing::info!(
?rw_schema,
Expand Down Expand Up @@ -334,32 +333,10 @@ impl PostgresExternalTableReader {
.map(|f| Self::quote_column(&f.name))
.join(",");

// prepare once
let prepared_scan_stmt = {
let primary_keys = pk_indices
.iter()
.map(|i| rw_schema.fields[*i].name.clone())
.collect_vec();

let table_name = SchemaTableName {
schema_name: config.schema.clone(),
table_name: config.table.clone(),
};
let order_key = Self::get_order_key(&primary_keys);
let scan_sql = format!(
"SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {scan_limit}",
field_names,
Self::get_normalized_table_name(&table_name),
Self::filter_expression(&primary_keys),
order_key,
);
client.prepare(&scan_sql).await?
};

Ok(Self {
rw_schema,
field_names,
prepared_scan_stmt,
pk_indices,
client: tokio::sync::Mutex::new(client),
})
}
Expand All @@ -385,29 +362,49 @@ impl PostgresExternalTableReader {
table_name: SchemaTableName,
start_pk_row: Option<OwnedRow>,
primary_keys: Vec<String>,
limit: u32,
scan_limit: u32,
) {
let order_key = Self::get_order_key(&primary_keys);
let client = self.client.lock().await;
client.execute("set time zone '+00:00'", &[]).await?;

let stream = match start_pk_row {
Some(ref pk_row) => {
// prepare the scan statement, since we may need to convert the RW data type to postgres data type
// e.g. varchar to uuid
let prepared_scan_stmt = {
let primary_keys = self
.pk_indices
.iter()
.map(|i| self.rw_schema.fields[*i].name.clone())
.collect_vec();

let order_key = Self::get_order_key(&primary_keys);
let scan_sql = format!(
"SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {scan_limit}",
self.field_names,
Self::get_normalized_table_name(&table_name),
Self::filter_expression(&primary_keys),
order_key,
);
client.prepare(&scan_sql).await?
};

let params: Vec<Option<ScalarAdapter>> = pk_row
.iter()
.zip_eq_fast(self.prepared_scan_stmt.params())
.zip_eq_fast(prepared_scan_stmt.params())
.map(|(datum, ty)| {
datum
.map(|scalar| ScalarAdapter::from_scalar(scalar, ty))
.transpose()
})
.try_collect()?;

client.query_raw(&self.prepared_scan_stmt, &params).await?
client.query_raw(&prepared_scan_stmt, &params).await?
}
None => {
let sql = format!(
"SELECT {} FROM {} ORDER BY {} LIMIT {limit}",
"SELECT {} FROM {} ORDER BY {} LIMIT {scan_limit}",
self.field_names,
Self::get_normalized_table_name(&table_name),
order_key,
Expand Down Expand Up @@ -459,7 +456,6 @@ impl PostgresExternalTableReader {

#[cfg(test)]
mod tests {

use std::collections::HashMap;

use futures::pin_mut;
Expand Down Expand Up @@ -547,7 +543,7 @@ mod tests {
let config =
serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
.unwrap();
let reader = PostgresExternalTableReader::new(config, rw_schema, vec![0, 1], 1000)
let reader = PostgresExternalTableReader::new(config, rw_schema, vec![0, 1])
.await
.unwrap();

Expand Down
7 changes: 1 addition & 6 deletions src/stream/src/from_proto/stream_cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,7 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder {
.map_err(|e| anyhow!("failed to parse external table config").context(e))?;
let database_name = table_config.database.clone();
let table_reader = table_type
.create_table_reader(
table_config,
table_schema.clone(),
table_pk_indices.clone(),
scan_options.snapshot_batch_size,
)
.create_table_reader(table_config, table_schema.clone(), table_pk_indices.clone())
.await?;

let external_table = ExternalStorageTable::new(
Expand Down
Loading