Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

qvalue: UUID array type #2327

Merged
merged 6 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion flow/connectors/bigquery/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func qValueKindToBigQueryType(columnDescription *protos.FieldDescription, nullab
// string related
case qvalue.QValueKindString:
bqField.Type = bigquery.StringFieldType
// json also is stored as string for now
// json related
case qvalue.QValueKindJSON, qvalue.QValueKindJSONB, qvalue.QValueKindHStore:
bqField.Type = bigquery.JSONFieldType
// time related
Expand Down Expand Up @@ -69,6 +69,12 @@ func qValueKindToBigQueryType(columnDescription *protos.FieldDescription, nullab
bqField.Repeated = true
case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint:
bqField.Type = bigquery.GeographyFieldType
// UUID related - stored as strings for now
case qvalue.QValueKindUUID:
bqField.Type = bigquery.StringFieldType
case qvalue.QValueKindArrayUUID:
bqField.Type = bigquery.StringFieldType
bqField.Repeated = true
// rest will be strings
default:
bqField.Type = bigquery.StringFieldType
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/qrep_query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestAllDataTypes(t *testing.T) {
"text", // col_text
[]byte("bytea"), // col_bytea
`{"key": "value"}`, // col_json
savedUUID.String(), // col_uuid
savedUUID, // col_uuid
savedTime, // col_timestamp
"123.456", // col_numeric
savedTime, // col_tz
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestAllDataTypes(t *testing.T) {
expectedJSON := `{"key":"value"}`
require.Equal(t, expectedJSON, record[7].Value(), "expected '{\"key\":\"value\"}'")

actualUUID := record[8].Value().([16]uint8)
actualUUID := record[8].Value().(uuid.UUID)
require.Equal(t, savedUUID[:], actualUUID[:], "expected savedUUID: %v", savedUUID)
actualTime := record[9].Value().(time.Time)
require.Equal(t, savedTime.Truncate(time.Second),
Expand Down
73 changes: 62 additions & 11 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func (c *PostgresConnector) postgresOIDToQValueKind(recvOID uint32) qvalue.QValu
return qvalue.QValueKindJSONB
case pgtype.UUIDOID:
return qvalue.QValueKindUUID
case pgtype.UUIDArrayOID:
return qvalue.QValueKindArrayUUID
case pgtype.TimeOID:
return qvalue.QValueKindTime
case pgtype.DateOID:
Expand Down Expand Up @@ -175,6 +177,8 @@ func qValueKindToPostgresType(colTypeStr string) string {
return "HSTORE"
case qvalue.QValueKindUUID:
return "UUID"
case qvalue.QValueKindArrayUUID:
return "UUID[]"
case qvalue.QValueKindTime:
return "TIME"
case qvalue.QValueKindTimeTZ:
Expand Down Expand Up @@ -236,6 +240,54 @@ func parseJSON(value interface{}, isArray bool) (qvalue.QValue, error) {
return qvalue.QValueJSON{Val: string(jsonVal), IsArray: isArray}, nil
}

func parseUUID(value interface{}) (uuid.UUID, error) {
switch v := value.(type) {
case string:
return uuid.Parse(v)
case [16]byte:
return uuid.UUID(v), nil
case uuid.UUID:
return v, nil
default:
return uuid.UUID{}, fmt.Errorf("unsupported type for UUID: %T", value)
}
}

func parseUUIDArray(value interface{}) (qvalue.QValue, error) {
switch v := value.(type) {
case []string:
uuids := make([]uuid.UUID, 0, len(v))
for _, s := range v {
id, err := uuid.Parse(s)
if err != nil {
return nil, fmt.Errorf("invalid UUID in array: %w", err)
}
uuids = append(uuids, id)
}
return qvalue.QValueArrayUUID{Val: uuids}, nil
case [][16]byte:
uuids := make([]uuid.UUID, 0, len(v))
for _, v := range v {
uuids = append(uuids, uuid.UUID(v))
}
return qvalue.QValueArrayUUID{Val: uuids}, nil
case []uuid.UUID:
return qvalue.QValueArrayUUID{Val: v}, nil
case []interface{}:
uuids := make([]uuid.UUID, 0, len(v))
for _, v := range v {
id, err := parseUUID(v)
if err != nil {
return nil, fmt.Errorf("invalid UUID interface{} value in array: %w", err)
}
uuids = append(uuids, id)
}
return qvalue.QValueArrayUUID{Val: uuids}, nil
default:
return nil, fmt.Errorf("unsupported type for UUID array: %T", value)
}
}

func convertToArray[T any](kind qvalue.QValueKind, value interface{}) ([]T, error) {
switch v := value.(type) {
case pgtype.Array[T]:
Expand Down Expand Up @@ -378,18 +430,17 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
// handling all unsupported types with strings as well for now.
return qvalue.QValueString{Val: fmt.Sprint(value)}, nil
case qvalue.QValueKindUUID:
switch v := value.(type) {
case string:
id, err := uuid.Parse(v)
if err != nil {
return nil, fmt.Errorf("failed to parse UUID: %w", err)
}
return qvalue.QValueUUID{Val: [16]byte(id)}, nil
case [16]byte:
return qvalue.QValueUUID{Val: v}, nil
default:
return nil, fmt.Errorf("failed to parse UUID: %v", value)
tmp, err := parseUUID(value)
if err != nil {
return nil, fmt.Errorf("failed to parse UUID: %w", err)
}
return qvalue.QValueUUID{Val: tmp}, nil
case qvalue.QValueKindArrayUUID:
tmp, err := parseUUIDArray(value)
if err != nil {
return nil, fmt.Errorf("failed to parse UUID array: %w", err)
}
return tmp, nil
case qvalue.QValueKindINET:
switch v := value.(type) {
case string:
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/avro_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func createQValue(t *testing.T, kind qvalue.QValueKind, placeholder int) qvalue.
case qvalue.QValueKindNumeric:
return qvalue.QValueNumeric{Val: decimal.New(int64(placeholder), 1)}
case qvalue.QValueKindUUID:
return qvalue.QValueUUID{Val: [16]byte(uuid.New())} // assuming you have the github.com/google/uuid package
return qvalue.QValueUUID{Val: uuid.New()} // assuming you have the github.com/google/uuid package
case qvalue.QValueKindQChar:
return qvalue.QValueQChar{Val: uint8(48 + placeholder%10)} // assuming you have the github.com/google/uuid package
// case qvalue.QValueKindArray:
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) {
if err != nil {
return nil, fmt.Errorf("failed to parse uuid: %v", *v)
}
return qvalue.QValueUUID{Val: [16]byte(uuidVal)}, nil
return qvalue.QValueUUID{Val: uuidVal}, nil
}

case qvalue.QValueKindJSON:
Expand Down
5 changes: 3 additions & 2 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() {
c14 INET,c15 INTEGER,c21 MACADDR,
c29 SMALLINT,c32 TEXT,
c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,
c40 UUID, c42 INT[], c43 FLOAT[], c44 TEXT[],
c40 UUID, c42 INT[], c43 FLOAT[], c44 TEXT[], c45 UUID[],
c46 DATE[], c47 TIMESTAMPTZ[], c48 TIMESTAMP[], c49 BOOLEAN[], c50 SMALLINT[]);
`, srcTableName))
require.NoError(s.t, err)
Expand Down Expand Up @@ -129,6 +129,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() {
ARRAY[10299301,2579827],
ARRAY[0.0003, 8902.0092],
ARRAY['hello','bye'],
ARRAY['66073c38-b8df-4bdb-bbca-1c97596b8940','cd76be3e-d20a-451b-8e60-015872d7f607']::uuid[],
'{2020-01-01, 2020-01-02}'::date[],
'{"2020-01-01 01:01:01+00", "2020-01-02 01:01:01+00"}'::timestamptz[],
'{"2020-01-01 01:01:01", "2020-01-02 01:01:01"}'::timestamp[],
Expand All @@ -142,7 +143,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() {
"c1", "c2", "c4",
"c40", "id", "c9", "c11", "c12", "c13", "c14", "c15",
"c21", "c29", "c33", "c34", "c35", "c36",
"c7", "c8", "c32", "c42", "c43", "c44", "c46", "c47", "c48", "c49", "c50",
"c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46", "c47", "c48", "c49", "c50",
}, ",")
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize types", func() bool {
return s.comparePGTables(srcTableName, dstTableName, allCols) == nil
Expand Down
8 changes: 7 additions & 1 deletion flow/model/qrecord_copy_from_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,13 @@ func (src *QRecordCopyFromSource) Values() ([]interface{}, error) {
case qvalue.QValueTimestampTZ:
values[i] = pgtype.Timestamptz{Time: v.Val, Valid: true}
case qvalue.QValueUUID:
values[i] = uuid.UUID(v.Val)
values[i] = v.Val
case qvalue.QValueArrayUUID:
a, err := constructArray[uuid.UUID](qValue, "ArrayUUID")
if err != nil {
return nil, err
}
values[i] = a
case qvalue.QValueNumeric:
values[i] = v.Val
case qvalue.QValueBytes:
Expand Down
12 changes: 10 additions & 2 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ func GetAvroSchemaFromQValueKind(
Type: "string",
LogicalType: "uuid",
}, nil
case QValueKindArrayUUID:
return AvroSchemaComplexArray{
Type: "array",
Items: AvroSchemaField{
Type: "string",
LogicalType: "uuid",
},
}, nil
case QValueKindGeometry, QValueKindGeography, QValueKindPoint:
return "string", nil
case QValueKindInt16, QValueKindInt32, QValueKindInt64:
Expand Down Expand Up @@ -598,8 +606,8 @@ func (c *QValueAvroConverter) processHStore(hstore string) (interface{}, error)
return jsonString, nil
}

func (c *QValueAvroConverter) processUUID(byteData [16]byte) interface{} {
uuidString := uuid.UUID(byteData).String()
func (c *QValueAvroConverter) processUUID(byteData uuid.UUID) interface{} {
uuidString := byteData.String()
if c.Nullable {
return goavro.Union("string", uuidString)
}
Expand Down
18 changes: 18 additions & 0 deletions flow/model/qvalue/equals.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func Equals(qv QValue, other QValue) bool {
return compareTimeArrays(qvValue, otherValue)
case QValueArrayBoolean:
return compareBoolArrays(q.Val, otherValue)
case QValueArrayUUID:
return compareUuidArrays(q.Val, otherValue)
case QValueArrayString:
return compareArrayString(q.Val, otherValue)
default:
Expand Down Expand Up @@ -321,6 +323,22 @@ func compareBoolArrays(value1, value2 interface{}) bool {
return true
}

func compareUuidArrays(value1, value2 interface{}) bool {
array1, ok1 := value1.([]uuid.UUID)
array2, ok2 := value2.([]uuid.UUID)

if !ok1 || !ok2 || len(array1) != len(array2) {
return false
}

for i := range array1 {
if array1[i] != array2[i] {
return false
}
}
return true
}

func compareArrayString(value1, value2 interface{}) bool {
array1, ok1 := value1.([]string)
array2, ok2 := value2.([]string)
Expand Down
3 changes: 3 additions & 0 deletions flow/model/qvalue/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
QValueKindArrayBoolean QValueKind = "array_bool"
QValueKindArrayJSON QValueKind = "array_json"
QValueKindArrayJSONB QValueKind = "array_jsonb"
QValueKindArrayUUID QValueKind = "array_uuid"
)

func (kind QValueKind) IsArray() bool {
Expand Down Expand Up @@ -103,6 +104,7 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{
QValueKindArrayBoolean: "VARIANT",
QValueKindArrayJSON: "VARIANT",
QValueKindArrayJSONB: "VARIANT",
QValueKindArrayUUID: "VARIANT",
}

var QValueKindToClickHouseTypeMap = map[QValueKind]string{
Expand Down Expand Up @@ -139,6 +141,7 @@ var QValueKindToClickHouseTypeMap = map[QValueKind]string{
QValueKindArrayTimestampTZ: "Array(DateTime64(6))",
QValueKindArrayJSON: "String",
QValueKindArrayJSONB: "String",
QValueKindArrayUUID: "Array(UUID)",
}

func getClickHouseTypeForNumericColumn(ctx context.Context, env map[string]string, column *protos.FieldDescription) (string, error) {
Expand Down
22 changes: 20 additions & 2 deletions flow/model/qvalue/qvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (v QValueBytes) LValue(ls *lua.LState) lua.LValue {
}

type QValueUUID struct {
Val [16]byte
Val uuid.UUID
}

func (QValueUUID) Kind() QValueKind {
Expand All @@ -355,7 +355,25 @@ func (v QValueUUID) Value() any {
}

func (v QValueUUID) LValue(ls *lua.LState) lua.LValue {
return shared.LuaUuid.New(ls, uuid.UUID(v.Val))
return shared.LuaUuid.New(ls, v.Val)
}

type QValueArrayUUID struct {
Val []uuid.UUID
}

func (QValueArrayUUID) Kind() QValueKind {
return QValueKindArrayUUID
}

func (v QValueArrayUUID) Value() any {
return v.Val
}

func (v QValueArrayUUID) LValue(ls *lua.LState) lua.LValue {
return shared.SliceToLTable(ls, v.Val, func(x uuid.UUID) lua.LValue {
return shared.LuaUuid.New(ls, x)
})
}

type QValueJSON struct {
Expand Down
4 changes: 1 addition & 3 deletions flow/model/record_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"math"

"github.com/google/uuid"

"github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/model/qvalue"
)
Expand Down Expand Up @@ -89,7 +87,7 @@ func (r RecordItems) toMap(opts ToJSONOptions) (map[string]interface{}, error) {

switch v := qv.(type) {
case qvalue.QValueUUID:
jsonStruct[col] = uuid.UUID(v.Val)
jsonStruct[col] = v.Val
case qvalue.QValueQChar:
jsonStruct[col] = string(v.Val)
case qvalue.QValueString:
Expand Down
12 changes: 10 additions & 2 deletions flow/pua/peerdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/shopspring/decimal"
"github.com/yuin/gopher-lua"
lua "github.com/yuin/gopher-lua"

"github.com/PeerDB-io/glua64"
"github.com/PeerDB-io/gluabit32"
Expand Down Expand Up @@ -241,7 +241,15 @@ func LuaRowNewIndex(ls *lua.LState) int {
case qvalue.QValueKindUUID:
if ud, ok := val.(*lua.LUserData); ok {
if id, ok := ud.Value.(uuid.UUID); ok {
newqv = qvalue.QValueUUID{Val: [16]byte(id)}
newqv = qvalue.QValueUUID{Val: id}
}
}
case qvalue.QValueKindArrayUUID:
if tbl, ok := val.(*lua.LTable); ok {
newqv = qvalue.QValueArrayUUID{
Val: shared.LTableToSlice(ls, tbl, func(_ *lua.LState, v lua.LValue) uuid.UUID {
return uuid.MustParse(lua.LVAsString(v))
}),
}
}
case qvalue.QValueKindJSON:
Expand Down
Loading