Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jul 1, 2023
2 parents 774426c + c53e2d8 commit c425923
Show file tree
Hide file tree
Showing 39 changed files with 1,455 additions and 181 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,8 @@ jobs:
gotestsum --format testname -- -p 1 ./...
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_REGION: ${{ secrets.AWS_REGION }}
TEST_BQ_CREDS: ${{ github.workspace }}/bq_service_account.json
TEST_SF_CREDS: ${{ github.workspace }}/snowflake_creds.json
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
.vscode
.env

12 changes: 11 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,16 @@ services:
ports:
- 8112:8112
environment:
CATALOG_DSN: postgres://postgres:postgres@catalog:5432/postgres?sslmode=disable
TEMPORAL_HOST_PORT: temporalite:7233
PEERDB_CATALOG_DB: postgres
PEERDB_CATALOG_HOST: catalog
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_PORT: 5432
PEERDB_CATALOG_USER: postgres
GIN_MODE: release
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-""}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-""}
AWS_REGION: ${AWS_REGION:-""}
depends_on:
catalog:
condition: service_healthy
Expand All @@ -95,6 +102,9 @@ services:
ENABLE_PROFILING: true
PROFILING_SERVER: 0.0.0.0:6060
TEMPORAL_HOST_PORT: temporalite:7233
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-""}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-""}
AWS_REGION: ${AWS_REGION:-""}
ports:
- 6060:6060
depends_on:
Expand Down
12 changes: 12 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type IFlowable interface {
// ConsolidateQRepPartitions consolidates the QRepPartitions into the destination.
ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig) error

// CleanupQrepFlow cleans up the QRep flow.
CleanupQrepFlow(ctx context.Context, config *protos.QRepConfig) error

DropFlow(ctx context.Context, config *protos.DropFlowInput) error
}

Expand Down Expand Up @@ -358,6 +361,15 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
return dst.ConsolidateQRepPartitions(config)
}

func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error {
dst, err := connectors.GetConnector(ctx, config.DestinationPeer)
if err != nil {
return fmt.Errorf("failed to get destination connector: %w", err)
}

return dst.CleanupQRepFlow(config)
}

func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.FlowConnectionConfigs) error {
src, err := connectors.GetConnector(ctx, config.Source)
defer connectors.CloseConnector(src)
Expand Down
6 changes: 6 additions & 0 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ func genConfigForQRepFlow(config *protos.QRepConfig, flowOptions map[string]inte
config.WaitBetweenBatchesSeconds = uint32(flowOptions["refresh_interval"].(float64))
if flowOptions["sync_data_format"].(string) == "avro" {
config.SyncMode = protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO
if _, ok := flowOptions["staging_path"]; ok {
config.StagingPath = flowOptions["staging_path"].(string)
} else {
// if staging_path is not present, set it to empty string
config.StagingPath = ""
}
} else if flowOptions["sync_data_format"].(string) == "default" {
config.SyncMode = protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT
} else {
Expand Down
41 changes: 35 additions & 6 deletions flow/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package main

import (
"context"
"fmt"
"log"
"net/url"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -70,17 +72,44 @@ func main() {
Aliases: []string{"p"},
Value: 8110,
},
// JDBC connection string for catalog database
&cli.StringFlag{
Name: "catalog-dsn",
Aliases: []string{"c"},
Value: "postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable",
EnvVars: []string{"CATALOG_DSN"},
Name: "catalog-host",
Value: "localhost",
EnvVars: []string{"PEERDB_CATALOG_HOST"},
},
&cli.UintFlag{
Name: "catalog-port",
Value: 5432,
EnvVars: []string{"PEERDB_CATALOG_PORT"},
},
&cli.StringFlag{
Name: "catalog-user",
Value: "postgres",
EnvVars: []string{"PEERDB_CATALOG_USER"},
},
&cli.StringFlag{
Name: "catalog-password",
Value: "postgres",
EnvVars: []string{"PEERDB_CATALOG_PASSWORD"},
},
&cli.StringFlag{
Name: "catalog-db",
Value: "postgres",
EnvVars: []string{"PEERDB_CATALOG_DB"},
},
temporalHostPortFlag,
},
Action: func(ctx *cli.Context) error {
catalogURL := ctx.String("catalog-dsn")
catalogHost := ctx.String("catalog-host")
catalogPort := ctx.String("catalog-port")
catalogUser := ctx.String("catalog-user")
catalogPassword := url.QueryEscape(ctx.String("catalog-password"))
catalogDB := ctx.String("catalog-db")

// create the catalogURL from the host, port, user, password, and db
catalogURL := fmt.Sprintf("postgresql://%s:%s@%s:%s/%s?sslmode=disable",
catalogUser, catalogPassword, catalogHost, catalogPort, catalogDB)

temporalHostPort := ctx.String("temporal-host-port")

return APIMain(&APIServerParams{
Expand Down
7 changes: 7 additions & 0 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ func (c *BigQueryConnector) ConsolidateQRepPartitions(config *protos.QRepConfig)
return nil
}

// CleanupQRepFlow function for bigquery connector
func (c *BigQueryConnector) CleanupQRepFlow(config *protos.QRepConfig) error {
log.Infof("Cleaning up flow job %s", config.FlowJobName)
log.Infof("This is a no-op for BigQuery")
return nil
}

func (c *BigQueryConnector) isPartitionSynced(partitionID string) (bool, error) {
queryString := fmt.Sprintf(
"SELECT COUNT(*) FROM %s._peerdb_query_replication_metadata WHERE partitionID = '%s';",
Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type Connector interface {
// ConsolidateQRepPartitions consolidates the partitions for a given table.
ConsolidateQRepPartitions(config *protos.QRepConfig) error

// CleanupQRepFlow cleans up the QRep flow for a given table.
CleanupQRepFlow(config *protos.QRepConfig) error

PullFlowCleanup(jobName string) error
SyncFlowCleanup(jobName string) error
}
Expand Down
28 changes: 17 additions & 11 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type PostgresCDCSource struct {
startLSN pglogrepl.LSN
}

type PostrgesCDCConfig struct {
type PostgresCDCConfig struct {
AppContext context.Context
Connection *pgxpool.Pool
Slot string
Expand All @@ -37,14 +37,14 @@ type PostrgesCDCConfig struct {
}

// Create a new PostgresCDCSource
func NewPostgresCDCSource(cdcConfing *PostrgesCDCConfig) (*PostgresCDCSource, error) {
func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, error) {
return &PostgresCDCSource{
ctx: cdcConfing.AppContext,
conn: cdcConfing.Connection,
SrcTableIDNameMapping: cdcConfing.SrcTableIDNameMapping,
TableNameMapping: cdcConfing.TableNameMapping,
slot: cdcConfing.Slot,
publication: cdcConfing.Publication,
ctx: cdcConfig.AppContext,
conn: cdcConfig.Connection,
SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping,
TableNameMapping: cdcConfig.TableNameMapping,
slot: cdcConfig.Slot,
publication: cdcConfig.Publication,
relations: make(map[uint32]*pglogrepl.RelationMessage),
typeMap: pgtype.NewMap(),
}, nil
Expand Down Expand Up @@ -114,6 +114,12 @@ func (p *PostgresCDCSource) consumeStream(

standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)
defer func() {
err := conn.Close(p.ctx)
if err != nil {
log.Errorf("unexpected error closing replication connection: %v", err)
}
}()

for {
if time.Now().After(nextStandbyMessageDeadline) {
Expand Down Expand Up @@ -388,7 +394,7 @@ func (p *PostgresCDCSource) convertTupleToMap(

// create empty map of string to interface{}
items := make(map[string]interface{})
unchangeToastColumns := make(map[string]bool)
unchangedToastColumns := make(map[string]bool)

for idx, col := range tuple.Columns {
colName := rel.Columns[idx].Name
Expand All @@ -409,12 +415,12 @@ func (p *PostgresCDCSource) convertTupleToMap(
}
items[colName] = data
case 'u': // unchanged toast
unchangeToastColumns[colName] = true
unchangedToastColumns[colName] = true
default:
return nil, nil, fmt.Errorf("unknown column data type: %s", string(col.DataType))
}
}
return items, unchangeToastColumns, nil
return items, unchangedToastColumns, nil
}

func (p *PostgresCDCSource) decodeTextColumnData(data []byte, dataType uint32) (interface{}, error) {
Expand Down
45 changes: 31 additions & 14 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ func (c *PostgresConnector) Close() error {

// ConnectionActive returns true if the connection is active.
func (c *PostgresConnector) ConnectionActive() bool {
return c.pool != nil
if c.pool == nil {
return false
}
return c.pool.Ping(c.ctx) == nil
}

// NeedsSetupMetadataTables returns true if the metadata tables need to be set up.
Expand Down Expand Up @@ -99,6 +102,18 @@ func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) (*model.R
// Publication name would be the job name prefixed with "peerflow_pub_"
publicationName := fmt.Sprintf("peerflow_pub_%s", req.FlowJobName)

// Check if the replication slot and publication exist
exists, err := c.checkSlotAndPublication(slotName, publicationName)
if err != nil {
return nil, fmt.Errorf("error checking for replication slot and publication: %w", err)
}
if !exists.PublicationExists {
return nil, fmt.Errorf("publication %s does not exist", publicationName)
}
if !exists.SlotExists {
return nil, fmt.Errorf("replication slot %s does not exist", slotName)
}

// ensure that replication is set to database
connConfig, err := pgxpool.ParseConfig(c.connStr)
if err != nil {
Expand All @@ -117,7 +132,7 @@ func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) (*model.R
return nil, fmt.Errorf("failed to create connection pool: %w", err)
}

cdc, err := NewPostgresCDCSource(&PostrgesCDCConfig{
cdc, err := NewPostgresCDCSource(&PostgresCDCConfig{
AppContext: c.ctx,
Connection: replPool,
SrcTableIDNameMapping: req.SrcTableIDNameMapping,
Expand Down Expand Up @@ -194,15 +209,6 @@ func (c *PostgresConnector) createSlotAndPublication(
publication string,
tableNameMapping map[string]string,
) error {
if !s.SlotExists {
// Create the logical replication slot
_, err := c.pool.Exec(c.ctx,
"SELECT * FROM pg_create_logical_replication_slot($1, 'pgoutput')",
slot)
if err != nil {
return fmt.Errorf("error creating replication slot: %w", err)
}
}
/*
iterating through source tables and creating a publication.
expecting tablenames to be schema qualified
Expand All @@ -225,6 +231,17 @@ func (c *PostgresConnector) createSlotAndPublication(
}
}

// create slot only after we succeeded in creating publication.
if !s.SlotExists {
// Create the logical replication slot
_, err := c.pool.Exec(c.ctx,
"SELECT * FROM pg_create_logical_replication_slot($1, 'pgoutput')",
slot)
if err != nil {
return fmt.Errorf("error creating replication slot: %w", err)
}
}

return nil
}

Expand Down Expand Up @@ -256,7 +273,7 @@ func (c *PostgresConnector) GetTableSchema(req *protos.GetTableSchemaInput) (*pr

relID, err := c.getRelIDForTable(schemaTable)
if err != nil {
return nil, fmt.Errorf("failed to get relation id for table %s: %w", schemaTable, err)
return nil, err
}

// Get the column names and types
Expand Down Expand Up @@ -327,7 +344,7 @@ func (c *PostgresConnector) EnsurePullability(req *protos.EnsurePullabilityInput
// check if the table exists by getting the relation ID
relID, err := c.getRelIDForTable(schemaTable)
if err != nil {
return nil, fmt.Errorf("error getting relation ID for table %s: %w", schemaTable, err)
return nil, err
}
return &protos.EnsurePullabilityOutput{TableIdentifier: &protos.TableIdentifier{
TableIdentifier: &protos.TableIdentifier_PostgresTableIdentifier{
Expand Down Expand Up @@ -378,7 +395,7 @@ func (c *PostgresConnector) PullFlowCleanup(jobName string) error {
log.Errorf("unexpected error rolling back transaction for flow cleanup: %v", err)
}
}()
_, err = pullFlowCleanupTx.Exec(c.ctx, fmt.Sprintf("DROP PUBLICATION %s", publicationName))
_, err = pullFlowCleanupTx.Exec(c.ctx, fmt.Sprintf("DROP PUBLICATION IF EXISTS %s", publicationName))
if err != nil {
return fmt.Errorf("error dropping publication: %w", err)
}
Expand Down
Loading

0 comments on commit c425923

Please sign in to comment.