diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index aa8f5c80da..08cba887d9 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -251,7 +251,7 @@ func (p *PostgresCDCSource) consumeStream( consumedXLogPos = proposedConsumedXLogPos err := p.SetLastOffset(int64(consumedXLogPos)) if err != nil { - return fmt.Errorf("[initial-flush] storing updated LSN failed: %w", err) + return fmt.Errorf("storing updated LSN failed: %w", err) } } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 5cf9f0e76f..281fb8dbb2 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -195,9 +195,9 @@ func (c *PostgresConnector) GetLastOffset(jobName string) (int64, error) { // SetLastOffset updates the last synced offset for a job. func (c *PostgresConnector) SetLastOffset(jobName string, lastOffset int64) error { _, err := c.pool. - Exec(c.ctx, fmt.Sprintf(setLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName, lastOffset) + Exec(c.ctx, fmt.Sprintf(setLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), lastOffset, jobName) if err != nil { - return fmt.Errorf("error getting last offset for job %s: %w", jobName, err) + return fmt.Errorf("error setting last offset for job %s: %w", jobName, err) } return nil