diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index d99462042..42e16b4f4 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -565,6 +565,10 @@ func GetTableSchemaForTable(tableName string, columns []driver.ColumnType) (*pro qkind = qvalue.QValueKindTimestamp case "Date32", "Nullable(Date32)": qkind = qvalue.QValueKindDate + case "Float32", "Nullable(Float32)": + qkind = qvalue.QValueKindFloat32 + case "Float64", "Nullable(Float64)": + qkind = qvalue.QValueKindFloat64 default: if strings.Contains(column.DatabaseTypeName(), "Decimal") { qkind = qvalue.QValueKindNumeric diff --git a/flow/e2e/clickhouse/clickhouse.go b/flow/e2e/clickhouse/clickhouse.go index a36a5335b..916f0606f 100644 --- a/flow/e2e/clickhouse/clickhouse.go +++ b/flow/e2e/clickhouse/clickhouse.go @@ -166,6 +166,46 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch } case *decimal.Decimal: qrow = append(qrow, qvalue.QValueNumeric{Val: *v}) + case **bool: + if *v == nil { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindBoolean)) + } else { + qrow = append(qrow, qvalue.QValueBoolean{Val: **v}) + } + case *bool: + qrow = append(qrow, qvalue.QValueBoolean{Val: *v}) + case **float32: + if *v == nil { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindFloat32)) + } else { + qrow = append(qrow, qvalue.QValueFloat32{Val: **v}) + } + case *float32: + qrow = append(qrow, qvalue.QValueFloat32{Val: *v}) + case **float64: + if *v == nil { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindFloat64)) + } else { + qrow = append(qrow, qvalue.QValueFloat64{Val: **v}) + } + case *float64: + qrow = append(qrow, qvalue.QValueFloat64{Val: *v}) + case **int64: + if *v == nil { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindInt64)) + } else { + qrow = append(qrow, qvalue.QValueInt64{Val: **v}) + } + case *int64: + qrow = append(qrow, qvalue.QValueInt64{Val: *v}) + case **int16: + if *v == nil { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindInt16)) + } else { + qrow = append(qrow, qvalue.QValueInt16{Val: **v}) + } + case *int16: + qrow = append(qrow, qvalue.QValueInt16{Val: *v}) default: return nil, fmt.Errorf("cannot convert %T to qvalue", v) } diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index a19e69c8c..813d3d35c 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/jackc/pgerrcode" "github.com/shopspring/decimal" "github.com/stretchr/testify/require" @@ -664,3 +665,114 @@ func (s ClickHouseSuite) Test_Unbounded_Numeric_With_FF() { func (s ClickHouseSuite) Test_Unbounded_Numeric_Without_FF() { s.testNumericFF(false) } + +func (s ClickHouseSuite) Test_Types_CH() { + srcTableName := "test_types" + srcFullName := s.attachSchemaSuffix("test_types") + dstTableName := "test_types" + createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');" + _, enumErr := s.Conn().Exec(context.Background(), createMoodEnum) + if enumErr != nil && + !shared.IsSQLStateError(enumErr, pgerrcode.DuplicateObject, pgerrcode.UniqueViolation) { + require.NoError(s.t, enumErr) + } + + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %[1]s(id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, + c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION, + c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, + c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, + c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME,c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, + c39 TXID_SNAPSHOT,c40 UUID,c42 INT[], c43 FLOAT[], c44 TEXT[], c45 mood, c46 HSTORE, + c47 DATE[], c48 TIMESTAMPTZ[], c49 TIMESTAMP[], c50 BOOLEAN[], c51 SMALLINT[]); + INSERT INTO %[1]s SELECT 2,2,b'1',b'101', + true,random_bytea(32),'s','test','1.1.10.2'::cidr, + CURRENT_DATE,1.23,1.234,'10.0.0.0/32'::inet,1, + '5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval, + '{"sai":-8.02139037433155}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr, + 1.2,123456789012345678901234567890.123456789012345678901234567890, + 4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, + 'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector, + txid_current_snapshot(), + '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid, + ARRAY[10299301,2579827], + ARRAY[0.0003, 8902.0092], + ARRAY['hello','bye'],'happy', + 'key1=>value1, key2=>NULL'::hstore, + '{2020-01-01, 2020-01-02}'::date[], + '{"2020-01-01 01:01:01+00", "2020-01-02 01:01:01+00"}'::timestamptz[], + '{"2020-01-01 01:01:01", "2020-01-02 01:01:01"}'::timestamp[], + '{true, false}'::boolean[], + '{1, 2}'::smallint[];`, srcFullName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("clickhouse_test_types"), + TableNameMapping: map[string]string{srcFullName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) + flowConnConfig.DoInitialSnapshot = true + + tc := e2e.NewTemporalClient(s.t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + e2e.EnvWaitForCount(env, s, "waiting for initial snapshot count", dstTableName, "id", 1) + e2e.EnvWaitForEqualTablesWithNames(env, s, "check comparable types 1", srcTableName, dstTableName, + "id,c1,c4,c7,c8,c11,c12,c13,c15,c23,c28,c29,c30,c31,c32,c33,c34,c35,c36") + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s SELECT 3,2,b'1',b'101', + true,random_bytea(32),'s','test','1.1.10.2'::cidr, + CURRENT_DATE,1.23,1.234,'10.0.0.0/32'::inet,1, + '5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval, + '{"sai":-8.02139037433155}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr, + 1.2,123456789012345678901234567890.123456789012345678901234567890, + 4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, + 'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector, + txid_current_snapshot(), + '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid, + ARRAY[10299301,2579827], + ARRAY[0.0003, 8902.0092], + ARRAY['hello','bye'],'happy', + 'key1=>value1, key2=>NULL'::hstore, + '{2020-01-01, 2020-01-02}'::date[], + '{"2020-01-01 01:01:01+00", "2020-01-02 01:01:01+00"}'::timestamptz[], + '{"2020-01-01 01:01:01", "2020-01-02 01:01:01"}'::timestamp[], + '{true, false}'::boolean[], + '{1, 2}'::smallint[];`, srcFullName)) + require.NoError(s.t, err) + e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id", 2) + e2e.EnvWaitForEqualTablesWithNames(env, s, "check comparable types 2", srcTableName, dstTableName, + "id,c1,c4,c7,c8,c11,c12,c13,c15,c23,c28,c29,c30,c31,c32,c33,c34,c35,c36") + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` + UPDATE %[1]s SET c1=3,c32='testery' WHERE id=2; + UPDATE %[1]s SET c33=now(),c34=now(),c35=now()::TIME,c36=now()::TIMETZ WHERE id=3; + INSERT INTO %[1]s SELECT 4,2,b'1',b'101', + true,random_bytea(32),'s','test','1.1.10.2'::cidr, + CURRENT_DATE,1.23,1.234,'10.0.0.0/32'::inet,1, + '5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval, + '{"sai":-8.02139037433155}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr, + 1.2,123456789012345678901234567890.123456789012345678901234567890, + 4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, + 'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector, + txid_current_snapshot(), + '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid, + ARRAY[10299301,2579827], + ARRAY[0.0003, 8902.0092], + ARRAY['hello','bye'],'happy', + 'key1=>value1, key2=>NULL'::hstore, + '{2020-01-01, 2020-01-02}'::date[], + '{"2020-01-01 01:01:01+00", "2020-01-02 01:01:01+00"}'::timestamptz[], + '{"2020-01-01 01:01:01", "2020-01-02 01:01:01"}'::timestamp[], + '{true, false}'::boolean[], + '{1, 2}'::smallint[];`, srcFullName)) + require.NoError(s.t, err) + e2e.EnvWaitForCount(env, s, "waiting for CDC count again", dstTableName, "id", 3) + e2e.EnvWaitForEqualTablesWithNames(env, s, "check comparable types 3", srcTableName, dstTableName, + "id,c1,c4,c7,c8,c11,c12,c13,c15,c23,c28,c29,c30,c31,c32,c33,c34,c35,c36") + + env.Cancel() + e2e.RequireEnvCanceled(s.t, env) +} diff --git a/flow/model/qvalue/equals.go b/flow/model/qvalue/equals.go index 13af407b4..744a4838a 100644 --- a/flow/model/qvalue/equals.go +++ b/flow/model/qvalue/equals.go @@ -65,8 +65,12 @@ func Equals(qv QValue, other QValue) bool { case QValueMacaddr: return compareString(q.Val, otherValue) // all internally represented as a Golang time.Time - case QValueDate, QValueTimestamp, QValueTimestampTZ, QValueTime, QValueTimeTZ: + case QValueTimestamp, QValueTimestampTZ: + return compareGoTimestamp(qvValue, otherValue) + case QValueTime, QValueTimeTZ: return compareGoTime(qvValue, otherValue) + case QValueDate: + return compareGoDate(qvValue, otherValue) case QValueNumeric: return compareNumeric(q.Val, otherValue) case QValueBytes: @@ -129,7 +133,7 @@ func compareString(s1 string, value2 interface{}) bool { return ok && s1 == s2 } -func compareGoTime(value1, value2 interface{}) bool { +func compareGoTimestamp(value1, value2 interface{}) bool { et1, ok1 := value1.(time.Time) et2, ok2 := value2.(time.Time) @@ -145,6 +149,32 @@ func compareGoTime(value1, value2 interface{}) bool { return t1 == t2 } +func compareGoTime(value1, value2 interface{}) bool { + t1, ok1 := value1.(time.Time) + t2, ok2 := value2.(time.Time) + + if !ok1 || !ok2 { + return false + } + + h1, m1, s1 := t1.Clock() + h2, m2, s2 := t2.Clock() + return h1 == h2 && m1 == m2 && s1 == s2 +} + +func compareGoDate(value1, value2 interface{}) bool { + t1, ok1 := value1.(time.Time) + t2, ok2 := value2.(time.Time) + + if !ok1 || !ok2 { + return false + } + + y1, m1, d1 := t1.Date() + y2, m2, d2 := t2.Date() + return y1 == y2 && m1 == m2 && d1 == d2 +} + func compareUUID(value1, value2 interface{}) bool { uuid1, ok1 := getUUID(value1) uuid2, ok2 := getUUID(value2)