Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(postgres-cdc): only prepare statement in the backfill process #17813

Merged
merged 3 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,13 @@ impl CdcTableType {
config: ExternalTableConfig,
schema: Schema,
pk_indices: Vec<usize>,
scan_limit: u32,
) -> ConnectorResult<ExternalTableReaderImpl> {
match self {
Self::MySql => Ok(ExternalTableReaderImpl::MySql(
MySqlExternalTableReader::new(config, schema).await?,
)),
Self::Postgres => Ok(ExternalTableReaderImpl::Postgres(
PostgresExternalTableReader::new(config, schema, pk_indices, scan_limit).await?,
PostgresExternalTableReader::new(config, schema, pk_indices).await?,
)),
_ => bail!("invalid external table type: {:?}", *self),
}
Expand Down
60 changes: 28 additions & 32 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use sqlx::postgres::{PgConnectOptions, PgSslMode};
use sqlx::PgPool;
use thiserror_ext::AsReport;
use tokio_postgres::types::PgLsn;
use tokio_postgres::{NoTls, Statement};
use tokio_postgres::NoTls;

use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::postgres_row_to_owned_row;
Expand Down Expand Up @@ -232,7 +232,7 @@ fn type_to_rw_type(col_type: &ColumnType) -> ConnectorResult<DataType> {
pub struct PostgresExternalTableReader {
rw_schema: Schema,
field_names: String,
prepared_scan_stmt: Statement,
pk_indices: Vec<usize>,
client: tokio::sync::Mutex<tokio_postgres::Client>,
}

Expand Down Expand Up @@ -273,7 +273,6 @@ impl PostgresExternalTableReader {
config: ExternalTableConfig,
rw_schema: Schema,
pk_indices: Vec<usize>,
scan_limit: u32,
) -> ConnectorResult<Self> {
tracing::info!(
?rw_schema,
Expand Down Expand Up @@ -334,32 +333,10 @@ impl PostgresExternalTableReader {
.map(|f| Self::quote_column(&f.name))
.join(",");

// prepare once
let prepared_scan_stmt = {
let primary_keys = pk_indices
.iter()
.map(|i| rw_schema.fields[*i].name.clone())
.collect_vec();

let table_name = SchemaTableName {
schema_name: config.schema.clone(),
table_name: config.table.clone(),
};
let order_key = Self::get_order_key(&primary_keys);
let scan_sql = format!(
"SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {scan_limit}",
field_names,
Self::get_normalized_table_name(&table_name),
Self::filter_expression(&primary_keys),
order_key,
);
client.prepare(&scan_sql).await?
};

Ok(Self {
rw_schema,
field_names,
prepared_scan_stmt,
pk_indices,
client: tokio::sync::Mutex::new(client),
})
}
Expand All @@ -385,29 +362,49 @@ impl PostgresExternalTableReader {
table_name: SchemaTableName,
start_pk_row: Option<OwnedRow>,
primary_keys: Vec<String>,
limit: u32,
scan_limit: u32,
) {
let order_key = Self::get_order_key(&primary_keys);
let client = self.client.lock().await;
client.execute("set time zone '+00:00'", &[]).await?;

let stream = match start_pk_row {
Some(ref pk_row) => {
// prepare the scan statement, since we may need to convert the RW data type to postgres data type
// e.g. varchar to uuid
let prepared_scan_stmt = {
let primary_keys = self
.pk_indices
.iter()
.map(|i| self.rw_schema.fields[*i].name.clone())
.collect_vec();

let order_key = Self::get_order_key(&primary_keys);
let scan_sql = format!(
"SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {scan_limit}",
self.field_names,
Self::get_normalized_table_name(&table_name),
Self::filter_expression(&primary_keys),
order_key,
);
client.prepare(&scan_sql).await?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every time it prepares once and execute once? Then let's just use simple statements such as Line 406~413

Copy link
Contributor Author

@StrikeW StrikeW Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prepare for each scan_limit chunk, we need the data type from upstream to convert varchar for uuid.
line 398.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it prepare once but only when backfill not finished yet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible, but needs to refactor the code for example to call a init method before doing the snapshot.
We have set the default value of scan_limit to 1000, the overhead is low.

};

let params: Vec<Option<ScalarAdapter>> = pk_row
.iter()
.zip_eq_fast(self.prepared_scan_stmt.params())
.zip_eq_fast(prepared_scan_stmt.params())
.map(|(datum, ty)| {
datum
.map(|scalar| ScalarAdapter::from_scalar(scalar, ty))
.transpose()
})
.try_collect()?;

client.query_raw(&self.prepared_scan_stmt, &params).await?
client.query_raw(&prepared_scan_stmt, &params).await?
}
None => {
let sql = format!(
"SELECT {} FROM {} ORDER BY {} LIMIT {limit}",
"SELECT {} FROM {} ORDER BY {} LIMIT {scan_limit}",
self.field_names,
Self::get_normalized_table_name(&table_name),
order_key,
Expand Down Expand Up @@ -459,7 +456,6 @@ impl PostgresExternalTableReader {

#[cfg(test)]
mod tests {

use std::collections::HashMap;

use futures::pin_mut;
Expand Down Expand Up @@ -547,7 +543,7 @@ mod tests {
let config =
serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
.unwrap();
let reader = PostgresExternalTableReader::new(config, rw_schema, vec![0, 1], 1000)
let reader = PostgresExternalTableReader::new(config, rw_schema, vec![0, 1])
.await
.unwrap();

Expand Down
7 changes: 1 addition & 6 deletions src/stream/src/from_proto/stream_cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,7 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder {
.map_err(|e| anyhow!("failed to parse external table config").context(e))?;
let database_name = table_config.database.clone();
let table_reader = table_type
.create_table_reader(
table_config,
table_schema.clone(),
table_pk_indices.clone(),
scan_options.snapshot_batch_size,
)
.create_table_reader(table_config, table_schema.clone(), table_pk_indices.clone())
.await?;

let external_table = ExternalStorageTable::new(
Expand Down
Loading