Skip to content

Commit

Permalink
[clickhouse] test types test (#2185)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 11, 2024
1 parent d08b0e9 commit 0a8fa90
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 2 deletions.
4 changes: 4 additions & 0 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,10 @@ func GetTableSchemaForTable(tableName string, columns []driver.ColumnType) (*pro
qkind = qvalue.QValueKindTimestamp
case "Date32", "Nullable(Date32)":
qkind = qvalue.QValueKindDate
case "Float32", "Nullable(Float32)":
qkind = qvalue.QValueKindFloat32
case "Float64", "Nullable(Float64)":
qkind = qvalue.QValueKindFloat64
default:
if strings.Contains(column.DatabaseTypeName(), "Decimal") {
qkind = qvalue.QValueKindNumeric
Expand Down
40 changes: 40 additions & 0 deletions flow/e2e/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,46 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch
}
case *decimal.Decimal:
qrow = append(qrow, qvalue.QValueNumeric{Val: *v})
case **bool:
if *v == nil {
qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindBoolean))
} else {
qrow = append(qrow, qvalue.QValueBoolean{Val: **v})
}
case *bool:
qrow = append(qrow, qvalue.QValueBoolean{Val: *v})
case **float32:
if *v == nil {
qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindFloat32))
} else {
qrow = append(qrow, qvalue.QValueFloat32{Val: **v})
}
case *float32:
qrow = append(qrow, qvalue.QValueFloat32{Val: *v})
case **float64:
if *v == nil {
qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindFloat64))
} else {
qrow = append(qrow, qvalue.QValueFloat64{Val: **v})
}
case *float64:
qrow = append(qrow, qvalue.QValueFloat64{Val: *v})
case **int64:
if *v == nil {
qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindInt64))
} else {
qrow = append(qrow, qvalue.QValueInt64{Val: **v})
}
case *int64:
qrow = append(qrow, qvalue.QValueInt64{Val: *v})
case **int16:
if *v == nil {
qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindInt16))
} else {
qrow = append(qrow, qvalue.QValueInt16{Val: **v})
}
case *int16:
qrow = append(qrow, qvalue.QValueInt16{Val: *v})
default:
return nil, fmt.Errorf("cannot convert %T to qvalue", v)
}
Expand Down
112 changes: 112 additions & 0 deletions flow/e2e/clickhouse/peer_flow_ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/jackc/pgerrcode"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -664,3 +665,114 @@ func (s ClickHouseSuite) Test_Unbounded_Numeric_With_FF() {
func (s ClickHouseSuite) Test_Unbounded_Numeric_Without_FF() {
s.testNumericFF(false)
}

func (s ClickHouseSuite) Test_Types_CH() {
srcTableName := "test_types"
srcFullName := s.attachSchemaSuffix("test_types")
dstTableName := "test_types"
createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');"
_, enumErr := s.Conn().Exec(context.Background(), createMoodEnum)
if enumErr != nil &&
!shared.IsSQLStateError(enumErr, pgerrcode.DuplicateObject, pgerrcode.UniqueViolation) {
require.NoError(s.t, enumErr)
}

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %[1]s(id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN,
c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION,
c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY,
c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT,
c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME,c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR,
c39 TXID_SNAPSHOT,c40 UUID,c42 INT[], c43 FLOAT[], c44 TEXT[], c45 mood, c46 HSTORE,
c47 DATE[], c48 TIMESTAMPTZ[], c49 TIMESTAMP[], c50 BOOLEAN[], c51 SMALLINT[]);
INSERT INTO %[1]s SELECT 2,2,b'1',b'101',
true,random_bytea(32),'s','test','1.1.10.2'::cidr,
CURRENT_DATE,1.23,1.234,'10.0.0.0/32'::inet,1,
'5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval,
'{"sai":-8.02139037433155}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr,
1.2,123456789012345678901234567890.123456789012345678901234567890,
4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz,
'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector,
txid_current_snapshot(),
'66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,
ARRAY[10299301,2579827],
ARRAY[0.0003, 8902.0092],
ARRAY['hello','bye'],'happy',
'key1=>value1, key2=>NULL'::hstore,
'{2020-01-01, 2020-01-02}'::date[],
'{"2020-01-01 01:01:01+00", "2020-01-02 01:01:01+00"}'::timestamptz[],
'{"2020-01-01 01:01:01", "2020-01-02 01:01:01"}'::timestamp[],
'{true, false}'::boolean[],
'{1, 2}'::smallint[];`, srcFullName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("clickhouse_test_types"),
TableNameMapping: map[string]string{srcFullName: dstTableName},
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig.DoInitialSnapshot = true

tc := e2e.NewTemporalClient(s.t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
e2e.EnvWaitForCount(env, s, "waiting for initial snapshot count", dstTableName, "id", 1)
e2e.EnvWaitForEqualTablesWithNames(env, s, "check comparable types 1", srcTableName, dstTableName,
"id,c1,c4,c7,c8,c11,c12,c13,c15,c23,c28,c29,c30,c31,c32,c33,c34,c35,c36")

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s SELECT 3,2,b'1',b'101',
true,random_bytea(32),'s','test','1.1.10.2'::cidr,
CURRENT_DATE,1.23,1.234,'10.0.0.0/32'::inet,1,
'5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval,
'{"sai":-8.02139037433155}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr,
1.2,123456789012345678901234567890.123456789012345678901234567890,
4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz,
'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector,
txid_current_snapshot(),
'66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,
ARRAY[10299301,2579827],
ARRAY[0.0003, 8902.0092],
ARRAY['hello','bye'],'happy',
'key1=>value1, key2=>NULL'::hstore,
'{2020-01-01, 2020-01-02}'::date[],
'{"2020-01-01 01:01:01+00", "2020-01-02 01:01:01+00"}'::timestamptz[],
'{"2020-01-01 01:01:01", "2020-01-02 01:01:01"}'::timestamp[],
'{true, false}'::boolean[],
'{1, 2}'::smallint[];`, srcFullName))
require.NoError(s.t, err)
e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id", 2)
e2e.EnvWaitForEqualTablesWithNames(env, s, "check comparable types 2", srcTableName, dstTableName,
"id,c1,c4,c7,c8,c11,c12,c13,c15,c23,c28,c29,c30,c31,c32,c33,c34,c35,c36")

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
UPDATE %[1]s SET c1=3,c32='testery' WHERE id=2;
UPDATE %[1]s SET c33=now(),c34=now(),c35=now()::TIME,c36=now()::TIMETZ WHERE id=3;
INSERT INTO %[1]s SELECT 4,2,b'1',b'101',
true,random_bytea(32),'s','test','1.1.10.2'::cidr,
CURRENT_DATE,1.23,1.234,'10.0.0.0/32'::inet,1,
'5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval,
'{"sai":-8.02139037433155}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr,
1.2,123456789012345678901234567890.123456789012345678901234567890,
4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz,
'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector,
txid_current_snapshot(),
'66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,
ARRAY[10299301,2579827],
ARRAY[0.0003, 8902.0092],
ARRAY['hello','bye'],'happy',
'key1=>value1, key2=>NULL'::hstore,
'{2020-01-01, 2020-01-02}'::date[],
'{"2020-01-01 01:01:01+00", "2020-01-02 01:01:01+00"}'::timestamptz[],
'{"2020-01-01 01:01:01", "2020-01-02 01:01:01"}'::timestamp[],
'{true, false}'::boolean[],
'{1, 2}'::smallint[];`, srcFullName))
require.NoError(s.t, err)
e2e.EnvWaitForCount(env, s, "waiting for CDC count again", dstTableName, "id", 3)
e2e.EnvWaitForEqualTablesWithNames(env, s, "check comparable types 3", srcTableName, dstTableName,
"id,c1,c4,c7,c8,c11,c12,c13,c15,c23,c28,c29,c30,c31,c32,c33,c34,c35,c36")

env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}
34 changes: 32 additions & 2 deletions flow/model/qvalue/equals.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,12 @@ func Equals(qv QValue, other QValue) bool {
case QValueMacaddr:
return compareString(q.Val, otherValue)
// all internally represented as a Golang time.Time
case QValueDate, QValueTimestamp, QValueTimestampTZ, QValueTime, QValueTimeTZ:
case QValueTimestamp, QValueTimestampTZ:
return compareGoTimestamp(qvValue, otherValue)
case QValueTime, QValueTimeTZ:
return compareGoTime(qvValue, otherValue)
case QValueDate:
return compareGoDate(qvValue, otherValue)
case QValueNumeric:
return compareNumeric(q.Val, otherValue)
case QValueBytes:
Expand Down Expand Up @@ -129,7 +133,7 @@ func compareString(s1 string, value2 interface{}) bool {
return ok && s1 == s2
}

func compareGoTime(value1, value2 interface{}) bool {
func compareGoTimestamp(value1, value2 interface{}) bool {
et1, ok1 := value1.(time.Time)
et2, ok2 := value2.(time.Time)

Expand All @@ -145,6 +149,32 @@ func compareGoTime(value1, value2 interface{}) bool {
return t1 == t2
}

func compareGoTime(value1, value2 interface{}) bool {
t1, ok1 := value1.(time.Time)
t2, ok2 := value2.(time.Time)

if !ok1 || !ok2 {
return false
}

h1, m1, s1 := t1.Clock()
h2, m2, s2 := t2.Clock()
return h1 == h2 && m1 == m2 && s1 == s2
}

func compareGoDate(value1, value2 interface{}) bool {
t1, ok1 := value1.(time.Time)
t2, ok2 := value2.(time.Time)

if !ok1 || !ok2 {
return false
}

y1, m1, d1 := t1.Date()
y2, m2, d2 := t2.Date()
return y1 == y2 && m1 == m2 && d1 == d2
}

func compareUUID(value1, value2 interface{}) bool {
uuid1, ok1 := getUUID(value1)
uuid2, ok2 := getUUID(value2)
Expand Down

0 comments on commit 0a8fa90

Please sign in to comment.