Skip to content

Commit

Permalink
feat(batch): support basic postgres tvf (#18811)
Browse files Browse the repository at this point in the history
Co-authored-by: xiangjinwu <[email protected]>
  • Loading branch information
kwannoel and xiangjinwu authored Oct 11, 2024
1 parent aaad651 commit da977e9
Show file tree
Hide file tree
Showing 20 changed files with 804 additions and 8 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ risedev slt './e2e_test/source/cdc/cdc.check_new_rows.slt'
# drop relations
risedev slt './e2e_test/source/cdc/cdc_share_stream_drop.slt'

echo "--- postgres_query tvf test"
risedev slt './e2e_test/source/tvf/postgres_query.slt'

echo "--- Kill cluster"
risedev ci-kill
export RISINGWAVE_CI=true
Expand Down
40 changes: 40 additions & 0 deletions e2e_test/source/tvf/postgres_query.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
system ok
PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test psql -c "
CREATE TABLE test (
id bigint primary key,
v1 bool,
v2 smallint,
v3 integer,
v4 bigint,
v5 real,
v6 double precision,
v7 numeric,
v8 date,
v9 time,
v10 timestamp,
v11 timestamptz,
v12 text,
v13 varchar,
v14 interval,
v15 jsonb,
v16 bytea
);"

system ok
PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test psql -c "
INSERT INTO test SELECT generate_series(1, 100), true, 1, 1, 1, 1.0, 1.0, 1.0, '2021-01-01', '00:00:00', '2021-01-01 00:00:00', '2021-01-01 00:00:00 pst', 'text', 'varchar', '1 day', '{}', '\x01';
"

query II
select * from postgres_query('db', '5432', 'postgres', 'postgres', 'cdc_test', 'select * from test where id > 90;');
----
91 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
92 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
93 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
94 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
95 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
96 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
97 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
98 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
99 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
100 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
12 changes: 12 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ message FileScanNode {
repeated string file_location = 7;
}

// NOTE(kwannoel): This will only be used in batch mode. We can change the definition as needed.
message PostgresQueryNode {
repeated plan_common.ColumnDesc columns = 1;
string hostname = 2;
string port = 3;
string username = 4;
string password = 5;
string database = 6;
string query = 7;
}

message ProjectNode {
repeated expr.ExprNode select_list = 1;
}
Expand Down Expand Up @@ -373,6 +384,7 @@ message PlanNode {
LogRowSeqScanNode log_row_seq_scan = 37;
FileScanNode file_scan = 38;
IcebergScanNode iceberg_scan = 39;
PostgresQueryNode postgres_query = 40;
// The following nodes are used for testing.
bool block_executor = 100;
bool busy_loop_executor = 101;
Expand Down
2 changes: 2 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ message TableFunction {
JSONB_TO_RECORDSET = 17;
// file scan
FILE_SCAN = 19;
// postgres query
POSTGRES_QUERY = 20;
// User defined table function
USER_DEFINED = 100;
}
Expand Down
1 change: 1 addition & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"fs",
] }
tokio-metrics = "0.3.0"
tokio-postgres = "0.7"
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tonic = { workspace = true }
Expand Down
8 changes: 8 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
use risingwave_storage::error::StorageError;
use thiserror::Error;
use thiserror_ext::Construct;
use tokio_postgres;
use tonic::Status;

use crate::worker_manager::worker_node_manager::FragmentId;
Expand Down Expand Up @@ -127,6 +128,13 @@ pub enum BatchError {
ParquetError,
),

#[error(transparent)]
Postgres(
#[from]
#[backtrace]
tokio_postgres::Error,
),

// Make the ref-counted type to be a variant for easier code structuring.
// TODO(error-handling): replace with `thiserror_ext::Arc`
#[error(transparent)]
Expand Down
3 changes: 3 additions & 0 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod max_one_row;
mod merge_sort;
mod merge_sort_exchange;
mod order_by;
mod postgres_query;
mod project;
mod project_set;
mod row_seq_scan;
Expand Down Expand Up @@ -65,6 +66,7 @@ pub use max_one_row::*;
pub use merge_sort::*;
pub use merge_sort_exchange::*;
pub use order_by::*;
pub use postgres_query::*;
pub use project::*;
pub use project_set::*;
use risingwave_common::array::DataChunk;
Expand Down Expand Up @@ -244,6 +246,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {
NodeBody::MaxOneRow => MaxOneRowExecutor,
NodeBody::FileScan => FileScanExecutorBuilder,
NodeBody::IcebergScan => IcebergScanExecutorBuilder,
NodeBody::PostgresQuery => PostgresQueryExecutorBuilder,
// Follow NodeBody only used for test
NodeBody::BlockExecutor => BlockExecutorBuilder,
NodeBody::BusyLoopExecutor => BusyLoopExecutorBuilder,
Expand Down
194 changes: 194 additions & 0 deletions src/batch/src/executor/postgres_query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Context;
use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum, Decimal, ScalarImpl};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use thiserror_ext::AsReport;
use tokio_postgres;

use crate::error::BatchError;
use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, DataChunk, Executor, ExecutorBuilder};
use crate::task::BatchTaskContext;

/// `PostgresQuery` executor. Runs a query against a Postgres database.
pub struct PostgresQueryExecutor {
schema: Schema,
host: String,
port: String,
username: String,
password: String,
database: String,
query: String,
identity: String,
}

impl Executor for PostgresQueryExecutor {
fn schema(&self) -> &risingwave_common::catalog::Schema {
&self.schema
}

fn identity(&self) -> &str {
&self.identity
}

fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
self.do_execute().boxed()
}
}

pub fn postgres_row_to_owned_row(
row: tokio_postgres::Row,
schema: &Schema,
) -> Result<OwnedRow, BatchError> {
let mut datums = vec![];
for i in 0..schema.fields.len() {
let rw_field = &schema.fields[i];
let name = rw_field.name.as_str();
let datum = postgres_cell_to_scalar_impl(&row, &rw_field.data_type, i, name)?;
datums.push(datum);
}
Ok(OwnedRow::new(datums))
}

// TODO(kwannoel): Support more types, see postgres connector's ScalarAdapter.
fn postgres_cell_to_scalar_impl(
row: &tokio_postgres::Row,
data_type: &DataType,
i: usize,
name: &str,
) -> Result<Datum, BatchError> {
let datum = match data_type {
DataType::Boolean
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Float32
| DataType::Float64
| DataType::Date
| DataType::Time
| DataType::Timestamp
| DataType::Timestamptz
| DataType::Jsonb
| DataType::Interval
| DataType::Varchar
| DataType::Bytea => {
// ScalarAdapter is also fine. But ScalarImpl is more efficient
row.try_get::<_, Option<ScalarImpl>>(i)?
}
DataType::Decimal => {
// Decimal is more efficient than PgNumeric in ScalarAdapter
let val = row.try_get::<_, Option<Decimal>>(i)?;
val.map(ScalarImpl::from)
}
_ => {
tracing::warn!(name, ?data_type, "unsupported data type, set to null");
None
}
};
Ok(datum)
}

impl PostgresQueryExecutor {
pub fn new(
schema: Schema,
host: String,
port: String,
username: String,
password: String,
database: String,
query: String,
identity: String,
) -> Self {
Self {
schema,
host,
port,
username,
password,
database,
query,
identity,
}
}

#[try_stream(ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
tracing::debug!("postgres_query_executor: started");
let conn_str = format!(
"host={} port={} user={} password={} dbname={}",
self.host, self.port, self.username, self.password, self.database
);
let (client, conn) = tokio_postgres::connect(&conn_str, tokio_postgres::NoTls).await?;

tokio::spawn(async move {
if let Err(e) = conn.await {
tracing::error!(
"postgres_query_executor: connection error: {:?}",
e.as_report()
);
}
});

// TODO(kwannoel): Use pagination using CURSOR.
let rows = client
.query(&self.query, &[])
.await
.context("postgres_query received error from remote server")?;
let mut builder = DataChunkBuilder::new(self.schema.data_types(), 1024);
tracing::debug!("postgres_query_executor: query executed, start deserializing rows");
// deserialize the rows
for row in rows {
let owned_row = postgres_row_to_owned_row(row, &self.schema)?;
if let Some(chunk) = builder.append_one_row(owned_row) {
yield chunk;
}
}
if let Some(chunk) = builder.consume_all() {
yield chunk;
}
return Ok(());
}
}

pub struct PostgresQueryExecutorBuilder {}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for PostgresQueryExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
_inputs: Vec<BoxedExecutor>,
) -> crate::error::Result<BoxedExecutor> {
let postgres_query_node = try_match_expand!(
source.plan_node().get_node_body().unwrap(),
NodeBody::PostgresQuery
)?;

Ok(Box::new(PostgresQueryExecutor::new(
Schema::from_iter(postgres_query_node.columns.iter().map(Field::from)),
postgres_query_node.hostname.clone(),
postgres_query_node.port.clone(),
postgres_query_node.username.clone(),
postgres_query_node.password.clone(),
postgres_query_node.database.clone(),
postgres_query_node.query.clone(),
source.plan_node().get_identity().clone(),
)))
}
}
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"signal",
"fs",
] }
tokio-postgres = "0.7"
tokio-stream = { workspace = true }
tonic = { workspace = true }
tracing = "0.1"
Expand Down
12 changes: 12 additions & 0 deletions src/frontend/src/binder/expr/function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::Context;
use itertools::Itertools;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{INFORMATION_SCHEMA_SCHEMA_NAME, PG_CATALOG_SCHEMA_NAME};
Expand Down Expand Up @@ -321,6 +322,17 @@ impl Binder {
self.ensure_table_function_allowed()?;
return Ok(TableFunction::new_file_scan(args)?.into());
}
// `postgres_query` table function
if func_name.eq("postgres_query") {
reject_syntax!(
arg_list.variadic,
"`VARIADIC` is not allowed in table function call"
);
self.ensure_table_function_allowed()?;
return Ok(TableFunction::new_postgres_query(args)
.context("postgres_query error")?
.into());
}
// UDTF
if let Some(ref udf) = udf
&& udf.kind.is_table()
Expand Down
Loading

0 comments on commit da977e9

Please sign in to comment.