Skip to content

Commit

Permalink
Merge branch 'main' into ch-validate-shared
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 16, 2024
2 parents 2f2939a + 793f407 commit c9704a1
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 8 deletions.
2 changes: 1 addition & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ x-flow-worker-env: &flow-worker-env
services:
catalog:
container_name: catalog
image: postgres:17-alpine@sha256:ccfa992a46925f976709844d552aecb316eab4fb512b699bc0f990a489fed463
image: postgres:17-alpine@sha256:d37d2c160d34430877c802e5adc22824a2ad453499db9bab1a2ceb2be6c1a46f
command: -c config_file=/etc/postgresql.conf
ports:
- 9901:5432
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ x-flow-worker-env: &flow-worker-env
services:
catalog:
container_name: catalog
image: postgres:17-alpine@sha256:ccfa992a46925f976709844d552aecb316eab4fb512b699bc0f990a489fed463
image: postgres:17-alpine@sha256:d37d2c160d34430877c802e5adc22824a2ad453499db9bab1a2ceb2be6c1a46f
command: -c config_file=/etc/postgresql.conf
restart: unless-stopped
ports:
Expand Down
8 changes: 7 additions & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch(
ctx context.Context,
flowJobName string,
batchId int64,
tableToSchema map[string]*protos.TableSchema,
) ([]string, error) {
rawTableName := c.getRawTableName(flowJobName)

Expand Down Expand Up @@ -283,7 +284,11 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch(
}
if len(row) > 0 {
value := row[0].(string)
distinctTableNames = append(distinctTableNames, value)
if _, ok := tableToSchema[value]; ok {
distinctTableNames = append(distinctTableNames, value)
} else {
c.logger.Warn("table not found in table to schema mapping", "table", value)
}
}
}

Expand Down Expand Up @@ -446,6 +451,7 @@ func (c *BigQueryConnector) mergeTablesInThisBatch(
ctx,
flowName,
batchId,
tableToSchema,
)
if err != nil {
return fmt.Errorf("couldn't get distinct table names to normalize: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE(dstTable string, normalizedTab
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt16,
qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString,
qvalue.QValueKindArrayBoolean, qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ,
qvalue.QValueKindArrayDate:
qvalue.QValueKindArrayDate, qvalue.QValueKindArrayUUID:
castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+
"UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY<STRING>)) AS element WHERE element IS NOT null) AS `%s`",
bqTypeString, column.Name, shortCol)
Expand Down
8 changes: 7 additions & 1 deletion flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ func (c *ClickHouseConnector) NormalizeRecords(
req.FlowJobName,
req.SyncBatchID,
normBatchID,
req.TableNameSchemaMapping,
)
if err != nil {
c.logger.Error("[clickhouse] error while getting distinct table names in batch", "error", err)
Expand Down Expand Up @@ -482,6 +483,7 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch(
flowJobName string,
syncBatchID int64,
normalizeBatchID int64,
tableToSchema map[string]*protos.TableSchema,
) ([]string, error) {
rawTbl := c.getRawTableName(flowJobName)

Expand All @@ -505,7 +507,11 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch(
return nil, errors.New("table name is not valid")
}

tableNames = append(tableNames, tableName.String)
if _, ok := tableToSchema[tableName.String]; ok {
tableNames = append(tableNames, tableName.String)
} else {
c.logger.Warn("table not found in table to schema mapping", "table", tableName.String)
}
}

if err := rows.Err(); err != nil {
Expand Down
10 changes: 9 additions & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"slices"
"strings"

"github.com/jackc/pgerrcode"
Expand Down Expand Up @@ -594,6 +595,7 @@ func (c *PostgresConnector) getDistinctTableNamesInBatch(
flowJobName string,
syncBatchID int64,
normalizeBatchID int64,
tableToSchema map[string]*protos.TableSchema,
) ([]string, error) {
rawTableIdentifier := getRawTableIdentifier(flowJobName)

Expand All @@ -607,7 +609,13 @@ func (c *PostgresConnector) getDistinctTableNamesInBatch(
if err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
return destinationTableNames, nil
return slices.DeleteFunc(destinationTableNames, func(name string) bool {
if _, ok := tableToSchema[name]; !ok {
c.logger.Warn("table not found in table to schema mapping", "table", name)
return true
}
return false
}), nil
}

func (c *PostgresConnector) getTableNametoUnchangedCols(
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ func (c *PostgresConnector) NormalizeRecords(
}

destinationTableNames, err := c.getDistinctTableNamesInBatch(
ctx, req.FlowJobName, req.SyncBatchID, normBatchID)
ctx, req.FlowJobName, req.SyncBatchID, normBatchID, req.TableNameSchemaMapping)
if err != nil {
return nil, err
}
Expand Down
15 changes: 15 additions & 0 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ func QValueToAvro(
return c.processArrayDate(v.Val), nil
case QValueUUID:
return c.processUUID(v.Val), nil
case QValueArrayUUID:
return c.processArrayUUID(v.Val), nil
case QValueGeography, QValueGeometry, QValuePoint:
return c.processGeospatial(v.Value().(string)), nil
default:
Expand Down Expand Up @@ -614,6 +616,19 @@ func (c *QValueAvroConverter) processUUID(byteData uuid.UUID) interface{} {
return uuidString
}

func (c *QValueAvroConverter) processArrayUUID(arrayData []uuid.UUID) interface{} {
UUIDData := make([]string, 0, len(arrayData))
for _, uuid := range arrayData {
UUIDData = append(UUIDData, uuid.String())
}

if c.Nullable {
return goavro.Union("array", UUIDData)
}

return UUIDData
}

func (c *QValueAvroConverter) processGeospatial(geoString string) interface{} {
if c.Nullable {
return goavro.Union("string", geoString)
Expand Down
11 changes: 10 additions & 1 deletion flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func SyncFlowWorkflow(
}

syncFlowCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{
StartToCloseTimeout: 72 * time.Hour,
StartToCloseTimeout: 7 * 24 * time.Hour,
HeartbeatTimeout: time.Minute,
WaitForCancellation: true,
})
Expand Down Expand Up @@ -162,6 +162,15 @@ func SyncFlowWorkflow(
}

restart := syncErr || workflow.GetInfo(ctx).GetContinueAsNewSuggested()

if syncErr {
logger.Info("sync flow error, sleeping for 30 seconds...")
err := workflow.Sleep(ctx, 30*time.Second)
if err != nil {
logger.Error("failed to sleep", slog.Any("error", err))
}
}

if !stop && !syncErr && mustWait {
waitSelector.Select(ctx)
if restart {
Expand Down

0 comments on commit c9704a1

Please sign in to comment.