Skip to content

Commit

Permalink
port recent SF fix to BQ/PG/CH
Browse files Browse the repository at this point in the history
previously: #2317
came up with pg: #2354
  • Loading branch information
serprex committed Dec 16, 2024
1 parent e0d7e7a commit d16261f
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 4 deletions.
8 changes: 7 additions & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch(
ctx context.Context,
flowJobName string,
batchId int64,
tableToSchema map[string]*protos.TableSchema,
) ([]string, error) {
rawTableName := c.getRawTableName(flowJobName)

Expand Down Expand Up @@ -283,7 +284,11 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch(
}
if len(row) > 0 {
value := row[0].(string)
distinctTableNames = append(distinctTableNames, value)
if _, ok := tableToSchema[value]; ok {
distinctTableNames = append(distinctTableNames, value)
} else {
c.logger.Warn("table not found in table to schema mapping", "table", value)
}
}
}

Expand Down Expand Up @@ -446,6 +451,7 @@ func (c *BigQueryConnector) mergeTablesInThisBatch(
ctx,
flowName,
batchId,
tableToSchema,
)
if err != nil {
return fmt.Errorf("couldn't get distinct table names to normalize: %w", err)
Expand Down
8 changes: 7 additions & 1 deletion flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func (c *ClickHouseConnector) NormalizeRecords(
req.FlowJobName,
req.SyncBatchID,
normBatchID,
req.TableNameSchemaMapping,
)
if err != nil {
c.logger.Error("[clickhouse] error while getting distinct table names in batch", "error", err)
Expand Down Expand Up @@ -484,6 +485,7 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch(
flowJobName string,
syncBatchID int64,
normalizeBatchID int64,
tableToSchema map[string]*protos.TableSchema,
) ([]string, error) {
rawTbl := c.getRawTableName(flowJobName)

Expand All @@ -507,7 +509,11 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch(
return nil, errors.New("table name is not valid")
}

tableNames = append(tableNames, tableName.String)
if _, ok := tableToSchema[tableName.String]; ok {
tableNames = append(tableNames, tableName.String)
} else {
c.logger.Warn("table not found in table to schema mapping", "table", tableName.String)
}
}

if err := rows.Err(); err != nil {
Expand Down
10 changes: 9 additions & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"slices"
"strings"

"github.com/jackc/pgerrcode"
Expand Down Expand Up @@ -594,6 +595,7 @@ func (c *PostgresConnector) getDistinctTableNamesInBatch(
flowJobName string,
syncBatchID int64,
normalizeBatchID int64,
tableToSchema map[string]*protos.TableSchema,
) ([]string, error) {
rawTableIdentifier := getRawTableIdentifier(flowJobName)

Expand All @@ -607,7 +609,13 @@ func (c *PostgresConnector) getDistinctTableNamesInBatch(
if err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
return destinationTableNames, nil
return slices.DeleteFunc(destinationTableNames, func(name string) bool {
if _, ok := tableToSchema[name]; !ok {
c.logger.Warn("table not found in table to schema mapping", "table", name)
return true
}
return false
}), nil
}

func (c *PostgresConnector) getTableNametoUnchangedCols(
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ func (c *PostgresConnector) NormalizeRecords(
}

destinationTableNames, err := c.getDistinctTableNamesInBatch(
ctx, req.FlowJobName, req.SyncBatchID, normBatchID)
ctx, req.FlowJobName, req.SyncBatchID, normBatchID, req.TableNameSchemaMapping)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit d16261f

Please sign in to comment.