Skip to content

Commit

Permalink
qvalue: UUID array type (#2327)
Browse files Browse the repository at this point in the history
Co-authored-by: Luke Judd <[email protected]>
  • Loading branch information
serprex and lukejudd-lux authored Dec 6, 2024
1 parent 1917c81 commit 572aa1d
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 28 deletions.
8 changes: 7 additions & 1 deletion flow/connectors/bigquery/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func qValueKindToBigQueryType(columnDescription *protos.FieldDescription, nullab
// string related
case qvalue.QValueKindString:
bqField.Type = bigquery.StringFieldType
// json also is stored as string for now
// json related
case qvalue.QValueKindJSON, qvalue.QValueKindJSONB, qvalue.QValueKindHStore:
bqField.Type = bigquery.JSONFieldType
// time related
Expand Down Expand Up @@ -69,6 +69,12 @@ func qValueKindToBigQueryType(columnDescription *protos.FieldDescription, nullab
bqField.Repeated = true
case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint:
bqField.Type = bigquery.GeographyFieldType
// UUID related - stored as strings for now
case qvalue.QValueKindUUID:
bqField.Type = bigquery.StringFieldType
case qvalue.QValueKindArrayUUID:
bqField.Type = bigquery.StringFieldType
bqField.Repeated = true
// rest will be strings
default:
bqField.Type = bigquery.StringFieldType
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/qrep_query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestAllDataTypes(t *testing.T) {
"text", // col_text
[]byte("bytea"), // col_bytea
`{"key": "value"}`, // col_json
savedUUID.String(), // col_uuid
savedUUID, // col_uuid
savedTime, // col_timestamp
"123.456", // col_numeric
savedTime, // col_tz
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestAllDataTypes(t *testing.T) {
expectedJSON := `{"key":"value"}`
require.Equal(t, expectedJSON, record[7].Value(), "expected '{\"key\":\"value\"}'")

actualUUID := record[8].Value().([16]uint8)
actualUUID := record[8].Value().(uuid.UUID)
require.Equal(t, savedUUID[:], actualUUID[:], "expected savedUUID: %v", savedUUID)
actualTime := record[9].Value().(time.Time)
require.Equal(t, savedTime.Truncate(time.Second),
Expand Down
73 changes: 62 additions & 11 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func (c *PostgresConnector) postgresOIDToQValueKind(recvOID uint32) qvalue.QValu
return qvalue.QValueKindJSONB
case pgtype.UUIDOID:
return qvalue.QValueKindUUID
case pgtype.UUIDArrayOID:
return qvalue.QValueKindArrayUUID
case pgtype.TimeOID:
return qvalue.QValueKindTime
case pgtype.DateOID:
Expand Down Expand Up @@ -175,6 +177,8 @@ func qValueKindToPostgresType(colTypeStr string) string {
return "HSTORE"
case qvalue.QValueKindUUID:
return "UUID"
case qvalue.QValueKindArrayUUID:
return "UUID[]"
case qvalue.QValueKindTime:
return "TIME"
case qvalue.QValueKindTimeTZ:
Expand Down Expand Up @@ -236,6 +240,54 @@ func parseJSON(value interface{}, isArray bool) (qvalue.QValue, error) {
return qvalue.QValueJSON{Val: string(jsonVal), IsArray: isArray}, nil
}

func parseUUID(value interface{}) (uuid.UUID, error) {
switch v := value.(type) {
case string:
return uuid.Parse(v)
case [16]byte:
return uuid.UUID(v), nil
case uuid.UUID:
return v, nil
default:
return uuid.UUID{}, fmt.Errorf("unsupported type for UUID: %T", value)
}
}

func parseUUIDArray(value interface{}) (qvalue.QValue, error) {
switch v := value.(type) {
case []string:
uuids := make([]uuid.UUID, 0, len(v))
for _, s := range v {
id, err := uuid.Parse(s)
if err != nil {
return nil, fmt.Errorf("invalid UUID in array: %w", err)
}
uuids = append(uuids, id)
}
return qvalue.QValueArrayUUID{Val: uuids}, nil
case [][16]byte:
uuids := make([]uuid.UUID, 0, len(v))
for _, v := range v {
uuids = append(uuids, uuid.UUID(v))
}
return qvalue.QValueArrayUUID{Val: uuids}, nil
case []uuid.UUID:
return qvalue.QValueArrayUUID{Val: v}, nil
case []interface{}:
uuids := make([]uuid.UUID, 0, len(v))
for _, v := range v {
id, err := parseUUID(v)
if err != nil {
return nil, fmt.Errorf("invalid UUID interface{} value in array: %w", err)
}
uuids = append(uuids, id)
}
return qvalue.QValueArrayUUID{Val: uuids}, nil
default:
return nil, fmt.Errorf("unsupported type for UUID array: %T", value)
}
}

func convertToArray[T any](kind qvalue.QValueKind, value interface{}) ([]T, error) {
switch v := value.(type) {
case pgtype.Array[T]:
Expand Down Expand Up @@ -378,18 +430,17 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
// handling all unsupported types with strings as well for now.
return qvalue.QValueString{Val: fmt.Sprint(value)}, nil
case qvalue.QValueKindUUID:
switch v := value.(type) {
case string:
id, err := uuid.Parse(v)
if err != nil {
return nil, fmt.Errorf("failed to parse UUID: %w", err)
}
return qvalue.QValueUUID{Val: [16]byte(id)}, nil
case [16]byte:
return qvalue.QValueUUID{Val: v}, nil
default:
return nil, fmt.Errorf("failed to parse UUID: %v", value)
tmp, err := parseUUID(value)
if err != nil {
return nil, fmt.Errorf("failed to parse UUID: %w", err)
}
return qvalue.QValueUUID{Val: tmp}, nil
case qvalue.QValueKindArrayUUID:
tmp, err := parseUUIDArray(value)
if err != nil {
return nil, fmt.Errorf("failed to parse UUID array: %w", err)
}
return tmp, nil
case qvalue.QValueKindINET:
switch v := value.(type) {
case string:
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/avro_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func createQValue(t *testing.T, kind qvalue.QValueKind, placeholder int) qvalue.
case qvalue.QValueKindNumeric:
return qvalue.QValueNumeric{Val: decimal.New(int64(placeholder), 1)}
case qvalue.QValueKindUUID:
return qvalue.QValueUUID{Val: [16]byte(uuid.New())} // assuming you have the github.com/google/uuid package
return qvalue.QValueUUID{Val: uuid.New()} // assuming you have the github.com/google/uuid package
case qvalue.QValueKindQChar:
return qvalue.QValueQChar{Val: uint8(48 + placeholder%10)} // assuming you have the github.com/google/uuid package
// case qvalue.QValueKindArray:
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) {
if err != nil {
return nil, fmt.Errorf("failed to parse uuid: %v", *v)
}
return qvalue.QValueUUID{Val: [16]byte(uuidVal)}, nil
return qvalue.QValueUUID{Val: uuidVal}, nil
}

case qvalue.QValueKindJSON:
Expand Down
5 changes: 3 additions & 2 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() {
c14 INET,c15 INTEGER,c21 MACADDR,
c29 SMALLINT,c32 TEXT,
c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,
c40 UUID, c42 INT[], c43 FLOAT[], c44 TEXT[],
c40 UUID, c42 INT[], c43 FLOAT[], c44 TEXT[], c45 UUID[],
c46 DATE[], c47 TIMESTAMPTZ[], c48 TIMESTAMP[], c49 BOOLEAN[], c50 SMALLINT[]);
`, srcTableName))
require.NoError(s.t, err)
Expand Down Expand Up @@ -129,6 +129,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() {
ARRAY[10299301,2579827],
ARRAY[0.0003, 8902.0092],
ARRAY['hello','bye'],
ARRAY['66073c38-b8df-4bdb-bbca-1c97596b8940','cd76be3e-d20a-451b-8e60-015872d7f607']::uuid[],
'{2020-01-01, 2020-01-02}'::date[],
'{"2020-01-01 01:01:01+00", "2020-01-02 01:01:01+00"}'::timestamptz[],
'{"2020-01-01 01:01:01", "2020-01-02 01:01:01"}'::timestamp[],
Expand All @@ -142,7 +143,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() {
"c1", "c2", "c4",
"c40", "id", "c9", "c11", "c12", "c13", "c14", "c15",
"c21", "c29", "c33", "c34", "c35", "c36",
"c7", "c8", "c32", "c42", "c43", "c44", "c46", "c47", "c48", "c49", "c50",
"c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46", "c47", "c48", "c49", "c50",
}, ",")
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize types", func() bool {
return s.comparePGTables(srcTableName, dstTableName, allCols) == nil
Expand Down
8 changes: 7 additions & 1 deletion flow/model/qrecord_copy_from_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,13 @@ func (src *QRecordCopyFromSource) Values() ([]interface{}, error) {
case qvalue.QValueTimestampTZ:
values[i] = pgtype.Timestamptz{Time: v.Val, Valid: true}
case qvalue.QValueUUID:
values[i] = uuid.UUID(v.Val)
values[i] = v.Val
case qvalue.QValueArrayUUID:
a, err := constructArray[uuid.UUID](qValue, "ArrayUUID")
if err != nil {
return nil, err
}
values[i] = a
case qvalue.QValueNumeric:
values[i] = v.Val
case qvalue.QValueBytes:
Expand Down
12 changes: 10 additions & 2 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ func GetAvroSchemaFromQValueKind(
Type: "string",
LogicalType: "uuid",
}, nil
case QValueKindArrayUUID:
return AvroSchemaComplexArray{
Type: "array",
Items: AvroSchemaField{
Type: "string",
LogicalType: "uuid",
},
}, nil
case QValueKindGeometry, QValueKindGeography, QValueKindPoint:
return "string", nil
case QValueKindInt16, QValueKindInt32, QValueKindInt64:
Expand Down Expand Up @@ -598,8 +606,8 @@ func (c *QValueAvroConverter) processHStore(hstore string) (interface{}, error)
return jsonString, nil
}

func (c *QValueAvroConverter) processUUID(byteData [16]byte) interface{} {
uuidString := uuid.UUID(byteData).String()
func (c *QValueAvroConverter) processUUID(byteData uuid.UUID) interface{} {
uuidString := byteData.String()
if c.Nullable {
return goavro.Union("string", uuidString)
}
Expand Down
18 changes: 18 additions & 0 deletions flow/model/qvalue/equals.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func Equals(qv QValue, other QValue) bool {
return compareTimeArrays(qvValue, otherValue)
case QValueArrayBoolean:
return compareBoolArrays(q.Val, otherValue)
case QValueArrayUUID:
return compareUuidArrays(q.Val, otherValue)
case QValueArrayString:
return compareArrayString(q.Val, otherValue)
default:
Expand Down Expand Up @@ -321,6 +323,22 @@ func compareBoolArrays(value1, value2 interface{}) bool {
return true
}

func compareUuidArrays(value1, value2 interface{}) bool {
array1, ok1 := value1.([]uuid.UUID)
array2, ok2 := value2.([]uuid.UUID)

if !ok1 || !ok2 || len(array1) != len(array2) {
return false
}

for i := range array1 {
if array1[i] != array2[i] {
return false
}
}
return true
}

func compareArrayString(value1, value2 interface{}) bool {
array1, ok1 := value1.([]string)
array2, ok2 := value2.([]string)
Expand Down
3 changes: 3 additions & 0 deletions flow/model/qvalue/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
QValueKindArrayBoolean QValueKind = "array_bool"
QValueKindArrayJSON QValueKind = "array_json"
QValueKindArrayJSONB QValueKind = "array_jsonb"
QValueKindArrayUUID QValueKind = "array_uuid"
)

func (kind QValueKind) IsArray() bool {
Expand Down Expand Up @@ -103,6 +104,7 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{
QValueKindArrayBoolean: "VARIANT",
QValueKindArrayJSON: "VARIANT",
QValueKindArrayJSONB: "VARIANT",
QValueKindArrayUUID: "VARIANT",
}

var QValueKindToClickHouseTypeMap = map[QValueKind]string{
Expand Down Expand Up @@ -139,6 +141,7 @@ var QValueKindToClickHouseTypeMap = map[QValueKind]string{
QValueKindArrayTimestampTZ: "Array(DateTime64(6))",
QValueKindArrayJSON: "String",
QValueKindArrayJSONB: "String",
QValueKindArrayUUID: "Array(UUID)",
}

func getClickHouseTypeForNumericColumn(ctx context.Context, env map[string]string, column *protos.FieldDescription) (string, error) {
Expand Down
22 changes: 20 additions & 2 deletions flow/model/qvalue/qvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (v QValueBytes) LValue(ls *lua.LState) lua.LValue {
}

type QValueUUID struct {
Val [16]byte
Val uuid.UUID
}

func (QValueUUID) Kind() QValueKind {
Expand All @@ -355,7 +355,25 @@ func (v QValueUUID) Value() any {
}

func (v QValueUUID) LValue(ls *lua.LState) lua.LValue {
return shared.LuaUuid.New(ls, uuid.UUID(v.Val))
return shared.LuaUuid.New(ls, v.Val)
}

type QValueArrayUUID struct {
Val []uuid.UUID
}

func (QValueArrayUUID) Kind() QValueKind {
return QValueKindArrayUUID
}

func (v QValueArrayUUID) Value() any {
return v.Val
}

func (v QValueArrayUUID) LValue(ls *lua.LState) lua.LValue {
return shared.SliceToLTable(ls, v.Val, func(x uuid.UUID) lua.LValue {
return shared.LuaUuid.New(ls, x)
})
}

type QValueJSON struct {
Expand Down
4 changes: 1 addition & 3 deletions flow/model/record_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"math"

"github.com/google/uuid"

"github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/model/qvalue"
)
Expand Down Expand Up @@ -89,7 +87,7 @@ func (r RecordItems) toMap(opts ToJSONOptions) (map[string]interface{}, error) {

switch v := qv.(type) {
case qvalue.QValueUUID:
jsonStruct[col] = uuid.UUID(v.Val)
jsonStruct[col] = v.Val
case qvalue.QValueQChar:
jsonStruct[col] = string(v.Val)
case qvalue.QValueString:
Expand Down
12 changes: 10 additions & 2 deletions flow/pua/peerdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/shopspring/decimal"
"github.com/yuin/gopher-lua"
lua "github.com/yuin/gopher-lua"

"github.com/PeerDB-io/glua64"
"github.com/PeerDB-io/gluabit32"
Expand Down Expand Up @@ -241,7 +241,15 @@ func LuaRowNewIndex(ls *lua.LState) int {
case qvalue.QValueKindUUID:
if ud, ok := val.(*lua.LUserData); ok {
if id, ok := ud.Value.(uuid.UUID); ok {
newqv = qvalue.QValueUUID{Val: [16]byte(id)}
newqv = qvalue.QValueUUID{Val: id}
}
}
case qvalue.QValueKindArrayUUID:
if tbl, ok := val.(*lua.LTable); ok {
newqv = qvalue.QValueArrayUUID{
Val: shared.LTableToSlice(ls, tbl, func(_ *lua.LState, v lua.LValue) uuid.UUID {
return uuid.MustParse(lua.LVAsString(v))
}),
}
}
case qvalue.QValueKindJSON:
Expand Down

0 comments on commit 572aa1d

Please sign in to comment.