From 9d4065b10a794dd0f1a7029548a9ed94a3fe7300 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 26 Jan 2024 14:25:45 +0000 Subject: [PATCH] Don't call ticker.Reset (#1162) Tickers are periodic: https://pkg.go.dev/time#NewTicker > The ticker will adjust the time interval or drop ticks to make up for slow receivers --- flow/activities/flowable.go | 4 +--- flow/activities/slot.go | 4 +--- flow/connectors/eventhub/eventhub.go | 6 +----- flow/connectors/utils/heartbeat.go | 1 - 4 files changed, 3 insertions(+), 12 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index c5680ebe6c..de3f2bcfeb 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -721,8 +721,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { return nil } - sendTimeout := 10 * time.Minute - ticker := time.NewTicker(sendTimeout) + ticker := time.NewTicker(10 * time.Minute) defer ticker.Stop() activity.RecordHeartbeat(ctx, "sending walheartbeat every 10 minutes") @@ -768,7 +767,6 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { slog.InfoContext(ctx, fmt.Sprintf("sent walheartbeat to peer %v", pgPeer.Name)) } } - ticker.Reset(sendTimeout) } } diff --git a/flow/activities/slot.go b/flow/activities/slot.go index b748e67e45..90c20deeb4 100644 --- a/flow/activities/slot.go +++ b/flow/activities/slot.go @@ -75,8 +75,7 @@ func (a *FlowableActivity) recordSlotSizePeriodically( return } - timeout := 5 * time.Minute - ticker := time.NewTicker(timeout) + ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for { @@ -89,6 +88,5 @@ func (a *FlowableActivity) recordSlotSizePeriodically( case <-ctx.Done(): return } - ticker.Reset(timeout) } } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 66a9131676..cf10e72c96 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -124,9 +124,7 @@ func (c *EventHubConnector) processBatch( batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns, false) - eventHubFlushTimeout := peerdbenv.PeerDBEventhubFlushTimeoutSeconds() - - ticker := time.NewTicker(eventHubFlushTimeout) + ticker := time.NewTicker(peerdbenv.PeerDBEventhubFlushTimeoutSeconds()) defer ticker.Stop() lastSeenLSN := int64(0) @@ -219,8 +217,6 @@ func (c *EventHubConnector) processBatch( return 0, fmt.Errorf("failed to update last offset: %w", err) } } - - ticker.Reset(eventHubFlushTimeout) } } } diff --git a/flow/connectors/utils/heartbeat.go b/flow/connectors/utils/heartbeat.go index c12a0dce6f..c5bf14cdaf 100644 --- a/flow/connectors/utils/heartbeat.go +++ b/flow/connectors/utils/heartbeat.go @@ -29,7 +29,6 @@ func HeartbeatRoutine( case <-ctx.Done(): return case <-ticker.C: - ticker.Reset(15 * time.Second) } } }()