Skip to content

Commit

Permalink
sync state is not boolean
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 30, 2024
1 parent 5fb5652 commit 6d65399
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 21 deletions.
33 changes: 16 additions & 17 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,28 +250,26 @@ func (a *FlowableActivity) SyncFlow(
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

currentSyncFlowNum := atomic.Int32{}
totalRecordsSynced := atomic.Int64{}
normalizingBatchID := atomic.Int64{}
normalizeWaiting := atomic.Bool{}
syncingBatchID := atomic.Int64{}
syncWaiting := atomic.Bool{}
var currentSyncFlowNum atomic.Int32
var totalRecordsSynced atomic.Int64
var normalizingBatchID atomic.Int64
var normalizeWaiting atomic.Bool
var syncingBatchID atomic.Int64
var syncState atomic.Pointer[string]
syncState.Store(shared.Ptr("setup"))

shutdown := heartbeatRoutine(ctx, func() string {
// Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch
sBatchID := syncingBatchID.Load()
nBatchID := normalizingBatchID.Load()
var sWaiting, nWaiting string
if syncWaiting.Load() {
sWaiting = " (W)"
}
var nWaiting string
if normalizeWaiting.Load() {
nWaiting = " (W)"
}
return fmt.Sprintf(
"currentSyncFlowNum:%d, totalRecordsSynced:%d, syncingBatchID:%d%s, normalizingBatchID:%d%s",
"currentSyncFlowNum:%d, totalRecordsSynced:%d, syncingBatchID:%d (%s), normalizingBatchID:%d%s",
currentSyncFlowNum.Load(), totalRecordsSynced.Load(),
sBatchID, sWaiting, nBatchID, nWaiting,
sBatchID, *syncState.Load(), nBatchID, nWaiting,
)
})
defer shutdown()
Expand Down Expand Up @@ -323,19 +321,19 @@ func (a *FlowableActivity) SyncFlow(
var syncErr error
if config.System == protos.TypeSystem_Q {
syncResponse, syncErr = a.syncRecords(groupCtx, config, options, srcConn.(connectors.CDCPullConnector),
normRequests, &syncingBatchID, &syncWaiting)
normRequests, &syncingBatchID, &syncState)
} else {
syncResponse, syncErr = a.syncPg(groupCtx, config, options, srcConn.(connectors.CDCPullPgConnector),
normRequests, &syncingBatchID, &syncWaiting)
normRequests, &syncingBatchID, &syncState)
}
syncWaiting.Store(true)

if syncErr != nil {
if groupCtx.Err() != nil {
// need to return ctx.Err(), avoid returning syncErr that's wrapped context canceled
break
}
logger.Error("failed to sync records", slog.Any("error", syncErr))
syncState.Store(shared.Ptr("cleanup"))
close(syncDone)
return errors.Join(syncErr, group.Wait())
} else {
Expand All @@ -349,6 +347,7 @@ func (a *FlowableActivity) SyncFlow(
}
}

syncState.Store(shared.Ptr("cleanup"))
close(syncDone)
waitErr := group.Wait()
if err := ctx.Err(); err != nil {
Expand All @@ -368,7 +367,7 @@ func (a *FlowableActivity) syncRecords(
srcConn connectors.CDCPullConnector,
normRequests chan<- NormalizeBatchRequest,
syncingBatchID *atomic.Int64,
syncWaiting *atomic.Bool,
syncWaiting *atomic.Pointer[string],
) (*model.SyncResponse, error) {
var adaptStream func(stream *model.CDCStream[model.RecordItems]) (*model.CDCStream[model.RecordItems], error)
if config.Script != "" {
Expand Down Expand Up @@ -413,7 +412,7 @@ func (a *FlowableActivity) syncPg(
srcConn connectors.CDCPullPgConnector,
normRequests chan<- NormalizeBatchRequest,
syncingBatchID *atomic.Int64,
syncWaiting *atomic.Bool,
syncWaiting *atomic.Pointer[string],
) (*model.SyncResponse, error) {
return syncCore(ctx, a, config, options, srcConn, normRequests,
syncingBatchID, syncWaiting, nil,
Expand Down
10 changes: 6 additions & 4 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
srcConn TPull,
normRequests chan<- NormalizeBatchRequest,
syncingBatchID *atomic.Int64,
syncWaiting *atomic.Bool,
syncState *atomic.Pointer[string],
adaptStream func(*model.CDCStream[Items]) (*model.CDCStream[Items], error),
pull func(TPull, context.Context, *pgxpool.Pool, *otel_metrics.OtelManager, *model.PullRecordsRequest[Items]) error,
sync func(TSync, context.Context, *model.SyncRecordsRequest[Items]) (*model.SyncResponse, error),
Expand Down Expand Up @@ -167,7 +167,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}

startTime := time.Now()
syncWaiting.Store(false)
syncState.Store(shared.Ptr("syncing"))
errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return pull(srcConn, errCtx, a.CatalogPool, a.OtelManager, &model.PullRecordsRequest[Items]{
Expand Down Expand Up @@ -206,14 +206,14 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}
}
logger.Info("no records to push")
syncWaiting.Store(true)

dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName)
if err != nil {
return nil, fmt.Errorf("failed to recreate destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

syncState.Store(shared.Ptr("updating schema"))
if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, recordBatchSync.SchemaDeltas); err != nil {
return nil, fmt.Errorf("failed to sync schema: %w", err)
}
Expand Down Expand Up @@ -280,7 +280,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return nil, fmt.Errorf("failed to pull records: %w", err)
}
}
syncWaiting.Store(true)
syncState.Store(shared.Ptr("bookkeeping"))

syncDuration := time.Since(syncStartTime)
lastCheckpoint := recordBatchSync.GetLastCheckpoint()
Expand Down Expand Up @@ -319,6 +319,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}
}

syncState.Store(shared.Ptr("updating schema"))
if err := a.applySchemaDeltas(ctx, config, options, res.TableSchemaDeltas); err != nil {
return nil, err
}
Expand All @@ -332,6 +333,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
if !parallel {
done = make(chan struct{})
}
syncState.Store(shared.Ptr("normalizing"))
select {
case normRequests <- NormalizeBatchRequest{BatchID: res.CurrentSyncBatchID, Done: done}:
case <-ctx.Done():
Expand Down
4 changes: 4 additions & 0 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,7 @@ const (
)

const FetchAndChannelSize = 256 * 1024

func Ptr[T any](x T) *T {
return &x
}

0 comments on commit 6d65399

Please sign in to comment.