From 5a47b6635da66cc808dcabe228add13bc6889f82 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 15 Nov 2024 01:33:12 +0530 Subject: [PATCH 1/2] use disconnected ctx to ensure snapshot connection drops --- flow/workflows/snapshot_flow.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index c8b6a3fd29..00cbbda05d 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -274,6 +274,13 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( if err != nil { return fmt.Errorf("failed to setup replication: %w", err) } + defer func() { + dCtx, cancel := workflow.NewDisconnectedContext(ctx) + defer cancel() + if err := s.closeSlotKeepAlive(dCtx); err != nil { + s.logger.Error("failed to close slot keep alive", slog.Any("error", err)) + } + }() s.logger.Info(fmt.Sprintf("cloning %d tables in parallel", numTablesInParallel)) if err := s.cloneTables(ctx, @@ -283,13 +290,10 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( slotInfo.SupportsTidScans, numTablesInParallel, ); err != nil { + s.logger.Error("failed to clone tables", slog.Any("error", err)) return fmt.Errorf("failed to clone tables: %w", err) } - if err := s.closeSlotKeepAlive(sessionCtx); err != nil { - return fmt.Errorf("failed to close slot keep alive: %w", err) - } - return nil } From bee19f4ef81bfa051b1682f43b4ee5cee4368440 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 15 Nov 2024 01:47:13 +0530 Subject: [PATCH 2/2] review feedback --- flow/workflows/snapshot_flow.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 00cbbda05d..d4f494d1ff 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -275,7 +275,7 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( return fmt.Errorf("failed to setup replication: %w", err) } defer func() { - dCtx, cancel := workflow.NewDisconnectedContext(ctx) + dCtx, cancel := workflow.NewDisconnectedContext(sessionCtx) defer cancel() if err := s.closeSlotKeepAlive(dCtx); err != nil { s.logger.Error("failed to close slot keep alive", slog.Any("error", err))