Skip to content

Commit

Permalink
Merge branch 'main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jun 4, 2024
2 parents bbb3d23 + 75133ca commit 0a17d0f
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 112 deletions.
23 changes: 2 additions & 21 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log/slog"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -224,26 +223,8 @@ func (h *FlowRequestHandler) CreateQRepFlow(
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}
}

state := peerflow.NewQRepFlowState()
preColon, postColon, hasColon := strings.Cut(cfg.WatermarkColumn, "::")
var workflowFn interface{}
if cfg.SourcePeer.Type == protos.DBType_POSTGRES &&
preColon == "xmin" {
state.LastPartition.PartitionId = uuid.New().String()
if hasColon {
// hack to facilitate migrating from existing xmin sync
txid, err := strconv.ParseInt(postColon, 10, 64)
if err != nil {
slog.Error("invalid xmin txid for xmin rep",
slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
return nil, fmt.Errorf("invalid xmin txid for xmin rep: %w", err)
}
state.LastPartition.Range = &protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: txid}},
}
}

if cfg.SourcePeer.Type == protos.DBType_POSTGRES && cfg.WatermarkColumn == "xmin" {
workflowFn = peerflow.XminFlowWorkflow
} else {
workflowFn = peerflow.QRepFlowWorkflow
Expand All @@ -253,7 +234,7 @@ func (h *FlowRequestHandler) CreateQRepFlow(
cfg.SyncedAtColName = "_PEERDB_SYNCED_AT"
}

_, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state)
_, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, nil)
if err != nil {
slog.Error("unable to start QRepFlow workflow",
slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
Expand Down
11 changes: 9 additions & 2 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,11 +514,18 @@ func syncQRepRecords(
} else {
// Step 2.1: Create a temp staging table
stagingTableName := "_peerdb_staging_" + shared.RandomString(8)
stagingTableIdentifier := pgx.Identifier{c.metadataSchema, stagingTableName}
stagingTableIdentifier := pgx.Identifier{stagingTableName}
dstTableIdentifier := pgx.Identifier{dstTable.Schema, dstTable.Table}

// From PG docs: The cost of setting a large value in sessions that do not actually need many
// temporary buffers is only a buffer descriptor, or about 64 bytes, per increment in temp_buffers.
_, err = tx.Exec(ctx, "SET temp_buffers = '4GB';")
if err != nil {
return -1, fmt.Errorf("failed to set temp_buffers: %w", err)
}

createStagingTableStmt := fmt.Sprintf(
"CREATE TEMP UNLOGGED TABLE %s (LIKE %s);",
"CREATE TEMP TABLE %s (LIKE %s);",
stagingTableIdentifier.Sanitize(),
dstTableIdentifier.Sanitize(),
)
Expand Down
18 changes: 9 additions & 9 deletions flow/connectors/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c *PubSubConnector) Close() error {
func (c *PubSubConnector) ConnectionActive(ctx context.Context) error {
topic := c.client.Topic("test")
_, err := topic.Exists(ctx)
return err
return fmt.Errorf("pubsub connection active check failure: %w", err)
}

func (c *PubSubConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
Expand Down Expand Up @@ -143,7 +143,7 @@ func (c *PubSubConnector) createPool(
_ = c.LogFlowInfo(ctx, flowJobName, s)
}))
if err != nil {
return nil, err
return nil, fmt.Errorf("[pubsub] error loading script: %w", err)
}
if script == "" {
ls.Env.RawSetString("onRecord", ls.NewFunction(utils.DefaultOnRecord))
Expand All @@ -170,7 +170,7 @@ func (c *PubSubConnector) createPool(
return topicClient, nil
})
if err != nil {
queueErr(err)
queueErr(fmt.Errorf("[pubsub] error getting topic: %w", err))
return
}

Expand Down Expand Up @@ -253,7 +253,7 @@ func (c *PubSubConnector) SyncRecords(ctx context.Context, req *model.SyncRecord
if curpub.PublishResult == nil {
shared.AtomicInt64Max(&lastSeenLSN, curpub.lsn)
} else if _, err := curpub.Get(ctx); err != nil {
queueErr(err)
queueErr(fmt.Errorf("[pubsub] error publishing message: %w", err))
break
}
}
Expand Down Expand Up @@ -299,7 +299,7 @@ Loop:
lfn := ls.Env.RawGetString("onRecord")
fn, ok := lfn.(*lua.LFunction)
if !ok {
queueErr(fmt.Errorf("script should define `onRecord` as function, not %s", lfn))
queueErr(fmt.Errorf("script should define `onRecord` as function, not %v", lfn))
return poolResult{}
}

Expand All @@ -316,7 +316,7 @@ Loop:
for i := range args {
msg, err := lvalueToPubSubMessage(ls, ls.Get(i-args))
if err != nil {
queueErr(err)
queueErr(fmt.Errorf("[pubsub] error creating message: %w", err))
return poolResult{}
}
if msg.Message != nil {
Expand All @@ -342,19 +342,19 @@ Loop:

close(flushLoopDone)
if err := pool.Wait(queueCtx); err != nil {
return nil, err
return nil, fmt.Errorf("[pubsub] pool.Wait error: %w", err)
}
close(publish)
topiccache.Stop(queueCtx)
select {
case <-queueCtx.Done():
return nil, queueCtx.Err()
return nil, fmt.Errorf("[pubsub] queueCtx.Done: %w", queueCtx.Err())
case <-waitChan:
}

lastCheckpoint := req.Records.GetLastCheckpoint()
if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint); err != nil {
return nil, err
return nil, fmt.Errorf("[pubsub] FinishBatch error: %w", err)
}

return &model.SyncResponse{
Expand Down
37 changes: 19 additions & 18 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,25 +737,31 @@ func (c *SnowflakeConnector) RenameTables(ctx context.Context, req *protos.Renam
}
}()

if req.SyncedAtColName != nil {
for _, renameRequest := range req.RenameTableOptions {
resyncTblName := renameRequest.CurrentName
for _, renameRequest := range req.RenameTableOptions {
srcTable, err := utils.ParseSchemaTable(renameRequest.CurrentName)
if err != nil {
return nil, fmt.Errorf("unable to parse source %s: %w", renameRequest.CurrentName, err)
}

dstTable, err := utils.ParseSchemaTable(renameRequest.NewName)
if err != nil {
return nil, fmt.Errorf("unable to parse destination %s: %w", renameRequest.NewName, err)
}

c.logger.Info(fmt.Sprintf("setting synced at column for table '%s'...", resyncTblName))
src := snowflakeSchemaTableNormalize(srcTable)
dst := snowflakeSchemaTableNormalize(dstTable)

c.logger.Info(fmt.Sprintf("setting synced at column for table '%s'...", src))

if req.SyncedAtColName != nil {
_, err = renameTablesTx.ExecContext(ctx,
fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP", resyncTblName, *req.SyncedAtColName))
fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP", src, *req.SyncedAtColName))
if err != nil {
return nil, fmt.Errorf("unable to set synced at column for table %s: %w", resyncTblName, err)
return nil, fmt.Errorf("unable to set synced at column for table %s: %w", src, err)
}
}
}

if req.SoftDeleteColName != nil {
for _, renameRequest := range req.RenameTableOptions {
src := renameRequest.CurrentName
dst := renameRequest.NewName

if req.SoftDeleteColName != nil {
columnNames := make([]string, 0, len(renameRequest.TableSchema.Columns))
for _, col := range renameRequest.TableSchema.Columns {
columnNames = append(columnNames, SnowflakeIdentifierNormalize(col.Name))
Expand All @@ -779,13 +785,8 @@ func (c *SnowflakeConnector) RenameTables(ctx context.Context, req *protos.Renam
return nil, fmt.Errorf("unable to handle soft-deletes for table %s: %w", dst, err)
}
}
}

// renaming and dropping such that the _resync table is the new destination
for _, renameRequest := range req.RenameTableOptions {
src := renameRequest.CurrentName
dst := renameRequest.NewName

// renaming and dropping such that the _resync table is the new destination
c.logger.Info(fmt.Sprintf("renaming table '%s' to '%s'...", src, dst))

// drop the dst table if exists
Expand Down
7 changes: 2 additions & 5 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,14 +428,11 @@ func CreateQRepWorkflowConfig(
}

func RunQRepFlowWorkflow(tc client.Client, config *protos.QRepConfig) WorkflowRun {
state := peerflow.NewQRepFlowState()
return ExecutePeerflow(tc, peerflow.QRepFlowWorkflow, config, state)
return ExecutePeerflow(tc, peerflow.QRepFlowWorkflow, config, nil)
}

func RunXminFlowWorkflow(tc client.Client, config *protos.QRepConfig) WorkflowRun {
state := peerflow.NewQRepFlowState()
state.LastPartition.PartitionId = uuid.New().String()
return ExecutePeerflow(tc, peerflow.XminFlowWorkflow, config, state)
return ExecutePeerflow(tc, peerflow.XminFlowWorkflow, config, nil)
}

func GetOwnersSchema() *qvalue.QRecordSchema {
Expand Down
22 changes: 13 additions & 9 deletions flow/shared/bound_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,28 @@ type BoundSelector struct {
count int
}

func NewBoundSelector(ctx workflow.Context, limit int) *BoundSelector {
func NewBoundSelector(ctx workflow.Context, selectorName string, limit int) *BoundSelector {
return &BoundSelector{
limit: limit,
selector: workflow.NewSelector(ctx),
selector: workflow.NewNamedSelector(ctx, selectorName),
}
}

func (s *BoundSelector) SpawnChild(ctx workflow.Context, w interface{}, args ...interface{}) {
if s.count >= s.limit {
func (s *BoundSelector) SpawnChild(ctx workflow.Context, w interface{}, futureCallback func(workflow.Future), args ...interface{}) {
if s.limit > 0 && s.count >= s.limit {
s.waitOne(ctx)
}

future := workflow.ExecuteChildWorkflow(ctx, w, args...)
s.selector.AddFuture(future, func(f workflow.Future) {
if err := f.Get(ctx, nil); err != nil {
s.ferrors = append(s.ferrors, err)
}
})
if futureCallback != nil {
s.selector.AddFuture(future, futureCallback)
} else {
s.selector.AddFuture(future, func(f workflow.Future) {
if err := f.Get(ctx, nil); err != nil {
s.ferrors = append(s.ferrors, err)
}
})
}
s.count += 1
}

Expand Down
Loading

0 comments on commit 0a17d0f

Please sign in to comment.