Skip to content

Commit

Permalink
Increase serf message buffer sizes (#1032)
Browse files Browse the repository at this point in the history
* Increase serf message buffer sizes

* Add metrics for current buffer sizes

* try fix build
  • Loading branch information
mjh1 authored Dec 15, 2023
1 parent 289817b commit f216f9e
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ jobs:
tar -czvf "../releases/livepeer-catalyst-api-${GOOS}-${GOARCH}.tar.gz" .
- name: Upload artifacts for cutting release
uses: actions/upload-artifact@master
uses: actions/upload-artifact@v3
with:
name: release-artifacts
path: releases/
Expand Down
14 changes: 9 additions & 5 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/metrics"
)

type Cluster interface {
Expand Down Expand Up @@ -54,9 +55,9 @@ var mediaFilter = map[string]string{"node": "media"}
func NewCluster(config *config.Cli) Cluster {
c := ClusterImpl{
config: config,
serfCh: make(chan serf.Event, 64),
serfCh: make(chan serf.Event, 4096),
memberCh: make(chan []Member),
eventCh: make(chan serf.UserEvent, 64),
eventCh: make(chan serf.UserEvent, 4096),
}
return &c
}
Expand Down Expand Up @@ -256,13 +257,16 @@ func (c *ClusterImpl) BroadcastEvent(event serf.UserEvent) error {
}

func (c *ClusterImpl) handleEvents(ctx context.Context) error {
inbox := make(chan serf.Event, 1)
inbox := make(chan serf.Event, 4096)
go func() {
for {
select {
case <-ctx.Done():
return
case e := <-c.serfCh:
metrics.Metrics.UserEventBufferSize.Set(float64(len(c.eventCh)))
metrics.Metrics.MemberEventBufferSize.Set(float64(len(inbox)))

switch evt := e.(type) {
case serf.UserEvent:
select {
Expand All @@ -272,7 +276,7 @@ func (c *ClusterImpl) handleEvents(ctx context.Context) error {
// Event moved to eventCh
default:
// Overflow event gets dropped
glog.V(3).Infof("Overflow UserEvent, dropped: %v", evt)
glog.Infof("Overflow UserEvent, dropped: %v", evt)
}
case serf.MemberEvent:
select {
Expand All @@ -282,7 +286,7 @@ func (c *ClusterImpl) handleEvents(ctx context.Context) error {
// Event is now in the inbox
default:
// Overflow event gets dropped
glog.V(3).Infof("Overflow MemberEvent, dropped: %v", evt)
glog.Infof("Overflow MemberEvent, dropped: %v", evt)
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type CatalystAPIMetrics struct {
PlaybackRequestDurationSec *prometheus.SummaryVec
CDNRedirectCount *prometheus.CounterVec
CDNRedirectWebRTC406 *prometheus.CounterVec
UserEventBufferSize prometheus.Gauge
MemberEventBufferSize prometheus.Gauge

JobsInFlight prometheus.Gauge
HTTPRequestsInFlight prometheus.Gauge
Expand Down Expand Up @@ -60,6 +62,14 @@ func NewMetrics() *CatalystAPIMetrics {
Name: "http_requests_in_flight",
Help: "A count of the http requests in flight",
}),
UserEventBufferSize: promauto.NewGauge(prometheus.GaugeOpts{
Name: "user_event_buffer_size",
Help: "A count of the user events currently held in the buffer",
}),
MemberEventBufferSize: promauto.NewGauge(prometheus.GaugeOpts{
Name: "member_event_buffer_size",
Help: "A count of the member events currently held in the buffer",
}),

// /api/vod request metrics
UploadVODRequestCount: promauto.NewCounter(prometheus.CounterOpts{
Expand Down

0 comments on commit f216f9e

Please sign in to comment.