From 12aa30037ec99ea470011e1f2239acce2e46b827 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 13 Dec 2024 12:45:03 -0800 Subject: [PATCH 1/4] add throttling when there is a syncErr (#2356) --- flow/workflows/sync_flow.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 8b00364dd2..7dce9ffec5 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -79,7 +79,7 @@ func SyncFlowWorkflow( } syncFlowCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{ - StartToCloseTimeout: 72 * time.Hour, + StartToCloseTimeout: 7 * 24 * time.Hour, HeartbeatTimeout: time.Minute, WaitForCancellation: true, }) @@ -162,6 +162,15 @@ func SyncFlowWorkflow( } restart := syncErr || workflow.GetInfo(ctx).GetContinueAsNewSuggested() + + if syncErr { + logger.Info("sync flow error, sleeping for 30 seconds...") + err := workflow.Sleep(ctx, 30*time.Second) + if err != nil { + logger.Error("failed to sleep", slog.Any("error", err)) + } + } + if !stop && !syncErr && mustWait { waitSelector.Select(ctx) if restart { From e0d7e7a392b67ddaf1bb5ff122a36040854798c6 Mon Sep 17 00:00:00 2001 From: Luke Judd <156633949+lukejudd-lux@users.noreply.github.com> Date: Tue, 17 Dec 2024 01:58:34 +1100 Subject: [PATCH 2/4] Handle syncing UUID arrays into BQ (#2358) Building on top of #2327 there are some missing pieces required to sync this data into BQ Have tested these changes locally ![image](https://github.com/user-attachments/assets/81d97117-91f7-40c9-8cfc-5c7b5c65a35f) Using this setup ```sh #!/usr/bin/env bash set -xeuo pipefail # This script creates databases on the PeerDB internal cluster to be used as peers later. CONNECTION_STRING="${1:-postgres://postgres:postgres@localhost:9901/postgres}" if ! type psql >/dev/null 2>&1; then echo "psql not found on PATH, exiting" exit 1 fi psql "$CONNECTION_STRING" << EOF --- Create the databases DROP DATABASE IF EXISTS source; CREATE DATABASE source; DROP DATABASE IF EXISTS target; CREATE DATABASE target; --- Switch to source database \c source CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; --- Create the source table DROP TABLE IF EXISTS source CASCADE; CREATE TABLE source ( id uuid DEFAULT uuid_generate_v4(), related_ids uuid[], PRIMARY KEY (id) ); CREATE PUBLICATION source_publication FOR TABLE source; -- insert mock rows into source with valid uuid values -- INSERT INTO source (related_ids) VALUES (ARRAY[uuid_generate_v4(), uuid_generate_v4()]); -- Switch to target database \c target EOF ``` --- flow/connectors/bigquery/merge_stmt_generator.go | 2 +- flow/model/qvalue/avro_converter.go | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index 5ee4f883c2..3acf864785 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -45,7 +45,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE(dstTable string, normalizedTab case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt16, qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString, qvalue.QValueKindArrayBoolean, qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ, - qvalue.QValueKindArrayDate: + qvalue.QValueKindArrayDate, qvalue.QValueKindArrayUUID: castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+ "UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY)) AS element WHERE element IS NOT null) AS `%s`", bqTypeString, column.Name, shortCol) diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 80c8d9b822..df5aaee040 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -402,6 +402,8 @@ func QValueToAvro( return c.processArrayDate(v.Val), nil case QValueUUID: return c.processUUID(v.Val), nil + case QValueArrayUUID: + return c.processArrayUUID(v.Val), nil case QValueGeography, QValueGeometry, QValuePoint: return c.processGeospatial(v.Value().(string)), nil default: @@ -614,6 +616,19 @@ func (c *QValueAvroConverter) processUUID(byteData uuid.UUID) interface{} { return uuidString } +func (c *QValueAvroConverter) processArrayUUID(arrayData []uuid.UUID) interface{} { + UUIDData := make([]string, 0, len(arrayData)) + for _, uuid := range arrayData { + UUIDData = append(UUIDData, uuid.String()) + } + + if c.Nullable { + return goavro.Union("array", UUIDData) + } + + return UUIDData +} + func (c *QValueAvroConverter) processGeospatial(geoString string) interface{} { if c.Nullable { return goavro.Union("string", geoString) From 63b905cd4ce2845a936af233c53f072bd5555a4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 16 Dec 2024 17:03:22 +0000 Subject: [PATCH 3/4] port recent SF fix to BQ/PG/CH (#2357) previously: https://github.com/PeerDB-io/peerdb/pull/2317 came up with pg: https://github.com/PeerDB-io/peerdb/issues/2354 --- flow/connectors/bigquery/bigquery.go | 8 +++++++- flow/connectors/clickhouse/normalize.go | 8 +++++++- flow/connectors/postgres/client.go | 10 +++++++++- flow/connectors/postgres/postgres.go | 2 +- 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index d6504322ca..94155343e2 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -253,6 +253,7 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch( ctx context.Context, flowJobName string, batchId int64, + tableToSchema map[string]*protos.TableSchema, ) ([]string, error) { rawTableName := c.getRawTableName(flowJobName) @@ -283,7 +284,11 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch( } if len(row) > 0 { value := row[0].(string) - distinctTableNames = append(distinctTableNames, value) + if _, ok := tableToSchema[value]; ok { + distinctTableNames = append(distinctTableNames, value) + } else { + c.logger.Warn("table not found in table to schema mapping", "table", value) + } } } @@ -446,6 +451,7 @@ func (c *BigQueryConnector) mergeTablesInThisBatch( ctx, flowName, batchId, + tableToSchema, ) if err != nil { return fmt.Errorf("couldn't get distinct table names to normalize: %w", err) diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index fabe07a35f..f7f6d8f980 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -259,6 +259,7 @@ func (c *ClickHouseConnector) NormalizeRecords( req.FlowJobName, req.SyncBatchID, normBatchID, + req.TableNameSchemaMapping, ) if err != nil { c.logger.Error("[clickhouse] error while getting distinct table names in batch", "error", err) @@ -484,6 +485,7 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch( flowJobName string, syncBatchID int64, normalizeBatchID int64, + tableToSchema map[string]*protos.TableSchema, ) ([]string, error) { rawTbl := c.getRawTableName(flowJobName) @@ -507,7 +509,11 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch( return nil, errors.New("table name is not valid") } - tableNames = append(tableNames, tableName.String) + if _, ok := tableToSchema[tableName.String]; ok { + tableNames = append(tableNames, tableName.String) + } else { + c.logger.Warn("table not found in table to schema mapping", "table", tableName.String) + } } if err := rows.Err(); err != nil { diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 70b0d15d1d..172a45c6b8 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "slices" "strings" "github.com/jackc/pgerrcode" @@ -594,6 +595,7 @@ func (c *PostgresConnector) getDistinctTableNamesInBatch( flowJobName string, syncBatchID int64, normalizeBatchID int64, + tableToSchema map[string]*protos.TableSchema, ) ([]string, error) { rawTableIdentifier := getRawTableIdentifier(flowJobName) @@ -607,7 +609,13 @@ func (c *PostgresConnector) getDistinctTableNamesInBatch( if err != nil { return nil, fmt.Errorf("failed to scan row: %w", err) } - return destinationTableNames, nil + return slices.DeleteFunc(destinationTableNames, func(name string) bool { + if _, ok := tableToSchema[name]; !ok { + c.logger.Warn("table not found in table to schema mapping", "table", name) + return true + } + return false + }), nil } func (c *PostgresConnector) getTableNametoUnchangedCols( diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 435df65ed4..02c61b2ebb 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -643,7 +643,7 @@ func (c *PostgresConnector) NormalizeRecords( } destinationTableNames, err := c.getDistinctTableNamesInBatch( - ctx, req.FlowJobName, req.SyncBatchID, normBatchID) + ctx, req.FlowJobName, req.SyncBatchID, normBatchID, req.TableNameSchemaMapping) if err != nil { return nil, err } From 793f4072dbc6ed96f8c880b67bae6e81c68014a9 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Mon, 16 Dec 2024 17:12:02 +0000 Subject: [PATCH 4/4] chore(deps): update postgres:17-alpine docker digest to d37d2c1 (#2364) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR contains the following updates: | Package | Update | Change | |---|---|---| | postgres | digest | `ccfa992` -> `d37d2c1` | --- > [!WARNING] > Some dependencies could not be looked up. Check the Dependency Dashboard for more information. --- ### Configuration 📅 **Schedule**: Branch creation - "after 5pm on monday" in timezone Etc/UTC, Automerge - At any time (no schedule defined). 🚦 **Automerge**: Enabled. ♻ **Rebasing**: Whenever PR is behind base branch, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] If you want to rebase/retry this PR, check this box --- This PR was generated by [Mend Renovate](https://mend.io/renovate/). View the [repository job log](https://developer.mend.io/github/PeerDB-io/peerdb). Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- docker-compose-dev.yml | 2 +- docker-compose.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 1d88de3672..289b980582 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -39,7 +39,7 @@ x-flow-worker-env: &flow-worker-env services: catalog: container_name: catalog - image: postgres:17-alpine@sha256:ccfa992a46925f976709844d552aecb316eab4fb512b699bc0f990a489fed463 + image: postgres:17-alpine@sha256:d37d2c160d34430877c802e5adc22824a2ad453499db9bab1a2ceb2be6c1a46f command: -c config_file=/etc/postgresql.conf ports: - 9901:5432 diff --git a/docker-compose.yml b/docker-compose.yml index 998de8cba3..5bfb4c3e65 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -32,7 +32,7 @@ x-flow-worker-env: &flow-worker-env services: catalog: container_name: catalog - image: postgres:17-alpine@sha256:ccfa992a46925f976709844d552aecb316eab4fb512b699bc0f990a489fed463 + image: postgres:17-alpine@sha256:d37d2c160d34430877c802e5adc22824a2ad453499db9bab1a2ceb2be6c1a46f command: -c config_file=/etc/postgresql.conf restart: unless-stopped ports: