Skip to content

Commit

Permalink
Revert "ClickHouse: Normalize one batch at a time" (#2249)
Browse files Browse the repository at this point in the history
Reverts #2219

seems to degrade perf
  • Loading branch information
heavycrystal authored Nov 13, 2024
1 parent cea742d commit 6c618df
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 80 deletions.
152 changes: 75 additions & 77 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package connclickhouse
import (
"cmp"
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
"slices"
"strconv"
"strings"
Expand Down Expand Up @@ -265,78 +266,22 @@ func (c *ClickHouseConnector) NormalizeRecords(
return nil, fmt.Errorf("failed to copy avro stages to destination: %w", err)
}

rawTbl := c.getRawTableName(req.FlowJobName)
distinctTableNamesBatchMapping, err := c.getDistinctTableNamesInBatchRange(
ctx, req.FlowJobName, req.SyncBatchID, normBatchID)
if err != nil {
return nil, fmt.Errorf("failed to get distinct table names in batch range: %w", err)
}

for batchID := normBatchID + 1; batchID <= req.SyncBatchID; batchID++ {
if err := c.syncTablesInThisBatch(ctx, req, rawTbl, batchID, distinctTableNamesBatchMapping[batchID]); err != nil {
c.logger.Error("[clickhouse] error while syncing tables in this batch", slog.Any("error", err),
slog.Int64("batchID", batchID))
return nil, err
}

if err := c.UpdateNormalizeBatchID(ctx, req.FlowJobName, batchID); err != nil {
c.logger.Error("[clickhouse] error while updating normalize batch id",
slog.Any("error", err),
slog.Int64("batchID", batchID))
return nil, err
}
}

return &model.NormalizeResponse{
Done: true,
StartBatchID: normBatchID + 1,
EndBatchID: req.SyncBatchID,
}, nil
}

func (c *ClickHouseConnector) getDistinctTableNamesInBatchRange(
ctx context.Context,
flowJobName string,
syncBatchID int64,
normalizeBatchID int64,
) (map[int64][]string, error) {
rawTbl := c.getRawTableName(flowJobName)

q := fmt.Sprintf(
`SELECT DISTINCT _peerdb_batch_id,groupArray(DISTINCT _peerdb_destination_table_name)
FROM %s WHERE _peerdb_batch_id>%d AND _peerdb_batch_id<=%d GROUP BY _peerdb_batch_id`,
rawTbl, normalizeBatchID, syncBatchID)

rows, err := c.query(ctx, q)
destinationTableNames, err := c.getDistinctTableNamesInBatch(
ctx,
req.FlowJobName,
req.SyncBatchID,
normBatchID,
)
if err != nil {
return nil, fmt.Errorf("error while querying raw table for distinct table names in batch: %w", err)
}
defer rows.Close()
distinctTableNamesBatchMapping := make(map[int64][]string)
for rows.Next() {
var batchID int32
var tableNames []string
if err := rows.Scan(&batchID, &tableNames); err != nil {
return nil, fmt.Errorf("error while scanning rows: %w", err)
}
distinctTableNamesBatchMapping[int64(batchID)] = tableNames
}
if rows.Err() != nil {
return nil, fmt.Errorf("failed to read rows: %w", err)
c.logger.Error("[clickhouse] error while getting distinct table names in batch", "error", err)
return nil, err
}

return distinctTableNamesBatchMapping, nil
}
rawTbl := c.getRawTableName(req.FlowJobName)

func (c *ClickHouseConnector) syncTablesInThisBatch(
ctx context.Context,
req *model.NormalizeRecordsRequest,
rawTableName string,
batchID int64,
destinationTableNames []string,
) error {
// model the raw table data as inserts.
for _, tbl := range destinationTableNames {
// SELECT projection FROM raw_table WHERE _peerdb_batch_id > normalize_batch_id AND _peerdb_batch_id <= sync_batch_id
selectQuery := strings.Builder{}
selectQuery.WriteString("SELECT ")

Expand All @@ -355,7 +300,7 @@ func (c *ClickHouseConnector) syncTablesInThisBatch(

enablePrimaryUpdate, err := peerdbenv.PeerDBEnableClickHousePrimaryUpdate(ctx, req.Env)
if err != nil {
return err
return nil, err
}

projection := strings.Builder{}
Expand Down Expand Up @@ -392,7 +337,7 @@ func (c *ClickHouseConnector) syncTablesInThisBatch(
var err error
clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE)
if err != nil {
return fmt.Errorf("error while converting column type to clickhouse type: %w", err)
return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}
}
if (schema.NullableEnabled || columnNullableEnabled) && column.Nullable && !colType.IsArray() {
Expand Down Expand Up @@ -451,9 +396,11 @@ func (c *ClickHouseConnector) syncTablesInThisBatch(

selectQuery.WriteString(projection.String())
selectQuery.WriteString(" FROM ")
selectQuery.WriteString(rawTableName)
selectQuery.WriteString(" WHERE _peerdb_batch_id = ")
selectQuery.WriteString(strconv.FormatInt(batchID, 10))
selectQuery.WriteString(rawTbl)
selectQuery.WriteString(" WHERE _peerdb_batch_id > ")
selectQuery.WriteString(strconv.FormatInt(normBatchID, 10))
selectQuery.WriteString(" AND _peerdb_batch_id <= ")
selectQuery.WriteString(strconv.FormatInt(req.SyncBatchID, 10))
selectQuery.WriteString(" AND _peerdb_destination_table_name = '")
selectQuery.WriteString(tbl)
selectQuery.WriteString("'")
Expand All @@ -468,9 +415,11 @@ func (c *ClickHouseConnector) syncTablesInThisBatch(
selectQuery.WriteString("UNION ALL SELECT ")
selectQuery.WriteString(projectionUpdate.String())
selectQuery.WriteString(" FROM ")
selectQuery.WriteString(rawTableName)
selectQuery.WriteString(" WHERE _peerdb_batch_id = ")
selectQuery.WriteString(strconv.FormatInt(batchID, 10))
selectQuery.WriteString(rawTbl)
selectQuery.WriteString(" WHERE _peerdb_batch_id > ")
selectQuery.WriteString(strconv.FormatInt(normBatchID, 10))
selectQuery.WriteString(" AND _peerdb_batch_id <= ")
selectQuery.WriteString(strconv.FormatInt(req.SyncBatchID, 10))
selectQuery.WriteString(" AND _peerdb_destination_table_name = '")
selectQuery.WriteString(tbl)
selectQuery.WriteString("' AND _peerdb_record_type = 1")
Expand All @@ -486,11 +435,60 @@ func (c *ClickHouseConnector) syncTablesInThisBatch(
q := insertIntoSelectQuery.String()

if err := c.execWithLogging(ctx, q); err != nil {
return fmt.Errorf("error while inserting into normalized table: %w", err)
return nil, fmt.Errorf("error while inserting into normalized table: %w", err)
}
}

return nil
err = c.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID)
if err != nil {
c.logger.Error("[clickhouse] error while updating normalize batch id", "error", err)
return nil, err
}

return &model.NormalizeResponse{
Done: true,
StartBatchID: normBatchID + 1,
EndBatchID: req.SyncBatchID,
}, nil
}

func (c *ClickHouseConnector) getDistinctTableNamesInBatch(
ctx context.Context,
flowJobName string,
syncBatchID int64,
normalizeBatchID int64,
) ([]string, error) {
rawTbl := c.getRawTableName(flowJobName)

q := fmt.Sprintf(
`SELECT DISTINCT _peerdb_destination_table_name FROM %s WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d`,
rawTbl, normalizeBatchID, syncBatchID)

rows, err := c.query(ctx, q)
if err != nil {
return nil, fmt.Errorf("error while querying raw table for distinct table names in batch: %w", err)
}
defer rows.Close()
var tableNames []string
for rows.Next() {
var tableName sql.NullString
err = rows.Scan(&tableName)
if err != nil {
return nil, fmt.Errorf("error while scanning table name: %w", err)
}

if !tableName.Valid {
return nil, errors.New("table name is not valid")
}

tableNames = append(tableNames, tableName.String)
}

if rows.Err() != nil {
return nil, fmt.Errorf("failed to read rows: %w", err)
}

return tableNames, nil
}

func (c *ClickHouseConnector) copyAvroStageToDestination(ctx context.Context, flowJobName string, syncBatchID int64) error {
Expand Down
4 changes: 1 addition & 3 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,7 @@ func (p *PostgresMetadata) UpdateNormalizeBatchID(ctx context.Context, jobName s
`UPDATE `+lastSyncStateTableName+
` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID)
if err != nil {
p.logger.Error("failed to update normalize batch id",
slog.Any("error", err),
slog.Int64("batchID", batchID))
p.logger.Error("failed to update normalize batch id", slog.Any("error", err))
return err
}

Expand Down

0 comments on commit 6c618df

Please sign in to comment.