From 35fd4bbb178fa70e99b43a1c51a93bd021396090 Mon Sep 17 00:00:00 2001 From: Luke Judd Date: Mon, 16 Dec 2024 14:14:36 +1100 Subject: [PATCH] Handle syncing UUID arrays in BQ --- flow/connectors/bigquery/merge_stmt_generator.go | 2 +- flow/model/qvalue/avro_converter.go | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index 5ee4f883c2..3acf864785 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -45,7 +45,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE(dstTable string, normalizedTab case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt16, qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString, qvalue.QValueKindArrayBoolean, qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ, - qvalue.QValueKindArrayDate: + qvalue.QValueKindArrayDate, qvalue.QValueKindArrayUUID: castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+ "UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY)) AS element WHERE element IS NOT null) AS `%s`", bqTypeString, column.Name, shortCol) diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 80c8d9b822..df5aaee040 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -402,6 +402,8 @@ func QValueToAvro( return c.processArrayDate(v.Val), nil case QValueUUID: return c.processUUID(v.Val), nil + case QValueArrayUUID: + return c.processArrayUUID(v.Val), nil case QValueGeography, QValueGeometry, QValuePoint: return c.processGeospatial(v.Value().(string)), nil default: @@ -614,6 +616,19 @@ func (c *QValueAvroConverter) processUUID(byteData uuid.UUID) interface{} { return uuidString } +func (c *QValueAvroConverter) processArrayUUID(arrayData []uuid.UUID) interface{} { + UUIDData := make([]string, 0, len(arrayData)) + for _, uuid := range arrayData { + UUIDData = append(UUIDData, uuid.String()) + } + + if c.Nullable { + return goavro.Union("array", UUIDData) + } + + return UUIDData +} + func (c *QValueAvroConverter) processGeospatial(geoString string) interface{} { if c.Nullable { return goavro.Union("string", geoString)