From 436b8409128082e8c9ab5f35fbb11ed390b84ecd Mon Sep 17 00:00:00 2001 From: Luke Judd Date: Fri, 6 Dec 2024 13:32:33 +1100 Subject: [PATCH 1/6] Add UUID array type --- flow/connectors/bigquery/qvalue_convert.go | 8 ++- flow/connectors/postgres/qvalue_convert.go | 59 ++++++++++++++++++---- flow/model/qrecord_copy_from_source.go | 6 +++ flow/model/qvalue/avro_converter.go | 8 +++ flow/model/qvalue/kind.go | 3 ++ flow/model/qvalue/qvalue.go | 20 +++++++- flow/pua/peerdb.go | 10 +++- 7 files changed, 100 insertions(+), 14 deletions(-) diff --git a/flow/connectors/bigquery/qvalue_convert.go b/flow/connectors/bigquery/qvalue_convert.go index aa798641ac..2ce0f6333e 100644 --- a/flow/connectors/bigquery/qvalue_convert.go +++ b/flow/connectors/bigquery/qvalue_convert.go @@ -33,7 +33,7 @@ func qValueKindToBigQueryType(columnDescription *protos.FieldDescription, nullab // string related case qvalue.QValueKindString: bqField.Type = bigquery.StringFieldType - // json also is stored as string for now + // json related case qvalue.QValueKindJSON, qvalue.QValueKindJSONB, qvalue.QValueKindHStore: bqField.Type = bigquery.JSONFieldType // time related @@ -69,6 +69,12 @@ func qValueKindToBigQueryType(columnDescription *protos.FieldDescription, nullab bqField.Repeated = true case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint: bqField.Type = bigquery.GeographyFieldType + // UUID related - stored as strings for now + case qvalue.QValueKindUUID: + bqField.Type = bigquery.StringFieldType + case qvalue.QValueKindArrayUUID: + bqField.Type = bigquery.StringFieldType + bqField.Repeated = true // rest will be strings default: bqField.Type = bigquery.StringFieldType diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index fe2489ed30..184006f5ff 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -68,6 +68,8 @@ func (c *PostgresConnector) postgresOIDToQValueKind(recvOID uint32) qvalue.QValu return qvalue.QValueKindJSONB case pgtype.UUIDOID: return qvalue.QValueKindUUID + case pgtype.UUIDArrayOID: + return qvalue.QValueKindArrayUUID case pgtype.TimeOID: return qvalue.QValueKindTime case pgtype.DateOID: @@ -175,6 +177,8 @@ func qValueKindToPostgresType(colTypeStr string) string { return "HSTORE" case qvalue.QValueKindUUID: return "UUID" + case qvalue.QValueKindArrayUUID: + return "UUID[]" case qvalue.QValueKindTime: return "TIME" case qvalue.QValueKindTimeTZ: @@ -236,6 +240,40 @@ func parseJSON(value interface{}, isArray bool) (qvalue.QValue, error) { return qvalue.QValueJSON{Val: string(jsonVal), IsArray: isArray}, nil } +func parseUUID(value interface{}) (qvalue.QValue, error) { + switch v := value.(type) { + case string: + id, err := uuid.Parse(v) + if err != nil { + return nil, fmt.Errorf("invalid UUID string: %w", err) + } + return qvalue.QValueUUID{Val: id}, nil + case uuid.UUID: + return qvalue.QValueUUID{Val: v}, nil + default: + return nil, fmt.Errorf("unsupported type for UUID: %T", value) + } +} + +func parseUUIDArray(value interface{}) (qvalue.QValue, error) { + switch v := value.(type) { + case []string: + uuids := make([]uuid.UUID, 0, len(v)) + for _, s := range v { + id, err := uuid.Parse(s) + if err != nil { + return nil, fmt.Errorf("invalid UUID in array: %w", err) + } + uuids = append(uuids, id) + } + return qvalue.QValueArrayUUID{Val: uuids}, nil + case []uuid.UUID: + return qvalue.QValueArrayUUID{Val: v}, nil + default: + return nil, fmt.Errorf("unsupported type for UUID array: %T", value) + } +} + func convertToArray[T any](kind qvalue.QValueKind, value interface{}) ([]T, error) { switch v := value.(type) { case pgtype.Array[T]: @@ -378,18 +416,17 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( // handling all unsupported types with strings as well for now. return qvalue.QValueString{Val: fmt.Sprint(value)}, nil case qvalue.QValueKindUUID: - switch v := value.(type) { - case string: - id, err := uuid.Parse(v) - if err != nil { - return nil, fmt.Errorf("failed to parse UUID: %w", err) - } - return qvalue.QValueUUID{Val: [16]byte(id)}, nil - case [16]byte: - return qvalue.QValueUUID{Val: v}, nil - default: - return nil, fmt.Errorf("failed to parse UUID: %v", value) + tmp, err := parseUUID(value) + if err != nil { + return nil, fmt.Errorf("failed to parse UUID: %w", err) + } + return tmp, nil + case qvalue.QValueKindArrayUUID: + tmp, err := parseUUIDArray(value) + if err != nil { + return nil, fmt.Errorf("failed to parse UUID array: %w", err) } + return tmp, nil case qvalue.QValueKindINET: switch v := value.(type) { case string: diff --git a/flow/model/qrecord_copy_from_source.go b/flow/model/qrecord_copy_from_source.go index d633fda999..057d4b1ffc 100644 --- a/flow/model/qrecord_copy_from_source.go +++ b/flow/model/qrecord_copy_from_source.go @@ -91,6 +91,12 @@ func (src *QRecordCopyFromSource) Values() ([]interface{}, error) { values[i] = pgtype.Timestamptz{Time: v.Val, Valid: true} case qvalue.QValueUUID: values[i] = uuid.UUID(v.Val) + case qvalue.QValueArrayUUID: + a, err := constructArray[uuid.UUID](qValue, "ArrayUUID") + if err != nil { + return nil, err + } + values[i] = a case qvalue.QValueNumeric: values[i] = v.Val case qvalue.QValueBytes: diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index db5bf4e2af..6cf5b01856 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -96,6 +96,14 @@ func GetAvroSchemaFromQValueKind( Type: "string", LogicalType: "uuid", }, nil + case QValueKindArrayUUID: + return AvroSchemaComplexArray{ + Type: "array", + Items: AvroSchemaField{ + Type: "string", + LogicalType: "uuid", + }, + }, nil case QValueKindGeometry, QValueKindGeography, QValueKindPoint: return "string", nil case QValueKindInt16, QValueKindInt32, QValueKindInt64: diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 3cffcc274a..29fb469c24 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -58,6 +58,7 @@ const ( QValueKindArrayBoolean QValueKind = "array_bool" QValueKindArrayJSON QValueKind = "array_json" QValueKindArrayJSONB QValueKind = "array_jsonb" + QValueKindArrayUUID QValueKind = "array_uuid" ) func (kind QValueKind) IsArray() bool { @@ -103,6 +104,7 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{ QValueKindArrayBoolean: "VARIANT", QValueKindArrayJSON: "VARIANT", QValueKindArrayJSONB: "VARIANT", + QValueKindArrayUUID: "VARIANT", } var QValueKindToClickHouseTypeMap = map[QValueKind]string{ @@ -139,6 +141,7 @@ var QValueKindToClickHouseTypeMap = map[QValueKind]string{ QValueKindArrayTimestampTZ: "Array(DateTime64(6))", QValueKindArrayJSON: "String", QValueKindArrayJSONB: "String", + QValueKindArrayUUID: "Array(String)", } func getClickHouseTypeForNumericColumn(ctx context.Context, env map[string]string, column *protos.FieldDescription) (string, error) { diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index 1277881a3d..de703a112a 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -343,7 +343,7 @@ func (v QValueBytes) LValue(ls *lua.LState) lua.LValue { } type QValueUUID struct { - Val [16]byte + Val uuid.UUID } func (QValueUUID) Kind() QValueKind { @@ -358,6 +358,24 @@ func (v QValueUUID) LValue(ls *lua.LState) lua.LValue { return shared.LuaUuid.New(ls, uuid.UUID(v.Val)) } +type QValueArrayUUID struct { + Val []uuid.UUID +} + +func (QValueArrayUUID) Kind() QValueKind { + return QValueKindArrayUUID +} + +func (v QValueArrayUUID) Value() any { + return v.Val +} + +func (v QValueArrayUUID) LValue(ls *lua.LState) lua.LValue { + return shared.SliceToLTable(ls, v.Val, func(x uuid.UUID) lua.LValue { + return shared.LuaUuid.New(ls, x) + }) +} + type QValueJSON struct { Val string IsArray bool diff --git a/flow/pua/peerdb.go b/flow/pua/peerdb.go index 441daa24d5..6279cb960a 100644 --- a/flow/pua/peerdb.go +++ b/flow/pua/peerdb.go @@ -10,7 +10,7 @@ import ( "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/shopspring/decimal" - "github.com/yuin/gopher-lua" + lua "github.com/yuin/gopher-lua" "github.com/PeerDB-io/glua64" "github.com/PeerDB-io/gluabit32" @@ -244,6 +244,14 @@ func LuaRowNewIndex(ls *lua.LState) int { newqv = qvalue.QValueUUID{Val: [16]byte(id)} } } + case qvalue.QValueKindArrayUUID: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayUUID{ + Val: shared.LTableToSlice(ls, tbl, func(_ *lua.LState, v lua.LValue) uuid.UUID { + return uuid.MustParse(lua.LVAsString(v)) + }), + } + } case qvalue.QValueKindJSON: newqv = qvalue.QValueJSON{Val: lua.LVAsString(val)} case qvalue.QValueKindArrayFloat32: From af607e9df522fc812f1f09050cd978af438a9dc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 6 Dec 2024 14:37:33 +0000 Subject: [PATCH 2/6] feedback --- flow/connectors/postgres/qvalue_convert.go | 8 ++++++++ flow/model/qvalue/equals.go | 18 ++++++++++++++++++ flow/model/qvalue/kind.go | 2 +- 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 184006f5ff..df88a89653 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -248,6 +248,8 @@ func parseUUID(value interface{}) (qvalue.QValue, error) { return nil, fmt.Errorf("invalid UUID string: %w", err) } return qvalue.QValueUUID{Val: id}, nil + case [16]byte: + return qvalue.QValueUUID{Val: uuid.UUID(v)}, nil case uuid.UUID: return qvalue.QValueUUID{Val: v}, nil default: @@ -267,6 +269,12 @@ func parseUUIDArray(value interface{}) (qvalue.QValue, error) { uuids = append(uuids, id) } return qvalue.QValueArrayUUID{Val: uuids}, nil + case [][16]byte: + uuids := make([]uuid.UUID, 0, len(v)) + for _, v := range v { + uuids = append(uuids, uuid.UUID(v)) + } + return qvalue.QValueArrayUUID{Val: uuids}, nil case []uuid.UUID: return qvalue.QValueArrayUUID{Val: v}, nil default: diff --git a/flow/model/qvalue/equals.go b/flow/model/qvalue/equals.go index a609c6df6a..13af407b4b 100644 --- a/flow/model/qvalue/equals.go +++ b/flow/model/qvalue/equals.go @@ -90,6 +90,8 @@ func Equals(qv QValue, other QValue) bool { return compareTimeArrays(qvValue, otherValue) case QValueArrayBoolean: return compareBoolArrays(q.Val, otherValue) + case QValueArrayUUID: + return compareUuidArrays(q.Val, otherValue) case QValueArrayString: return compareArrayString(q.Val, otherValue) default: @@ -321,6 +323,22 @@ func compareBoolArrays(value1, value2 interface{}) bool { return true } +func compareUuidArrays(value1, value2 interface{}) bool { + array1, ok1 := value1.([]uuid.UUID) + array2, ok2 := value2.([]uuid.UUID) + + if !ok1 || !ok2 || len(array1) != len(array2) { + return false + } + + for i := range array1 { + if array1[i] != array2[i] { + return false + } + } + return true +} + func compareArrayString(value1, value2 interface{}) bool { array1, ok1 := value1.([]string) array2, ok2 := value2.([]string) diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 29fb469c24..10b7db4629 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -141,7 +141,7 @@ var QValueKindToClickHouseTypeMap = map[QValueKind]string{ QValueKindArrayTimestampTZ: "Array(DateTime64(6))", QValueKindArrayJSON: "String", QValueKindArrayJSONB: "String", - QValueKindArrayUUID: "Array(String)", + QValueKindArrayUUID: "Array(UUID)", } func getClickHouseTypeForNumericColumn(ctx context.Context, env map[string]string, column *protos.FieldDescription) (string, error) { From 2fd1e5d6f2179a4fa3417615d32f5f88bf58d70f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 6 Dec 2024 14:39:21 +0000 Subject: [PATCH 3/6] lints --- flow/model/qrecord_copy_from_source.go | 2 +- flow/model/qvalue/qvalue.go | 2 +- flow/model/record_items.go | 4 +--- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/flow/model/qrecord_copy_from_source.go b/flow/model/qrecord_copy_from_source.go index 057d4b1ffc..e98adea90f 100644 --- a/flow/model/qrecord_copy_from_source.go +++ b/flow/model/qrecord_copy_from_source.go @@ -90,7 +90,7 @@ func (src *QRecordCopyFromSource) Values() ([]interface{}, error) { case qvalue.QValueTimestampTZ: values[i] = pgtype.Timestamptz{Time: v.Val, Valid: true} case qvalue.QValueUUID: - values[i] = uuid.UUID(v.Val) + values[i] = v.Val case qvalue.QValueArrayUUID: a, err := constructArray[uuid.UUID](qValue, "ArrayUUID") if err != nil { diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index de703a112a..9e81c080d6 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -355,7 +355,7 @@ func (v QValueUUID) Value() any { } func (v QValueUUID) LValue(ls *lua.LState) lua.LValue { - return shared.LuaUuid.New(ls, uuid.UUID(v.Val)) + return shared.LuaUuid.New(ls, v.Val) } type QValueArrayUUID struct { diff --git a/flow/model/record_items.go b/flow/model/record_items.go index 7c242974b3..13b6dfef5a 100644 --- a/flow/model/record_items.go +++ b/flow/model/record_items.go @@ -5,8 +5,6 @@ import ( "fmt" "math" - "github.com/google/uuid" - "github.com/PeerDB-io/peer-flow/datatypes" "github.com/PeerDB-io/peer-flow/model/qvalue" ) @@ -89,7 +87,7 @@ func (r RecordItems) toMap(opts ToJSONOptions) (map[string]interface{}, error) { switch v := qv.(type) { case qvalue.QValueUUID: - jsonStruct[col] = uuid.UUID(v.Val) + jsonStruct[col] = v.Val case qvalue.QValueQChar: jsonStruct[col] = string(v.Val) case qvalue.QValueString: From 47cd0eec568356a10341344b5b4a3000be1e77a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 6 Dec 2024 15:09:13 +0000 Subject: [PATCH 4/6] test --- flow/e2e/postgres/peer_flow_pg_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index f2ca26443a..bd816e79e1 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -101,7 +101,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() { c14 INET,c15 INTEGER,c21 MACADDR, c29 SMALLINT,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ, - c40 UUID, c42 INT[], c43 FLOAT[], c44 TEXT[], + c40 UUID, c42 INT[], c43 FLOAT[], c44 TEXT[], c45 UUID[], c46 DATE[], c47 TIMESTAMPTZ[], c48 TIMESTAMP[], c49 BOOLEAN[], c50 SMALLINT[]); `, srcTableName)) require.NoError(s.t, err) @@ -129,6 +129,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() { ARRAY[10299301,2579827], ARRAY[0.0003, 8902.0092], ARRAY['hello','bye'], + ARRAY['66073c38-b8df-4bdb-bbca-1c97596b8940','cd76be3e-d20a-451b-8e60-015872d7f607']::uuid[], '{2020-01-01, 2020-01-02}'::date[], '{"2020-01-01 01:01:01+00", "2020-01-02 01:01:01+00"}'::timestamptz[], '{"2020-01-01 01:01:01", "2020-01-02 01:01:01"}'::timestamp[], @@ -142,7 +143,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() { "c1", "c2", "c4", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c21", "c29", "c33", "c34", "c35", "c36", - "c7", "c8", "c32", "c42", "c43", "c44", "c46", "c47", "c48", "c49", "c50", + "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46", "c47", "c48", "c49", "c50", }, ",") e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize types", func() bool { return s.comparePGTables(srcTableName, dstTableName, allCols) == nil From 3fa8eed0699f4ff5135519ec40521a30f7dfc8e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 6 Dec 2024 15:38:56 +0000 Subject: [PATCH 5/6] more [16]byte to uuid.UUID conversion --- flow/connectors/postgres/qrep_query_executor_test.go | 4 ++-- flow/connectors/snowflake/avro_file_writer_test.go | 2 +- flow/connectors/sql/query_executor.go | 2 +- flow/model/qvalue/avro_converter.go | 4 ++-- flow/pua/peerdb.go | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/flow/connectors/postgres/qrep_query_executor_test.go b/flow/connectors/postgres/qrep_query_executor_test.go index f8f686c42f..eaa015c1cf 100644 --- a/flow/connectors/postgres/qrep_query_executor_test.go +++ b/flow/connectors/postgres/qrep_query_executor_test.go @@ -132,7 +132,7 @@ func TestAllDataTypes(t *testing.T) { "text", // col_text []byte("bytea"), // col_bytea `{"key": "value"}`, // col_json - savedUUID.String(), // col_uuid + savedUUID, // col_uuid savedTime, // col_timestamp "123.456", // col_numeric savedTime, // col_tz @@ -189,7 +189,7 @@ func TestAllDataTypes(t *testing.T) { expectedJSON := `{"key":"value"}` require.Equal(t, expectedJSON, record[7].Value(), "expected '{\"key\":\"value\"}'") - actualUUID := record[8].Value().([16]uint8) + actualUUID := record[8].Value().(uuid.UUID) require.Equal(t, savedUUID[:], actualUUID[:], "expected savedUUID: %v", savedUUID) actualTime := record[9].Value().(time.Time) require.Equal(t, savedTime.Truncate(time.Second), diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index 4a76fccd01..fbf86fc4c7 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -49,7 +49,7 @@ func createQValue(t *testing.T, kind qvalue.QValueKind, placeholder int) qvalue. case qvalue.QValueKindNumeric: return qvalue.QValueNumeric{Val: decimal.New(int64(placeholder), 1)} case qvalue.QValueKindUUID: - return qvalue.QValueUUID{Val: [16]byte(uuid.New())} // assuming you have the github.com/google/uuid package + return qvalue.QValueUUID{Val: uuid.New()} // assuming you have the github.com/google/uuid package case qvalue.QValueKindQChar: return qvalue.QValueQChar{Val: uint8(48 + placeholder%10)} // assuming you have the github.com/google/uuid package // case qvalue.QValueKindArray: diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index fb94280c38..4be60f4a51 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -443,7 +443,7 @@ func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) { if err != nil { return nil, fmt.Errorf("failed to parse uuid: %v", *v) } - return qvalue.QValueUUID{Val: [16]byte(uuidVal)}, nil + return qvalue.QValueUUID{Val: uuidVal}, nil } case qvalue.QValueKindJSON: diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 6cf5b01856..80c8d9b822 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -606,8 +606,8 @@ func (c *QValueAvroConverter) processHStore(hstore string) (interface{}, error) return jsonString, nil } -func (c *QValueAvroConverter) processUUID(byteData [16]byte) interface{} { - uuidString := uuid.UUID(byteData).String() +func (c *QValueAvroConverter) processUUID(byteData uuid.UUID) interface{} { + uuidString := byteData.String() if c.Nullable { return goavro.Union("string", uuidString) } diff --git a/flow/pua/peerdb.go b/flow/pua/peerdb.go index 6279cb960a..b437232b97 100644 --- a/flow/pua/peerdb.go +++ b/flow/pua/peerdb.go @@ -241,7 +241,7 @@ func LuaRowNewIndex(ls *lua.LState) int { case qvalue.QValueKindUUID: if ud, ok := val.(*lua.LUserData); ok { if id, ok := ud.Value.(uuid.UUID); ok { - newqv = qvalue.QValueUUID{Val: [16]byte(id)} + newqv = qvalue.QValueUUID{Val: id} } } case qvalue.QValueKindArrayUUID: From da9c3683f2a94f6b9e1fe13a2fa02ffd0c312d15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 6 Dec 2024 15:54:20 +0000 Subject: [PATCH 6/6] need to handle []any --- flow/connectors/postgres/qvalue_convert.go | 26 +++++++++++++--------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index df88a89653..e3052a08f4 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -240,20 +240,16 @@ func parseJSON(value interface{}, isArray bool) (qvalue.QValue, error) { return qvalue.QValueJSON{Val: string(jsonVal), IsArray: isArray}, nil } -func parseUUID(value interface{}) (qvalue.QValue, error) { +func parseUUID(value interface{}) (uuid.UUID, error) { switch v := value.(type) { case string: - id, err := uuid.Parse(v) - if err != nil { - return nil, fmt.Errorf("invalid UUID string: %w", err) - } - return qvalue.QValueUUID{Val: id}, nil + return uuid.Parse(v) case [16]byte: - return qvalue.QValueUUID{Val: uuid.UUID(v)}, nil + return uuid.UUID(v), nil case uuid.UUID: - return qvalue.QValueUUID{Val: v}, nil + return v, nil default: - return nil, fmt.Errorf("unsupported type for UUID: %T", value) + return uuid.UUID{}, fmt.Errorf("unsupported type for UUID: %T", value) } } @@ -277,6 +273,16 @@ func parseUUIDArray(value interface{}) (qvalue.QValue, error) { return qvalue.QValueArrayUUID{Val: uuids}, nil case []uuid.UUID: return qvalue.QValueArrayUUID{Val: v}, nil + case []interface{}: + uuids := make([]uuid.UUID, 0, len(v)) + for _, v := range v { + id, err := parseUUID(v) + if err != nil { + return nil, fmt.Errorf("invalid UUID interface{} value in array: %w", err) + } + uuids = append(uuids, id) + } + return qvalue.QValueArrayUUID{Val: uuids}, nil default: return nil, fmt.Errorf("unsupported type for UUID array: %T", value) } @@ -428,7 +434,7 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( if err != nil { return nil, fmt.Errorf("failed to parse UUID: %w", err) } - return tmp, nil + return qvalue.QValueUUID{Val: tmp}, nil case qvalue.QValueKindArrayUUID: tmp, err := parseUUIDArray(value) if err != nil {