Skip to content

Commit

Permalink
cleanup error handling, fixes 2 bugs (#2250)
Browse files Browse the repository at this point in the history
Searched for `if rows.Err` & removed all instances
Has tendency to typo into branch referencing `err` which code expects to
be result of `rows.Err()`
  • Loading branch information
serprex authored Nov 13, 2024
1 parent 6c618df commit c37b057
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 32 deletions.
14 changes: 6 additions & 8 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,7 @@ func (c *ClickHouseConnector) checkTablesEmptyAndEngine(ctx context.Context, tab
for rows.Next() {
var tableName, engine string
var totalRows uint64
err = rows.Scan(&tableName, &engine, &totalRows)
if err != nil {
if err := rows.Scan(&tableName, &engine, &totalRows); err != nil {
return fmt.Errorf("failed to scan information for tables: %w", err)
}
if totalRows != 0 && optedForInitialLoad {
Expand All @@ -393,8 +392,8 @@ func (c *ClickHouseConnector) checkTablesEmptyAndEngine(ctx context.Context, tab
slog.String("table", tableName), slog.String("engine", engine))
}
}
if rows.Err() != nil {
return fmt.Errorf("failed to read rows: %w", rows.Err())
if err := rows.Err(); err != nil {
return fmt.Errorf("failed to read rows: %w", err)
}
return nil
}
Expand All @@ -418,14 +417,13 @@ func (c *ClickHouseConnector) getTableColumnsMapping(ctx context.Context,
for rows.Next() {
var tableName string
var fieldDescription protos.FieldDescription
err = rows.Scan(&fieldDescription.Name, &fieldDescription.Type, &tableName)
if err != nil {
if err := rows.Scan(&fieldDescription.Name, &fieldDescription.Type, &tableName); err != nil {
return nil, fmt.Errorf("failed to scan columns for tables: %w", err)
}
tableColumnsMapping[tableName] = append(tableColumnsMapping[tableName], &fieldDescription)
}
if rows.Err() != nil {
return nil, fmt.Errorf("failed to read rows: %w", rows.Err())
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to read rows: %w", err)
}
return tableColumnsMapping, nil
}
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"database/sql"
"errors"
"fmt"
"log/slog"
"slices"
"strconv"
"strings"
Expand Down Expand Up @@ -441,7 +442,7 @@ func (c *ClickHouseConnector) NormalizeRecords(

err = c.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID)
if err != nil {
c.logger.Error("[clickhouse] error while updating normalize batch id", "error", err)
c.logger.Error("[clickhouse] error while updating normalize batch id", slog.Int64("BatchID", req.SyncBatchID), slog.Any("error", err))
return nil, err
}

Expand Down Expand Up @@ -472,8 +473,7 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch(
var tableNames []string
for rows.Next() {
var tableName sql.NullString
err = rows.Scan(&tableName)
if err != nil {
if err := rows.Scan(&tableName); err != nil {
return nil, fmt.Errorf("error while scanning table name: %w", err)
}

Expand All @@ -484,7 +484,7 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch(
tableNames = append(tableNames, tableName.String)
}

if rows.Err() != nil {
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to read rows: %w", err)
}

Expand Down
9 changes: 4 additions & 5 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,10 @@ func (p *PostgresMetadata) FinishBatch(ctx context.Context, jobName string, sync

func (p *PostgresMetadata) UpdateNormalizeBatchID(ctx context.Context, jobName string, batchID int64) error {
p.logger.Info("updating normalize batch id for job", slog.Int64("batchID", batchID))
_, err := p.pool.Exec(ctx,
`UPDATE `+lastSyncStateTableName+
` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID)
if err != nil {
p.logger.Error("failed to update normalize batch id", slog.Any("error", err))
if _, err := p.pool.Exec(ctx,
`UPDATE `+lastSyncStateTableName+` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID,
); err != nil {
p.logger.Error("failed to update normalize batch id", slog.Int64("batchID", batchID), slog.Any("error", err))
return err
}

Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func (qe *QRepQueryExecutor) ProcessRows(
}

// Check for any errors encountered during iteration
if rows.Err() != nil {
return nil, fmt.Errorf("row iteration failed: %w", rows.Err())
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("row iteration failed: %w", err)
}

batch := &model.QRecordBatch{
Expand Down
14 changes: 5 additions & 9 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,12 @@ func (c *SnowflakeConnector) SetupQRepMetadataTables(ctx context.Context, config
}

stageName := c.getStageNameForJob(config.FlowJobName)
err = c.createStage(ctx, stageName, config)
if err != nil {
if err := c.createStage(ctx, stageName, config); err != nil {
return err
}

if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
_, err = c.execWithLogging(ctx, "TRUNCATE TABLE "+config.DestinationTableIdentifier)
if err != nil {
if _, err := c.execWithLogging(ctx, "TRUNCATE TABLE "+config.DestinationTableIdentifier); err != nil {
return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err)
}
}
Expand Down Expand Up @@ -224,8 +222,7 @@ func (c *SnowflakeConnector) getColsFromTable(ctx context.Context, tableName str
})
}

err = rows.Err()
if err != nil {
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to read rows: %w", err)
}

Expand Down Expand Up @@ -280,11 +277,10 @@ func (c *SnowflakeConnector) dropStage(ctx context.Context, stagingPath string,
}

for _, object := range page.Contents {
_, err = s3svc.DeleteObject(ctx, &s3.DeleteObjectInput{
if _, err := s3svc.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(s3o.Bucket),
Key: object.Key,
})
if err != nil {
}); err != nil {
c.logger.Error("failed to delete objects from bucket", slog.Any("error", err))
return fmt.Errorf("failed to delete objects from bucket: %w", err)
}
Expand Down
6 changes: 2 additions & 4 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,13 @@ func (c *SnowflakeConnector) getDistinctTableNamesInBatch(
var result pgtype.Text
destinationTableNames := make([]string, 0)
for rows.Next() {
err = rows.Scan(&result)
if err != nil {
if err := rows.Scan(&result); err != nil {
return nil, fmt.Errorf("failed to read row: %w", err)
}
destinationTableNames = append(destinationTableNames, result.String)
}

err = rows.Err()
if err != nil {
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to read rows: %w", err)
}
return destinationTableNames, nil
Expand Down

0 comments on commit c37b057

Please sign in to comment.