diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index 5ee4f883c..3acf86478 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -45,7 +45,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE(dstTable string, normalizedTab case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt16, qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString, qvalue.QValueKindArrayBoolean, qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ, - qvalue.QValueKindArrayDate: + qvalue.QValueKindArrayDate, qvalue.QValueKindArrayUUID: castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+ "UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY)) AS element WHERE element IS NOT null) AS `%s`", bqTypeString, column.Name, shortCol) diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 80c8d9b82..df5aaee04 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -402,6 +402,8 @@ func QValueToAvro( return c.processArrayDate(v.Val), nil case QValueUUID: return c.processUUID(v.Val), nil + case QValueArrayUUID: + return c.processArrayUUID(v.Val), nil case QValueGeography, QValueGeometry, QValuePoint: return c.processGeospatial(v.Value().(string)), nil default: @@ -614,6 +616,19 @@ func (c *QValueAvroConverter) processUUID(byteData uuid.UUID) interface{} { return uuidString } +func (c *QValueAvroConverter) processArrayUUID(arrayData []uuid.UUID) interface{} { + UUIDData := make([]string, 0, len(arrayData)) + for _, uuid := range arrayData { + UUIDData = append(UUIDData, uuid.String()) + } + + if c.Nullable { + return goavro.Union("array", UUIDData) + } + + return UUIDData +} + func (c *QValueAvroConverter) processGeospatial(geoString string) interface{} { if c.Nullable { return goavro.Union("string", geoString)