diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index d994620427..42e16b4f4d 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -565,6 +565,10 @@ func GetTableSchemaForTable(tableName string, columns []driver.ColumnType) (*pro qkind = qvalue.QValueKindTimestamp case "Date32", "Nullable(Date32)": qkind = qvalue.QValueKindDate + case "Float32", "Nullable(Float32)": + qkind = qvalue.QValueKindFloat32 + case "Float64", "Nullable(Float64)": + qkind = qvalue.QValueKindFloat64 default: if strings.Contains(column.DatabaseTypeName(), "Decimal") { qkind = qvalue.QValueKindNumeric diff --git a/flow/e2e/clickhouse/clickhouse.go b/flow/e2e/clickhouse/clickhouse.go index a36a5335b9..ecb42bdd1b 100644 --- a/flow/e2e/clickhouse/clickhouse.go +++ b/flow/e2e/clickhouse/clickhouse.go @@ -166,6 +166,30 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch } case *decimal.Decimal: qrow = append(qrow, qvalue.QValueNumeric{Val: *v}) + case **bool: + if *v == nil { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindBoolean)) + } else { + qrow = append(qrow, qvalue.QValueBoolean{Val: **v}) + } + case *bool: + qrow = append(qrow, qvalue.QValueBoolean{Val: *v}) + case **float32: + if *v == nil { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindFloat32)) + } else { + qrow = append(qrow, qvalue.QValueFloat32{Val: **v}) + } + case *float32: + qrow = append(qrow, qvalue.QValueFloat32{Val: *v}) + case **float64: + if *v == nil { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindFloat64)) + } else { + qrow = append(qrow, qvalue.QValueFloat64{Val: **v}) + } + case *float64: + qrow = append(qrow, qvalue.QValueFloat64{Val: *v}) default: return nil, fmt.Errorf("cannot convert %T to qvalue", v) } diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index 0f7ce5f55b..568a8db206 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -667,6 +667,7 @@ func (s ClickHouseSuite) Test_Unbounded_Numeric_Without_FF() { } func (s ClickHouseSuite) Test_Types_CH() { + srcTableName := "test_types" srcFullName := s.attachSchemaSuffix("test_types") dstTableName := "test_types" createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');" @@ -683,13 +684,14 @@ func (s ClickHouseSuite) Test_Types_CH() { c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME,c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, c39 TXID_SNAPSHOT,c40 UUID,c42 INT[], c43 FLOAT[], c44 TEXT[], c45 mood, c46 HSTORE, - c47 DATE[], c48 TIMESTAMPTZ[], c49 TIMESTAMP[], c50 BOOLEAN[], c51 SMALLINT[], c52 NUMERIC); + c47 DATE[], c48 TIMESTAMPTZ[], c49 TIMESTAMP[], c50 BOOLEAN[], c51 SMALLINT[]); INSERT INTO %[1]s SELECT 2,2,b'1',b'101', true,random_bytea(32),'s','test','1.1.10.2'::cidr, CURRENT_DATE,1.23,1.234,'10.0.0.0/32'::inet,1, '5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval, '{"sai":-8.02139037433155}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr, - 1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, + 1.2,1234567890123456789012345678901234567890.1234567890123456789012345678901234567890, + 4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, 'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector, txid_current_snapshot(), '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid, @@ -723,7 +725,8 @@ func (s ClickHouseSuite) Test_Types_CH() { CURRENT_DATE,1.23,1.234,'10.0.0.0/32'::inet,1, '5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval, '{"sai":-8.02139037433155}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr, - 1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, + 1.2,1234567890123456789012345678901234567890.1234567890123456789012345678901234567890, + 4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, 'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector, txid_current_snapshot(), '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid, @@ -738,6 +741,8 @@ func (s ClickHouseSuite) Test_Types_CH() { '{1, 2}'::smallint[];`, srcFullName)) require.NoError(s.t, err) e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id", 2) + e2e.EnvWaitForEqualTablesWithNames(env, s, "check comparable types", srcTableName, dstTableName, + "id,c1,c4,c7,c8,c11,c12,c13,c15,c23,c28,c29,c30,c31,c32,c33,c34,c35,c36") env.Cancel() e2e.RequireEnvCanceled(s.t, env)