Skip to content

Commit

Permalink
fix(cdc): fix enum to varchar in postgres-cdc
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Apr 23, 2024
1 parent d476283 commit 967c04c
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 156 deletions.
38 changes: 6 additions & 32 deletions src/connector/src/parser/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use risingwave_common::types::{
use rust_decimal::Decimal as RustDecimal;
use thiserror_ext::AsReport;

use crate::parser::util::log_error;

static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);

macro_rules! handle_data_type {
Expand All @@ -33,14 +35,7 @@ macro_rules! handle_data_type {
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v)),
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
column = $name,
error = %err.as_report(),
suppressed_count,
"parse column failed",
);
}
log_error!($name, err, "parse column failed");
None
}
}
Expand All @@ -50,14 +45,7 @@ macro_rules! handle_data_type {
match res {
Ok(val) => val.map(|v| ScalarImpl::from(<$rw_type>::from(v))),
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
column = $name,
error = %err.as_report(),
suppressed_count,
"parse column failed",
);
}
log_error!($name, err, "parse column failed");
None
}
}
Expand Down Expand Up @@ -113,14 +101,7 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne
ScalarImpl::from(Timestamptz::from_micros(v.timestamp_micros()))
}),
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count,
column = name,
error = %err.as_report(),
"parse column failed",
);
}
log_error!(name, err, "parse column failed");
None
}
}
Expand All @@ -132,14 +113,7 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v.into_boxed_slice())),
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count,
column = name,
error = %err.as_report(),
"parse column failed",
);
}
log_error!(name, err, "parse column failed");
None
}
}
Expand Down
159 changes: 35 additions & 124 deletions src/connector/src/parser/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use rust_decimal::Decimal as RustDecimal;
use thiserror_ext::AsReport;
use tokio_postgres::types::{to_sql_checked, FromSql, IsNull, Kind, ToSql, Type};

use crate::parser::util::log_error;

static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);

macro_rules! handle_list_data_type {
Expand All @@ -42,14 +44,7 @@ macro_rules! handle_list_data_type {
}
}
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
column = $name,
error = %err.as_report(),
suppressed_count,
"parse column failed",
);
}
log_error!($name, err, "parse column failed");
}
}
};
Expand All @@ -64,14 +59,7 @@ macro_rules! handle_list_data_type {
}
}
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
column = $name,
error = %err.as_report(),
suppressed_count,
"parse column failed",
);
}
log_error!($name, err, "parse column failed");
}
}
};
Expand All @@ -83,14 +71,7 @@ macro_rules! handle_data_type {
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v)),
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
column = $name,
error = %err.as_report(),
suppressed_count,
"parse column failed",
);
}
log_error!($name, err, "parse column failed");
None
}
}
Expand All @@ -100,14 +81,7 @@ macro_rules! handle_data_type {
match res {
Ok(val) => val.map(|v| ScalarImpl::from(<$rw_type>::from(v))),
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
column = $name,
error = %err.as_report(),
suppressed_count,
"parse column failed",
);
}
log_error!($name, err, "parse column failed");
None
}
}
Expand Down Expand Up @@ -149,16 +123,9 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
// Note: It's only used to map the numeric type in upstream Postgres to RisingWave's rw_int256.
let res = row.try_get::<_, Option<PgNumeric>>(i);
match res {
Ok(val) => pg_numeric_to_rw_int256(val),
Ok(val) => pg_numeric_to_rw_int256(val, name),
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
column = name,
error = %err.as_report(),
suppressed_count,
"parse numeric column as pg_numeric failed",
);
}
log_error!(name, err, "parse numeric column as pg_numeric failed");
None
}
}
Expand All @@ -170,14 +137,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v.0)),
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count,
column = name,
error = %err.as_report(),
"parse enum column failed",
);
}
log_error!(name, err, "parse enum column failed");
None
}
}
Expand All @@ -189,14 +149,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v.to_string())),
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count,
column = name,
error = %err.as_report(),
"parse uuid column failed",
);
}
log_error!(name, err, "parse uuid column failed");
None
}
}
Expand All @@ -210,14 +163,11 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
match res {
Ok(val) => pg_numeric_to_varchar(val),
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
column = name,
error = %err.as_report(),
suppressed_count,
"parse numeric column as pg_numeric failed",
);
}
log_error!(
name,
err,
"parse numeric column as pg_numeric failed"
);
None
}
}
Expand Down Expand Up @@ -245,14 +195,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v.into_boxed_slice())),
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count,
column = name,
error = %err.as_report(),
"parse column failed",
);
}
log_error!(name, err, "parse column failed");
None
}
}
Expand All @@ -279,14 +222,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
}
}
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count,
column = name,
error = %err.as_report(),
"parse enum column failed",
);
}
log_error!(name, err, "parse enum column failed");
}
}
} else {
Expand Down Expand Up @@ -331,15 +267,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
}
}
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check()
{
tracing::error!(
suppressed_count,
column = name,
error = %err.as_report(),
"parse uuid column failed",
);
}
log_error!(name, err, "parse uuid column failed");
}
};
}
Expand All @@ -356,15 +284,11 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
}
}
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check()
{
tracing::error!(
suppressed_count,
column = name,
error = %err.as_report(),
"parse numeric list column as pg_numeric list failed",
);
}
log_error!(
name,
err,
"parse numeric list column as pg_numeric list failed"
);
}
};
}
Expand Down Expand Up @@ -429,14 +353,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
}
}
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count,
column = name,
error = %err.as_report(),
"parse column failed",
);
}
log_error!(name, err, "parse column failed");
}
}
}
Expand All @@ -446,19 +363,19 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
Ok(val) => {
if let Some(v) = val {
v.into_iter().for_each(|val| {
builder.append(pg_numeric_to_rw_int256(Some(val)))
builder.append(pg_numeric_to_rw_int256(
Some(val),
name,
))
});
}
}
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count,
column = name,
error = %err.as_report(),
"parse numeric list column as pg_numeric list failed",
);
}
log_error!(
name,
err,
"parse numeric list column as pg_numeric list failed"
);
}
};
}
Expand All @@ -484,18 +401,12 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
OwnedRow::new(datums)
}

fn pg_numeric_to_rw_int256(val: Option<PgNumeric>) -> Option<ScalarImpl> {
fn pg_numeric_to_rw_int256(val: Option<PgNumeric>, name: &str) -> Option<ScalarImpl> {
let string = pg_numeric_to_string(val)?;
match Int256::from_str(string.as_str()) {
Ok(num) => Some(ScalarImpl::from(num)),
Err(err) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
error = %err.as_report(),
suppressed_count,
"parse numeric string as rw_int256 failed",
);
}
log_error!(name, err, "parse numeric string as rw_int256 failed");
None
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/connector/src/parser/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@ use crate::connector_common::AwsAuthProps;
use crate::error::ConnectorResult;
use crate::source::SourceMeta;

macro_rules! log_error {
($name:expr, $err:expr, $message:expr) => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
column = $name,
error = %$err.as_report(),
suppressed_count,
$message,
);
}
};
}
pub(crate) use log_error;

/// get kafka topic name
pub(super) fn get_kafka_topic(props: &HashMap<String, String>) -> ConnectorResult<&String> {
const KAFKA_TOPIC_KEY1: &str = "kafka.topic";
Expand Down

0 comments on commit 967c04c

Please sign in to comment.