Skip to content

Commit

Permalink
fix up stream closing
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 27, 2024
1 parent 9674a51 commit 815b60e
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 5 deletions.
5 changes: 2 additions & 3 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,7 @@ func (qe *QRepQueryExecutor) processRowsStream(
record, err := qe.mapRowToQRecord(rows, fieldDescriptions)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err))
err := fmt.Errorf("failed to map row to QRecord: %w", err)
stream.Close(err)
return 0, err
return 0, fmt.Errorf("failed to map row to QRecord: %w", err)
}

stream.Records <- record
Expand Down Expand Up @@ -204,6 +202,7 @@ func (qe *QRepQueryExecutor) processFetchedRows(

numRows, err := qe.processRowsStream(ctx, cursorName, stream, rows, fieldDescriptions)
if err != nil {
stream.Close(err)
qe.logger.Error("[pg_query_executor] failed to process rows", slog.Any("error", err))
return 0, fmt.Errorf("failed to process rows: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/postgres/sink_q.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
defer shared.RollbackTx(tx, qe.logger)

if qe.snapshot != "" {
_, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot))
if err != nil {
if _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)); err != nil {
qe.logger.Error("[pg_query_executor] failed to set snapshot",
slog.Any("error", err), slog.String("query", query))
err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
Expand Down

0 comments on commit 815b60e

Please sign in to comment.