Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jun 28, 2023
2 parents 7ff2cb5 + 653dfa3 commit 794d846
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:

jobs:
flow_test:
runs-on: ubuntu-latest-16-cores
runs-on: ubuntu-latest
timeout-minutes: 30
services:
pg_cdc:
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (c *PostgresConnector) getMinMaxValues(
last *protos.QRepPartition,
) (interface{}, interface{}, error) {
var minValue, maxValue interface{}
quotedWatermarkColumn := fmt.Sprintf("\"%s\"", config.WatermarkColumn)

if last != nil && last.Range != nil {
// If there's a last partition, start from its end
Expand All @@ -59,7 +60,7 @@ func (c *PostgresConnector) getMinMaxValues(
}
} else {
// Otherwise get the minimum value from the database
minQuery := fmt.Sprintf("SELECT MIN(%[1]s) FROM %[2]s", config.WatermarkColumn, config.WatermarkTable)
minQuery := fmt.Sprintf("SELECT MIN(%[1]s) FROM %[2]s", quotedWatermarkColumn, config.WatermarkTable)
row := c.pool.QueryRow(c.ctx, minQuery)
if err := row.Scan(&minValue); err != nil {
log.Errorf("failed to query [%s] for min value: %v", minQuery, err)
Expand All @@ -68,7 +69,7 @@ func (c *PostgresConnector) getMinMaxValues(
}

// Get the maximum value from the database
maxQuery := fmt.Sprintf("SELECT MAX(%[1]s) FROM %[2]s", config.WatermarkColumn, config.WatermarkTable)
maxQuery := fmt.Sprintf("SELECT MAX(%[1]s) FROM %[2]s", quotedWatermarkColumn, config.WatermarkTable)
row := c.pool.QueryRow(c.ctx, maxQuery)
if err := row.Scan(&maxValue); err != nil {
return nil, nil, fmt.Errorf("failed to query for max value: %w", err)
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/postgres/qrep_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type testCase struct {
func newTestCase(schema string, name string, duration uint32, wantErr bool) *testCase {
schemaQualifiedTable := fmt.Sprintf("%s.test", schema)
query := fmt.Sprintf(
"SELECT * FROM %s WHERE timestamp >= {{.start}} AND timestamp < {{.end}}",
`SELECT * FROM %s WHERE "from" >= {{.start}} AND "from" < {{.end}}`,
schemaQualifiedTable)
return &testCase{
name: name,
Expand All @@ -33,7 +33,7 @@ func newTestCase(schema string, name string, duration uint32, wantErr bool) *tes
BatchDurationSeconds: duration,
Query: query,
WatermarkTable: schemaQualifiedTable,
WatermarkColumn: "timestamp",
WatermarkColumn: "from",
},
want: []*protos.QRepPartition{},
wantErr: wantErr,
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestGetQRepPartitions(t *testing.T) {
CREATE TABLE IF NOT EXISTS %s.test (
id SERIAL PRIMARY KEY,
value INT NOT NULL,
timestamp TIMESTAMP NOT NULL
"from" TIMESTAMP NOT NULL
)
`, schemaName))
if err != nil {
Expand Down Expand Up @@ -209,7 +209,7 @@ func prepareTestData(test *testing.T, pool *pgxpool.Pool, schema string) {
// Insert the test data
for i, t := range times {
_, err := pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s.test (value, timestamp) VALUES ($1, $2)
INSERT INTO %s.test (value, "from") VALUES ($1, $2)
`, schema), i+1, t)
if err != nil {
test.Fatalf("Failed to insert test data: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (s *SnowflakeClient) CreateTable(schema *model.QRecordSchema, schemaName st
if err != nil {
return err
}
fields = append(fields, fmt.Sprintf("%s %s", field.Name, snowflakeType))
fields = append(fields, fmt.Sprintf(`"%s" %s`, field.Name, snowflakeType))
}

command := fmt.Sprintf("CREATE TABLE %s.%s (%s)", schemaName, tableName, strings.Join(fields, ", "))
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func (b *BigQueryTestHelper) CreateTable(tableName string, schema *model.QRecord
if err != nil {
return err
}
fields = append(fields, fmt.Sprintf("%s %s", field.Name, bqType))
fields = append(fields, fmt.Sprintf("`%s` %s", field.Name, bqType))
}

command := fmt.Sprintf("CREATE TABLE %s.%s (%s)", b.datasetName, tableName, strings.Join(fields, ", "))
Expand Down
24 changes: 13 additions & 11 deletions flow/e2e/qrep_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (
"github.com/PeerDB-io/peer-flow/model/qvalue"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
)

func (s *E2EPeerFlowTestSuite) createSourceTable(tableName string) {
tblFields := []string{
"id UUID NOT NULL PRIMARY KEY",
"card_id UUID",
`from_v TIMESTAMP NOT NULL`,
`"from" TIMESTAMP NOT NULL`,
"price NUMERIC",
"created_at TIMESTAMP NOT NULL",
"updated_at TIMESTAMP NOT NULL",
Expand Down Expand Up @@ -61,7 +62,7 @@ func (s *E2EPeerFlowTestSuite) populateSourceTable(tableName string, rowCount in
for i := 0; i < rowCount; i++ {
_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO e2e_test.%s (
id, card_id, from_v, price, created_at,
id, card_id, "from", price, created_at,
updated_at, transaction_hash, ownerable_type, ownerable_id,
user_nonce, transfer_type, blockchain, deal_type,
deal_id, ethereum_transaction_id, ignore_price, card_eth_value,
Expand Down Expand Up @@ -93,7 +94,7 @@ func getOwnersSchema() *model.QRecordSchema {
Fields: []*model.QField{
{Name: "id", Type: qvalue.QValueKindString, Nullable: true},
{Name: "card_id", Type: qvalue.QValueKindString, Nullable: true},
{Name: "from_v", Type: qvalue.QValueKindETime, Nullable: true},
{Name: "from", Type: qvalue.QValueKindETime, Nullable: true},
{Name: "price", Type: qvalue.QValueKindNumeric, Nullable: true},
{Name: "created_at", Type: qvalue.QValueKindETime, Nullable: true},
{Name: "updated_at", Type: qvalue.QValueKindETime, Nullable: true},
Expand Down Expand Up @@ -127,7 +128,8 @@ func getOwnersSelectorString() string {
schema := getOwnersSchema()
var fields []string
for _, field := range schema.Fields {
fields = append(fields, field.Name)
// append quoted field name
fields = append(fields, fmt.Sprintf(`"%s"`, field.Name))
}
return strings.Join(fields, ",")
}
Expand All @@ -137,7 +139,7 @@ func (s *E2EPeerFlowTestSuite) setupBQDestinationTable(dstTable string) {
err := s.bqHelper.CreateTable(dstTable, schema)

// fail if table creation fails
s.NoError(err)
require.NoError(s.T(), err)

fmt.Printf("created table on bigquery: %s.%s. %v\n", s.bqHelper.Config.DatasetId, dstTable, err)
}
Expand All @@ -148,7 +150,7 @@ func (s *E2EPeerFlowTestSuite) setupSFDestinationTable(dstTable string) {

// fail if table creation fails
if err != nil {
s.Fail("unable to create table on snowflake", err)
s.FailNow("unable to create table on snowflake", err)
}

fmt.Printf("created table on snowflake: %s.%s. %v\n", s.sfHelper.testSchemaName, dstTable, err)
Expand Down Expand Up @@ -204,14 +206,14 @@ func (s *E2EPeerFlowTestSuite) compareTableContentsSF(tableName string, selector
pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery(
fmt.Sprintf("SELECT %s FROM e2e_test.%s ORDER BY id", selector, tableName),
)
s.NoError(err)
require.NoError(s.T(), err)

// read rows from destination table
qualifiedTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName)
sfRows, err := s.sfHelper.ExecuteAndProcessQuery(
fmt.Sprintf("SELECT %s FROM %s ORDER BY id", selector, qualifiedTableName),
)
s.NoError(err)
sfSelQuery := fmt.Sprintf(`SELECT %s FROM %s ORDER BY "id"`, selector, qualifiedTableName)
fmt.Printf("running query on snowflake: %s\n", sfSelQuery)
sfRows, err := s.sfHelper.ExecuteAndProcessQuery(sfSelQuery)
require.NoError(s.T(), err)

s.True(pgRows.Equals(sfRows), "rows from source and destination tables are not equal")
}
Expand Down
2 changes: 1 addition & 1 deletion nexus/peer-bigquery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl QueryExecutor for BigQueryQueryExecutor {
}))
})?;

// add LIMIT 1 to the root level query.
// add LIMIT 0 to the root level query.
// this is a workaround for the bigquery API not supporting DESCRIBE
// queries.
query.limit = Some(Expr::Value(Value::Number("1".to_owned(), false)));
Expand Down

0 comments on commit 794d846

Please sign in to comment.