diff --git a/balancer/balancer.go b/balancer/balancer.go index d61060120..b0f6a233c 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -126,4 +126,8 @@ type Config struct { MistHost string OwnRegion string OwnRegionTagAdjust int + + ReplaceHostMatch string + ReplaceHostList []string + ReplaceHostPercent int } diff --git a/balancer/mist/mist_balancer.go b/balancer/mist/mist_balancer.go index 3951fe1d4..044db7c83 100644 --- a/balancer/mist/mist_balancer.go +++ b/balancer/mist/mist_balancer.go @@ -6,11 +6,13 @@ import ( "errors" "fmt" "io" + "math/rand" "net/http" "net/url" "os" "os/exec" "regexp" + "strings" "sync" "time" @@ -335,6 +337,8 @@ func (b *MistBalancer) queryMistForClosestNode(ctx context.Context, playbackID, return node, nil } +var nodeHostRegex = regexp.MustCompile(`^.+?\.`) // matches the first part of the hostname before the first dot + // return the best node available for a given stream. will return any node if nobody has the stream. func (b *MistBalancer) GetBestNode(ctx context.Context, redirectPrefixes []string, playbackID, lat, lon, fallbackPrefix string, isStudioReq bool) (string, string, error) { var nodeAddr, fullPlaybackID, fallbackAddr string @@ -366,6 +370,13 @@ func (b *MistBalancer) GetBestNode(ctx context.Context, redirectPrefixes []strin // good path: we found the stream and a good node to play it back, yay! if nodeAddr != "" { + if b.config.ReplaceHostMatch != "" && len(b.config.ReplaceHostList) > 0 && rand.Intn(100) < b.config.ReplaceHostPercent { + // replace the host for a percentage of requests based on the configured replacement list, choosing a random host from that list + if strings.Contains(nodeHostRegex.FindString(nodeAddr), b.config.ReplaceHostMatch) { + nodeAddr = nodeHostRegex.ReplaceAllString(nodeAddr, b.config.ReplaceHostList[rand.Intn(len(b.config.ReplaceHostList))]+".") + } + } + return nodeAddr, fullPlaybackID, nil } diff --git a/balancer/mist/mist_balancer_test.go b/balancer/mist/mist_balancer_test.go index de79e6cf0..3b663a8c5 100644 --- a/balancer/mist/mist_balancer_test.go +++ b/balancer/mist/mist_balancer_test.go @@ -202,6 +202,33 @@ func TestGetBestNode(t *testing.T) { require.Contains(t, []string{"one.example.com", "two.example.com"}, node) } +func TestGetBestNodeWithReplacement(t *testing.T) { + bal, mul := start(t) + defer mul.Close() + + mul.BalancedHosts = map[string]string{ + "http://one.example.com:4242": "Online", + "http://two.example.com:4242": "Online", + } + mul.StreamsLive = map[string][]string{"http://one.example.com:4242": {"prefix+fakeid"}} + + // stream is live on host "one" but replace this with "two" + bal.config.ReplaceHostMatch = "one" + bal.config.ReplaceHostPercent = 100 + bal.config.ReplaceHostList = []string{"two"} + + node, streamName, err := bal.GetBestNode(context.Background(), []string{"prefix"}, "fakeid", "0", "0", "", false) + require.NoError(t, err) + require.Equal(t, streamName, "prefix+fakeid") + require.Contains(t, node, "two.example.com") + + // set percent to zero, should not replace + bal.config.ReplaceHostPercent = 0 + node, _, err = bal.GetBestNode(context.Background(), []string{"prefix"}, "fakeid", "0", "0", "", false) + require.NoError(t, err) + require.Contains(t, node, "one.example.com") +} + func TestGetBestNodeForWebRTC(t *testing.T) { const webrtcStreamKey = "webr-tcst-ream-key1" bal, mul := start(t) diff --git a/config/cli.go b/config/cli.go index 5e697adbf..9cdc5e6eb 100644 --- a/config/cli.go +++ b/config/cli.go @@ -91,6 +91,10 @@ type Cli struct { SerfQueueSize int SerfEventBuffer int SerfMaxQueueDepth int + + LBReplaceHostMatch string + LBReplaceHostPercent int + LBReplaceHostList []string } // Return our own URL for callback trigger purposes diff --git a/handlers/healthcheck.go b/handlers/healthcheck.go index 8a25526a2..4dd569ad0 100644 --- a/handlers/healthcheck.go +++ b/handlers/healthcheck.go @@ -2,7 +2,6 @@ package handlers import ( "encoding/json" - "io" "net/http" "github.com/julienschmidt/httprouter" @@ -27,7 +26,7 @@ func (d *CatalystAPIHandlersCollection) Healthcheck() httprouter.Handle { b = []byte(`{"status": "marshalling status failed"}`) } - if _, err := io.Writer.Write(w, b); err != nil { + if _, err := w.Write(b); err != nil { log.LogNoRequestID("Failed to write HTTP response for " + req.URL.RawPath) } } diff --git a/main.go b/main.go index 9801a21ab..05507d594 100644 --- a/main.go +++ b/main.go @@ -135,6 +135,9 @@ func main() { fs.StringVar(&cli.SerfMembersEndpoint, "serf-members-endpoint", "", "Endpoint to get the current members in the cluster") fs.StringVar(&cli.EventsEndpoint, "events-endpoint", "", "Endpoint to send proxied events from catalyst-api into catalyst") fs.StringVar(&cli.CatalystApiURL, "catalyst-api-url", "", "Endpoint for externally deployed catalyst-api; if not set, use local catalyst-api") + fs.StringVar(&cli.LBReplaceHostMatch, "lb-replace-host-match", "", "What to match on the hostname for node replacement e.g. sto") + config.CommaSliceFlag(fs, &cli.LBReplaceHostList, "lb-replace-host-list", []string{}, "List of hostnames to replace with for node replacement") + fs.IntVar(&cli.LBReplaceHostPercent, "lb-replace-host-percent", 0, "Percentage of matching requests to replace host on") pprofPort := fs.Int("pprof-port", 6061, "Pprof listen port") fs.String("send-audio", "", "[DEPRECATED] ignored, will be removed") @@ -205,6 +208,10 @@ func main() { NodeName: cli.NodeName, OwnRegion: cli.OwnRegion, OwnRegionTagAdjust: cli.OwnRegionTagAdjust, + + ReplaceHostMatch: cli.LBReplaceHostMatch, + ReplaceHostPercent: cli.LBReplaceHostPercent, + ReplaceHostList: cli.LBReplaceHostList, } broker = misttriggers.NewTriggerBroker()