From b57b2e68382446fd538d93776b549e13badfa0ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 22 Oct 2024 03:29:13 +0000 Subject: [PATCH] bring back tid scan logic, defer to ParentMirrorName for lookup --- flow/activities/flowable.go | 6 ++- flow/activities/flowable_core.go | 6 ++- flow/activities/snapshot_activity.go | 8 ++++ flow/workflows/snapshot_flow.go | 68 +++++++++++++--------------- 4 files changed, 49 insertions(+), 39 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 267e024373..4b6eab5e3f 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -489,7 +489,11 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, }) defer shutdown() - _, snapshotName, _, err := shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, config.FlowJobName) + snapshotFlowName := config.ParentMirrorName + if snapshotFlowName == "" { + snapshotFlowName = config.FlowJobName + } + _, snapshotName, _, err := shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, snapshotFlowName) if err != nil { return nil, err } diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 96ba492dc1..b885e7e79f 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -424,7 +424,11 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn var rowsSynced int errGroup, errCtx := errgroup.WithContext(ctx) errGroup.Go(func() error { - _, snapshotName, _, err := shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, config.FlowJobName) + snapshotFlowName := config.ParentMirrorName + if snapshotFlowName == "" { + snapshotFlowName = config.FlowJobName + } + _, snapshotName, _, err := shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, snapshotFlowName) if err != nil { return err } diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 00759b8683..9a9338fc66 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -129,6 +129,14 @@ func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, flo } } +func (a *SnapshotActivity) LoadSupportsTidScan( + ctx context.Context, + flowJobName string, +) (bool, error) { + _, _, supportsTidScan, err := shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, flowJobName) + return supportsTidScan, err +} + func (a *SnapshotActivity) LoadTableSchema( ctx context.Context, flowName string, diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 4b971ece3d..8e5ce951c2 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -18,14 +18,6 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) -type snapshotType int8 - -const ( - SNAPSHOT_TYPE_UNKNOWN snapshotType = iota - SNAPSHOT_TYPE_SLOT - SNAPSHOT_TYPE_TX -) - type SnapshotFlowExecution struct { config *protos.FlowConnectionConfigs logger log.Logger @@ -67,9 +59,7 @@ func (s *SnapshotFlowExecution) setupReplication(ctx workflow.Context) error { return nil } -func (s *SnapshotFlowExecution) closeSlotKeepAlive( - ctx workflow.Context, -) error { +func (s *SnapshotFlowExecution) closeSlotKeepAlive(ctx workflow.Context) error { s.logger.Info("closing slot keep alive for peer flow") ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ @@ -207,25 +197,35 @@ func (s *SnapshotFlowExecution) cloneTable( return nil } -func (s *SnapshotFlowExecution) cloneTables( - ctx workflow.Context, - snapshotType snapshotType, - maxParallelClones int, -) error { - if snapshotType == SNAPSHOT_TYPE_SLOT { - s.logger.Info("cloning tables for slot") - } else if snapshotType == SNAPSHOT_TYPE_TX { - s.logger.Info("cloning tables for snapshot") - } - +func (s *SnapshotFlowExecution) cloneTables(ctx workflow.Context, maxParallelClones int) error { boundSelector := shared.NewBoundSelector(ctx, "CloneTablesSelector", maxParallelClones) + supportsCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + supportsFuture := workflow.ExecuteActivity( + supportsCtx, + snapshot.LoadSupportsTidScan, + s.config.FlowJobName, + ) + var supportsTIDScans bool + supportsFuture.Get(supportsCtx, &supportsTIDScans) + + defaultPartitionCol := "ctid" + if !supportsTIDScans { + s.logger.Info("Postgres version too old for TID scans, might use full table partitions!") + defaultPartitionCol = "" + } + for _, v := range s.config.TableMappings { source := v.SourceTableIdentifier destination := v.DestinationTableIdentifier s.logger.Info( fmt.Sprintf("Cloning table with source table %s and destination table name %s", source, destination), ) + if v.PartitionKey == "" { + v.PartitionKey = defaultPartitionCol + } if err := s.cloneTable(ctx, boundSelector, v); err != nil { s.logger.Error("failed to start clone child workflow", slog.Any("error", err)) continue @@ -251,10 +251,7 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( } s.logger.Info(fmt.Sprintf("cloning %d tables in parallel", numTablesInParallel)) - if err := s.cloneTables(ctx, - SNAPSHOT_TYPE_SLOT, - numTablesInParallel, - ); err != nil { + if err := s.cloneTables(ctx, numTablesInParallel); err != nil { return fmt.Errorf("failed to clone tables: %w", err) } @@ -320,22 +317,19 @@ func SnapshotFlowWorkflow( ) var sessionError error - sessionSelector := workflow.NewNamedSelector(ctx, "ExportSnapshotSetup") - sessionSelector.AddFuture(fMaintain, func(f workflow.Future) { + cancelCtx, cancel := workflow.WithCancel(ctx) + workflow.GoNamed(ctx, "ExportSnapshotGoroutine", func(ctx workflow.Context) { // MaintainTx should never exit without an error before this point - sessionError = f.Get(exportCtx, nil) + sessionError = fMaintain.Get(exportCtx, nil) + cancel() }) - sessionSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { - sessionError = ctx.Err() - }) - sessionSelector.Select(ctx) - if sessionError != nil { - return sessionError - } - if err := se.cloneTables(ctx, SNAPSHOT_TYPE_TX, numTablesInParallel); err != nil { + if err := se.cloneTables(cancelCtx, numTablesInParallel); err != nil { return fmt.Errorf("failed to clone tables: %w", err) } + if sessionError != nil { + return sessionError + } } else if err := se.cloneTablesWithSlot(ctx, sessionCtx, numTablesInParallel); err != nil { return fmt.Errorf("failed to clone slots and create replication slot: %w", err) }