From 815b60e114156d02b92d8cf3373f8036a9bbb527 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 27 Dec 2024 17:02:12 +0000 Subject: [PATCH] fix up stream closing --- flow/connectors/postgres/qrep_query_executor.go | 5 ++--- flow/connectors/postgres/sink_q.go | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index c548c9d76..7c2cf728f 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -161,9 +161,7 @@ func (qe *QRepQueryExecutor) processRowsStream( record, err := qe.mapRowToQRecord(rows, fieldDescriptions) if err != nil { qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err)) - err := fmt.Errorf("failed to map row to QRecord: %w", err) - stream.Close(err) - return 0, err + return 0, fmt.Errorf("failed to map row to QRecord: %w", err) } stream.Records <- record @@ -204,6 +202,7 @@ func (qe *QRepQueryExecutor) processFetchedRows( numRows, err := qe.processRowsStream(ctx, cursorName, stream, rows, fieldDescriptions) if err != nil { + stream.Close(err) qe.logger.Error("[pg_query_executor] failed to process rows", slog.Any("error", err)) return 0, fmt.Errorf("failed to process rows: %w", err) } diff --git a/flow/connectors/postgres/sink_q.go b/flow/connectors/postgres/sink_q.go index 5c5c97e04..c4204b63c 100644 --- a/flow/connectors/postgres/sink_q.go +++ b/flow/connectors/postgres/sink_q.go @@ -26,8 +26,7 @@ func (stream RecordStreamSink) ExecuteQueryWithTx( defer shared.RollbackTx(tx, qe.logger) if qe.snapshot != "" { - _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)) - if err != nil { + if _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)); err != nil { qe.logger.Error("[pg_query_executor] failed to set snapshot", slog.Any("error", err), slog.String("query", query)) err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)