Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pg-cdc): support inject UUID column to VARCHAR for shared pg source (cherry-pick #15114) #15132

Merged
merged 1 commit into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ CREATE TABLE IF NOT EXISTS postgres_all_types(
c_timestamptz_array timestamptz[],
c_interval_array interval[],
c_jsonb_array jsonb[],
c_uuid varchar,
PRIMARY KEY (c_boolean,c_bigint,c_date)
) from pg_source table 'public.postgres_all_types';

Expand Down Expand Up @@ -234,9 +235,9 @@ CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_ne
sleep 3s

query TTTTTTT
SELECT c_boolean,c_date,c_time,c_timestamp,c_jsonb,c_smallint_array,c_timestamp_array FROM postgres_all_types where c_bigint=-9223372036854775807
SELECT c_boolean,c_date,c_time,c_timestamp,c_jsonb,c_smallint_array,c_timestamp_array,c_uuid FROM postgres_all_types where c_bigint=-9223372036854775807
----
f 0001-01-01 00:00:00 0001-01-01 00:00:00 {} {-32767} {"0001-01-01 00:00:00"}
f 0001-01-01 00:00:00 0001-01-01 00:00:00 {} {-32767} {"0001-01-01 00:00:00"} bb488f9b-330d-4012-b849-12adeb49e57e


# postgres streaming test
Expand Down
5 changes: 3 additions & 2 deletions e2e_test/source/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ CREATE TABLE IF NOT EXISTS postgres_all_types(
c_timestamptz_array timestamptz[],
c_interval_array interval[],
c_jsonb_array jsonb[],
c_uuid uuid,
PRIMARY KEY (c_boolean,c_bigint,c_date)
);
INSERT INTO postgres_all_types VALUES ( False, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[]);
INSERT INTO postgres_all_types VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['0001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['0001-01-01 00:00:00'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[]);
INSERT INTO postgres_all_types VALUES ( False, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[], null);
INSERT INTO postgres_all_types VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['0001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['0001-01-01 00:00:00'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], 'bb488f9b-330d-4012-b849-12adeb49e57e');
3 changes: 2 additions & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,15 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"signal",
"fs",
] }
tokio-postgres = "0.7"
tokio-postgres = { version = "0.7", features = ["with-uuid-1"] }
tokio-retry = "0.3"
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["codec", "io"] }
tonic = "0.10.2"
tracing = "0.1"
url = "2"
urlencoding = "2"
uuid = { version = "1", features = ["v4", "fast-rng"] }
with_options = { path = "./with_options" }
yup-oauth2 = "8.3"

Expand Down
42 changes: 33 additions & 9 deletions src/connector/src/parser/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use risingwave_common::types::{
Timestamptz,
};
use rust_decimal::Decimal as RustDecimal;
use thiserror_ext::AsReport;
use tokio_postgres::types::Type;

static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);

Expand Down Expand Up @@ -138,7 +140,29 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
handle_data_type!(row, i, name, RustDecimal, Decimal)
}
DataType::Varchar => {
handle_data_type!(row, i, name, String)
match row.columns()[i].type_() {
// Since we don't support UUID natively, adapt it to a VARCHAR column
&Type::UUID => {
let res = row.try_get::<_, Option<uuid::Uuid>>(i);
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v.to_string())),
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count = sc,
column_name = name,
error = %err.as_report(),
"parse uuid column failed",
);
}
None
}
}
}
_ => {
handle_data_type!(row, i, name, String)
}
}
}
DataType::Date => {
handle_data_type!(row, i, name, NaiveDate, Date)
Expand All @@ -159,10 +183,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
tracing::error!(
"parse column \"{}\" fail: {} ({} suppressed)",
name,
err,
sc
suppressed_count = sc,
column_name = name,
error = %err.as_report(),
"parse column failed",
);
}
None
Expand Down Expand Up @@ -256,10 +280,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
tracing::error!(
"parse column \"{}\" fail: {} ({} suppressed)",
name,
err,
sc
suppressed_count = sc,
column_name = name,
error = %err.as_report(),
"parse column failed",
);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ parking_lot_core = { version = "0.9", default-features = false, features = ["dea
petgraph = { version = "0.6" }
phf = { version = "0.11", features = ["uncased"] }
phf_shared = { version = "0.11", features = ["uncased"] }
postgres-types = { version = "0.2", default-features = false, features = ["derive", "with-chrono-0_4", "with-serde_json-1"] }
postgres-types = { version = "0.2", default-features = false, features = ["derive", "with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] }
proc-macro2 = { version = "1", features = ["span-locations"] }
prometheus = { version = "0.13", features = ["process"] }
prost = { version = "0.12", features = ["no-recursion-limit"] }
Expand Down Expand Up @@ -135,7 +135,7 @@ syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-trai
time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] }
tinyvec = { version = "1", features = ["alloc", "grab_spare_slice", "rustc_1_55"] }
tokio = { version = "1", features = ["full", "stats", "tracing"] }
tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88", features = ["with-chrono-0_4"] }
tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88", features = ["with-chrono-0_4", "with-uuid-1"] }
tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "fe39bb8e", features = ["fs", "net"] }
tokio-util = { version = "0.7", features = ["codec", "io"] }
toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
Expand Down
Loading