diff --git a/balancer/mist/mist_balancer.go b/balancer/mist/mist_balancer.go index 7f30fe8b..b4017781 100644 --- a/balancer/mist/mist_balancer.go +++ b/balancer/mist/mist_balancer.go @@ -19,7 +19,6 @@ import ( "github.com/golang/glog" "github.com/livepeer/catalyst-api/balancer" "github.com/livepeer/catalyst-api/cluster" - "golang.org/x/sync/errgroup" ) var mistUtilLoadSingleRequestTimeout = 15 * time.Second @@ -50,14 +49,12 @@ func NewBalancer(config *balancer.Config) balancer.Balancer { // start this load balancer instance, execing MistUtilLoad if necessary func (b *MistBalancer) Start(ctx context.Context) error { - group, ctx := errgroup.WithContext(ctx) - group.Go(func() error { - return b.execBalancer(ctx, b.config.Args) - }) - group.Go(func() error { - return b.waitForStartup(ctx) - }) - return group.Wait() + b.killPreviousBalancer(ctx) + + go func() { + b.reconcileBalancerLoop(ctx, b.config.Args) + }() + return b.waitForStartup(ctx) } // wait for the mist LB to be available. can be called multiple times. @@ -245,6 +242,49 @@ func (b *MistBalancer) formatNodeAddress(server string) string { return fmt.Sprintf(b.config.MistLoadBalancerTemplate, server) } +// killPreviousBalancer cleans up the previous MistUtilLoad process if it exists. +// It uses pkill to kill the process. +func (b *MistBalancer) killPreviousBalancer(ctx context.Context) { + cmd := exec.CommandContext(ctx, "pkill", "-f", "MistUtilLoad") + err := cmd.Run() + if err != nil { + glog.V(6).Infof("Killing MistUtilLoad failed, most probably it was not running, err=%v", err) + } +} + +// reconcileBalancerLoop makes sure that MistUtilLoad is up and running all the time. +func (b *MistBalancer) reconcileBalancerLoop(ctx context.Context, balancerArgs []string) { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for { + b.reconcileBalancer(ctx, balancerArgs) + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + } +} + +// reconcileBalancer makes sure that MistUtilLoad is up and running. +func (b *MistBalancer) reconcileBalancer(ctx context.Context, balancerArgs []string) { + if !b.isBalancerRunning(ctx) { + glog.Info("Starting MistUtilLoad") + err := b.execBalancer(ctx, balancerArgs) + if err != nil { + glog.Warningf("Error starting MistUtilLoad: %v", err) + } + } +} + +// isBalancerRunning checks if MistUtilLoad is running. +func (b *MistBalancer) isBalancerRunning(ctx context.Context) bool { + cmd := exec.CommandContext(ctx, "pgrep", "-f", "MistUtilLoad") + err := cmd.Run() + return err == nil +} + func (b *MistBalancer) execBalancer(ctx context.Context, balancerArgs []string) error { args := append(balancerArgs, "-p", fmt.Sprintf("%d", b.config.MistUtilLoadPort), "-g", "4") glog.Infof("Running MistUtilLoad with %v", args) diff --git a/cluster/cluster.go b/cluster/cluster.go index 2c39d340..f9d83dc6 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -47,7 +47,7 @@ type Member struct { Status string `json:"status"` } -var mediaFilter = map[string]string{"node": "media"} +var MediaFilter = map[string]string{"node": "media"} // Create a connection to a new Cluster that will immediately connect func NewCluster(config *config.Cli) Cluster { @@ -272,7 +272,7 @@ func (c *ClusterImpl) handleEvents(ctx context.Context) error { return nil } - members, err := c.MembersFiltered(mediaFilter, "alive", "") + members, err := c.MembersFiltered(MediaFilter, "alive", "") if err != nil { glog.Errorf("Error getting serf, crashing: %v\n", err) diff --git a/main.go b/main.go index 0fbcf0e2..b55776b6 100644 --- a/main.go +++ b/main.go @@ -7,7 +7,6 @@ import ( "flag" "fmt" "log" - "math/rand" "os" "os/signal" "strings" @@ -111,7 +110,7 @@ func main() { fs.Float64Var(&cli.NodeLongitude, "node-longitude", 0, "Longitude of this Catalyst node. Used for load balancing.") config.CommaSliceFlag(fs, &cli.RedirectPrefixes, "redirect-prefixes", []string{}, "Set of valid prefixes of playback id which are handled by mistserver") config.CommaMapFlag(fs, &cli.Tags, "tags", map[string]string{"node": "media"}, "Serf tags for Catalyst nodes") - fs.IntVar(&cli.MistLoadBalancerPort, "mist-load-balancer-port", rand.Intn(10000)+40000, "MistUtilLoad port (default random)") + fs.IntVar(&cli.MistLoadBalancerPort, "mist-load-balancer-port", 40010, "MistUtilLoad port (default random)") fs.StringVar(&cli.MistLoadBalancerTemplate, "mist-load-balancer-template", "http://%s:4242", "template for specifying the host that should be queried for Prometheus stat output for this node") config.CommaSliceFlag(fs, &cli.RetryJoin, "retry-join", []string{}, "An agent to join with. This flag be specified multiple times. Does not exit on failure like -join, used to retry until success.") fs.StringVar(&cli.EncryptKey, "encrypt", "", "Key for encrypting network traffic within Serf. Must be a base64-encoded 32-byte key.") @@ -343,15 +342,25 @@ func main() { // Eventually this will be the main loop of the state machine, but we just have one variable right now. func reconcileBalancer(ctx context.Context, bal balancer.Balancer, c cluster.Cluster) error { memberCh := c.MemberChan() + ticker := time.NewTicker(1 * time.Minute) for { + var members []cluster.Member + var err error select { case <-ctx.Done(): return nil - case list := <-memberCh: - err := bal.UpdateMembers(ctx, list) + case <-ticker.C: + members, err = c.MembersFiltered(cluster.MediaFilter, "alive", "") if err != nil { - return fmt.Errorf("failed to update load balancer from member list: %w", err) + glog.Errorf("Error getting serf members: %v", err) + continue } + case members = <-memberCh: + } + err = bal.UpdateMembers(ctx, members) + if err != nil { + glog.Errorf("Failed to update load balancer from member list: %v", err) + continue } } }