Skip to content

Commit

Permalink
Merge branch 'main' into setup-partitioning-and-clustering-bq
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 27, 2023
2 parents 6f243cc + a8f8061 commit 759e86f
Show file tree
Hide file tree
Showing 25 changed files with 107 additions and 93 deletions.
8 changes: 6 additions & 2 deletions flow/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@ run:
linters:
enable:
- dogsled
- durationcheck
- errcheck
- gofumpt
- gosec
- gosimple
- misspell
- nakedret
- nolintlint
- staticcheck
- stylecheck
- sqlclosecheck
- unconvert
- unparam
- whitespace
- errcheck
- prealloc
- staticcheck
- thelper
- ineffassign
- unparam
- unused
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,6 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl
p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg)
} else {
// RelationMessages don't contain an LSN, so we use current clientXlogPos instead.
//nolint:lll
// https://github.com/postgres/postgres/blob/8b965c549dc8753be8a38c4a1b9fabdb535a4338/src/backend/replication/logical/proto.c#L670
return p.processRelationMessage(currentClientXlogPos, convertRelationMessageToProto(msg))
}
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"golang.org/x/exp/maps"
)

//nolint:stylecheck
const (
mirrorJobsTableIdentifier = "peerdb_mirror_jobs"
createMirrorJobsTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(mirror_job_name TEXT PRIMARY KEY,
Expand Down
10 changes: 6 additions & 4 deletions flow/connectors/postgres/qrep_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ func TestGetQRepPartitions(t *testing.T) {
}

// returns the number of rows inserted
func prepareTestData(test *testing.T, pool *pgxpool.Pool, schema string) int {
func prepareTestData(t *testing.T, pool *pgxpool.Pool, schema string) int {
t.Helper()

// Define the start and end times
startTime := time.Date(2010, time.January, 1, 10, 0, 0, 0, time.UTC)
endTime := time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC)
Expand All @@ -223,12 +225,12 @@ func prepareTestData(test *testing.T, pool *pgxpool.Pool, schema string) int {
}

// Insert the test data
for i, t := range times {
for i, time := range times {
_, err := pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s.test (value, "from") VALUES ($1, $2)
`, schema), i+1, t)
`, schema), i+1, time)
if err != nil {
test.Fatalf("Failed to insert test data: %v", err)
t.Fatalf("Failed to insert test data: %v", err)
}
}

Expand Down
4 changes: 4 additions & 0 deletions flow/connectors/postgres/qrep_query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
)

func setupDB(t *testing.T) (*pgxpool.Pool, string) {
t.Helper()

config, err := pgxpool.ParseConfig("postgres://postgres:postgres@localhost:7132/postgres")
if err != nil {
t.Fatalf("unable to parse config: %v", err)
Expand All @@ -36,6 +38,8 @@ func setupDB(t *testing.T) (*pgxpool.Pool, string) {
}

func teardownDB(t *testing.T, pool *pgxpool.Pool, schemaName string) {
t.Helper()

_, err := pool.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA %s CASCADE;", schemaName))
if err != nil {
t.Fatalf("error while dropping schema: %v", err)
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/lib/pq/oid"

//nolint:all
geom "github.com/twpayne/go-geos"
)

Expand Down
6 changes: 5 additions & 1 deletion flow/connectors/snowflake/avro_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (

// createQValue creates a QValue of the appropriate kind for a given placeholder.
func createQValue(t *testing.T, kind qvalue.QValueKind, placeHolder int) qvalue.QValue {
t.Helper()

var value interface{}
switch kind {
case qvalue.QValueKindInt16, qvalue.QValueKindInt32, qvalue.QValueKindInt64:
Expand Down Expand Up @@ -55,13 +57,15 @@ func createQValue(t *testing.T, kind qvalue.QValueKind, placeHolder int) qvalue.
}
}

// nolint:unparam
//nolint:unparam
func generateRecords(
t *testing.T,
nullable bool,
numRows uint32,
allnulls bool,
) (*model.QRecordStream, *model.QRecordSchema) {
t.Helper()

allQValueKinds := []qvalue.QValueKind{
qvalue.QValueKindFloat32,
qvalue.QValueKindFloat64,
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/snowflake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func (c *SnowflakeConnector) getTableCounts(tables []string) (int64, error) {
if err != nil {
return 0, fmt.Errorf("failed to parse table name %s: %w", table, err)
}
//nolint:gosec
row := c.database.QueryRowContext(c.ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", table))
var count pgtype.Int8
err = row.Scan(&count)
Expand Down
6 changes: 5 additions & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"golang.org/x/sync/errgroup"
)

//nolint:stylecheck
const (
mirrorJobsTableIdentifier = "PEERDB_MIRROR_JOBS"
createMirrorJobsTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(MIRROR_JOB_NAME STRING NOT NULL,OFFSET INT NOT NULL,
Expand Down Expand Up @@ -326,6 +325,7 @@ func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) {
if err != nil {
return 0, fmt.Errorf("error querying Snowflake peer for last syncBatchId: %w", err)
}
defer rows.Close()

var result pgtype.Int8
if !rows.Next() {
Expand All @@ -346,6 +346,7 @@ func (c *SnowflakeConnector) GetLastSyncAndNormalizeBatchID(jobName string) (mod
return model.SyncAndNormalizeBatchID{},
fmt.Errorf("error querying Snowflake peer for last normalizeBatchId: %w", err)
}
defer rows.Close()

var syncResult, normResult pgtype.Int8
if !rows.Next() {
Expand All @@ -372,6 +373,7 @@ func (c *SnowflakeConnector) getDistinctTableNamesInBatch(flowJobName string, sy
if err != nil {
return nil, fmt.Errorf("error while retrieving table names for normalization: %w", err)
}
defer rows.Close()

var result pgtype.Text
destinationTableNames := make([]string, 0)
Expand All @@ -395,6 +397,8 @@ func (c *SnowflakeConnector) getTableNametoUnchangedCols(flowJobName string, syn
if err != nil {
return nil, fmt.Errorf("error while retrieving table names for normalization: %w", err)
}
defer rows.Close()

// Create a map to store the results
resultMap := make(map[string][]string)
// Process the rows and populate the map
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,6 @@ func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) {
if v, ok := val.(*sql.NullString); ok {
if v.Valid {
numeric := new(big.Rat)
//nolint:gosec
if _, ok := numeric.SetString(v.String); !ok {
return qvalue.QValue{}, fmt.Errorf("failed to parse numeric: %v", v.String)
}
Expand Down
3 changes: 0 additions & 3 deletions flow/connectors/sqlserver/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func (c *SQLServerConnector) GetQRepPartitions(
}

// Query to get the total number of rows in the table
//nolint:gosec
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s %s", config.WatermarkTable, whereClause)
var minVal interface{} = nil
var totalRows pgtype.Int8
Expand Down Expand Up @@ -91,7 +90,6 @@ func (c *SQLServerConnector) GetQRepPartitions(
var rows *sqlx.Rows
if minVal != nil {
// Query to get partitions using window functions
//nolint:gosec
partitionsQuery := fmt.Sprintf(
`SELECT bucket_v, MIN(v_from) AS start_v, MAX(v_from) AS end_v
FROM (
Expand All @@ -112,7 +110,6 @@ func (c *SQLServerConnector) GetQRepPartitions(
}
rows, err = c.db.NamedQuery(partitionsQuery, params)
} else {
//nolint:gosec
partitionsQuery := fmt.Sprintf(
`SELECT bucket_v, MIN(v_from) AS start_v, MAX(v_from) AS end_v
FROM (
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/utils/cdc_records/cdc_records_storage.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//nolint:stylecheck
package cdc_records

import (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//nolint:stylecheck
package cdc_records

import (
Expand Down
Loading

0 comments on commit 759e86f

Please sign in to comment.