From e0d7e7a392b67ddaf1bb5ff122a36040854798c6 Mon Sep 17 00:00:00 2001 From: Luke Judd <156633949+lukejudd-lux@users.noreply.github.com> Date: Tue, 17 Dec 2024 01:58:34 +1100 Subject: [PATCH] Handle syncing UUID arrays into BQ (#2358) Building on top of #2327 there are some missing pieces required to sync this data into BQ Have tested these changes locally ![image](https://github.com/user-attachments/assets/81d97117-91f7-40c9-8cfc-5c7b5c65a35f) Using this setup ```sh #!/usr/bin/env bash set -xeuo pipefail # This script creates databases on the PeerDB internal cluster to be used as peers later. CONNECTION_STRING="${1:-postgres://postgres:postgres@localhost:9901/postgres}" if ! type psql >/dev/null 2>&1; then echo "psql not found on PATH, exiting" exit 1 fi psql "$CONNECTION_STRING" << EOF --- Create the databases DROP DATABASE IF EXISTS source; CREATE DATABASE source; DROP DATABASE IF EXISTS target; CREATE DATABASE target; --- Switch to source database \c source CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; --- Create the source table DROP TABLE IF EXISTS source CASCADE; CREATE TABLE source ( id uuid DEFAULT uuid_generate_v4(), related_ids uuid[], PRIMARY KEY (id) ); CREATE PUBLICATION source_publication FOR TABLE source; -- insert mock rows into source with valid uuid values -- INSERT INTO source (related_ids) VALUES (ARRAY[uuid_generate_v4(), uuid_generate_v4()]); -- Switch to target database \c target EOF ``` --- flow/connectors/bigquery/merge_stmt_generator.go | 2 +- flow/model/qvalue/avro_converter.go | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index 5ee4f883c2..3acf864785 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -45,7 +45,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE(dstTable string, normalizedTab case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt16, qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString, qvalue.QValueKindArrayBoolean, qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ, - qvalue.QValueKindArrayDate: + qvalue.QValueKindArrayDate, qvalue.QValueKindArrayUUID: castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+ "UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY)) AS element WHERE element IS NOT null) AS `%s`", bqTypeString, column.Name, shortCol) diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 80c8d9b822..df5aaee040 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -402,6 +402,8 @@ func QValueToAvro( return c.processArrayDate(v.Val), nil case QValueUUID: return c.processUUID(v.Val), nil + case QValueArrayUUID: + return c.processArrayUUID(v.Val), nil case QValueGeography, QValueGeometry, QValuePoint: return c.processGeospatial(v.Value().(string)), nil default: @@ -614,6 +616,19 @@ func (c *QValueAvroConverter) processUUID(byteData uuid.UUID) interface{} { return uuidString } +func (c *QValueAvroConverter) processArrayUUID(arrayData []uuid.UUID) interface{} { + UUIDData := make([]string, 0, len(arrayData)) + for _, uuid := range arrayData { + UUIDData = append(UUIDData, uuid.String()) + } + + if c.Nullable { + return goavro.Union("array", UUIDData) + } + + return UUIDData +} + func (c *QValueAvroConverter) processGeospatial(geoString string) interface{} { if c.Nullable { return goavro.Union("string", geoString)