Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(cdc): refactor parsing of non-builtin Postgres data types #16589

Merged
merged 28 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use self::avro::AvroAccessBuilder;
use self::bytes_parser::BytesAccessBuilder;
pub use self::mysql::mysql_row_to_owned_row;
use self::plain_parser::PlainParser;
pub use self::postgres::{postgres_row_to_owned_row, EnumString};
pub use self::postgres::postgres_row_to_owned_row;
use self::simd_json_parser::DebeziumJsonAccessBuilder;
pub use self::unified::json::TimestamptzHandling;
use self::unified::AccessImpl;
Expand Down Expand Up @@ -75,7 +75,9 @@ mod maxwell;
mod mysql;
pub mod plain_parser;
mod postgres;

mod protobuf;
pub mod scalar_adapter;
mod unified;
mod upsert_parser;
mod util;
Expand Down
163 changes: 46 additions & 117 deletions src/connector/src/parser/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::str::FromStr;
use std::sync::LazyLock;

use bytes::BytesMut;
use chrono::{NaiveDate, Utc};
use pg_bigdecimal::PgNumeric;
use risingwave_common::catalog::Schema;
use risingwave_common::log::LogSuppresser;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{
DataType, Date, Decimal, Int256, Interval, JsonbVal, ListValue, ScalarImpl, Time, Timestamp,
DataType, Date, Decimal, Interval, JsonbVal, ListValue, ScalarImpl, Time, Timestamp,
Timestamptz,
};
use rust_decimal::Decimal as RustDecimal;
use thiserror_ext::AsReport;
use tokio_postgres::types::{to_sql_checked, FromSql, IsNull, Kind, ToSql, Type};
use tokio_postgres::types::{Kind, Type};

use crate::parser::scalar_adapter::ScalarAdapter;
use crate::parser::util::log_error;

static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
Expand Down Expand Up @@ -121,9 +119,9 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
// we use the PgNumeric type to convert the decimal to a string.
// Then we convert the string to Int256.
// Note: It's only used to map the numeric type in upstream Postgres to RisingWave's rw_int256.
let res = row.try_get::<_, Option<PgNumeric>>(i);
let res = row.try_get::<_, Option<ScalarAdapter<'_>>>(i);
match res {
Ok(val) => pg_numeric_to_rw_int256(val, name),
Ok(val) => val.and_then(|v| v.into_scalar(DataType::Int256)),
Err(err) => {
log_error!(name, err, "parse numeric column as pg_numeric failed");
None
Expand All @@ -133,9 +131,9 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
DataType::Varchar => {
if let Kind::Enum(_) = row.columns()[i].type_().kind() {
// enum type needs to be handled separately
let res = row.try_get::<_, Option<EnumString>>(i);
let res = row.try_get::<_, Option<ScalarAdapter<'_>>>(i);
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v.0)),
Ok(val) => val.and_then(|v| v.into_scalar(DataType::Varchar)),
Err(err) => {
log_error!(name, err, "parse enum column failed");
None
Expand All @@ -145,9 +143,9 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
match *row.columns()[i].type_() {
// Since we don't support UUID natively, adapt it to a VARCHAR column
Type::UUID => {
let res = row.try_get::<_, Option<uuid::Uuid>>(i);
let res = row.try_get::<_, Option<ScalarAdapter<'_>>>(i);
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v.to_string())),
Ok(val) => val.and_then(|v| v.into_scalar(DataType::Varchar)),
Err(err) => {
log_error!(name, err, "parse uuid column failed");
None
Expand All @@ -159,9 +157,9 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
// Currently in order to handle the decimal beyond RustDecimal,
// we use the PgNumeric type to convert the decimal to a string.
// Note: It's only used to map the numeric type in upstream Postgres to RisingWave's varchar.
let res = row.try_get::<_, Option<PgNumeric>>(i);
let res = row.try_get::<_, Option<ScalarAdapter<'_>>>(i);
match res {
Ok(val) => pg_numeric_to_varchar(val),
Ok(val) => val.and_then(|v| v.into_scalar(DataType::Varchar)),
Err(err) => {
log_error!(
name,
Expand Down Expand Up @@ -216,13 +214,13 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
// As `NULL` in enum list is not supported in Debezium, we use `EnumString`
// instead of `Option<EnumString>` to handle enum to keep the behaviors aligned.
// An enum list contains `NULL` will be converted to `NULL`.
let res = row.try_get::<_, Option<Vec<EnumString>>>(i);
let res = row.try_get::<_, Option<Vec<ScalarAdapter<'_>>>>(i);
match res {
Ok(val) => {
if let Some(v) = val {
v.into_iter().for_each(|val| {
builder.append(Some(ScalarImpl::from(val.0)));
});
if let Some(vec) = val {
for val in vec {
builder.append(val.into_scalar(DataType::Varchar))
}
}
Some(ScalarImpl::from(ListValue::new(builder.finish())))
}
Expand Down Expand Up @@ -261,16 +259,18 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
match *row.columns()[i].type_() {
// Since we don't support UUID natively, adapt it to a VARCHAR column
Type::UUID_ARRAY => {
let res =
row.try_get::<_, Option<Vec<Option<uuid::Uuid>>>>(i);
let res = row
.try_get::<_, Option<Vec<Option<ScalarAdapter<'_>>>>>(
i,
);
match res {
Ok(val) => {
if let Some(v) = val {
v.into_iter().for_each(|val| {
builder.append(val.map(|v| {
ScalarImpl::from(v.to_string())
if let Some(vec) = val {
for val in vec {
builder.append(val.and_then(|v| {
v.into_scalar(DataType::Varchar)
}))
});
}
}
}
Err(err) => {
Expand All @@ -279,14 +279,18 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
};
}
Type::NUMERIC_ARRAY => {
let res =
row.try_get::<_, Option<Vec<Option<PgNumeric>>>>(i);
let res = row
.try_get::<_, Option<Vec<Option<ScalarAdapter<'_>>>>>(
i,
);
match res {
Ok(val) => {
if let Some(v) = val {
v.into_iter().for_each(|val| {
builder.append(pg_numeric_to_varchar(val))
});
if let Some(vec) = val {
for val in vec {
builder.append(val.and_then(|v| {
v.into_scalar(DataType::Varchar)
}))
}
}
}
Err(err) => {
Expand Down Expand Up @@ -364,13 +368,18 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
}
}
DataType::Int256 => {
let res = row.try_get::<_, Option<Vec<Option<PgNumeric>>>>(i);
let res =
row.try_get::<_, Option<Vec<Option<ScalarAdapter<'_>>>>>(i);
match res {
Ok(val) => {
if let Some(v) = val {
v.into_iter().for_each(|val| {
builder.append(pg_numeric_to_rw_int256(val, name))
});
if let Some(vec) = val {
for val in vec {
builder.append(
val.and_then(|v| {
v.into_scalar(DataType::Int256)
}),
)
}
}
}
Err(err) => {
Expand Down Expand Up @@ -404,91 +413,11 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
OwnedRow::new(datums)
}

fn pg_numeric_to_rw_int256(val: Option<PgNumeric>, name: &str) -> Option<ScalarImpl> {
let string = pg_numeric_to_string(val)?;
match Int256::from_str(string.as_str()) {
Ok(num) => Some(ScalarImpl::from(num)),
Err(err) => {
log_error!(name, err, "parse numeric string as rw_int256 failed");
None
}
}
}

fn pg_numeric_to_varchar(val: Option<PgNumeric>) -> Option<ScalarImpl> {
pg_numeric_to_string(val).map(ScalarImpl::from)
}

fn pg_numeric_to_string(val: Option<PgNumeric>) -> Option<String> {
if let Some(pg_numeric) = val {
// TODO(kexiang): NEGATIVE_INFINITY -> -Infinity, POSITIVE_INFINITY -> Infinity, NAN -> NaN
// The current implementation is to ensure consistency with the behavior of cdc event parsor.
match pg_numeric {
PgNumeric::NegativeInf => Some(String::from("NEGATIVE_INFINITY")),
PgNumeric::Normalized(big_decimal) => Some(big_decimal.to_string()),
PgNumeric::PositiveInf => Some(String::from("POSITIVE_INFINITY")),
PgNumeric::NaN => Some(String::from("NAN")),
}
} else {
// NULL
None
}
}

#[derive(Clone, Debug)]
pub struct EnumString(pub String);

impl<'a> FromSql<'a> for EnumString {
fn from_sql(
_ty: &Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + 'static + Sync + Send>> {
Ok(EnumString(String::from_utf8_lossy(raw).into_owned()))
}

fn accepts(ty: &Type) -> bool {
matches!(ty.kind(), Kind::Enum(_))
}
}

impl ToSql for EnumString {
to_sql_checked!();

fn accepts(ty: &Type) -> bool {
matches!(ty.kind(), Kind::Enum(_))
}

fn to_sql(
&self,
ty: &Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
where
Self: Sized,
{
match ty.kind() {
Kind::Enum(e) => {
if e.contains(&self.0) {
out.extend_from_slice(self.0.as_bytes());
Ok(IsNull::No)
} else {
Err(format!(
"EnumString value {} is not in the enum type {:?}",
self.0, e
)
.into())
}
}
_ => Err("EnumString can only be used with ENUM types".into()),
}
}
}

#[cfg(test)]
mod tests {
use tokio_postgres::NoTls;

use crate::parser::postgres::EnumString;
use crate::parser::scalar_adapter::EnumString;
const DB: &str = "postgres";
const USER: &str = "kexiang";

Expand Down
Loading
Loading