Skip to content

Commit

Permalink
poc
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 19, 2024
1 parent c0e5fba commit bb103e9
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 270 deletions.
128 changes: 73 additions & 55 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"log/slog"
"sync"
"sync/atomic"
"time"

Expand All @@ -16,7 +15,6 @@ import (
"go.opentelemetry.io/otel/metric"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peer-flow/alerting"
Expand All @@ -43,7 +41,7 @@ type NormalizeBatchRequest struct {
BatchID int64
}

type CdcCacheEntry struct {
type CdcState struct {
connector connectors.CDCPullConnectorCore
syncDone chan struct{}
normalize chan NormalizeBatchRequest
Expand All @@ -53,9 +51,7 @@ type CdcCacheEntry struct {
type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
CdcCache map[string]CdcCacheEntry
OtelManager *otel_metrics.OtelManager
CdcCacheRw sync.RWMutex
}

func (a *FlowableActivity) CheckConnection(
Expand Down Expand Up @@ -253,91 +249,115 @@ func (a *FlowableActivity) CreateNormalizedTable(
}, nil
}

func (a *FlowableActivity) MaintainPull(
func (a *FlowableActivity) maintainPull(
ctx context.Context,
config *protos.FlowConnectionConfigs,
sessionID string,
) error {
) (CdcState, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, config.Env, a.CatalogPool, config.SourceName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
return CdcState{}, err
}
defer connectors.CloseConnector(ctx, srcConn)

if err := srcConn.SetupReplConn(ctx); err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
connectors.CloseConnector(ctx, srcConn)
return CdcState{}, err
}

normalizeBufferSize, err := peerdbenv.PeerDBNormalizeChannelBufferSize(ctx, config.Env)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
connectors.CloseConnector(ctx, srcConn)
return CdcState{}, err
}

// syncDone will be closed by UnmaintainPull,
// whereas normalizeDone will be closed by the normalize goroutine
// syncDone will be closed by SyncFlow,
// whereas normalizeDone will be closed by normalizing goroutine
// Wait on normalizeDone at end to not interrupt final normalize
syncDone := make(chan struct{})
normalize := make(chan NormalizeBatchRequest, normalizeBufferSize)
normalizeDone := make(chan struct{})
a.CdcCacheRw.Lock()
a.CdcCache[sessionID] = CdcCacheEntry{

go a.normalizeLoop(ctx, config, syncDone, normalize, normalizeDone)
go func() {
defer connectors.CloseConnector(ctx, srcConn)
err := a.maintainReplConn(ctx, config.FlowJobName, srcConn, syncDone)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
}
// TODO propagate error
}()

return CdcState{
connector: srcConn,
syncDone: syncDone,
normalize: normalize,
normalizeDone: normalizeDone,
}, nil
}

func (a *FlowableActivity) SyncFlow(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

cdcState, err := a.maintainPull(ctx, config)
if err != nil {
logger.Error("MaintainPull failed", slog.Any("error", err))
return err
}
a.CdcCacheRw.Unlock()

ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
currentSyncFlowNum := int32(0)
totalRecordsSynced := int64(0)

go a.normalizeLoop(ctx, config, syncDone, normalize, normalizeDone)
for ctx.Err() == nil {
currentSyncFlowNum += 1
logger.Info("executing sync flow", slog.Int("count", int(currentSyncFlowNum)))

for {
select {
case <-ticker.C:
activity.RecordHeartbeat(ctx, "keep session alive")
if err := srcConn.ReplPing(ctx); err != nil {
a.CdcCacheRw.Lock()
delete(a.CdcCache, sessionID)
a.CdcCacheRw.Unlock()
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", err)
var numRecordsSynced int64
var syncErr error
if config.System == protos.TypeSystem_Q {
numRecordsSynced, syncErr = a.SyncRecords(ctx, config, options, cdcState)
} else {
numRecordsSynced, syncErr = a.SyncPg(ctx, config, options, cdcState)
}

if syncErr != nil {
logger.Error("failed to execute sync flow, sleeping for 30 seconds...", slog.Any("error", syncErr))
time.Sleep(30 * time.Second)
break
} else {
totalRecordsSynced += numRecordsSynced
logger.Info("Total records synced",
slog.Int64("numRecordsSynced", numRecordsSynced), slog.Int64("totalRecordsSynced", totalRecordsSynced))

if options.NumberOfSyncs > 0 && currentSyncFlowNum >= options.NumberOfSyncs {
break
}
case <-syncDone:
return nil
case <-ctx.Done():
a.CdcCacheRw.Lock()
delete(a.CdcCache, sessionID)
a.CdcCacheRw.Unlock()
return nil
}
}
}

func (a *FlowableActivity) UnmaintainPull(ctx context.Context, sessionID string) error {
var normalizeDone chan struct{}
a.CdcCacheRw.Lock()
if entry, ok := a.CdcCache[sessionID]; ok {
close(entry.syncDone)
delete(a.CdcCache, sessionID)
normalizeDone = entry.normalizeDone
close(cdcState.syncDone)
<-cdcState.normalizeDone

if err := ctx.Err(); err != nil {
logger.Info("sync canceled", slog.Any("error", err))
return err
}
a.CdcCacheRw.Unlock()
<-normalizeDone
return nil
}

func (a *FlowableActivity) SyncRecords(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
sessionID string,
) (model.SyncRecordsResult, error) {
cdcState CdcState,
) (int64, error) {
var adaptStream func(stream *model.CDCStream[model.RecordItems]) (*model.CDCStream[model.RecordItems], error)
if config.Script != "" {
var onErr context.CancelCauseFunc
Expand Down Expand Up @@ -368,22 +388,20 @@ func (a *FlowableActivity) SyncRecords(
return stream, nil
}
}
numRecords, err := syncCore(ctx, a, config, options, sessionID, adaptStream,
return syncCore(ctx, a, config, options, cdcState, adaptStream,
connectors.CDCPullConnector.PullRecords,
connectors.CDCSyncConnector.SyncRecords)
return model.SyncRecordsResult{NumRecordsSynced: numRecords}, err
}

func (a *FlowableActivity) SyncPg(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
sessionID string,
) (model.SyncRecordsResult, error) {
numRecords, err := syncCore(ctx, a, config, options, sessionID, nil,
cdcState CdcState,
) (int64, error) {
return syncCore(ctx, a, config, options, cdcState, nil,
connectors.CDCPullPgConnector.PullPg,
connectors.CDCSyncPgConnector.SyncPg)
return model.SyncRecordsResult{NumRecordsSynced: numRecords}, err
}

func (a *FlowableActivity) StartNormalize(
Expand Down
68 changes: 25 additions & 43 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"fmt"
"log/slog"
"reflect"
"sync/atomic"
"time"

Expand Down Expand Up @@ -50,43 +49,6 @@ func heartbeatRoutine(
)
}

func waitForCdcCache[TPull connectors.CDCPullConnectorCore](
ctx context.Context, a *FlowableActivity, sessionID string,
) (TPull, chan NormalizeBatchRequest, error) {
var none TPull
logger := activity.GetLogger(ctx)
attempt := 0
waitInterval := time.Second
// try for 5 minutes, once per second
// after that, try indefinitely every minute
for {
a.CdcCacheRw.RLock()
entry, ok := a.CdcCache[sessionID]
a.CdcCacheRw.RUnlock()
if ok {
if conn, ok := entry.connector.(TPull); ok {
return conn, entry.normalize, nil
}
return none, nil, fmt.Errorf("expected %s, cache held %T", reflect.TypeFor[TPull]().Name(), entry.connector)
}
activity.RecordHeartbeat(ctx, fmt.Sprintf("wait %s for source connector", waitInterval))
attempt += 1
if attempt > 2 {
logger.Info("waiting on source connector setup",
slog.Int("attempt", attempt), slog.String("sessionID", sessionID))
}
if err := ctx.Err(); err != nil {
return none, nil, err
}
time.Sleep(waitInterval)
if attempt == 300 {
logger.Info("source connector not setup in time, transition to slow wait",
slog.String("sessionID", sessionID))
waitInterval = time.Minute
}
}
}

func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowName string) (map[string]*protos.TableSchema, error) {
rows, err := a.CatalogPool.Query(ctx, "select table_name, table_schema from table_schema_mapping where flow_name = $1", flowName)
if err != nil {
Expand Down Expand Up @@ -142,7 +104,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
a *FlowableActivity,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
sessionID string,
cdcState CdcState,
adaptStream func(*model.CDCStream[Items]) (*model.CDCStream[Items], error),
pull func(TPull, context.Context, *pgxpool.Pool, *otel_metrics.OtelManager, *model.PullRecordsRequest[Items]) error,
sync func(TSync, context.Context, *model.SyncRecordsRequest[Items]) (*model.SyncResponse, error),
Expand All @@ -160,10 +122,8 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

srcConn, normChan, err := waitForCdcCache[TPull](ctx, a, sessionID)
if err != nil {
return 0, err
}
srcConn := cdcState.connector.(TPull)
normChan := cdcState.normalize
if err := srcConn.ConnectionActive(ctx); err != nil {
return 0, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
}
Expand Down Expand Up @@ -630,6 +590,28 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
return currentSnapshotXmin, nil
}

func (a *FlowableActivity) maintainReplConn(
ctx context.Context, flowName string, srcConn connectors.CDCPullConnector, syncDone <-chan struct{},
) error {
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
activity.RecordHeartbeat(ctx, "keep session alive")
if err := srcConn.ReplPing(ctx); err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return fmt.Errorf("connection to source down: %w", err)
}
case <-syncDone:
return nil
case <-ctx.Done():
return nil
}
}
}

// Suitable to be run as goroutine
func (a *FlowableActivity) normalizeLoop(
ctx context.Context,
Expand Down
1 change: 0 additions & 1 deletion flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
w.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerting.NewAlerter(context.Background(), conn),
CdcCache: make(map[string]activities.CdcCacheEntry),
OtelManager: otelManager,
})

Expand Down
4 changes: 0 additions & 4 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,6 @@ type SyncResponse struct {
CurrentSyncBatchID int64
}

type SyncRecordsResult struct {
NumRecordsSynced int64
}

type NormalizeResponse struct {
StartBatchID int64
EndBatchID int64
Expand Down
33 changes: 9 additions & 24 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,21 +473,13 @@ func CDCFlowWorkflow(
state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING
}

syncFlowID := GetChildWorkflowID("sync-flow", cfg.FlowJobName, originalRunID)

var restart, finished bool
syncFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: syncFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
TypedSearchAttributes: mirrorNameSearch,
WaitForCancellation: true,
}
syncCtx := workflow.WithChildOptions(ctx, syncFlowOpts)

syncFlowFuture := workflow.ExecuteChildWorkflow(syncCtx, SyncFlowWorkflow, cfg, state.SyncFlowOptions)
var finished bool
syncCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 365 * 24 * time.Hour,
HeartbeatTimeout: time.Minute,
WaitForCancellation: true,
})
syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions)

mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop")
mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
Expand All @@ -509,7 +501,6 @@ func CDCFlowWorkflow(
logger.Info("sync finished")
}
syncFlowFuture = nil
restart = true
finished = true
if state.SyncFlowOptions.NumberOfSyncs > 0 {
state.ActiveSignal = model.PauseSignal
Expand Down Expand Up @@ -537,16 +528,10 @@ func CDCFlowWorkflow(
}

if shared.ShouldWorkflowContinueAsNew(ctx) {
restart = true
if syncFlowFuture != nil {
if err := model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil); err != nil {
logger.Warn("failed to send sync-stop, finishing", slog.Any("error", err))
finished = true
}
}
finished = true
}

if restart || finished {
if finished {
for ctx.Err() == nil && (!finished || mainLoopSelector.HasPending()) {
mainLoopSelector.Select(ctx)
}
Expand Down
Loading

0 comments on commit bb103e9

Please sign in to comment.