Skip to content

Commit

Permalink
bring back tid scan logic, defer to ParentMirrorName for lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Oct 22, 2024
1 parent 9d0add2 commit 2696b10
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 42 deletions.
6 changes: 5 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,11 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
})
defer shutdown()

_, snapshotName, _, err := shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, config.FlowJobName)
snapshotFlowName := config.ParentMirrorName
if snapshotFlowName == "" {
snapshotFlowName = config.FlowJobName
}
_, snapshotName, _, err := shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, snapshotFlowName)
if err != nil {
return nil, err
}
Expand Down
6 changes: 5 additions & 1 deletion flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,11 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
var rowsSynced int
errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
_, snapshotName, _, err := shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, config.FlowJobName)
snapshotFlowName := config.ParentMirrorName
if snapshotFlowName == "" {
snapshotFlowName = config.FlowJobName
}
_, snapshotName, _, err := shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, snapshotFlowName)
if err != nil {
return err
}
Expand Down
8 changes: 8 additions & 0 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, flo
}
}

func (a *SnapshotActivity) LoadSupportsTidScan(
ctx context.Context,
flowJobName string,
) (bool, error) {
_, _, supportsTidScan, err := shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, flowJobName)
return supportsTidScan, err
}

func (a *SnapshotActivity) LoadTableSchema(
ctx context.Context,
flowName string,
Expand Down
68 changes: 32 additions & 36 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,6 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
)

type snapshotType int8

const (
SNAPSHOT_TYPE_UNKNOWN snapshotType = iota
SNAPSHOT_TYPE_SLOT
SNAPSHOT_TYPE_TX
)

type SnapshotFlowExecution struct {
config *protos.FlowConnectionConfigs
logger log.Logger
Expand Down Expand Up @@ -67,9 +59,7 @@ func (s *SnapshotFlowExecution) setupReplication(ctx workflow.Context) error {
return nil
}

func (s *SnapshotFlowExecution) closeSlotKeepAlive(
ctx workflow.Context,
) error {
func (s *SnapshotFlowExecution) closeSlotKeepAlive(ctx workflow.Context) error {
s.logger.Info("closing slot keep alive for peer flow")

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
Expand Down Expand Up @@ -207,25 +197,37 @@ func (s *SnapshotFlowExecution) cloneTable(
return nil
}

func (s *SnapshotFlowExecution) cloneTables(
ctx workflow.Context,
snapshotType snapshotType,
maxParallelClones int,
) error {
if snapshotType == SNAPSHOT_TYPE_SLOT {
s.logger.Info("cloning tables for slot")
} else if snapshotType == SNAPSHOT_TYPE_TX {
s.logger.Info("cloning tables for snapshot")
func (s *SnapshotFlowExecution) cloneTables(ctx workflow.Context, maxParallelClones int) error {
boundSelector := shared.NewBoundSelector(ctx, "CloneTablesSelector", maxParallelClones)

supportsCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})
supportsFuture := workflow.ExecuteActivity(
supportsCtx,
snapshot.LoadSupportsTidScan,
s.config.FlowJobName,
)
var supportsTIDScans bool
if err := supportsFuture.Get(supportsCtx, &supportsTIDScans); err != nil {
return err
}

boundSelector := shared.NewBoundSelector(ctx, "CloneTablesSelector", maxParallelClones)
defaultPartitionCol := "ctid"
if !supportsTIDScans {
s.logger.Info("Postgres version too old for TID scans, might use full table partitions!")
defaultPartitionCol = ""
}

for _, v := range s.config.TableMappings {
source := v.SourceTableIdentifier
destination := v.DestinationTableIdentifier
s.logger.Info(
fmt.Sprintf("Cloning table with source table %s and destination table name %s", source, destination),
)
if v.PartitionKey == "" {
v.PartitionKey = defaultPartitionCol
}
if err := s.cloneTable(ctx, boundSelector, v); err != nil {
s.logger.Error("failed to start clone child workflow", slog.Any("error", err))
continue
Expand All @@ -251,10 +253,7 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot(
}

s.logger.Info(fmt.Sprintf("cloning %d tables in parallel", numTablesInParallel))
if err := s.cloneTables(ctx,
SNAPSHOT_TYPE_SLOT,
numTablesInParallel,
); err != nil {
if err := s.cloneTables(ctx, numTablesInParallel); err != nil {
return fmt.Errorf("failed to clone tables: %w", err)
}

Expand Down Expand Up @@ -320,22 +319,19 @@ func SnapshotFlowWorkflow(
)

var sessionError error
sessionSelector := workflow.NewNamedSelector(ctx, "ExportSnapshotSetup")
sessionSelector.AddFuture(fMaintain, func(f workflow.Future) {
cancelCtx, cancel := workflow.WithCancel(ctx)
workflow.GoNamed(ctx, "ExportSnapshotGoroutine", func(ctx workflow.Context) {
// MaintainTx should never exit without an error before this point
sessionError = f.Get(exportCtx, nil)
sessionError = fMaintain.Get(exportCtx, nil)
cancel()
})
sessionSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
sessionError = ctx.Err()
})
sessionSelector.Select(ctx)
if sessionError != nil {
return sessionError
}

if err := se.cloneTables(ctx, SNAPSHOT_TYPE_TX, numTablesInParallel); err != nil {
if err := se.cloneTables(cancelCtx, numTablesInParallel); err != nil {
return fmt.Errorf("failed to clone tables: %w", err)
}
if sessionError != nil {
return sessionError
}
} else if err := se.cloneTablesWithSlot(ctx, sessionCtx, numTablesInParallel); err != nil {
return fmt.Errorf("failed to clone slots and create replication slot: %w", err)
}
Expand Down
4 changes: 0 additions & 4 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,6 @@ message SetupReplicationInput {
string destination_name = 9;
}

message SetupReplicationOutput {
string slot_name = 1;
}

message CreateRawTableInput {
string flow_job_name = 2;
map<string, string> table_name_mapping = 3;
Expand Down

0 comments on commit 2696b10

Please sign in to comment.