Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix initial snapshot replay #2168

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,17 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
return "getting partitions for job"
})
defer shutdown()
partitions, err := srcConn.GetQRepPartitions(ctx, config, last)

snapshotName := ""
if config.ParentMirrorName != "" {
_, snapshotName, _, err = shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, config.ParentMirrorName)
if err != nil {
a.Alerter.LogFlowError(ctx, "[GetQRepPartitions] "+config.FlowJobName, err)
return nil, fmt.Errorf("[GetQRepPartitions] failed to LoadSnapshotNameFromCatalog: %w", err)
}
}

partitions, err := srcConn.GetQRepPartitions(ctx, config, last, snapshotName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to get partitions from source: %w", err)
Expand Down
33 changes: 28 additions & 5 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,10 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
outstream TRead,
pullRecords func(
TPull,
context.Context, *protos.QRepConfig,
context.Context,
*protos.QRepConfig,
*protos.QRepPartition,
string,
TWrite,
) (int, error),
syncRecords func(TSync, context.Context, *protos.QRepConfig, *protos.QRepPartition, TRead) (int, error),
Expand Down Expand Up @@ -422,14 +424,23 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
var rowsSynced int
errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
snapshotName := ""
if config.ParentMirrorName != "" {
_, snapshotName, _, err = shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, config.ParentMirrorName)
if err != nil {
serprex marked this conversation as resolved.
Show resolved Hide resolved
a.Alerter.LogFlowError(ctx, "[replicateQRepPartition] "+config.FlowJobName, err)
return fmt.Errorf("[replicateQRepPartition] failed to LoadSnapshotNameFromCatalog: %w", err)
}
}

srcConn, err := connectors.GetByNameAs[TPull](ctx, config.Env, a.CatalogPool, config.SourceName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to get qrep source connector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)

tmp, err := pullRecords(srcConn, errCtx, config, partition, stream)
tmp, err := pullRecords(srcConn, errCtx, config, partition, snapshotName, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to pull records: %w", err)
Expand Down Expand Up @@ -479,8 +490,10 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
outstream TRead,
pullRecords func(
*connpostgres.PostgresConnector,
context.Context, *protos.QRepConfig,
context.Context,
*protos.QRepConfig,
*protos.QRepPartition,
string,
TWrite,
) (int, int64, error),
syncRecords func(TSync, context.Context, *protos.QRepConfig, *protos.QRepPartition, TRead) (int, error),
Expand All @@ -501,6 +514,16 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
var currentSnapshotXmin int64
var rowsSynced int
errGroup.Go(func() error {
snapshotName := ""
if config.ParentMirrorName != "" {
var err error
_, snapshotName, _, err = shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, config.ParentMirrorName)
serprex marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
a.Alerter.LogFlowError(ctx, "[replicateXminPartition] "+config.FlowJobName, err)
return fmt.Errorf("[replicateXminPartition] failed to LoadSnapshotNameFromCatalog: %w", err)
}
}

srcConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, config.Env, a.CatalogPool, config.SourceName)
if err != nil {
return fmt.Errorf("failed to get qrep source connector: %w", err)
Expand All @@ -509,10 +532,10 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn

var pullErr error
var numRecords int
numRecords, currentSnapshotXmin, pullErr = pullRecords(srcConn, ctx, config, partition, stream)
numRecords, currentSnapshotXmin, pullErr = pullRecords(srcConn, ctx, config, partition, snapshotName, stream)
if pullErr != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, pullErr)
logger.Warn(fmt.Sprintf("[xmin] failed to pull recordS: %v", pullErr))
logger.Warn(fmt.Sprintf("[xmin] failed to pull records: %v", pullErr))
return pullErr
}

Expand Down
108 changes: 53 additions & 55 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"

Expand All @@ -19,21 +20,14 @@ import (
)

type SlotSnapshotState struct {
connector connectors.CDCPullConnector
signal connpostgres.SlotSignal
snapshotName string
}

type TxSnapshotState struct {
SnapshotName string
SupportsTIDScans bool
connector connectors.CDCPullConnector
signal connpostgres.SlotSignal
}

type SnapshotActivity struct {
Alerter *alerting.Alerter
CatalogPool *pgxpool.Pool
SlotSnapshotStates map[string]SlotSnapshotState
TxSnapshotStates map[string]TxSnapshotState
SnapshotStatesMutex sync.Mutex
}

Expand All @@ -55,7 +49,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s
func (a *SnapshotActivity) SetupReplication(
ctx context.Context,
config *protos.SetupReplicationInput,
) (*protos.SetupReplicationOutput, error) {
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

Expand All @@ -65,9 +59,9 @@ func (a *SnapshotActivity) SetupReplication(
if err != nil {
if errors.Is(err, errors.ErrUnsupported) {
logger.Info("setup replication is no-op for non-postgres source")
return nil, nil
return nil
}
return nil, fmt.Errorf("failed to get connector: %w", err)
return fmt.Errorf("failed to get connector: %w", err)
}

closeConnectionForError := func(err error) {
Expand All @@ -84,85 +78,89 @@ func (a *SnapshotActivity) SetupReplication(

if slotInfo.Err != nil {
closeConnectionForError(slotInfo.Err)
return nil, fmt.Errorf("slot error: %w", slotInfo.Err)
return fmt.Errorf("slot error: %w", slotInfo.Err)
} else {
logger.Info("slot created", slog.String("SlotName", slotInfo.SlotName))
}

a.SnapshotStatesMutex.Lock()
defer a.SnapshotStatesMutex.Unlock()
for {
var slotName string
if err := a.CatalogPool.QueryRow(
ctx,
"select slot_name from snapshot_names where flow_name = $1",
config.FlowJobName,
).Scan(&slotName); err == nil && slotName != "" {
if err := conn.ExecuteCommand(
ctx,
"select pg_drop_replication_slot($1)",
slotName,
); err != nil && !shared.IsSQLStateError(err, pgerrcode.UndefinedObject) {
if shared.IsSQLStateError(err, pgerrcode.ObjectInUse) {
a.Alerter.LogFlowError(ctx, "[SetupReplication] "+config.FlowJobName, err)
time.Sleep(time.Second * 15)
continue
}
return fmt.Errorf("failed to drop slot from previous run: %w", err)
}
}
break
}

if _, err := a.CatalogPool.Exec(ctx,
`insert into snapshot_names (flow_name, slot_name, snapshot_name, supports_tid_scan) values ($1, $2, $3, $4)
on conflict (flow_name) do update set slot_name = $2, snapshot_name = $3, supports_tid_scan = $4`,
config.FlowJobName, slotInfo.SlotName, slotInfo.SnapshotName, slotInfo.SupportsTIDScans,
); err != nil {
return err
}

a.SnapshotStatesMutex.Lock()
a.SlotSnapshotStates[config.FlowJobName] = SlotSnapshotState{
signal: slotSignal,
snapshotName: slotInfo.SnapshotName,
connector: conn,
signal: slotSignal,
connector: conn,
}
a.SnapshotStatesMutex.Unlock()

return &protos.SetupReplicationOutput{
SlotName: slotInfo.SlotName,
SnapshotName: slotInfo.SnapshotName,
SupportsTidScans: slotInfo.SupportsTIDScans,
}, nil
return nil
}

func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, peer string) error {
func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, flowJobName string, peer string) error {
conn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, nil, a.CatalogPool, peer)
if err != nil {
return err
}
defer connectors.CloseConnector(ctx, conn)

exportSnapshotOutput, tx, err := conn.ExportTxSnapshot(ctx)
tx, err := conn.ExportTxSnapshot(ctx, flowJobName)
if err != nil {
return err
}

a.SnapshotStatesMutex.Lock()
a.TxSnapshotStates[sessionID] = TxSnapshotState{
SnapshotName: exportSnapshotOutput.SnapshotName,
SupportsTIDScans: exportSnapshotOutput.SupportsTidScans,
}
a.SnapshotStatesMutex.Unlock()

logger := activity.GetLogger(ctx)
start := time.Now()
for {
msg := fmt.Sprintf("maintaining export snapshot transaction %s", time.Since(start).Round(time.Second))
logger.Info(msg)
// this function relies on context cancellation to exit
// context is not explicitly cancelled, but workflow exit triggers an implicit cancel
// from activity.RecordBeat
// from activity.RecordHeartBeat
activity.RecordHeartbeat(ctx, msg)
if ctx.Err() != nil {
a.SnapshotStatesMutex.Lock()
delete(a.TxSnapshotStates, sessionID)
a.SnapshotStatesMutex.Unlock()
return conn.FinishExport(tx)
}
time.Sleep(time.Minute)
}
}

func (a *SnapshotActivity) WaitForExportSnapshot(ctx context.Context, sessionID string) (*TxSnapshotState, error) {
logger := activity.GetLogger(ctx)
attempt := 0
for {
a.SnapshotStatesMutex.Lock()
tsc, ok := a.TxSnapshotStates[sessionID]
a.SnapshotStatesMutex.Unlock()
if ok {
return &tsc, nil
}
activity.RecordHeartbeat(ctx, "wait another second for snapshot export")
attempt += 1
if attempt > 2 {
logger.Info("waiting on snapshot export", slog.Int("attempt", attempt))
}
if err := ctx.Err(); err != nil {
return nil, err
}
time.Sleep(time.Second)
func (a *SnapshotActivity) LoadSupportsTidScan(
ctx context.Context,
flowJobName string,
) (bool, error) {
_, _, supportsTidScan, err := shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, flowJobName)
if err != nil {
a.Alerter.LogFlowError(ctx, "[LoadSupportsTidScan] "+flowJobName, err)
}
return supportsTidScan, err
}

func (a *SnapshotActivity) LoadTableSchema(
Expand Down
1 change: 0 additions & 1 deletion flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Work
// explicitly not initializing mutex, in line with design
w.RegisterActivity(&activities.SnapshotActivity{
SlotSnapshotStates: make(map[string]activities.SlotSnapshotState),
TxSnapshotStates: make(map[string]activities.TxSnapshotState),
Alerter: alerting.NewAlerter(context.Background(), conn),
CatalogPool: conn,
})
Expand Down
9 changes: 5 additions & 4 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type CDCPullConnectorCore interface {

// For InitialSnapshotOnly correctness without replication slot
// `any` is for returning transaction if necessary
ExportTxSnapshot(context.Context) (*protos.ExportTxSnapshotOutput, any, error)
ExportTxSnapshot(ctx context.Context, flowJobName string) (any, error)

// `any` from ExportSnapshot passed here when done, allowing transaction to commit
FinishExport(any) error
Expand Down Expand Up @@ -201,21 +201,22 @@ type QRepPullConnectorCore interface {
Connector

// GetQRepPartitions returns the partitions for a given table that haven't been synced yet.
GetQRepPartitions(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition) ([]*protos.QRepPartition, error)
GetQRepPartitions(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition,
snapshotName string) ([]*protos.QRepPartition, error)
}

type QRepPullConnector interface {
QRepPullConnectorCore

// PullQRepRecords returns the records for a given partition.
PullQRepRecords(context.Context, *protos.QRepConfig, *protos.QRepPartition, *model.QRecordStream) (int, error)
PullQRepRecords(context.Context, *protos.QRepConfig, *protos.QRepPartition, string, *model.QRecordStream) (int, error)
}

type QRepPullPgConnector interface {
QRepPullConnectorCore

// PullPgQRepRecords returns the records for a given partition.
PullPgQRepRecords(context.Context, *protos.QRepConfig, *protos.QRepPartition, connpostgres.PgCopyWriter) (int, error)
PullPgQRepRecords(context.Context, *protos.QRepConfig, *protos.QRepPartition, string, connpostgres.PgCopyWriter) (int, error)
}

type QRepSyncConnectorCore interface {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,8 +668,8 @@ func (c *PostgresConnector) checkIfTableExistsWithTx(
return result.Bool, nil
}

func (c *PostgresConnector) ExecuteCommand(ctx context.Context, command string) error {
_, err := c.conn.Exec(ctx, command)
func (c *PostgresConnector) ExecuteCommand(ctx context.Context, command string, args ...any) error {
_, err := c.conn.Exec(ctx, command, args...)
return err
}

Expand Down
Loading
Loading