Skip to content

Commit

Permalink
add test_types for CH
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 9, 2024
1 parent 2b96245 commit 399eb82
Showing 1 changed file with 81 additions and 0 deletions.
81 changes: 81 additions & 0 deletions flow/e2e/clickhouse/peer_flow_ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/jackc/pgerrcode"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -664,3 +665,83 @@ func (s ClickHouseSuite) Test_Unbounded_Numeric_With_FF() {
func (s ClickHouseSuite) Test_Unbounded_Numeric_Without_FF() {
s.testNumericFF(false)
}

func (s ClickHouseSuite) Test_Types_CH() {
srcFullName := s.attachSchemaSuffix("test_types")
dstTableName := "test_types"
createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');"
_, enumErr := s.Conn().Exec(context.Background(), createMoodEnum)
if enumErr != nil &&
!shared.IsSQLStateError(enumErr, pgerrcode.DuplicateObject, pgerrcode.UniqueViolation) {
require.NoError(s.t, enumErr)
}

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s(id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN,
c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION,
c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY,
c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT,
c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME,c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR,
c39 TXID_SNAPSHOT,c40 UUID,c42 INT[], c43 FLOAT[], c44 TEXT[], c45 mood, c46 HSTORE,
c47 DATE[], c48 TIMESTAMPTZ[], c49 TIMESTAMP[], c50 BOOLEAN[], c51 SMALLINT[], c52 NUMERIC);
INSERT INTO %s SELECT 2,2,b'1',b'101',
true,random_bytea(32),'s','test','1.1.10.2'::cidr,
CURRENT_DATE,1.23,1.234,'10.0.0.0/32'::inet,1,
'5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval,
'{"sai":-8.02139037433155}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr,
1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz,
'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector,
txid_current_snapshot(),
'66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,
ARRAY[10299301,2579827],
ARRAY[0.0003, 8902.0092],
ARRAY['hello','bye'],'happy',
'key1=>value1, key2=>NULL'::hstore,
'{2020-01-01, 2020-01-02}'::date[],
'{"2020-01-01 01:01:01+00", "2020-01-02 01:01:01+00"}'::timestamptz[],
'{"2020-01-01 01:01:01", "2020-01-02 01:01:01"}'::timestamp[],
'{true, false}'::boolean[],
'{1, 2}'::smallint[];`, srcFullName, srcFullName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("clickhouse_test_types"),
TableNameMapping: map[string]string{srcFullName: dstTableName},
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)

tc := e2e.NewTemporalClient(s.t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s SELECT 3,2,b'1',b'101',
true,random_bytea(32),'s','test','1.1.10.2'::cidr,
CURRENT_DATE,1.23,1.234,'10.0.0.0/32'::inet,1,
'5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval,
'{"sai":-8.02139037433155}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr,
1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz,
'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector,
txid_current_snapshot(),
'66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,
ARRAY[10299301,2579827],
ARRAY[0.0003, 8902.0092],
ARRAY['hello','bye'],'happy',
'key1=>value1, key2=>NULL'::hstore,
'{2020-01-01, 2020-01-02}'::date[],
'{"2020-01-01 01:01:01+00", "2020-01-02 01:01:01+00"}'::timestamptz[],
'{"2020-01-01 01:01:01", "2020-01-02 01:01:01"}'::timestamp[],
'{true, false}'::boolean[],
'{1, 2}'::smallint[];`, srcFullName))
require.NoError(s.t, err)

e2e.EnvWaitFor(s.t, env, 2*time.Minute, "waiting on cdc", func() bool {
rows, err := s.GetRows(dstTableName, "id")
require.NoError(s.t, err)
return len(rows.Records) == 2
})

env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}

0 comments on commit 399eb82

Please sign in to comment.