From 192b678c658e074ed501abdda739efa06d7a0734 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Sun, 18 Feb 2024 17:55:27 +0800 Subject: [PATCH 1/5] support pg uuid --- Cargo.lock | 14 ++++++++++++++ src/connector/Cargo.toml | 3 ++- src/connector/src/parser/postgres.rs | 25 ++++++++++++++++++++++++- 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 98467d29c0211..9335e0252631f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7518,6 +7518,7 @@ dependencies = [ "postgres-protocol", "serde", "serde_json", + "uuid", ] [[package]] @@ -9059,6 +9060,7 @@ dependencies = [ "tracing-test", "url", "urlencoding", + "uuid", "walkdir", "with_options", "workspace-hack", @@ -12661,6 +12663,18 @@ dependencies = [ "getrandom", "rand", "serde", + "uuid-macro-internal", +] + +[[package]] +name = "uuid-macro-internal" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7abb14ae1a50dad63eaa768a458ef43d298cd1bd44951677bd10b732a9ba2a2d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", ] [[package]] diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index f73bd5f51c3e0..e62182e1ce810 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -135,12 +135,13 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "signal", "fs", ] } -tokio-postgres = "0.7" +tokio-postgres = {version = "0.7", features = ["with-uuid-1"]} tokio-retry = "0.3" tokio-stream = "0.1" tokio-util = { version = "0.7", features = ["codec", "io"] } tonic = { workspace = true } tracing = "0.1" +uuid = {version = "1", features = ["v4", "fast-rng", "macro-diagnostics"]} url = "2" urlencoding = "2" with_options = { path = "./with_options" } diff --git a/src/connector/src/parser/postgres.rs b/src/connector/src/parser/postgres.rs index fe1906614698c..05cffdcf8ff82 100644 --- a/src/connector/src/parser/postgres.rs +++ b/src/connector/src/parser/postgres.rs @@ -24,6 +24,7 @@ use risingwave_common::types::{ }; use rust_decimal::Decimal as RustDecimal; use thiserror_ext::AsReport; +use tokio_postgres::types::Type; static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); @@ -139,7 +140,29 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O handle_data_type!(row, i, name, RustDecimal, Decimal) } DataType::Varchar => { - handle_data_type!(row, i, name, String) + match row.columns()[i].type_() { + // Since we don't support UUID natively, adapt it to a VARCHAR column + &Type::UUID => { + let res = row.try_get::<_, Option>(i); + match res { + Ok(val) => val.map(|v| ScalarImpl::from(v.to_string())), + Err(err) => { + if let Ok(sc) = LOG_SUPPERSSER.check() { + tracing::error!( + "parse uuid column \"{}\" fail: {} ({} suppressed)" , + name , + err , + sc + ); + } + None + } + } + } + _ => { + handle_data_type!(row, i, name, String) + } + } } DataType::Date => { handle_data_type!(row, i, name, NaiveDate, Date) From 0fc3bf2359a7f6d3a0589bc990ed3ca4269c2329 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Sun, 18 Feb 2024 18:33:27 +0800 Subject: [PATCH 2/5] add e2e test --- e2e_test/source/cdc/cdc.share_stream.slt | 5 +++-- e2e_test/source/cdc/postgres_cdc.sql | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index d4b50ed4db6d6..7739d3f1ad6ea 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -205,6 +205,7 @@ CREATE TABLE IF NOT EXISTS postgres_all_types( c_timestamptz_array timestamptz[], c_interval_array interval[], c_jsonb_array jsonb[], + c_uuid varchar, PRIMARY KEY (c_boolean,c_bigint,c_date) ) from pg_source table 'public.postgres_all_types'; @@ -234,9 +235,9 @@ CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_ne sleep 3s query TTTTTTT -SELECT c_boolean,c_date,c_time,c_timestamp,c_jsonb,c_smallint_array,c_timestamp_array FROM postgres_all_types where c_bigint=-9223372036854775807 +SELECT c_boolean,c_date,c_time,c_timestamp,c_jsonb,c_smallint_array,c_timestamp_array,c_uuid FROM postgres_all_types where c_bigint=-9223372036854775807 ---- -f 0001-01-01 00:00:00 0001-01-01 00:00:00 {} {-32767} {"0001-01-01 00:00:00"} +f 0001-01-01 00:00:00 0001-01-01 00:00:00 {} {-32767} {"0001-01-01 00:00:00"} bb488f9b-330d-4012-b849-12adeb49e57e # postgres streaming test diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index 43dba14950b36..a4de0e447a0cc 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -67,7 +67,8 @@ CREATE TABLE IF NOT EXISTS postgres_all_types( c_timestamptz_array timestamptz[], c_interval_array interval[], c_jsonb_array jsonb[], + c_uuid uuid, PRIMARY KEY (c_boolean,c_bigint,c_date) ); -INSERT INTO postgres_all_types VALUES ( False, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[]); -INSERT INTO postgres_all_types VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['0001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['0001-01-01 00:00:00'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[]); +INSERT INTO postgres_all_types VALUES ( False, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[], null); +INSERT INTO postgres_all_types VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['0001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['0001-01-01 00:00:00'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], 'bb488f9b-330d-4012-b849-12adeb49e57e'); From 38d664ec6214f89e3c05a51eada3d08eaa8129a0 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Sun, 18 Feb 2024 18:48:50 +0800 Subject: [PATCH 3/5] fix format --- src/connector/Cargo.toml | 4 ++-- src/connector/src/parser/postgres.rs | 8 ++++---- src/workspace-hack/Cargo.toml | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index e62182e1ce810..94bc3a142f084 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -135,15 +135,15 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "signal", "fs", ] } -tokio-postgres = {version = "0.7", features = ["with-uuid-1"]} +tokio-postgres = { version = "0.7", features = ["with-uuid-1"] } tokio-retry = "0.3" tokio-stream = "0.1" tokio-util = { version = "0.7", features = ["codec", "io"] } tonic = { workspace = true } tracing = "0.1" -uuid = {version = "1", features = ["v4", "fast-rng", "macro-diagnostics"]} url = "2" urlencoding = "2" +uuid = { version = "1", features = ["v4", "fast-rng", "macro-diagnostics"] } with_options = { path = "./with_options" } yup-oauth2 = "8.3" diff --git a/src/connector/src/parser/postgres.rs b/src/connector/src/parser/postgres.rs index 05cffdcf8ff82..bc3ac950d4110 100644 --- a/src/connector/src/parser/postgres.rs +++ b/src/connector/src/parser/postgres.rs @@ -145,13 +145,13 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O &Type::UUID => { let res = row.try_get::<_, Option>(i); match res { - Ok(val) => val.map(|v| ScalarImpl::from(v.to_string())), + Ok(val) => val.map(|v| ScalarImpl::from(v.to_string())), Err(err) => { if let Ok(sc) = LOG_SUPPERSSER.check() { tracing::error!( - "parse uuid column \"{}\" fail: {} ({} suppressed)" , - name , - err , + "parse uuid column \"{}\" fail: {} ({} suppressed)", + name, + err, sc ); } diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 0c7df07fa5b79..44aef82fbb0d3 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -98,7 +98,7 @@ parking_lot_core = { version = "0.9", default-features = false, features = ["dea petgraph = { version = "0.6" } phf = { version = "0.11", features = ["uncased"] } phf_shared = { version = "0.11", features = ["uncased"] } -postgres-types = { version = "0.2", default-features = false, features = ["derive", "with-chrono-0_4", "with-serde_json-1"] } +postgres-types = { version = "0.2", default-features = false, features = ["derive", "with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } proc-macro2 = { version = "1", features = ["span-locations"] } prometheus = { version = "0.13", features = ["process"] } prost = { version = "0.12", features = ["no-recursion-limit"] } @@ -137,7 +137,7 @@ syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-trai time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] } tinyvec = { version = "1", features = ["alloc", "grab_spare_slice", "rustc_1_55"] } tokio = { version = "1", features = ["full", "stats", "tracing"] } -tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88", features = ["with-chrono-0_4"] } +tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88", features = ["with-chrono-0_4", "with-uuid-1"] } tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "fe39bb8e", features = ["fs", "net"] } tokio-util = { version = "0.7", features = ["codec", "io"] } toml_datetime = { version = "0.6", default-features = false, features = ["serde"] } @@ -150,7 +150,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "local unicode-bidi = { version = "0.3" } unicode-normalization = { version = "0.1" } url = { version = "2", features = ["serde"] } -uuid = { version = "1", features = ["fast-rng", "serde", "v4"] } +uuid = { version = "1", features = ["fast-rng", "macro-diagnostics", "serde", "v4"] } whoami = { version = "1" } zeroize = { version = "1" } zstd = { version = "0.13" } From 4c644fa6f99ca82d0e003745b17c27ea5a10c8c7 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Sun, 18 Feb 2024 20:39:19 +0800 Subject: [PATCH 4/5] format --- src/connector/src/parser/postgres.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/connector/src/parser/postgres.rs b/src/connector/src/parser/postgres.rs index bc3ac950d4110..acfbe5e4ae435 100644 --- a/src/connector/src/parser/postgres.rs +++ b/src/connector/src/parser/postgres.rs @@ -149,10 +149,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O Err(err) => { if let Ok(sc) = LOG_SUPPERSSER.check() { tracing::error!( - "parse uuid column \"{}\" fail: {} ({} suppressed)", - name, - err, - sc + suppressed_count = sc, + column_name = name, + error = %err.as_report(), + "parse uuid column failed", ); } None From 4a7ebd9908ac4ba89fb51055fcca8e16cca5e09f Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 19 Feb 2024 16:38:59 +0800 Subject: [PATCH 5/5] fix deps --- Cargo.lock | 12 ------------ src/connector/Cargo.toml | 2 +- src/workspace-hack/Cargo.toml | 2 +- 3 files changed, 2 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9335e0252631f..712572afd83e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12663,18 +12663,6 @@ dependencies = [ "getrandom", "rand", "serde", - "uuid-macro-internal", -] - -[[package]] -name = "uuid-macro-internal" -version = "1.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7abb14ae1a50dad63eaa768a458ef43d298cd1bd44951677bd10b732a9ba2a2d" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.48", ] [[package]] diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 94bc3a142f084..e1f7ea97812c5 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -143,7 +143,7 @@ tonic = { workspace = true } tracing = "0.1" url = "2" urlencoding = "2" -uuid = { version = "1", features = ["v4", "fast-rng", "macro-diagnostics"] } +uuid = { version = "1", features = ["v4", "fast-rng"] } with_options = { path = "./with_options" } yup-oauth2 = "8.3" diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 44aef82fbb0d3..0e3ba06b2602f 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -150,7 +150,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "local unicode-bidi = { version = "0.3" } unicode-normalization = { version = "0.1" } url = { version = "2", features = ["serde"] } -uuid = { version = "1", features = ["fast-rng", "macro-diagnostics", "serde", "v4"] } +uuid = { version = "1", features = ["fast-rng", "serde", "v4"] } whoami = { version = "1" } zeroize = { version = "1" } zstd = { version = "0.13" }