Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle syncing UUID arrays into BQ #2358

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE(dstTable string, normalizedTab
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt16,
qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString,
qvalue.QValueKindArrayBoolean, qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ,
qvalue.QValueKindArrayDate:
qvalue.QValueKindArrayDate, qvalue.QValueKindArrayUUID:
castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+
"UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY<STRING>)) AS element WHERE element IS NOT null) AS `%s`",
bqTypeString, column.Name, shortCol)
Expand Down
15 changes: 15 additions & 0 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ func QValueToAvro(
return c.processArrayDate(v.Val), nil
case QValueUUID:
return c.processUUID(v.Val), nil
case QValueArrayUUID:
return c.processArrayUUID(v.Val), nil
case QValueGeography, QValueGeometry, QValuePoint:
return c.processGeospatial(v.Value().(string)), nil
default:
Expand Down Expand Up @@ -614,6 +616,19 @@ func (c *QValueAvroConverter) processUUID(byteData uuid.UUID) interface{} {
return uuidString
}

func (c *QValueAvroConverter) processArrayUUID(arrayData []uuid.UUID) interface{} {
UUIDData := make([]string, 0, len(arrayData))
for _, uuid := range arrayData {
UUIDData = append(UUIDData, uuid.String())
}

if c.Nullable {
return goavro.Union("array", UUIDData)
}

return UUIDData
}

func (c *QValueAvroConverter) processGeospatial(geoString string) interface{} {
if c.Nullable {
return goavro.Union("string", geoString)
Expand Down
Loading