Skip to content

Commit

Permalink
SyncedAt Column for QRep (#854)
Browse files Browse the repository at this point in the history
Implements `_PEERDB_SYNCED_AT` for Query Replication mirrors for PG ->
[PG, BQ, SF]
  • Loading branch information
Amogh-Bharadwaj authored Dec 19, 2023
1 parent 77dea98 commit 531e1c2
Show file tree
Hide file tree
Showing 16 changed files with 271 additions and 27 deletions.
7 changes: 7 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,13 @@ func (h *FlowRequestHandler) CreateQRepFlow(
} else {
workflowFn = peerflow.QRepFlowWorkflow
}

if req.QrepConfig.SyncedAtColName == "" {
cfg.SyncedAtColName = "_PEERDB_SYNCED_AT"
} else {
// make them all uppercase
cfg.SyncedAtColName = strings.ToUpper(req.QrepConfig.SyncedAtColName)
}
_, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state)
if err != nil {
slog.Error("unable to start QRepFlow workflow",
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func (c *BigQueryConnector) SyncQRepRecords(
partition.PartitionId, destTable))

avroSync := &QRepAvroSyncMethod{connector: c, gcsBucket: config.StagingPath}
return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition, tblMetadata, stream)
return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition,
tblMetadata, stream, config.SyncedAtColName)
}

func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfig, partition *protos.QRepPartition,
Expand Down
17 changes: 13 additions & 4 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
flowJobName, dstTableName, syncBatchID),
)
// You will need to define your Avro schema as a string
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata)
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, "")
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand Down Expand Up @@ -107,6 +107,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
partition *protos.QRepPartition,
dstTableMetadata *bigquery.TableMetadata,
stream *model.QRecordStream,
syncedAtCol string,
) (int, error) {
startTime := time.Now()
flowLog := slog.Group("sync_metadata",
Expand All @@ -115,7 +116,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
slog.String("destinationTable", dstTableName),
)
// You will need to define your Avro schema as a string
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata)
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, syncedAtCol)
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand All @@ -137,9 +138,13 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
// Start a transaction
stmts := []string{"BEGIN TRANSACTION;"}

selector := "*"
if syncedAtCol != "" { // PeerDB column
selector = "*, CURRENT_TIMESTAMP"
}
// Insert the records from the staging table into the destination table
insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT * FROM `%s.%s`;",
datasetID, dstTableName, datasetID, stagingTable)
insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT %s FROM `%s.%s`;",
datasetID, dstTableName, selector, datasetID, stagingTable)

stmts = append(stmts, insertStmt)

Expand Down Expand Up @@ -181,11 +186,15 @@ type AvroSchema struct {

func DefineAvroSchema(dstTableName string,
dstTableMetadata *bigquery.TableMetadata,
syncedAtCol string,
) (*model.QRecordAvroSchemaDefinition, error) {
avroFields := []AvroField{}
nullableFields := make(map[string]struct{})

for _, bqField := range dstTableMetadata.Schema {
if bqField.Name == syncedAtCol {
continue
}
avroType, err := GetAvroType(bqField)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,8 @@ func (c *PostgresConnector) SyncQRepRecords(

stagingTableSync := &QRepStagingTableSync{connector: c}
return stagingTableSync.SyncQRepRecords(
config.FlowJobName, dstTable, partition, stream, config.WriteMode)
config.FlowJobName, dstTable, partition, stream,
config.WriteMode, config.SyncedAtColName)
}

// SetupQRepMetadataTables function for postgres connector
Expand Down
26 changes: 21 additions & 5 deletions flow/connectors/postgres/qrep_sync_method.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
partition *protos.QRepPartition,
stream *model.QRecordStream,
writeMode *protos.QRepWriteMode,
syncedAtCol string,
) (int, error) {
syncLog := slog.Group("sync-qrep-log",
slog.String(string(shared.FlowNameKey), flowJobName),
Expand Down Expand Up @@ -81,6 +82,19 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
if err != nil {
return -1, fmt.Errorf("failed to copy records into destination table: %v", err)
}

if syncedAtCol != "" {
updateSyncedAtStmt := fmt.Sprintf(
`UPDATE %s SET "%s" = CURRENT_TIMESTAMP WHERE "%s" IS NULL;`,
pgx.Identifier{dstTableName.Schema, dstTableName.Table}.Sanitize(),
syncedAtCol,
syncedAtCol,
)
_, err = tx.Exec(context.Background(), updateSyncedAtStmt)
if err != nil {
return -1, fmt.Errorf("failed to update synced_at column: %v", err)
}
}
} else {
// Step 2.1: Create a temp staging table
stagingTableName := fmt.Sprintf("_peerdb_staging_%s", shared.RandomString(8))
Expand Down Expand Up @@ -128,16 +142,18 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
}
selectStrArray = append(selectStrArray, fmt.Sprintf(`"%s"`, col))
}

setClauseArray = append(setClauseArray,
fmt.Sprintf(`"%s" = CURRENT_TIMESTAMP`, syncedAtCol))
setClause := strings.Join(setClauseArray, ",")
selectStr := strings.Join(selectStrArray, ",")
selectSQL := strings.Join(selectStrArray, ",")

// Step 2.3: Perform the upsert operation, ON CONFLICT UPDATE
upsertStmt := fmt.Sprintf(
"INSERT INTO %s (%s) SELECT %s FROM %s ON CONFLICT (%s) DO UPDATE SET %s;",
`INSERT INTO %s (%s, "%s") SELECT %s, CURRENT_TIMESTAMP FROM %s ON CONFLICT (%s) DO UPDATE SET %s;`,
dstTableIdentifier.Sanitize(),
selectStr,
selectStr,
selectSQL,
syncedAtCol,
selectSQL,
stagingTableIdentifier.Sanitize(),
strings.Join(writeMode.UpsertKeyColumns, ", "),
setClause,
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (c *SnowflakeConnector) createExternalStage(stageName string, config *proto
}

func (c *SnowflakeConnector) ConsolidateQRepPartitions(config *protos.QRepConfig) error {
c.logger.Error("Consolidating partitions")
c.logger.Info("Consolidating partitions")

destTable := config.DestinationTableIdentifier
stageName := c.getStageNameForJob(config.FlowJobName)
Expand All @@ -272,7 +272,7 @@ func (c *SnowflakeConnector) ConsolidateQRepPartitions(config *protos.QRepConfig

// CleanupQRepFlow function for snowflake connector
func (c *SnowflakeConnector) CleanupQRepFlow(config *protos.QRepConfig) error {
c.logger.Error("Cleaning up flow job")
c.logger.Info("Cleaning up flow job")
return c.dropStage(config.StagingPath, config.FlowJobName)
}

Expand Down
7 changes: 6 additions & 1 deletion flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ func (s *SnowflakeAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage

func (c *SnowflakeConnector) GetCopyTransformation(
dstTableName string,
syncedAtCol string,
) (*CopyInfo, error) {
colInfo, colsErr := c.getColsFromTable(dstTableName)
if colsErr != nil {
Expand All @@ -310,6 +311,10 @@ func (c *SnowflakeConnector) GetCopyTransformation(
columnOrder := make([]string, 0, len(colInfo.ColumnMap))
for colName, colType := range colInfo.ColumnMap {
columnOrder = append(columnOrder, fmt.Sprintf("\"%s\"", colName))
if colName == syncedAtCol {
transformations = append(transformations, fmt.Sprintf("CURRENT_TIMESTAMP AS \"%s\"", colName))
continue
}
switch colType {
case "GEOGRAPHY":
transformations = append(transformations,
Expand Down Expand Up @@ -354,7 +359,7 @@ func CopyStageToDestination(
}
}

copyTransformation, err := connector.GetCopyTransformation(dstTableName)
copyTransformation, err := connector.GetCopyTransformation(dstTableName, config.SyncedAtColName)
if err != nil {
return fmt.Errorf("failed to get copy transformation: %w", err)
}
Expand Down
19 changes: 13 additions & 6 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,22 @@ func (s PeerFlowE2ETestSuiteBQ) attachSuffix(input string) string {
return fmt.Sprintf("%s_%s", input, s.bqSuffix)
}

func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, rowID int8) error {
func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, softDelete bool) error {
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, dstQualified)
query := fmt.Sprintf("SELECT `_PEERDB_IS_DELETED`,`_PEERDB_SYNCED_AT` FROM %s WHERE id = %d",
qualifiedTableName, rowID)
selector := "`_PEERDB_SYNCED_AT`"
if softDelete {
selector += ", `_PEERDB_IS_DELETED`"
}
query := fmt.Sprintf("SELECT %s FROM %s",
selector, qualifiedTableName)

recordBatch, err := s.bqHelper.ExecuteAndProcessQuery(query)
if err != nil {
return err
}

recordCount := 0

for _, record := range recordBatch.Records {
for _, entry := range record.Entries {
if entry.Kind == qvalue.QValueKindBoolean {
Expand All @@ -78,12 +83,14 @@ func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, rowID i
if !ok {
return fmt.Errorf("peerdb column failed: _PEERDB_SYNCED_AT is not valid")
}

recordCount += 1
}
}
}
if recordCount != 2 {
return fmt.Errorf("peerdb column failed: _PEERDB_IS_DELETED or _PEERDB_SYNCED_AT not present")

if recordCount == 0 {
return fmt.Errorf("peerdb column check failed: no records found")
}

return nil
Expand Down Expand Up @@ -1191,7 +1198,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

err = s.checkPeerdbColumns(dstTableName, 1)
err = s.checkPeerdbColumns(dstTableName, true)
require.NoError(s.t, err)

env.AssertExpectations(s.t)
Expand Down
39 changes: 38 additions & 1 deletion flow/e2e/bigquery/qrep_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func (s PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int) {
err := e2e.CreateSourceTableQRep(s.pool, s.bqSuffix, tableName)
err := e2e.CreateTableForQRep(s.pool, s.bqSuffix, tableName)
require.NoError(s.t, err)
err = e2e.PopulateSourceTable(s.pool, s.bqSuffix, tableName, rowCount)
require.NoError(s.t, err)
Expand Down Expand Up @@ -64,6 +64,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
tblName,
query,
s.bqHelper.Peer,
"",
false,
"")
require.NoError(s.t, err)
e2e.RunQrepFlowWorkflow(env, qrepConfig)
Expand All @@ -78,3 +80,38 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {

env.AssertExpectations(s.t)
}

func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env, s.t)

numRows := 10

tblName := "test_columns_bq_qrep"
s.setupSourceTable(tblName, numRows)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}",
s.bqSuffix, tblName)

qrepConfig, err := e2e.CreateQRepWorkflowConfig("test_qrep_flow_avro",
fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName),
tblName,
query,
s.bqHelper.Peer,
"",
true,
"_PEERDB_SYNCED_AT")
require.NoError(s.t, err)
e2e.RunQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())

err = env.GetWorkflowError()
require.NoError(s.t, err)

err = s.checkPeerdbColumns(tblName, false)
require.NoError(s.t, err)

env.AssertExpectations(s.t)
}
Loading

0 comments on commit 531e1c2

Please sign in to comment.