Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement percentage node replacement for load balancer #1360

Merged
merged 5 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,8 @@ type Config struct {
MistHost string
OwnRegion string
OwnRegionTagAdjust int

ReplaceHostMatch string
ReplaceHostList []string
ReplaceHostPercent int
}
10 changes: 10 additions & 0 deletions balancer/mist/mist_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"os"
"os/exec"
"regexp"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -335,6 +337,8 @@ func (b *MistBalancer) queryMistForClosestNode(ctx context.Context, playbackID,
return node, nil
}

var nodeHostRegex = regexp.MustCompile(`^.+?\.`) // matches the first part of the hostname before the first dot

// return the best node available for a given stream. will return any node if nobody has the stream.
func (b *MistBalancer) GetBestNode(ctx context.Context, redirectPrefixes []string, playbackID, lat, lon, fallbackPrefix string, isStudioReq bool) (string, string, error) {
var nodeAddr, fullPlaybackID, fallbackAddr string
Expand Down Expand Up @@ -366,6 +370,12 @@ func (b *MistBalancer) GetBestNode(ctx context.Context, redirectPrefixes []strin

// good path: we found the stream and a good node to play it back, yay!
if nodeAddr != "" {
if b.config.ReplaceHostMatch != "" && len(b.config.ReplaceHostList) > 0 && rand.Intn(100) < b.config.ReplaceHostPercent {
if strings.Contains(nodeHostRegex.FindString(nodeAddr), b.config.ReplaceHostMatch) {
nodeAddr = nodeHostRegex.ReplaceAllString(nodeAddr, b.config.ReplaceHostList[rand.Intn(len(b.config.ReplaceHostList))]+".")
}
}

return nodeAddr, fullPlaybackID, nil
}

Expand Down
27 changes: 27 additions & 0 deletions balancer/mist/mist_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,33 @@ func TestGetBestNode(t *testing.T) {
require.Contains(t, []string{"one.example.com", "two.example.com"}, node)
}

func TestGetBestNodeWithReplacement(t *testing.T) {
bal, mul := start(t)
defer mul.Close()

mul.BalancedHosts = map[string]string{
"http://one.example.com:4242": "Online",
"http://two.example.com:4242": "Online",
}
mul.StreamsLive = map[string][]string{"http://one.example.com:4242": {"prefix+fakeid"}}

// stream is live on host "one" but replace this with "two"
bal.config.ReplaceHostMatch = "one"
bal.config.ReplaceHostPercent = 100
bal.config.ReplaceHostList = []string{"two"}

node, streamName, err := bal.GetBestNode(context.Background(), []string{"prefix"}, "fakeid", "0", "0", "", false)
require.NoError(t, err)
require.Equal(t, streamName, "prefix+fakeid")
require.Contains(t, node, "two.example.com")

// set percent to zero, should not replace
bal.config.ReplaceHostPercent = 0
node, _, err = bal.GetBestNode(context.Background(), []string{"prefix"}, "fakeid", "0", "0", "", false)
require.NoError(t, err)
require.Contains(t, node, "one.example.com")
}

func TestGetBestNodeForWebRTC(t *testing.T) {
const webrtcStreamKey = "webr-tcst-ream-key1"
bal, mul := start(t)
Expand Down
4 changes: 4 additions & 0 deletions config/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ type Cli struct {
SerfQueueSize int
SerfEventBuffer int
SerfMaxQueueDepth int

LBReplaceHostMatch string
LBReplaceHostPercent int
LBReplaceHostList []string
}

// Return our own URL for callback trigger purposes
Expand Down
3 changes: 1 addition & 2 deletions handlers/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package handlers

import (
"encoding/json"
"io"
"net/http"

"github.com/julienschmidt/httprouter"
Expand All @@ -27,7 +26,7 @@ func (d *CatalystAPIHandlersCollection) Healthcheck() httprouter.Handle {
b = []byte(`{"status": "marshalling status failed"}`)
}

if _, err := io.Writer.Write(w, b); err != nil {
if _, err := w.Write(b); err != nil {
log.LogNoRequestID("Failed to write HTTP response for " + req.URL.RawPath)
}
}
Expand Down
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ func main() {
fs.StringVar(&cli.SerfMembersEndpoint, "serf-members-endpoint", "", "Endpoint to get the current members in the cluster")
fs.StringVar(&cli.EventsEndpoint, "events-endpoint", "", "Endpoint to send proxied events from catalyst-api into catalyst")
fs.StringVar(&cli.CatalystApiURL, "catalyst-api-url", "", "Endpoint for externally deployed catalyst-api; if not set, use local catalyst-api")
fs.StringVar(&cli.LBReplaceHostMatch, "lb-replace-host-match", "", "What to match on the hostname for node replacement e.g. sto")
config.CommaSliceFlag(fs, &cli.LBReplaceHostList, "lb-replace-host-list", []string{}, "List of hostnames to replace with for node replacement")
fs.IntVar(&cli.LBReplaceHostPercent, "lb-replace-host-percent", 0, "Percentage of matching requests to replace host on")
pprofPort := fs.Int("pprof-port", 6061, "Pprof listen port")

fs.String("send-audio", "", "[DEPRECATED] ignored, will be removed")
Expand Down Expand Up @@ -205,6 +208,10 @@ func main() {
NodeName: cli.NodeName,
OwnRegion: cli.OwnRegion,
OwnRegionTagAdjust: cli.OwnRegionTagAdjust,

ReplaceHostMatch: cli.LBReplaceHostMatch,
ReplaceHostPercent: cli.LBReplaceHostPercent,
ReplaceHostList: cli.LBReplaceHostList,
}
broker = misttriggers.NewTriggerBroker()

Expand Down
Loading