From 593093e522406e09fa1a30c0a30998f5761bd524 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Fri, 15 Nov 2024 15:02:26 +0530 Subject: [PATCH] use disconnected ctx to ensure snapshot connection drops (#2258) Temporal has a thing for everything may close #2162 --- flow/workflows/snapshot_flow.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index c8b6a3fd29..d4f494d1ff 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -274,6 +274,13 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( if err != nil { return fmt.Errorf("failed to setup replication: %w", err) } + defer func() { + dCtx, cancel := workflow.NewDisconnectedContext(sessionCtx) + defer cancel() + if err := s.closeSlotKeepAlive(dCtx); err != nil { + s.logger.Error("failed to close slot keep alive", slog.Any("error", err)) + } + }() s.logger.Info(fmt.Sprintf("cloning %d tables in parallel", numTablesInParallel)) if err := s.cloneTables(ctx, @@ -283,13 +290,10 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( slotInfo.SupportsTidScans, numTablesInParallel, ); err != nil { + s.logger.Error("failed to clone tables", slog.Any("error", err)) return fmt.Errorf("failed to clone tables: %w", err) } - if err := s.closeSlotKeepAlive(sessionCtx); err != nil { - return fmt.Errorf("failed to close slot keep alive: %w", err) - } - return nil }