Skip to content

Commit

Permalink
fix(postgres-cdc): only prepare statement in the backfill process (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Jul 26, 2024
1 parent 517cde2 commit 16b22a9
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 40 deletions.
3 changes: 1 addition & 2 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,13 @@ impl CdcTableType {
config: ExternalTableConfig,
schema: Schema,
pk_indices: Vec<usize>,
scan_limit: u32,
) -> ConnectorResult<ExternalTableReaderImpl> {
match self {
Self::MySql => Ok(ExternalTableReaderImpl::MySql(
MySqlExternalTableReader::new(config, schema).await?,
)),
Self::Postgres => Ok(ExternalTableReaderImpl::Postgres(
PostgresExternalTableReader::new(config, schema, pk_indices, scan_limit).await?,
PostgresExternalTableReader::new(config, schema, pk_indices).await?,
)),
_ => bail!("invalid external table type: {:?}", *self),
}
Expand Down
60 changes: 28 additions & 32 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use sqlx::postgres::{PgConnectOptions, PgSslMode};
use sqlx::PgPool;
use thiserror_ext::AsReport;
use tokio_postgres::types::PgLsn;
use tokio_postgres::{NoTls, Statement};
use tokio_postgres::NoTls;

use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::postgres_row_to_owned_row;
Expand Down Expand Up @@ -232,7 +232,7 @@ fn type_to_rw_type(col_type: &ColumnType) -> ConnectorResult<DataType> {
pub struct PostgresExternalTableReader {
rw_schema: Schema,
field_names: String,
prepared_scan_stmt: Statement,
pk_indices: Vec<usize>,
client: tokio::sync::Mutex<tokio_postgres::Client>,
}

Expand Down Expand Up @@ -273,7 +273,6 @@ impl PostgresExternalTableReader {
config: ExternalTableConfig,
rw_schema: Schema,
pk_indices: Vec<usize>,
scan_limit: u32,
) -> ConnectorResult<Self> {
tracing::info!(
?rw_schema,
Expand Down Expand Up @@ -334,32 +333,10 @@ impl PostgresExternalTableReader {
.map(|f| Self::quote_column(&f.name))
.join(",");

// prepare once
let prepared_scan_stmt = {
let primary_keys = pk_indices
.iter()
.map(|i| rw_schema.fields[*i].name.clone())
.collect_vec();

let table_name = SchemaTableName {
schema_name: config.schema.clone(),
table_name: config.table.clone(),
};
let order_key = Self::get_order_key(&primary_keys);
let scan_sql = format!(
"SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {scan_limit}",
field_names,
Self::get_normalized_table_name(&table_name),
Self::filter_expression(&primary_keys),
order_key,
);
client.prepare(&scan_sql).await?
};

Ok(Self {
rw_schema,
field_names,
prepared_scan_stmt,
pk_indices,
client: tokio::sync::Mutex::new(client),
})
}
Expand All @@ -385,29 +362,49 @@ impl PostgresExternalTableReader {
table_name: SchemaTableName,
start_pk_row: Option<OwnedRow>,
primary_keys: Vec<String>,
limit: u32,
scan_limit: u32,
) {
let order_key = Self::get_order_key(&primary_keys);
let client = self.client.lock().await;
client.execute("set time zone '+00:00'", &[]).await?;

let stream = match start_pk_row {
Some(ref pk_row) => {
// prepare the scan statement, since we may need to convert the RW data type to postgres data type
// e.g. varchar to uuid
let prepared_scan_stmt = {
let primary_keys = self
.pk_indices
.iter()
.map(|i| self.rw_schema.fields[*i].name.clone())
.collect_vec();

let order_key = Self::get_order_key(&primary_keys);
let scan_sql = format!(
"SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {scan_limit}",
self.field_names,
Self::get_normalized_table_name(&table_name),
Self::filter_expression(&primary_keys),
order_key,
);
client.prepare(&scan_sql).await?
};

let params: Vec<Option<ScalarAdapter>> = pk_row
.iter()
.zip_eq_fast(self.prepared_scan_stmt.params())
.zip_eq_fast(prepared_scan_stmt.params())
.map(|(datum, ty)| {
datum
.map(|scalar| ScalarAdapter::from_scalar(scalar, ty))
.transpose()
})
.try_collect()?;

client.query_raw(&self.prepared_scan_stmt, &params).await?
client.query_raw(&prepared_scan_stmt, &params).await?
}
None => {
let sql = format!(
"SELECT {} FROM {} ORDER BY {} LIMIT {limit}",
"SELECT {} FROM {} ORDER BY {} LIMIT {scan_limit}",
self.field_names,
Self::get_normalized_table_name(&table_name),
order_key,
Expand Down Expand Up @@ -459,7 +456,6 @@ impl PostgresExternalTableReader {

#[cfg(test)]
mod tests {

use std::collections::HashMap;

use futures::pin_mut;
Expand Down Expand Up @@ -547,7 +543,7 @@ mod tests {
let config =
serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
.unwrap();
let reader = PostgresExternalTableReader::new(config, rw_schema, vec![0, 1], 1000)
let reader = PostgresExternalTableReader::new(config, rw_schema, vec![0, 1])
.await
.unwrap();

Expand Down
7 changes: 1 addition & 6 deletions src/stream/src/from_proto/stream_cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,7 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder {

let database_name = table_config.database.clone();
let table_reader = table_type
.create_table_reader(
table_config,
table_schema.clone(),
table_pk_indices.clone(),
scan_options.snapshot_batch_size,
)
.create_table_reader(table_config, table_schema.clone(), table_pk_indices.clone())
.await?;

let external_table = ExternalStorageTable::new(
Expand Down

0 comments on commit 16b22a9

Please sign in to comment.