Skip to content

Commit

Permalink
move schema changes into activity before normalize
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 7, 2024
1 parent 2a58842 commit 98ba6f2
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 70 deletions.
6 changes: 4 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,10 @@ func (a *FlowableActivity) MaintainPull(
}
close(normDone)
}()
defer func() { <-normDone }()
defer close(normalize) // TODO race, this will cause sync to panic if it tries to send to normalize after maintainpull ends
defer func() {
close(normalize) // TODO race, this will cause sync to panic if it tries to send to normalize after maintainpull ends
<-normDone
}()

for {
select {
Expand Down
20 changes: 20 additions & 0 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,26 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
activity.RecordHeartbeat(ctx, pushedRecordsWithCount)
a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount)

tableSchemaDeltasCount := len(res.TableSchemaDeltas)
if tableSchemaDeltasCount > 0 {
modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount)
for _, tableSchemaDelta := range res.TableSchemaDeltas {
modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName)
}

if err := a.SetupTableSchema(ctx, &protos.SetupTableSchemaBatchInput{
PeerName: config.SourceName,
TableIdentifiers: modifiedSrcTables,
TableMappings: options.TableMappings,
FlowName: config.FlowJobName,
System: config.System,
}); err != nil {
err = fmt.Errorf("failed to execute schema update at source: %w", err)
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}
}

if recordBatchSync.NeedsNormalize() {
parallel, err := peerdbenv.PeerDBEnableParallelSyncNormalize(ctx, config.Env)
if err != nil {
Expand Down
14 changes: 0 additions & 14 deletions flow/workflows/local_activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,6 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
)

func getParallelSyncNormalize(wCtx workflow.Context, logger log.Logger, env map[string]string) bool {
checkCtx := workflow.WithLocalActivityOptions(wCtx, workflow.LocalActivityOptions{
StartToCloseTimeout: time.Minute,
})

getParallelFuture := workflow.ExecuteLocalActivity(checkCtx, peerdbenv.PeerDBEnableParallelSyncNormalize, env)
var parallel bool
if err := getParallelFuture.Get(checkCtx, &parallel); err != nil {
logger.Warn("Failed to get status of parallel sync-normalize", slog.Any("error", err))
return false
}
return parallel
}

func getQRepOverwriteFullRefreshMode(wCtx workflow.Context, logger log.Logger, env map[string]string) bool {
checkCtx := workflow.WithLocalActivityOptions(wCtx, workflow.LocalActivityOptions{
StartToCloseTimeout: time.Minute,
Expand Down
55 changes: 1 addition & 54 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,13 @@ func SyncFlowWorkflow(
stop = true
})

var waitSelector workflow.Selector
parallel := getParallelSyncNormalize(ctx, logger, config.Env)
if !parallel {
waitSelector = workflow.NewNamedSelector(ctx, "NormalizeWait")
waitSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})
stopChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) {
stop = true
})
}

syncFlowCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{
StartToCloseTimeout: 72 * time.Hour,
HeartbeatTimeout: time.Minute,
WaitForCancellation: true,
})
for !stop && ctx.Err() == nil {
var syncDone bool
mustWait := waitSelector != nil

currentSyncFlowNum += 1
logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum))
Expand All @@ -102,56 +91,14 @@ func SyncFlowWorkflow(
} else if childSyncFlowRes != nil {
totalRecordsSynced += childSyncFlowRes.NumRecordsSynced
logger.Info("Total records synced", slog.Int64("totalRecordsSynced", totalRecordsSynced))

// slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes.
tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas)
if tableSchemaDeltasCount > 0 {
modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount)
for _, tableSchemaDelta := range childSyncFlowRes.TableSchemaDeltas {
modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName)
}

getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})
getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.SetupTableSchema,
&protos.SetupTableSchemaBatchInput{
PeerName: config.SourceName,
TableIdentifiers: modifiedSrcTables,
TableMappings: options.TableMappings,
FlowName: config.FlowJobName,
System: config.System,
})

if err := getModifiedSchemaFuture.Get(ctx, nil); err != nil {
logger.Error("failed to execute schema update at source", slog.Any("error", err))
}
}

mustWait = false
} else {
mustWait = false
}
})

for ctx.Err() == nil && ((!syncDone && !syncErr) || selector.HasPending()) {
selector.Select(ctx)
}
if ctx.Err() != nil {
break
}

restart := syncErr || workflow.GetInfo(ctx).GetContinueAsNewSuggested()
if !stop && !syncErr && mustWait {
waitSelector.Select(ctx)
if restart {
// must flush selector for signals received while waiting
for ctx.Err() == nil && selector.HasPending() {
selector.Select(ctx)
}
break
}
} else if restart {
if syncErr || ctx.Err() != nil || workflow.GetInfo(ctx).GetContinueAsNewSuggested() {
break
}
}
Expand Down

0 comments on commit 98ba6f2

Please sign in to comment.