Skip to content

Commit

Permalink
Extract catabalancer timeouts to cli args (#1027)
Browse files Browse the repository at this point in the history
And bump metric timeout
  • Loading branch information
mjh1 authored Dec 14, 2023
1 parent 6a5c9ee commit 289817b
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 14 deletions.
6 changes: 3 additions & 3 deletions balancer/catabalancer/catalyst_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ func (s ScoredNode) String() string {
)
}

func NewBalancer(nodeName string) *CataBalancer {
func NewBalancer(nodeName string, metricTimeout time.Duration, ingestStreamTimeout time.Duration) *CataBalancer {
return &CataBalancer{
NodeName: nodeName,
Nodes: make(map[string]*Node),
Streams: make(map[string]Streams),
IngestStreams: make(map[string]Streams),
NodeMetrics: make(map[string]NodeMetrics),
metricTimeout: 16 * time.Second,
ingestStreamTimeout: 20 * time.Minute,
metricTimeout: metricTimeout,
ingestStreamTimeout: ingestStreamTimeout,
}
}

Expand Down
18 changes: 9 additions & 9 deletions balancer/catabalancer/catalyst_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ var BandwidthOverloadedNode = ScoredNode{
}

func TestItReturnsItselfWhenNoOtherNodesPresent(t *testing.T) {
c := NewBalancer("me")
c := NewBalancer("me", time.Second, time.Second)
nodeName, prefix, err := c.GetBestNode(context.Background(), nil, "playbackID", "", "", "")
require.NoError(t, err)
require.Equal(t, "me", nodeName)
require.Equal(t, "video+playbackID", prefix)
}

func TestStaleNodes(t *testing.T) {
c := NewBalancer("me")
c := NewBalancer("me", time.Second, time.Second)
err := c.UpdateMembers(context.Background(), []cluster.Member{{Name: "node1"}})
require.NoError(t, err)

Expand Down Expand Up @@ -281,7 +281,7 @@ func scores(node1 ScoredNode, node2 ScoredNode) ScoredNode {

func TestSetMetrics(t *testing.T) {
// simple check that node metrics make it through to the load balancing algo
c := NewBalancer("")
c := NewBalancer("", time.Second, time.Second)
err := c.UpdateMembers(context.Background(), []cluster.Member{{Name: "node1"}, {Name: "node2"}})
require.NoError(t, err)

Expand All @@ -296,7 +296,7 @@ func TestSetMetrics(t *testing.T) {

func TestUnknownNode(t *testing.T) {
// check that the node metrics call creates the unknown node
c := NewBalancer("")
c := NewBalancer("", time.Second, time.Second)

c.UpdateNodes("node1", NodeMetrics{CPUUsagePercentage: 90})
c.UpdateNodes("bgw-node1", NodeMetrics{CPUUsagePercentage: 10})
Expand All @@ -307,7 +307,7 @@ func TestUnknownNode(t *testing.T) {
}

func TestNoIngestStream(t *testing.T) {
c := NewBalancer("")
c := NewBalancer("", time.Second, time.Second)
// first test no nodes available
c.UpdateNodes("id", NodeMetrics{})
c.UpdateStreams("id", "stream", false)
Expand All @@ -329,7 +329,7 @@ func TestNoIngestStream(t *testing.T) {
}

func TestMistUtilLoadSource(t *testing.T) {
c := NewBalancer("")
c := NewBalancer("", time.Second, time.Second)
err := c.UpdateMembers(context.Background(), []cluster.Member{{
Name: "node",
Tags: map[string]string{
Expand All @@ -356,7 +356,7 @@ func TestMistUtilLoadSource(t *testing.T) {
}

func TestStreamTimeout(t *testing.T) {
c := NewBalancer("")
c := NewBalancer("", time.Second, time.Second)
err := c.UpdateMembers(context.Background(), []cluster.Member{{
Name: "node",
Tags: map[string]string{
Expand Down Expand Up @@ -402,7 +402,7 @@ func TestStreamTimeout(t *testing.T) {

// needs to be run with go test -race
func TestConcurrentUpdates(t *testing.T) {
c := NewBalancer("")
c := NewBalancer("", time.Second, time.Second)

err := c.UpdateMembers(context.Background(), []cluster.Member{{Name: "node"}})
require.NoError(t, err)
Expand Down Expand Up @@ -436,7 +436,7 @@ func TestSimulate(t *testing.T) {

updateEvery := 5 * time.Second

c := NewBalancer("node0")
c := NewBalancer("node0", time.Second, time.Second)
var nodes []cluster.Member
for i := 0; i < nodeCount; i++ {
nodes = append(nodes, cluster.Member{Name: fmt.Sprintf("node%d", i)})
Expand Down
5 changes: 4 additions & 1 deletion config/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ type Cli struct {

C2PAPrivateKeyPath string
C2PACertsPath string
CataBalancer string

CataBalancer string
CataBalancerMetricTimeout time.Duration
CataBalancerIngestStreamTimeout time.Duration
}

// Return our own URL for callback trigger purposes
Expand Down
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func main() {
fs.IntVar(&config.MaxInFlightClipJobs, "max-inflight-clip-jobs", 20, "Maximum number of concurrent clipping jobs to support in catalyst-api")
fs.IntVar(&config.TranscodingParallelJobs, "parallel-transcode-jobs", 2, "Number of parallel transcode jobs")
fs.StringVar(&cli.CataBalancer, "catabalancer", "", "Enable catabalancer load balancer")
fs.DurationVar(&cli.CataBalancerMetricTimeout, "catabalancer-metric-timeout", 26*time.Second, "Catabalancer timeout for node metrics")
fs.DurationVar(&cli.CataBalancerIngestStreamTimeout, "catabalancer-ingest-stream-timeout", 20*time.Minute, "Catabalancer timeout for ingest stream metrics")

// mist-api-connector parameters
fs.IntVar(&cli.MistPort, "mist-port", 4242, "Port to connect to Mist")
Expand Down Expand Up @@ -276,7 +278,7 @@ func main() {

bal := mistBalancer
if cli.CataBalancer == "enabled" || cli.CataBalancer == "background" {
cataBalancer := catabalancer.NewBalancer(cli.NodeName)
cataBalancer := catabalancer.NewBalancer(cli.NodeName, cli.CataBalancerMetricTimeout, cli.CataBalancerIngestStreamTimeout)
// Temporary combined balancer to test cataBalancer logic alongside existing mist balancer
bal = balancer.CombinedBalancer{
Catabalancer: cataBalancer,
Expand Down

0 comments on commit 289817b

Please sign in to comment.