diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 2c9796fa1a..d65d61e9d7 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -3,8 +3,9 @@ package connclickhouse import ( "cmp" "context" + "database/sql" + "errors" "fmt" - "log/slog" "slices" "strconv" "strings" @@ -265,78 +266,22 @@ func (c *ClickHouseConnector) NormalizeRecords( return nil, fmt.Errorf("failed to copy avro stages to destination: %w", err) } - rawTbl := c.getRawTableName(req.FlowJobName) - distinctTableNamesBatchMapping, err := c.getDistinctTableNamesInBatchRange( - ctx, req.FlowJobName, req.SyncBatchID, normBatchID) - if err != nil { - return nil, fmt.Errorf("failed to get distinct table names in batch range: %w", err) - } - - for batchID := normBatchID + 1; batchID <= req.SyncBatchID; batchID++ { - if err := c.syncTablesInThisBatch(ctx, req, rawTbl, batchID, distinctTableNamesBatchMapping[batchID]); err != nil { - c.logger.Error("[clickhouse] error while syncing tables in this batch", slog.Any("error", err), - slog.Int64("batchID", batchID)) - return nil, err - } - - if err := c.UpdateNormalizeBatchID(ctx, req.FlowJobName, batchID); err != nil { - c.logger.Error("[clickhouse] error while updating normalize batch id", - slog.Any("error", err), - slog.Int64("batchID", batchID)) - return nil, err - } - } - - return &model.NormalizeResponse{ - Done: true, - StartBatchID: normBatchID + 1, - EndBatchID: req.SyncBatchID, - }, nil -} - -func (c *ClickHouseConnector) getDistinctTableNamesInBatchRange( - ctx context.Context, - flowJobName string, - syncBatchID int64, - normalizeBatchID int64, -) (map[int64][]string, error) { - rawTbl := c.getRawTableName(flowJobName) - - q := fmt.Sprintf( - `SELECT DISTINCT _peerdb_batch_id,groupArray(DISTINCT _peerdb_destination_table_name) - FROM %s WHERE _peerdb_batch_id>%d AND _peerdb_batch_id<=%d GROUP BY _peerdb_batch_id`, - rawTbl, normalizeBatchID, syncBatchID) - - rows, err := c.query(ctx, q) + destinationTableNames, err := c.getDistinctTableNamesInBatch( + ctx, + req.FlowJobName, + req.SyncBatchID, + normBatchID, + ) if err != nil { - return nil, fmt.Errorf("error while querying raw table for distinct table names in batch: %w", err) - } - defer rows.Close() - distinctTableNamesBatchMapping := make(map[int64][]string) - for rows.Next() { - var batchID int32 - var tableNames []string - if err := rows.Scan(&batchID, &tableNames); err != nil { - return nil, fmt.Errorf("error while scanning rows: %w", err) - } - distinctTableNamesBatchMapping[int64(batchID)] = tableNames - } - if rows.Err() != nil { - return nil, fmt.Errorf("failed to read rows: %w", err) + c.logger.Error("[clickhouse] error while getting distinct table names in batch", "error", err) + return nil, err } - return distinctTableNamesBatchMapping, nil -} + rawTbl := c.getRawTableName(req.FlowJobName) -func (c *ClickHouseConnector) syncTablesInThisBatch( - ctx context.Context, - req *model.NormalizeRecordsRequest, - rawTableName string, - batchID int64, - destinationTableNames []string, -) error { // model the raw table data as inserts. for _, tbl := range destinationTableNames { + // SELECT projection FROM raw_table WHERE _peerdb_batch_id > normalize_batch_id AND _peerdb_batch_id <= sync_batch_id selectQuery := strings.Builder{} selectQuery.WriteString("SELECT ") @@ -355,7 +300,7 @@ func (c *ClickHouseConnector) syncTablesInThisBatch( enablePrimaryUpdate, err := peerdbenv.PeerDBEnableClickHousePrimaryUpdate(ctx, req.Env) if err != nil { - return err + return nil, err } projection := strings.Builder{} @@ -392,7 +337,7 @@ func (c *ClickHouseConnector) syncTablesInThisBatch( var err error clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE) if err != nil { - return fmt.Errorf("error while converting column type to clickhouse type: %w", err) + return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err) } } if (schema.NullableEnabled || columnNullableEnabled) && column.Nullable && !colType.IsArray() { @@ -451,9 +396,11 @@ func (c *ClickHouseConnector) syncTablesInThisBatch( selectQuery.WriteString(projection.String()) selectQuery.WriteString(" FROM ") - selectQuery.WriteString(rawTableName) - selectQuery.WriteString(" WHERE _peerdb_batch_id = ") - selectQuery.WriteString(strconv.FormatInt(batchID, 10)) + selectQuery.WriteString(rawTbl) + selectQuery.WriteString(" WHERE _peerdb_batch_id > ") + selectQuery.WriteString(strconv.FormatInt(normBatchID, 10)) + selectQuery.WriteString(" AND _peerdb_batch_id <= ") + selectQuery.WriteString(strconv.FormatInt(req.SyncBatchID, 10)) selectQuery.WriteString(" AND _peerdb_destination_table_name = '") selectQuery.WriteString(tbl) selectQuery.WriteString("'") @@ -468,9 +415,11 @@ func (c *ClickHouseConnector) syncTablesInThisBatch( selectQuery.WriteString("UNION ALL SELECT ") selectQuery.WriteString(projectionUpdate.String()) selectQuery.WriteString(" FROM ") - selectQuery.WriteString(rawTableName) - selectQuery.WriteString(" WHERE _peerdb_batch_id = ") - selectQuery.WriteString(strconv.FormatInt(batchID, 10)) + selectQuery.WriteString(rawTbl) + selectQuery.WriteString(" WHERE _peerdb_batch_id > ") + selectQuery.WriteString(strconv.FormatInt(normBatchID, 10)) + selectQuery.WriteString(" AND _peerdb_batch_id <= ") + selectQuery.WriteString(strconv.FormatInt(req.SyncBatchID, 10)) selectQuery.WriteString(" AND _peerdb_destination_table_name = '") selectQuery.WriteString(tbl) selectQuery.WriteString("' AND _peerdb_record_type = 1") @@ -486,11 +435,60 @@ func (c *ClickHouseConnector) syncTablesInThisBatch( q := insertIntoSelectQuery.String() if err := c.execWithLogging(ctx, q); err != nil { - return fmt.Errorf("error while inserting into normalized table: %w", err) + return nil, fmt.Errorf("error while inserting into normalized table: %w", err) } } - return nil + err = c.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID) + if err != nil { + c.logger.Error("[clickhouse] error while updating normalize batch id", "error", err) + return nil, err + } + + return &model.NormalizeResponse{ + Done: true, + StartBatchID: normBatchID + 1, + EndBatchID: req.SyncBatchID, + }, nil +} + +func (c *ClickHouseConnector) getDistinctTableNamesInBatch( + ctx context.Context, + flowJobName string, + syncBatchID int64, + normalizeBatchID int64, +) ([]string, error) { + rawTbl := c.getRawTableName(flowJobName) + + q := fmt.Sprintf( + `SELECT DISTINCT _peerdb_destination_table_name FROM %s WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d`, + rawTbl, normalizeBatchID, syncBatchID) + + rows, err := c.query(ctx, q) + if err != nil { + return nil, fmt.Errorf("error while querying raw table for distinct table names in batch: %w", err) + } + defer rows.Close() + var tableNames []string + for rows.Next() { + var tableName sql.NullString + err = rows.Scan(&tableName) + if err != nil { + return nil, fmt.Errorf("error while scanning table name: %w", err) + } + + if !tableName.Valid { + return nil, errors.New("table name is not valid") + } + + tableNames = append(tableNames, tableName.String) + } + + if rows.Err() != nil { + return nil, fmt.Errorf("failed to read rows: %w", err) + } + + return tableNames, nil } func (c *ClickHouseConnector) copyAvroStageToDestination(ctx context.Context, flowJobName string, syncBatchID int64) error { diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 566bf4c4c7..515b622ee1 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -176,9 +176,7 @@ func (p *PostgresMetadata) UpdateNormalizeBatchID(ctx context.Context, jobName s `UPDATE `+lastSyncStateTableName+ ` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID) if err != nil { - p.logger.Error("failed to update normalize batch id", - slog.Any("error", err), - slog.Int64("batchID", batchID)) + p.logger.Error("failed to update normalize batch id", slog.Any("error", err)) return err }