Skip to content

Commit

Permalink
Merge branch 'main' into always-init-childrelid-map
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 20, 2024
2 parents 5c87b48 + 61cd7db commit 3e86866
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 11 deletions.
1 change: 1 addition & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,4 @@ jobs:
PEERDB_CATALOG_DATABASE: postgres
PEERDB_QUEUE_FORCE_TOPIC_CREATION: "true"
ELASTICSEARCH_TEST_ADDRESS: http://localhost:9200
CI_PG_VERSION: ${{ matrix.postgres-version }}
11 changes: 9 additions & 2 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,9 +657,16 @@ func (c *PostgresConnector) getCurrentLSN(ctx context.Context) (pglogrepl.LSN, e
var result pgtype.Text
err := row.Scan(&result)
if err != nil {
return 0, fmt.Errorf("error while running query: %w", err)
return 0, fmt.Errorf("error while running query for current LSN: %w", err)
}
return pglogrepl.ParseLSN(result.String)
if !result.Valid || result.String == "" {
return 0, errors.New("error while getting current LSN: no LSN available")
}
lsn, err := pglogrepl.ParseLSN(result.String)
if err != nil {
return 0, fmt.Errorf("error while parsing LSN %s: %w", result.String, err)
}
return lsn, nil
}

func (c *PostgresConnector) getDefaultPublicationName(jobName string) string {
Expand Down
14 changes: 7 additions & 7 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,18 +584,18 @@ func customTypeToQKind(typeName string) qvalue.QValueKind {
// in tstzrange.
// convertTimeRangeBound removes the +0000 UTC part
func convertTimeRangeBound(timeBound interface{}) (string, error) {
if timeBound, isInfinite := timeBound.(pgtype.InfinityModifier); isInfinite {
return timeBound.String(), nil
}

layout := "2006-01-02 15:04:05 -0700 MST"
postgresFormat := "2006-01-02 15:04:05"
var convertedTime string
if timeBound != nil {
lowerParsed, err := time.Parse(layout, fmt.Sprint(timeBound))
if err != nil {
return "", fmt.Errorf("unexpected lower bound value in tstzrange. Error: %v", err)
return "", fmt.Errorf("unexpected bound value in tstzrange. Error: %v", err)
}
convertedTime = lowerParsed.Format(postgresFormat)
} else {
convertedTime = ""
return lowerParsed.Format(postgresFormat), nil
}

return convertedTime, nil
return "", nil
}
3 changes: 1 addition & 2 deletions flow/e2e/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ {
t.Fatalf("Failed to create helper: %v", err)
}

err = bqHelper.RecreateDataset()
if err != nil {
if err := bqHelper.RecreateDataset(); err != nil {
t.Fatalf("Failed to recreate dataset: %v", err)
}

Expand Down
5 changes: 5 additions & 0 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"os"
"testing"
"time"

Expand All @@ -20,6 +21,10 @@ import (
)

func TestPeerFlowE2ETestSuiteBQ(t *testing.T) {
if val, ok := os.LookupEnv("CI_PG_VERSION"); ok && val != "16" {
t.Skip("Only running in PG16 to reduce flakiness from high concurrency")
}

e2eshared.RunSuite(t, SetupSuite)
}

Expand Down
5 changes: 5 additions & 0 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package e2e_snowflake
import (
"context"
"fmt"
"os"
"testing"
"time"

Expand All @@ -17,6 +18,10 @@ import (
)

func TestPeerFlowE2ETestSuiteSF(t *testing.T) {
if val, ok := os.LookupEnv("CI_PG_VERSION"); ok && val != "17" {
t.Skip("Only running in PG17 to reduce flakiness from high concurrency")
}

e2eshared.RunSuite(t, SetupSuite)
}

Expand Down

0 comments on commit 3e86866

Please sign in to comment.