diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index a19e69c8c7..4d5b9f313f 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/jackc/pgerrcode" "github.com/shopspring/decimal" "github.com/stretchr/testify/require" @@ -664,3 +665,83 @@ func (s ClickHouseSuite) Test_Unbounded_Numeric_With_FF() { func (s ClickHouseSuite) Test_Unbounded_Numeric_Without_FF() { s.testNumericFF(false) } + +func (s ClickHouseSuite) Test_Types_CH() { + srcFullName := s.attachSchemaSuffix("test_types") + dstTableName := "test_types" + createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');" + _, enumErr := s.Conn().Exec(context.Background(), createMoodEnum) + if enumErr != nil && + !shared.IsSQLStateError(enumErr, pgerrcode.DuplicateObject, pgerrcode.UniqueViolation) { + require.NoError(s.t, enumErr) + } + + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s(id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, + c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION, + c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, + c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, + c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME,c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, + c39 TXID_SNAPSHOT,c40 UUID,c42 INT[], c43 FLOAT[], c44 TEXT[], c45 mood, c46 HSTORE, + c47 DATE[], c48 TIMESTAMPTZ[], c49 TIMESTAMP[], c50 BOOLEAN[], c51 SMALLINT[], c52 NUMERIC); + INSERT INTO %s SELECT 2,2,b'1',b'101', + true,random_bytea(32),'s','test','1.1.10.2'::cidr, + CURRENT_DATE,1.23,1.234,'10.0.0.0/32'::inet,1, + '5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval, + '{"sai":-8.02139037433155}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr, + 1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, + 'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector, + txid_current_snapshot(), + '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid, + ARRAY[10299301,2579827], + ARRAY[0.0003, 8902.0092], + ARRAY['hello','bye'],'happy', + 'key1=>value1, key2=>NULL'::hstore, + '{2020-01-01, 2020-01-02}'::date[], + '{"2020-01-01 01:01:01+00", "2020-01-02 01:01:01+00"}'::timestamptz[], + '{"2020-01-01 01:01:01", "2020-01-02 01:01:01"}'::timestamp[], + '{true, false}'::boolean[], + '{1, 2}'::smallint[];`, srcFullName, srcFullName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("clickhouse_test_types"), + TableNameMapping: map[string]string{srcFullName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) + + tc := e2e.NewTemporalClient(s.t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s SELECT 3,2,b'1',b'101', + true,random_bytea(32),'s','test','1.1.10.2'::cidr, + CURRENT_DATE,1.23,1.234,'10.0.0.0/32'::inet,1, + '5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval, + '{"sai":-8.02139037433155}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr, + 1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, + 'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector, + txid_current_snapshot(), + '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid, + ARRAY[10299301,2579827], + ARRAY[0.0003, 8902.0092], + ARRAY['hello','bye'],'happy', + 'key1=>value1, key2=>NULL'::hstore, + '{2020-01-01, 2020-01-02}'::date[], + '{"2020-01-01 01:01:01+00", "2020-01-02 01:01:01+00"}'::timestamptz[], + '{"2020-01-01 01:01:01", "2020-01-02 01:01:01"}'::timestamp[], + '{true, false}'::boolean[], + '{1, 2}'::smallint[];`, srcFullName)) + require.NoError(s.t, err) + + e2e.EnvWaitFor(s.t, env, 2*time.Minute, "waiting on cdc", func() bool { + rows, err := s.GetRows(dstTableName, "id") + require.NoError(s.t, err) + return len(rows.Records) == 2 + }) + + env.Cancel() + e2e.RequireEnvCanceled(s.t, env) +}