Skip to content

Commit

Permalink
fix(pg-cdc): support inject UUID column to VARCHAR for shared pg sour…
Browse files Browse the repository at this point in the history
…ce (#15114)
  • Loading branch information
StrikeW committed Feb 19, 2024
1 parent 02e9a44 commit 20f8aaf
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 16 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ CREATE TABLE IF NOT EXISTS postgres_all_types(
c_timestamptz_array timestamptz[],
c_interval_array interval[],
c_jsonb_array jsonb[],
c_uuid varchar,
PRIMARY KEY (c_boolean,c_bigint,c_date)
) from pg_source table 'public.postgres_all_types';

Expand Down Expand Up @@ -234,9 +235,9 @@ CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_ne
sleep 3s

query TTTTTTT
SELECT c_boolean,c_date,c_time,c_timestamp,c_jsonb,c_smallint_array,c_timestamp_array FROM postgres_all_types where c_bigint=-9223372036854775807
SELECT c_boolean,c_date,c_time,c_timestamp,c_jsonb,c_smallint_array,c_timestamp_array,c_uuid FROM postgres_all_types where c_bigint=-9223372036854775807
----
f 0001-01-01 00:00:00 0001-01-01 00:00:00 {} {-32767} {"0001-01-01 00:00:00"}
f 0001-01-01 00:00:00 0001-01-01 00:00:00 {} {-32767} {"0001-01-01 00:00:00"} bb488f9b-330d-4012-b849-12adeb49e57e


# postgres streaming test
Expand Down
5 changes: 3 additions & 2 deletions e2e_test/source/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ CREATE TABLE IF NOT EXISTS postgres_all_types(
c_timestamptz_array timestamptz[],
c_interval_array interval[],
c_jsonb_array jsonb[],
c_uuid uuid,
PRIMARY KEY (c_boolean,c_bigint,c_date)
);
INSERT INTO postgres_all_types VALUES ( False, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[]);
INSERT INTO postgres_all_types VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['0001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['0001-01-01 00:00:00'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[]);
INSERT INTO postgres_all_types VALUES ( False, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[], null);
INSERT INTO postgres_all_types VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['0001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['0001-01-01 00:00:00'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], 'bb488f9b-330d-4012-b849-12adeb49e57e');
3 changes: 2 additions & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,15 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"signal",
"fs",
] }
tokio-postgres = "0.7"
tokio-postgres = { version = "0.7", features = ["with-uuid-1"] }
tokio-retry = "0.3"
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["codec", "io"] }
tonic = "0.10.2"
tracing = "0.1"
url = "2"
urlencoding = "2"
uuid = { version = "1", features = ["v4", "fast-rng"] }
with_options = { path = "./with_options" }
yup-oauth2 = "8.3"

Expand Down
42 changes: 33 additions & 9 deletions src/connector/src/parser/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use risingwave_common::types::{
Timestamptz,
};
use rust_decimal::Decimal as RustDecimal;
use thiserror_ext::AsReport;
use tokio_postgres::types::Type;

static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);

Expand Down Expand Up @@ -138,7 +140,29 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
handle_data_type!(row, i, name, RustDecimal, Decimal)
}
DataType::Varchar => {
handle_data_type!(row, i, name, String)
match row.columns()[i].type_() {
// Since we don't support UUID natively, adapt it to a VARCHAR column
&Type::UUID => {
let res = row.try_get::<_, Option<uuid::Uuid>>(i);
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v.to_string())),
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count = sc,
column_name = name,
error = %err.as_report(),
"parse uuid column failed",
);
}
None
}
}
}
_ => {
handle_data_type!(row, i, name, String)
}
}
}
DataType::Date => {
handle_data_type!(row, i, name, NaiveDate, Date)
Expand All @@ -159,10 +183,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
tracing::error!(
"parse column \"{}\" fail: {} ({} suppressed)",
name,
err,
sc
suppressed_count = sc,
column_name = name,
error = %err.as_report(),
"parse column failed",
);
}
None
Expand Down Expand Up @@ -256,10 +280,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
tracing::error!(
"parse column \"{}\" fail: {} ({} suppressed)",
name,
err,
sc
suppressed_count = sc,
column_name = name,
error = %err.as_report(),
"parse column failed",
);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ parking_lot_core = { version = "0.9", default-features = false, features = ["dea
petgraph = { version = "0.6" }
phf = { version = "0.11", features = ["uncased"] }
phf_shared = { version = "0.11", features = ["uncased"] }
postgres-types = { version = "0.2", default-features = false, features = ["derive", "with-chrono-0_4", "with-serde_json-1"] }
postgres-types = { version = "0.2", default-features = false, features = ["derive", "with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] }
proc-macro2 = { version = "1", features = ["span-locations"] }
prometheus = { version = "0.13", features = ["process"] }
prost = { version = "0.12", features = ["no-recursion-limit"] }
Expand Down Expand Up @@ -135,7 +135,7 @@ syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-trai
time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] }
tinyvec = { version = "1", features = ["alloc", "grab_spare_slice", "rustc_1_55"] }
tokio = { version = "1", features = ["full", "stats", "tracing"] }
tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88", features = ["with-chrono-0_4"] }
tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88", features = ["with-chrono-0_4", "with-uuid-1"] }
tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "fe39bb8e", features = ["fs", "net"] }
tokio-util = { version = "0.7", features = ["codec", "io"] }
toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
Expand Down

0 comments on commit 20f8aaf

Please sign in to comment.