From f216f9ee428924830796a2fa2119be69503d1332 Mon Sep 17 00:00:00 2001 From: Max Holland Date: Fri, 15 Dec 2023 12:21:34 +0000 Subject: [PATCH] Increase serf message buffer sizes (#1032) * Increase serf message buffer sizes * Add metrics for current buffer sizes * try fix build --- .github/workflows/build.yaml | 2 +- cluster/cluster.go | 14 +++++++++----- metrics/metrics.go | 10 ++++++++++ 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index ef35744aa..f33d7b055 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -91,7 +91,7 @@ jobs: tar -czvf "../releases/livepeer-catalyst-api-${GOOS}-${GOARCH}.tar.gz" . - name: Upload artifacts for cutting release - uses: actions/upload-artifact@master + uses: actions/upload-artifact@v3 with: name: release-artifacts path: releases/ diff --git a/cluster/cluster.go b/cluster/cluster.go index d31a84be2..66d3dc6da 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/memberlist" "github.com/hashicorp/serf/serf" "github.com/livepeer/catalyst-api/config" + "github.com/livepeer/catalyst-api/metrics" ) type Cluster interface { @@ -54,9 +55,9 @@ var mediaFilter = map[string]string{"node": "media"} func NewCluster(config *config.Cli) Cluster { c := ClusterImpl{ config: config, - serfCh: make(chan serf.Event, 64), + serfCh: make(chan serf.Event, 4096), memberCh: make(chan []Member), - eventCh: make(chan serf.UserEvent, 64), + eventCh: make(chan serf.UserEvent, 4096), } return &c } @@ -256,13 +257,16 @@ func (c *ClusterImpl) BroadcastEvent(event serf.UserEvent) error { } func (c *ClusterImpl) handleEvents(ctx context.Context) error { - inbox := make(chan serf.Event, 1) + inbox := make(chan serf.Event, 4096) go func() { for { select { case <-ctx.Done(): return case e := <-c.serfCh: + metrics.Metrics.UserEventBufferSize.Set(float64(len(c.eventCh))) + metrics.Metrics.MemberEventBufferSize.Set(float64(len(inbox))) + switch evt := e.(type) { case serf.UserEvent: select { @@ -272,7 +276,7 @@ func (c *ClusterImpl) handleEvents(ctx context.Context) error { // Event moved to eventCh default: // Overflow event gets dropped - glog.V(3).Infof("Overflow UserEvent, dropped: %v", evt) + glog.Infof("Overflow UserEvent, dropped: %v", evt) } case serf.MemberEvent: select { @@ -282,7 +286,7 @@ func (c *ClusterImpl) handleEvents(ctx context.Context) error { // Event is now in the inbox default: // Overflow event gets dropped - glog.V(3).Infof("Overflow MemberEvent, dropped: %v", evt) + glog.Infof("Overflow MemberEvent, dropped: %v", evt) } } } diff --git a/metrics/metrics.go b/metrics/metrics.go index 82a855d5e..0c48815eb 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -29,6 +29,8 @@ type CatalystAPIMetrics struct { PlaybackRequestDurationSec *prometheus.SummaryVec CDNRedirectCount *prometheus.CounterVec CDNRedirectWebRTC406 *prometheus.CounterVec + UserEventBufferSize prometheus.Gauge + MemberEventBufferSize prometheus.Gauge JobsInFlight prometheus.Gauge HTTPRequestsInFlight prometheus.Gauge @@ -60,6 +62,14 @@ func NewMetrics() *CatalystAPIMetrics { Name: "http_requests_in_flight", Help: "A count of the http requests in flight", }), + UserEventBufferSize: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "user_event_buffer_size", + Help: "A count of the user events currently held in the buffer", + }), + MemberEventBufferSize: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "member_event_buffer_size", + Help: "A count of the member events currently held in the buffer", + }), // /api/vod request metrics UploadVODRequestCount: promauto.NewCounter(prometheus.CounterOpts{