diff --git a/Makefile.toml b/Makefile.toml index 89814dbe36ae5..d9379c9cd55b5 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -649,13 +649,23 @@ echo " $(tput setaf 4)source ${PREFIX_CONFIG}/psql-env$(tput sgr0)" [tasks.psql] category = "RiseDev - Start/Stop" -description = "Run local psql client with default connection parameters. You can pass extra arguments to psql." +description = "Run local psql client for RisingWave with default connection parameters. You can pass extra arguments to psql." dependencies = ["check-and-load-risedev-env-file"] script = ''' #!/usr/bin/env bash psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev "$@" ''' +[tasks.pgpsql] +category = "RiseDev - Start/Stop" +description = "Run local psql client for postgres with default connection parameters. You can pass extra arguments to psql." +dependencies = ["check-and-load-risedev-env-file"] +script = ''' +#!/usr/bin/env bash +source "${PREFIX_CONFIG}/risedev-env" +PGHOST=$PGHOST PGPORT=$PGPORT PGUSER=$PGUSER PGDATABASE=$PGDATABASE psql "$@" +''' + [tasks.ctl] category = "RiseDev - Start/Stop" description = "Start RiseCtl" diff --git a/e2e_test/sink/pg_native_vs_jdbc.slt b/e2e_test/sink/pg_native_vs_jdbc.slt new file mode 100644 index 0000000000000..288d3cdc41053 --- /dev/null +++ b/e2e_test/sink/pg_native_vs_jdbc.slt @@ -0,0 +1,84 @@ +control substitution on + +statement ok +DROP SINK IF EXISTS pg_sink; + +statement ok +DROP SINK IF EXISTS pg_sink_jdbc; + +statement ok +DROP TABLE IF EXISTS datagen_source; + +system ok +psql -c 'DROP TABLE IF EXISTS datagen_source;' + +system ok +psql -c 'DROP TABLE IF EXISTS datagen_source_jdbc;' + +system ok +psql -c 'CREATE TABLE datagen_source (id INT PRIMARY KEY, v1 varchar);' + +system ok +psql -c 'CREATE TABLE datagen_source_jdbc (id INT PRIMARY KEY, v1 varchar);' + +statement ok +CREATE TABLE datagen_source (id INT PRIMARY KEY, v1 varchar); + +statement ok +INSERT INTO datagen_source SELECT key, 'asjdkk2kbdk2uk2ubek2' FROM generate_series(1, 2000000) t(key); + +statement ok +flush; + +statement ok +CREATE SINK pg_sink FROM datagen_source WITH ( + connector='postgres', + host='$PGHOST', + port='$PGPORT', + user='$PGUSER', + password='$PGPASSWORD', + database='$PGDATABASE', + table='datagen_source', + type='upsert', + primary_key='id', +); + +sleep 240s + +system ok +psql --tuples-only -c 'select count(*) from datagen_source;' +---- + 2000000 + + +statement ok +CREATE SINK pg_sink_jdbc FROM datagen_source WITH ( + connector='jdbc', + jdbc.url='jdbc:postgresql://${PGHOST}:${PGPORT}/${PGDATABASE}?user=${PGUSER}&password=${PGPASSWORD}', + table.name='datagen_source_jdbc', + primary_key='id', + type='upsert' +); + +sleep 240s + +system ok +psql --tuples-only -c 'select count(*) from datagen_source_jdbc;' +---- + 2000000 + + +statement ok +DROP SINK IF EXISTS pg_sink; + +statement ok +DROP SINK IF EXISTS pg_sink_jdbc; + +statement ok +DROP TABLE IF EXISTS datagen_source; + +system ok +psql -c 'DROP TABLE IF EXISTS datagen_source;' + +system ok +psql -c 'DROP TABLE IF EXISTS datagen_source_jdbc;' \ No newline at end of file diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index 02abe81f020e1..f077e9ac42d9d 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -13,48 +13,30 @@ // limitations under the License. use std::collections::{BTreeMap, HashSet}; -use std::sync::Arc; -use anyhow::{anyhow, Context}; +use anyhow::anyhow; use async_trait::async_trait; use itertools::Itertools; use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::bitmap::Bitmap; +use risingwave_common::bail; use risingwave_common::catalog::Schema; use risingwave_common::row::{Row, RowExt}; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use simd_json::prelude::ArrayTrait; use thiserror_ext::AsReport; -use tokio_postgres::Statement; +use tokio_postgres::types::Type as PgType; use super::{ - SinkError, SinkWriterMetrics, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, + LogSinker, SinkError, SinkLogReader, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; use crate::connector_common::{create_pg_client, PostgresExternalTable, SslMode}; use crate::parser::scalar_adapter::{validate_pg_type_to_rw_type, ScalarAdapter}; -use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; +use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam}; pub const POSTGRES_SINK: &str = "postgres"; -macro_rules! rw_row_to_pg_values { - ($row:expr, $statement:expr) => { - $row.iter().enumerate().map(|(i, d)| { - d.and_then(|d| { - let ty = &$statement.params()[i]; - match ScalarAdapter::from_scalar(d, ty) { - Ok(scalar) => Some(scalar), - Err(e) => { - tracing::error!(error=%e.as_report(), scalar=?d, "Failed to convert scalar to pg value"); - None - } - } - }) - }) - }; -} - #[serde_as] #[derive(Clone, Debug, Deserialize)] pub struct PostgresConfig { @@ -143,7 +125,7 @@ impl TryFrom for PostgresSink { impl Sink for PostgresSink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; + type LogSinker = PostgresSinkWriter; const SINK_NAME: &'static str = POSTGRES_SINK; @@ -239,40 +221,78 @@ impl Sink for PostgresSink { Ok(()) } - async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { - Ok(PostgresSinkWriter::new( + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { + PostgresSinkWriter::new( self.config.clone(), self.schema.clone(), self.pk_indices.clone(), self.is_append_only, ) - .await? - .into_log_sinker(SinkWriterMetrics::new(&writer_param))) + .await } } -struct Buffer { - buffer: Vec, - size: usize, +struct ParameterBuffer<'a> { + /// A set of parameters to be inserted/deleted. + /// Each set is a flattened 2d-array. + parameters: Vec>>, + /// the column dimension (fixed). + column_length: usize, + /// schema types to serialize into `ScalarAdapter` + schema_types: &'a [PgType], + /// estimated number of parameters that can be sent in a single query. + estimated_parameter_size: usize, + /// current parameter buffer to be filled. + current_parameter_buffer: Vec>, } -impl Buffer { - fn new() -> Self { +impl<'a> ParameterBuffer<'a> { + /// The maximum number of parameters that can be sent in a single query. + /// See: + /// and + const MAX_PARAMETERS: usize = 32768; + + /// `flattened_chunk_size` is the number of datums in a single chunk. + fn new(schema_types: &'a [PgType], flattened_chunk_size: usize) -> Self { + let estimated_parameter_size = usize::min(Self::MAX_PARAMETERS, flattened_chunk_size); Self { - buffer: Vec::new(), - size: 0, + parameters: vec![], + column_length: schema_types.len(), + schema_types, + estimated_parameter_size, + current_parameter_buffer: Vec::with_capacity(estimated_parameter_size), } } - fn push(&mut self, chunk: StreamChunk) -> usize { - self.size += chunk.cardinality(); - self.buffer.push(chunk); - self.size + fn add_row(&mut self, row: impl Row) { + if self.current_parameter_buffer.len() + self.column_length >= Self::MAX_PARAMETERS { + self.new_buffer(); + } + for (i, datum_ref) in row.iter().enumerate() { + let pg_datum = datum_ref.map(|s| { + let ty = &self.schema_types[i]; + match ScalarAdapter::from_scalar(s, ty) { + Ok(scalar) => Some(scalar), + Err(e) => { + tracing::error!(error=%e.as_report(), scalar=?s, "Failed to convert scalar to pg value"); + None + } + } + }); + self.current_parameter_buffer.push(pg_datum.flatten()); + } + } + + fn new_buffer(&mut self) { + let filled_buffer = std::mem::replace( + &mut self.current_parameter_buffer, + Vec::with_capacity(self.estimated_parameter_size), + ); + self.parameters.push(filled_buffer); } - fn drain(&mut self) -> Vec { - self.size = 0; - std::mem::take(&mut self.buffer) + fn into_parts(self) -> (Vec>>, Vec>) { + (self.parameters, self.current_parameter_buffer) } } @@ -281,10 +301,8 @@ pub struct PostgresSinkWriter { pk_indices: Vec, is_append_only: bool, client: tokio_postgres::Client, - buffer: Buffer, - insert_statement: Statement, - delete_statement: Option, - upsert_statement: Option, + schema_types: Vec, + schema: Schema, } impl PostgresSinkWriter { @@ -338,201 +356,227 @@ impl PostgresSinkWriter { schema_types }; - let insert_statement = { - let insert_sql = create_insert_sql(&schema, &config.table); - client - .prepare_typed(&insert_sql, &schema_types) - .await - .context("Failed to prepare insert statement")? - }; - - let delete_statement = if is_append_only { - None - } else { - let delete_types = pk_indices - .iter() - .map(|i| schema_types[*i].clone()) - .collect_vec(); - let delete_sql = create_delete_sql(&schema, &config.table, &pk_indices); - Some( - client - .prepare_typed(&delete_sql, &delete_types) - .await - .context("Failed to prepare delete statement")?, - ) - }; - - let upsert_statement = if is_append_only { - None - } else { - let upsert_sql = create_upsert_sql(&schema, &config.table, &pk_indices); - Some( - client - .prepare_typed(&upsert_sql, &schema_types) - .await - .context("Failed to prepare upsert statement")?, - ) - }; - let writer = Self { config, pk_indices, is_append_only, client, - buffer: Buffer::new(), - insert_statement, - delete_statement, - upsert_statement, + schema_types, + schema, }; Ok(writer) } - async fn flush(&mut self) -> Result<()> { + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + // https://www.postgresql.org/docs/current/limits.html + // We have a limit of 65,535 parameters in a single query, as restricted by the PostgreSQL protocol. if self.is_append_only { - for chunk in self.buffer.drain() { - for (op, row) in chunk.rows() { - match op { - Op::Insert => { - self.client - .execute_raw( - &self.insert_statement, - rw_row_to_pg_values!(row, self.insert_statement), - ) - .await?; - } - Op::UpdateInsert | Op::Delete | Op::UpdateDelete => { - debug_assert!(!self.is_append_only); - } - } - } - } + self.write_batch_append_only(chunk).await } else { - let mut unmatched_update_insert = 0; - for chunk in self.buffer.drain() { - for (op, row) in chunk.rows() { - match op { - Op::Insert => { - self.client - .execute_raw( - &self.insert_statement, - rw_row_to_pg_values!(row, self.insert_statement), - ) - .await?; - } - Op::UpdateInsert => { - unmatched_update_insert += 1; - self.client - .execute_raw( - self.upsert_statement.as_ref().unwrap(), - rw_row_to_pg_values!( - row, - self.upsert_statement.as_ref().unwrap() - ), - ) - .await?; - } - Op::Delete => { - self.client - .execute_raw( - self.delete_statement.as_ref().unwrap(), - rw_row_to_pg_values!( - row.project(&self.pk_indices), - self.delete_statement.as_ref().unwrap() - ), - ) - .await?; - } - Op::UpdateDelete => { - unmatched_update_insert -= 1; - } - } + self.write_batch_non_append_only(chunk).await + } + } + + async fn write_batch_append_only(&mut self, chunk: StreamChunk) -> Result<()> { + let mut transaction = self.client.transaction().await?; + // 1d flattened array of parameters to be inserted. + let mut parameter_buffer = ParameterBuffer::new( + &self.schema_types, + chunk.cardinality() * chunk.data_types().len(), + ); + for (op, row) in chunk.rows() { + match op { + Op::Insert => { + parameter_buffer.add_row(row); + } + Op::UpdateInsert | Op::Delete | Op::UpdateDelete => { + bail!("append-only sink should not receive update insert, update delete and delete operations") } } - assert_eq!(unmatched_update_insert, 0); } + let (parameters, remaining) = parameter_buffer.into_parts(); + Self::execute_parameter( + Op::Insert, + &mut transaction, + &self.schema, + &self.config.table, + &self.pk_indices, + parameters, + remaining, + ) + .await?; + transaction.commit().await?; Ok(()) } -} - -#[async_trait] -impl SinkWriter for PostgresSinkWriter { - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { - Ok(()) - } - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - let cardinality = self.buffer.push(chunk); - if cardinality >= self.config.max_batch_rows { - self.flush().await?; + async fn write_batch_non_append_only(&mut self, chunk: StreamChunk) -> Result<()> { + let mut transaction = self.client.transaction().await?; + // 1d flattened array of parameters to be inserted. + let mut insert_parameter_buffer = ParameterBuffer::new( + &self.schema_types, + chunk.cardinality() * chunk.data_types().len(), + ); + let mut delete_parameter_buffer = ParameterBuffer::new( + &self.schema_types, + chunk.cardinality() * self.pk_indices.len(), + ); + // 1d flattened array of parameters to be deleted. + for (op, row) in chunk.rows() { + match op { + Op::UpdateInsert | Op::UpdateDelete => { + bail!("UpdateInsert and UpdateDelete should have been normalized by the sink executor") + } + Op::Insert => { + insert_parameter_buffer.add_row(row); + } + Op::Delete => { + delete_parameter_buffer.add_row(row.project(&self.pk_indices)); + } + } } - Ok(()) - } - async fn barrier(&mut self, is_checkpoint: bool) -> Result { - if is_checkpoint { - self.flush().await?; - } - Ok(()) - } + let (delete_parameters, delete_remaining_parameter) = delete_parameter_buffer.into_parts(); + Self::execute_parameter( + Op::Delete, + &mut transaction, + &self.schema, + &self.config.table, + &self.pk_indices, + delete_parameters, + delete_remaining_parameter, + ) + .await?; + let (insert_parameters, insert_remaining_parameter) = insert_parameter_buffer.into_parts(); + Self::execute_parameter( + Op::Insert, + &mut transaction, + &self.schema, + &self.config.table, + &self.pk_indices, + insert_parameters, + insert_remaining_parameter, + ) + .await?; + transaction.commit().await?; - async fn abort(&mut self) -> Result<()> { Ok(()) } - async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { + async fn execute_parameter( + op: Op, + transaction: &mut tokio_postgres::Transaction<'_>, + schema: &Schema, + table_name: &str, + pk_indices: &[usize], + parameters: Vec>>, + remaining_parameter: Vec>, + ) -> Result<()> { + let column_length = schema.fields().len(); + if !parameters.is_empty() { + let parameter_length = parameters[0].len(); + let rows_length = parameter_length / column_length; + assert_eq!( + parameter_length % column_length, + 0, + "flattened parameters are unaligned" + ); + let statement = match op { + Op::Insert => create_insert_sql(schema, table_name, rows_length), + Op::Delete => create_delete_sql(schema, table_name, pk_indices, rows_length), + _ => unreachable!(), + }; + let statement = transaction.prepare(&statement).await?; + for parameter in parameters { + transaction.execute_raw(&statement, parameter).await?; + } + } + if !remaining_parameter.is_empty() { + let rows_length = remaining_parameter.len() / column_length; + assert_eq!( + remaining_parameter.len() % column_length, + 0, + "flattened parameters are unaligned" + ); + let statement = match op { + Op::Insert => create_insert_sql(schema, table_name, rows_length), + Op::Delete => create_delete_sql(schema, table_name, pk_indices, rows_length), + _ => unreachable!(), + }; + let statement = transaction.prepare(&statement).await?; + transaction + .execute_raw(&statement, remaining_parameter) + .await?; + } Ok(()) } } -fn create_insert_sql(schema: &Schema, table_name: &str) -> String { - let columns: String = schema - .fields() - .iter() - .map(|field| field.name.clone()) - .collect_vec() - .join(", "); - let parameters: String = (0..schema.fields().len()) - .map(|i| format!("${}", i + 1)) - .collect_vec() - .join(", "); - format!("INSERT INTO {table_name} ({columns}) VALUES ({parameters})") +#[async_trait] +impl LogSinker for PostgresSinkWriter { + async fn consume_log_and_sink(mut self, log_reader: &mut impl SinkLogReader) -> Result { + loop { + let (epoch, item) = log_reader.next_item().await?; + match item { + LogStoreReadItem::StreamChunk { chunk, chunk_id } => { + self.write_batch(chunk).await?; + log_reader.truncate(TruncateOffset::Chunk { epoch, chunk_id })?; + } + LogStoreReadItem::Barrier { .. } => { + log_reader.truncate(TruncateOffset::Barrier { epoch })?; + } + LogStoreReadItem::UpdateVnodeBitmap(_) => {} + } + } + } } -fn create_upsert_sql(schema: &Schema, table_name: &str, pk_indices: &[usize]) -> String { +fn create_insert_sql(schema: &Schema, table_name: &str, number_of_rows: usize) -> String { + let number_of_columns = schema.len(); let columns: String = schema .fields() .iter() .map(|field| field.name.clone()) .collect_vec() .join(", "); - let parameters: String = (0..schema.fields().len()) - .map(|i| format!("${}", i + 1)) - .collect_vec() - .join(", "); - let pk_columns = pk_indices - .iter() - .map(|i| schema.fields()[*i].name.clone()) - .collect_vec() - .join(", "); - let update_parameters: String = (0..schema.fields().len()) - .filter(|i| !pk_indices.contains(i)) + let parameters: String = (0..number_of_rows) .map(|i| { - let column = schema.fields()[i].name.clone(); - let param = format!("${}", i + 1); - format!("{column} = {param}") + let row_parameters = (0..number_of_columns) + .map(|j| format!("${}", i * number_of_columns + j + 1)) + .collect_vec() + .join(", "); + format!("({row_parameters})") }) .collect_vec() .join(", "); - format!("INSERT INTO {table_name} ({columns}) VALUES ({parameters}) on conflict ({pk_columns}) do update set {update_parameters}") + format!("INSERT INTO {table_name} ({columns}) VALUES {parameters}") } -fn create_delete_sql(schema: &Schema, table_name: &str, pk_indices: &[usize]) -> String { - let parameters: String = pk_indices - .iter() - .map(|i| format!("{} = ${}", schema.fields()[*i].name, i + 1)) +fn create_delete_sql( + schema: &Schema, + table_name: &str, + pk_indices: &[usize], + number_of_rows: usize, +) -> String { + let number_of_pk = pk_indices.len(); + let parameters: String = (0..number_of_rows) + .map(|i| { + let row_parameters: String = pk_indices + .iter() + .enumerate() + .map(|(j, pk_index)| { + format!( + "{} = ${}", + schema.fields()[*pk_index].name, + i * number_of_pk + j + 1 + ) + }) + .collect_vec() + .join(" AND "); + format!("({row_parameters})") + }) .collect_vec() - .join(" AND "); + .join(" OR "); format!("DELETE FROM {table_name} WHERE {parameters}") } @@ -568,10 +612,10 @@ mod tests { }, ]); let table_name = "test_table"; - let sql = create_insert_sql(&schema, table_name); + let sql = create_insert_sql(&schema, table_name, 3); check( sql, - expect!["INSERT INTO test_table (a, b) VALUES ($1, $2)"], + expect!["INSERT INTO test_table (a, b) VALUES ($1, $2), ($3, $4), ($5, $6)"], ); } @@ -592,31 +636,10 @@ mod tests { }, ]); let table_name = "test_table"; - let sql = create_delete_sql(&schema, table_name, &[1]); - check(sql, expect!["DELETE FROM test_table WHERE b = $2"]); - } - - #[test] - fn test_create_upsert_sql() { - let schema = Schema::new(vec![ - Field { - data_type: DataType::Int32, - name: "a".to_owned(), - sub_fields: vec![], - type_name: "".to_owned(), - }, - Field { - data_type: DataType::Int32, - name: "b".to_owned(), - sub_fields: vec![], - type_name: "".to_owned(), - }, - ]); - let table_name = "test_table"; - let sql = create_upsert_sql(&schema, table_name, &[1]); + let sql = create_delete_sql(&schema, table_name, &[1], 3); check( sql, - expect!["INSERT INTO test_table (a, b) VALUES ($1, $2) on conflict (b) do update set a = $1"], + expect!["DELETE FROM test_table WHERE (b = $1) OR (b = $2) OR (b = $3)"], ); } }