diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 3d89236444..8c5c523b0a 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -271,7 +271,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, err = errGroup.Wait() if err != nil { a.Alerter.LogFlowError(ctx, flowName, err) - return nil, fmt.Errorf("failed to pull records: %w", err) + return nil, fmt.Errorf("failed in pull records when: %w", err) } slog.InfoContext(ctx, "no records to push") syncResponse := &model.SyncResponse{} @@ -615,7 +615,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, recordBatch, err := srcConn.PullQRepRecords(config, partition) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return fmt.Errorf("failed to pull records: %w", err) + return fmt.Errorf("failed to pull qrep records: %w", err) } numRecords := int64(recordBatch.NumRecords) slog.InfoContext(ctx, fmt.Sprintf("pulled %d records\n", len(recordBatch.Records))) @@ -955,7 +955,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, numRecords, currentSnapshotXmin, pullErr = pgConn.PullXminRecordStream(config, partition, stream) if pullErr != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - slog.InfoContext(ctx, fmt.Sprintf("failed to pull records: %v", err)) + slog.InfoContext(ctx, fmt.Sprintf("[xmin] failed to pull records: %v", err)) return err } diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index ff2c9f5898..d5de8947c5 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "log/slog" + "regexp" "time" "github.com/PeerDB-io/peer-flow/connectors/utils" @@ -24,6 +25,8 @@ import ( "go.temporal.io/sdk/activity" ) +const maxRetriesForWalSegmentRemoved = 5 + type PostgresCDCSource struct { ctx context.Context replPool *pgxpool.Pool @@ -44,6 +47,8 @@ type PostgresCDCSource struct { // for storing chema delta audit logs to catalog catalogPool *pgxpool.Pool flowJobName string + + walSegmentRemovedRegex *regexp.Regexp } type PostgresCDCConfig struct { @@ -59,6 +64,12 @@ type PostgresCDCConfig struct { SetLastOffset func(int64) error } +type startReplicationOpts struct { + conn *pgconn.PgConn + startLSN pglogrepl.LSN + replicationOpts pglogrepl.StartReplicationOptions +} + // Create a new PostgresCDCSource func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32]string) (*PostgresCDCSource, error) { childToParentRelIDMap, err := getChildToParentRelIDMap(cdcConfig.AppContext, cdcConfig.Connection) @@ -66,6 +77,9 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32 return nil, fmt.Errorf("error getting child to parent relid map: %w", err) } + pattern := "requested WAL segment .* has already been removed.*" + regex := regexp.MustCompile(pattern) + flowName, _ := cdcConfig.AppContext.Value(shared.FlowNameKey).(string) return &PostgresCDCSource{ ctx: cdcConfig.AppContext, @@ -83,6 +97,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32 logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), catalogPool: cdcConfig.CatalogPool, flowJobName: cdcConfig.FlowJobName, + walSegmentRemovedRegex: regex, }, nil } @@ -120,21 +135,11 @@ func getChildToParentRelIDMap(ctx context.Context, pool *pgxpool.Pool) (map[uint // PullRecords pulls records from the cdc stream func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error { - // setup options - pluginArguments := []string{ - "proto_version '1'", - } - - if p.publication != "" { - pubOpt := fmt.Sprintf("publication_names '%s'", p.publication) - pluginArguments = append(pluginArguments, pubOpt) - } else { - return fmt.Errorf("publication name is not set") + replicationOpts, err := p.replicationOptions() + if err != nil { + return fmt.Errorf("error getting replication options: %w", err) } - replicationOpts := pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments} - replicationSlot := p.slot - // create replication connection replicationConn, err := p.replPool.Acquire(p.ctx) if err != nil { @@ -146,13 +151,6 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error { pgConn := replicationConn.Conn().PgConn() p.logger.Info("created replication connection") - sysident, err := pglogrepl.IdentifySystem(p.ctx, pgConn) - if err != nil { - return fmt.Errorf("IdentifySystem failed: %w", err) - } - p.logger.Debug(fmt.Sprintf("SystemID: %s, Timeline: %d, XLogPos: %d, DBName: %s", - sysident.SystemID, sysident.Timeline, sysident.XLogPos, sysident.DBName)) - // start replication var clientXLogPos, startLSN pglogrepl.LSN if req.LastOffset > 0 { @@ -161,15 +159,48 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error { startLSN = clientXLogPos + 1 } - err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, startLSN, replicationOpts) + opts := startReplicationOpts{ + conn: pgConn, + startLSN: startLSN, + replicationOpts: *replicationOpts, + } + + err = p.startReplication(opts) if err != nil { - return fmt.Errorf("error starting replication at startLsn - %d: %w", startLSN, err) + return fmt.Errorf("error starting replication: %w", err) } + p.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", p.slot, startLSN)) return p.consumeStream(pgConn, req, clientXLogPos, req.RecordStream) } +func (p *PostgresCDCSource) startReplication(opts startReplicationOpts) error { + err := pglogrepl.StartReplication(p.ctx, opts.conn, p.slot, opts.startLSN, opts.replicationOpts) + if err != nil { + p.logger.Error("error starting replication", slog.Any("error", err)) + return fmt.Errorf("error starting replication at startLsn - %d: %w", opts.startLSN, err) + } + + p.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", p.slot, opts.startLSN)) + return nil +} + +func (p *PostgresCDCSource) replicationOptions() (*pglogrepl.StartReplicationOptions, error) { + pluginArguments := []string{ + "proto_version '1'", + } + + if p.publication != "" { + pubOpt := fmt.Sprintf("publication_names '%s'", p.publication) + pluginArguments = append(pluginArguments, pubOpt) + } else { + return nil, fmt.Errorf("publication name is not set") + } + + return &pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments}, nil +} + // start consuming the cdc stream func (p *PostgresCDCSource) consumeStream( conn *pgconn.PgConn, @@ -250,6 +281,7 @@ func (p *PostgresCDCSource) consumeStream( pkmRequiresResponse := false waitingForCommit := false + retryAttemptForWALSegmentRemoved := 0 for { if pkmRequiresResponse { @@ -324,6 +356,20 @@ func (p *PostgresCDCSource) consumeStream( } if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok { + if errMsg.Severity == "ERROR" && errMsg.Code == "XX000" { + if p.walSegmentRemovedRegex.MatchString(errMsg.Message) { + retryAttemptForWALSegmentRemoved++ + if retryAttemptForWALSegmentRemoved > maxRetriesForWalSegmentRemoved { + return fmt.Errorf("max retries for WAL segment removed exceeded: %+v", errMsg) + } else { + p.logger.Warn(fmt.Sprintf( + "WAL segment removed, restarting replication retrying in 30 seconds..."), + slog.Any("error", errMsg), slog.Int("retryAttempt", retryAttemptForWALSegmentRemoved)) + time.Sleep(30 * time.Second) + continue + } + } + } return fmt.Errorf("received Postgres WAL error: %+v", errMsg) }