diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 8bb550449..46bec64fb 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -381,8 +381,7 @@ func (c *ClickHouseConnector) checkTablesEmptyAndEngine(ctx context.Context, tab for rows.Next() { var tableName, engine string var totalRows uint64 - err = rows.Scan(&tableName, &engine, &totalRows) - if err != nil { + if err := rows.Scan(&tableName, &engine, &totalRows); err != nil { return fmt.Errorf("failed to scan information for tables: %w", err) } if totalRows != 0 && optedForInitialLoad { @@ -393,8 +392,8 @@ func (c *ClickHouseConnector) checkTablesEmptyAndEngine(ctx context.Context, tab slog.String("table", tableName), slog.String("engine", engine)) } } - if rows.Err() != nil { - return fmt.Errorf("failed to read rows: %w", rows.Err()) + if err := rows.Err(); err != nil { + return fmt.Errorf("failed to read rows: %w", err) } return nil } @@ -418,14 +417,13 @@ func (c *ClickHouseConnector) getTableColumnsMapping(ctx context.Context, for rows.Next() { var tableName string var fieldDescription protos.FieldDescription - err = rows.Scan(&fieldDescription.Name, &fieldDescription.Type, &tableName) - if err != nil { + if err := rows.Scan(&fieldDescription.Name, &fieldDescription.Type, &tableName); err != nil { return nil, fmt.Errorf("failed to scan columns for tables: %w", err) } tableColumnsMapping[tableName] = append(tableColumnsMapping[tableName], &fieldDescription) } - if rows.Err() != nil { - return nil, fmt.Errorf("failed to read rows: %w", rows.Err()) + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("failed to read rows: %w", err) } return tableColumnsMapping, nil } diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index d65d61e9d..4474c6118 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -6,6 +6,7 @@ import ( "database/sql" "errors" "fmt" + "log/slog" "slices" "strconv" "strings" @@ -441,7 +442,7 @@ func (c *ClickHouseConnector) NormalizeRecords( err = c.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID) if err != nil { - c.logger.Error("[clickhouse] error while updating normalize batch id", "error", err) + c.logger.Error("[clickhouse] error while updating normalize batch id", slog.Int64("BatchID", req.SyncBatchID), slog.Any("error", err)) return nil, err } @@ -472,8 +473,7 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch( var tableNames []string for rows.Next() { var tableName sql.NullString - err = rows.Scan(&tableName) - if err != nil { + if err := rows.Scan(&tableName); err != nil { return nil, fmt.Errorf("error while scanning table name: %w", err) } @@ -484,7 +484,7 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch( tableNames = append(tableNames, tableName.String) } - if rows.Err() != nil { + if err := rows.Err(); err != nil { return nil, fmt.Errorf("failed to read rows: %w", err) } diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 515b622ee..f253bf228 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -172,11 +172,10 @@ func (p *PostgresMetadata) FinishBatch(ctx context.Context, jobName string, sync func (p *PostgresMetadata) UpdateNormalizeBatchID(ctx context.Context, jobName string, batchID int64) error { p.logger.Info("updating normalize batch id for job", slog.Int64("batchID", batchID)) - _, err := p.pool.Exec(ctx, - `UPDATE `+lastSyncStateTableName+ - ` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID) - if err != nil { - p.logger.Error("failed to update normalize batch id", slog.Any("error", err)) + if _, err := p.pool.Exec(ctx, + `UPDATE `+lastSyncStateTableName+` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID, + ); err != nil { + p.logger.Error("failed to update normalize batch id", slog.Int64("batchID", batchID), slog.Any("error", err)) return err } diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 2fa6ecd7f..bdfa7038b 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -115,8 +115,8 @@ func (qe *QRepQueryExecutor) ProcessRows( } // Check for any errors encountered during iteration - if rows.Err() != nil { - return nil, fmt.Errorf("row iteration failed: %w", rows.Err()) + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("row iteration failed: %w", err) } batch := &model.QRecordBatch{ diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index 96a1fa911..556627699 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -89,14 +89,12 @@ func (c *SnowflakeConnector) SetupQRepMetadataTables(ctx context.Context, config } stageName := c.getStageNameForJob(config.FlowJobName) - err = c.createStage(ctx, stageName, config) - if err != nil { + if err := c.createStage(ctx, stageName, config); err != nil { return err } if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE { - _, err = c.execWithLogging(ctx, "TRUNCATE TABLE "+config.DestinationTableIdentifier) - if err != nil { + if _, err := c.execWithLogging(ctx, "TRUNCATE TABLE "+config.DestinationTableIdentifier); err != nil { return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err) } } @@ -224,8 +222,7 @@ func (c *SnowflakeConnector) getColsFromTable(ctx context.Context, tableName str }) } - err = rows.Err() - if err != nil { + if err := rows.Err(); err != nil { return nil, fmt.Errorf("failed to read rows: %w", err) } @@ -280,11 +277,10 @@ func (c *SnowflakeConnector) dropStage(ctx context.Context, stagingPath string, } for _, object := range page.Contents { - _, err = s3svc.DeleteObject(ctx, &s3.DeleteObjectInput{ + if _, err := s3svc.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: aws.String(s3o.Bucket), Key: object.Key, - }) - if err != nil { + }); err != nil { c.logger.Error("failed to delete objects from bucket", slog.Any("error", err)) return fmt.Errorf("failed to delete objects from bucket: %w", err) } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 124a5c65a..7a400d78a 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -259,15 +259,13 @@ func (c *SnowflakeConnector) getDistinctTableNamesInBatch( var result pgtype.Text destinationTableNames := make([]string, 0) for rows.Next() { - err = rows.Scan(&result) - if err != nil { + if err := rows.Scan(&result); err != nil { return nil, fmt.Errorf("failed to read row: %w", err) } destinationTableNames = append(destinationTableNames, result.String) } - err = rows.Err() - if err != nil { + if err := rows.Err(); err != nil { return nil, fmt.Errorf("failed to read rows: %w", err) } return destinationTableNames, nil