Skip to content

Commit

Permalink
Handle syncing UUID arrays in BQ
Browse files Browse the repository at this point in the history
  • Loading branch information
lukejudd-lux committed Dec 16, 2024
1 parent 12aa300 commit 35fd4bb
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE(dstTable string, normalizedTab
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt16,
qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString,
qvalue.QValueKindArrayBoolean, qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ,
qvalue.QValueKindArrayDate:
qvalue.QValueKindArrayDate, qvalue.QValueKindArrayUUID:
castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+
"UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY<STRING>)) AS element WHERE element IS NOT null) AS `%s`",
bqTypeString, column.Name, shortCol)
Expand Down
15 changes: 15 additions & 0 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ func QValueToAvro(
return c.processArrayDate(v.Val), nil
case QValueUUID:
return c.processUUID(v.Val), nil
case QValueArrayUUID:
return c.processArrayUUID(v.Val), nil
case QValueGeography, QValueGeometry, QValuePoint:
return c.processGeospatial(v.Value().(string)), nil
default:
Expand Down Expand Up @@ -614,6 +616,19 @@ func (c *QValueAvroConverter) processUUID(byteData uuid.UUID) interface{} {
return uuidString
}

func (c *QValueAvroConverter) processArrayUUID(arrayData []uuid.UUID) interface{} {
UUIDData := make([]string, 0, len(arrayData))
for _, uuid := range arrayData {
UUIDData = append(UUIDData, uuid.String())
}

if c.Nullable {
return goavro.Union("array", UUIDData)
}

return UUIDData
}

func (c *QValueAvroConverter) processGeospatial(geoString string) interface{} {
if c.Nullable {
return goavro.Union("string", geoString)
Expand Down

0 comments on commit 35fd4bb

Please sign in to comment.