From 20f8aafeb016e22ca99af851f9ca63af95fb19e1 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 19 Feb 2024 17:21:29 +0800 Subject: [PATCH] fix(pg-cdc): support inject UUID column to VARCHAR for shared pg source (#15114) --- Cargo.lock | 2 ++ e2e_test/source/cdc/cdc.share_stream.slt | 5 +-- e2e_test/source/cdc/postgres_cdc.sql | 5 +-- src/connector/Cargo.toml | 3 +- src/connector/src/parser/postgres.rs | 42 +++++++++++++++++++----- src/workspace-hack/Cargo.toml | 4 +-- 6 files changed, 45 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f61886515a650..d18c726756bef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7517,6 +7517,7 @@ dependencies = [ "postgres-protocol", "serde", "serde_json", + "uuid", ] [[package]] @@ -9059,6 +9060,7 @@ dependencies = [ "tracing-test", "url", "urlencoding", + "uuid", "walkdir", "with_options", "workspace-hack", diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index d4b50ed4db6d6..7739d3f1ad6ea 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -205,6 +205,7 @@ CREATE TABLE IF NOT EXISTS postgres_all_types( c_timestamptz_array timestamptz[], c_interval_array interval[], c_jsonb_array jsonb[], + c_uuid varchar, PRIMARY KEY (c_boolean,c_bigint,c_date) ) from pg_source table 'public.postgres_all_types'; @@ -234,9 +235,9 @@ CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_ne sleep 3s query TTTTTTT -SELECT c_boolean,c_date,c_time,c_timestamp,c_jsonb,c_smallint_array,c_timestamp_array FROM postgres_all_types where c_bigint=-9223372036854775807 +SELECT c_boolean,c_date,c_time,c_timestamp,c_jsonb,c_smallint_array,c_timestamp_array,c_uuid FROM postgres_all_types where c_bigint=-9223372036854775807 ---- -f 0001-01-01 00:00:00 0001-01-01 00:00:00 {} {-32767} {"0001-01-01 00:00:00"} +f 0001-01-01 00:00:00 0001-01-01 00:00:00 {} {-32767} {"0001-01-01 00:00:00"} bb488f9b-330d-4012-b849-12adeb49e57e # postgres streaming test diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index 43dba14950b36..a4de0e447a0cc 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -67,7 +67,8 @@ CREATE TABLE IF NOT EXISTS postgres_all_types( c_timestamptz_array timestamptz[], c_interval_array interval[], c_jsonb_array jsonb[], + c_uuid uuid, PRIMARY KEY (c_boolean,c_bigint,c_date) ); -INSERT INTO postgres_all_types VALUES ( False, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[]); -INSERT INTO postgres_all_types VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['0001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['0001-01-01 00:00:00'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[]); +INSERT INTO postgres_all_types VALUES ( False, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[], null); +INSERT INTO postgres_all_types VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['0001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['0001-01-01 00:00:00'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], 'bb488f9b-330d-4012-b849-12adeb49e57e'); diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 83f359a12b206..a95e388fc00ea 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -135,7 +135,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "signal", "fs", ] } -tokio-postgres = "0.7" +tokio-postgres = { version = "0.7", features = ["with-uuid-1"] } tokio-retry = "0.3" tokio-stream = "0.1" tokio-util = { version = "0.7", features = ["codec", "io"] } @@ -143,6 +143,7 @@ tonic = "0.10.2" tracing = "0.1" url = "2" urlencoding = "2" +uuid = { version = "1", features = ["v4", "fast-rng"] } with_options = { path = "./with_options" } yup-oauth2 = "8.3" diff --git a/src/connector/src/parser/postgres.rs b/src/connector/src/parser/postgres.rs index 0823fa5579557..acfbe5e4ae435 100644 --- a/src/connector/src/parser/postgres.rs +++ b/src/connector/src/parser/postgres.rs @@ -23,6 +23,8 @@ use risingwave_common::types::{ Timestamptz, }; use rust_decimal::Decimal as RustDecimal; +use thiserror_ext::AsReport; +use tokio_postgres::types::Type; static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); @@ -138,7 +140,29 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O handle_data_type!(row, i, name, RustDecimal, Decimal) } DataType::Varchar => { - handle_data_type!(row, i, name, String) + match row.columns()[i].type_() { + // Since we don't support UUID natively, adapt it to a VARCHAR column + &Type::UUID => { + let res = row.try_get::<_, Option>(i); + match res { + Ok(val) => val.map(|v| ScalarImpl::from(v.to_string())), + Err(err) => { + if let Ok(sc) = LOG_SUPPERSSER.check() { + tracing::error!( + suppressed_count = sc, + column_name = name, + error = %err.as_report(), + "parse uuid column failed", + ); + } + None + } + } + } + _ => { + handle_data_type!(row, i, name, String) + } + } } DataType::Date => { handle_data_type!(row, i, name, NaiveDate, Date) @@ -159,10 +183,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O Err(err) => { if let Ok(sc) = LOG_SUPPERSSER.check() { tracing::error!( - "parse column \"{}\" fail: {} ({} suppressed)", - name, - err, - sc + suppressed_count = sc, + column_name = name, + error = %err.as_report(), + "parse column failed", ); } None @@ -256,10 +280,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O Err(err) => { if let Ok(sc) = LOG_SUPPERSSER.check() { tracing::error!( - "parse column \"{}\" fail: {} ({} suppressed)", - name, - err, - sc + suppressed_count = sc, + column_name = name, + error = %err.as_report(), + "parse column failed", ); } } diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 5a42ca82debf8..f822ddea5dcd0 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -96,7 +96,7 @@ parking_lot_core = { version = "0.9", default-features = false, features = ["dea petgraph = { version = "0.6" } phf = { version = "0.11", features = ["uncased"] } phf_shared = { version = "0.11", features = ["uncased"] } -postgres-types = { version = "0.2", default-features = false, features = ["derive", "with-chrono-0_4", "with-serde_json-1"] } +postgres-types = { version = "0.2", default-features = false, features = ["derive", "with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } proc-macro2 = { version = "1", features = ["span-locations"] } prometheus = { version = "0.13", features = ["process"] } prost = { version = "0.12", features = ["no-recursion-limit"] } @@ -135,7 +135,7 @@ syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-trai time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] } tinyvec = { version = "1", features = ["alloc", "grab_spare_slice", "rustc_1_55"] } tokio = { version = "1", features = ["full", "stats", "tracing"] } -tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88", features = ["with-chrono-0_4"] } +tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88", features = ["with-chrono-0_4", "with-uuid-1"] } tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "fe39bb8e", features = ["fs", "net"] } tokio-util = { version = "0.7", features = ["codec", "io"] } toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }