Skip to content

Commit

Permalink
Revert "Use constant port for MistUtilLoad and reconcile periodically…
Browse files Browse the repository at this point in the history
… to make sure it's running (#1319)"

This reverts commit 9ad676e.
  • Loading branch information
victorges committed Jul 18, 2024
1 parent baf4607 commit 8669baf
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 65 deletions.
58 changes: 9 additions & 49 deletions balancer/mist/mist_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/golang/glog"
"github.com/livepeer/catalyst-api/balancer"
"github.com/livepeer/catalyst-api/cluster"
"golang.org/x/sync/errgroup"
)

var mistUtilLoadSingleRequestTimeout = 15 * time.Second
Expand Down Expand Up @@ -49,12 +50,14 @@ func NewBalancer(config *balancer.Config) balancer.Balancer {

// start this load balancer instance, execing MistUtilLoad if necessary
func (b *MistBalancer) Start(ctx context.Context) error {
b.killPreviousBalancer(ctx)

go func() {
b.reconcileBalancerLoop(ctx, b.config.Args)
}()
return b.waitForStartup(ctx)
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {
return b.execBalancer(ctx, b.config.Args)
})
group.Go(func() error {
return b.waitForStartup(ctx)
})
return group.Wait()
}

// wait for the mist LB to be available. can be called multiple times.
Expand Down Expand Up @@ -242,49 +245,6 @@ func (b *MistBalancer) formatNodeAddress(server string) string {
return fmt.Sprintf(b.config.MistLoadBalancerTemplate, server)
}

// killPreviousBalancer cleans up the previous MistUtilLoad process if it exists.
// It uses pkill to kill the process.
func (b *MistBalancer) killPreviousBalancer(ctx context.Context) {
cmd := exec.CommandContext(ctx, "pkill", "-f", "MistUtilLoad")
err := cmd.Run()
if err != nil {
glog.V(6).Infof("Killing MistUtilLoad failed, most probably it was not running, err=%v", err)
}
}

// reconcileBalancerLoop makes sure that MistUtilLoad is up and running all the time.
func (b *MistBalancer) reconcileBalancerLoop(ctx context.Context, balancerArgs []string) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()

for {
b.reconcileBalancer(ctx, balancerArgs)
select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}

// reconcileBalancer makes sure that MistUtilLoad is up and running.
func (b *MistBalancer) reconcileBalancer(ctx context.Context, balancerArgs []string) {
if !b.isBalancerRunning(ctx) {
glog.Info("Starting MistUtilLoad")
err := b.execBalancer(ctx, balancerArgs)
if err != nil {
glog.Warningf("Error starting MistUtilLoad: %v", err)
}
}
}

// isBalancerRunning checks if MistUtilLoad is running.
func (b *MistBalancer) isBalancerRunning(ctx context.Context) bool {
cmd := exec.CommandContext(ctx, "pgrep", "-f", "MistUtilLoad")
err := cmd.Run()
return err == nil
}

func (b *MistBalancer) execBalancer(ctx context.Context, balancerArgs []string) error {
args := append(balancerArgs, "-p", fmt.Sprintf("%d", b.config.MistUtilLoadPort), "-g", "4")
glog.Infof("Running MistUtilLoad with %v", args)
Expand Down
4 changes: 2 additions & 2 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Member struct {
Status string `json:"status"`
}

var MediaFilter = map[string]string{"node": "media"}
var mediaFilter = map[string]string{"node": "media"}

// Create a connection to a new Cluster that will immediately connect
func NewCluster(config *config.Cli) Cluster {
Expand Down Expand Up @@ -272,7 +272,7 @@ func (c *ClusterImpl) handleEvents(ctx context.Context) error {
return nil
}

members, err := c.MembersFiltered(MediaFilter, "alive", "")
members, err := c.MembersFiltered(mediaFilter, "alive", "")

if err != nil {
glog.Errorf("Error getting serf, crashing: %v\n", err)
Expand Down
19 changes: 5 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"flag"
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"strings"
Expand Down Expand Up @@ -110,7 +111,7 @@ func main() {
fs.Float64Var(&cli.NodeLongitude, "node-longitude", 0, "Longitude of this Catalyst node. Used for load balancing.")
config.CommaSliceFlag(fs, &cli.RedirectPrefixes, "redirect-prefixes", []string{}, "Set of valid prefixes of playback id which are handled by mistserver")
config.CommaMapFlag(fs, &cli.Tags, "tags", map[string]string{"node": "media"}, "Serf tags for Catalyst nodes")
fs.IntVar(&cli.MistLoadBalancerPort, "mist-load-balancer-port", 40010, "MistUtilLoad port (default random)")
fs.IntVar(&cli.MistLoadBalancerPort, "mist-load-balancer-port", rand.Intn(10000)+40000, "MistUtilLoad port (default random)")
fs.StringVar(&cli.MistLoadBalancerTemplate, "mist-load-balancer-template", "http://%s:4242", "template for specifying the host that should be queried for Prometheus stat output for this node")
config.CommaSliceFlag(fs, &cli.RetryJoin, "retry-join", []string{}, "An agent to join with. This flag be specified multiple times. Does not exit on failure like -join, used to retry until success.")
fs.StringVar(&cli.EncryptKey, "encrypt", "", "Key for encrypting network traffic within Serf. Must be a base64-encoded 32-byte key.")
Expand Down Expand Up @@ -342,25 +343,15 @@ func main() {
// Eventually this will be the main loop of the state machine, but we just have one variable right now.
func reconcileBalancer(ctx context.Context, bal balancer.Balancer, c cluster.Cluster) error {
memberCh := c.MemberChan()
ticker := time.NewTicker(1 * time.Minute)
for {
var members []cluster.Member
var err error
select {
case <-ctx.Done():
return nil
case <-ticker.C:
members, err = c.MembersFiltered(cluster.MediaFilter, "alive", "")
case list := <-memberCh:
err := bal.UpdateMembers(ctx, list)
if err != nil {
glog.Errorf("Error getting serf members: %v", err)
continue
return fmt.Errorf("failed to update load balancer from member list: %w", err)
}
case members = <-memberCh:
}
err = bal.UpdateMembers(ctx, members)
if err != nil {
glog.Errorf("Failed to update load balancer from member list: %v", err)
continue
}
}
}
Expand Down

0 comments on commit 8669baf

Please sign in to comment.