diff --git a/cmd/proxy/cmd/proxy.go b/cmd/proxy/cmd/proxy.go index 39f1f3b99..9b1f02f30 100644 --- a/cmd/proxy/cmd/proxy.go +++ b/cmd/proxy/cmd/proxy.go @@ -19,7 +19,10 @@ import ( "fmt" "net" "net/http" + "os" + "os/signal" "sync" + "syscall" "time" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -163,15 +166,15 @@ func (c *ClusterChecker) stopPollonProxy() { } } -func (c *ClusterChecker) sendPollonConfData(confData pollon.ConfData) { +func (c *ClusterChecker) updateDestAddress(destAddr *net.TCPAddr) { c.pollonMutex.Lock() defer c.pollonMutex.Unlock() if c.pp != nil { - c.pp.C <- confData + c.pp.C <- pollon.ConfData{DestAddr: destAddr} } } -func (c *ClusterChecker) SetProxyInfo(e store.Store, generation int64, proxyTimeout time.Duration) error { +func (c *ClusterChecker) setProxyInfo(ctx context.Context, e store.Store, generation int64, proxyTimeout time.Duration) error { proxyInfo := &cluster.ProxyInfo{ InfoUID: common.UID(), UID: c.uid, @@ -180,15 +183,15 @@ func (c *ClusterChecker) SetProxyInfo(e store.Store, generation int64, proxyTime } log.Debugf("proxyInfo dump: %s", spew.Sdump(proxyInfo)) - if err := c.e.SetProxyInfo(context.TODO(), proxyInfo, 2*proxyTimeout); err != nil { + if err := c.e.SetProxyInfo(ctx, proxyInfo, 2*proxyTimeout); err != nil { return err } return nil } -// Check reads the cluster data and applies the right pollon configuration. -func (c *ClusterChecker) Check() error { - cd, _, err := c.e.GetClusterData(context.TODO()) +// check reads the cluster data and applies the right pollon configuration. +func (c *ClusterChecker) check(ctx context.Context) error { + cd, _, err := c.e.GetClusterData(ctx) if err != nil { return fmt.Errorf("cannot get cluster data: %v", err) } @@ -201,15 +204,15 @@ func (c *ClusterChecker) Check() error { log.Debugf("cd dump: %s", spew.Sdump(cd)) if cd == nil { log.Infow("no clusterdata available, closing connections to master") - c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) + c.updateDestAddress(nil) return nil } if cd.FormatVersion != cluster.CurrentCDFormatVersion { - c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) + c.updateDestAddress(nil) return fmt.Errorf("unsupported clusterdata format version: %d", cd.FormatVersion) } if err = cd.Cluster.Spec.Validate(); err != nil { - c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) + c.updateDestAddress(nil) return fmt.Errorf("clusterdata validation failed: %v", err) } @@ -228,9 +231,9 @@ func (c *ClusterChecker) Check() error { proxy := cd.Proxy if proxy == nil { log.Infow("no proxy object available, closing connections to master") - c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) + c.updateDestAddress(nil) // ignore errors on setting proxy info - if err = c.SetProxyInfo(c.e, cluster.NoGeneration, proxyTimeout); err != nil { + if err = c.setProxyInfo(ctx, c.e, cluster.NoGeneration, proxyTimeout); err != nil { log.Errorw("failed to update proxyInfo", zap.Error(err)) } else { // update proxyCheckinterval and proxyTimeout only if we successfully updated our proxy info @@ -245,9 +248,9 @@ func (c *ClusterChecker) Check() error { db, ok := cd.DBs[proxy.Spec.MasterDBUID] if !ok { log.Infow("no db object available, closing connections to master", "db", proxy.Spec.MasterDBUID) - c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) + c.updateDestAddress(nil) // ignore errors on setting proxy info - if err = c.SetProxyInfo(c.e, proxy.Generation, proxyTimeout); err != nil { + if err = c.setProxyInfo(ctx, c.e, proxy.Generation, proxyTimeout); err != nil { log.Errorw("failed to update proxyInfo", zap.Error(err)) } else { // update proxyCheckinterval and proxyTimeout only if we successfully updated our proxy info @@ -259,14 +262,15 @@ func (c *ClusterChecker) Check() error { return nil } + // TODO(sgotti) use a resolver with a context if it exists addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(db.Status.ListenAddress, db.Status.Port)) if err != nil { log.Errorw("cannot resolve db address", zap.Error(err)) - c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) + c.updateDestAddress(nil) return nil } - log.Infow("master address", "address", addr) - if err = c.SetProxyInfo(c.e, proxy.Generation, proxyTimeout); err != nil { + + if err = c.setProxyInfo(ctx, c.e, proxy.Generation, proxyTimeout); err != nil { // if we failed to update our proxy info when a master is defined we // cannot ignore this error since the sentinel won't know that we exist // and are sending connections to a master so, when electing a new @@ -282,84 +286,111 @@ func (c *ClusterChecker) Check() error { // start proxing only if we are inside enabledProxies, this ensures that the // sentinel has read our proxyinfo and knows we are alive - if util.StringInSlice(proxy.Spec.EnabledProxies, c.uid) { - log.Infow("proxying to master address", "address", addr) - c.sendPollonConfData(pollon.ConfData{DestAddr: addr}) - } else { + if !util.StringInSlice(proxy.Spec.EnabledProxies, c.uid) { log.Infow("not proxying to master address since we aren't in the enabled proxies list", "address", addr) - c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) + c.updateDestAddress(nil) + return nil } - return nil -} - -func (c *ClusterChecker) TimeoutChecker(checkOkCh chan struct{}) { - c.configMutex.Lock() - timeoutTimer := time.NewTimer(c.proxyTimeout) - c.configMutex.Unlock() - - for { - select { - case <-timeoutTimer.C: - log.Infow("check timeout timer fired") - // if the check timeouts close all connections and stop listening - // (for example to avoid load balancers forward connections to us - // since we aren't ready or in a bad state) - c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) - if c.stopListening { - c.stopPollonProxy() - } + // before updating the pollon address, check that the context isn't timed + // out, usually if the context is timeout out one of the above calls will + // return an error but libkv stores doesn't handle contexts so we should + // check here. + select { + case <-ctx.Done(): + log.Infow("not updating proxy address since context is done: %v", ctx.Err()) + return nil + default: + } - case <-checkOkCh: - log.Debugw("check ok message received") + log.Infow("proxying to master address", "address", addr) + c.updateDestAddress(addr) - // ignore if stop succeeded or not due to timer already expired - timeoutTimer.Stop() + return nil +} - c.configMutex.Lock() - timeoutTimer = time.NewTimer(c.proxyTimeout) - c.configMutex.Unlock() +// timeoutChecker will forcefully close connections when the context times +// out. +func (c *ClusterChecker) timeoutChecker(ctx context.Context) { + <-ctx.Done() + if ctx.Err() == context.DeadlineExceeded { + log.Infow("check timeout fired") + // if the check timeouts close all connections and stop listening + // (for example to avoid load balancers forward connections to us + // since we aren't ready or in a bad state) + c.updateDestAddress(nil) + if c.stopListening { + c.stopPollonProxy() } } } -func (c *ClusterChecker) Start() error { - checkOkCh := make(chan struct{}) +// checkLoop executes at predefined intervals the Check function. It'll force +// close connections when a check function continuosly fails for more than a +// timeout. +func (c *ClusterChecker) checkLoop(pctx context.Context) error { checkCh := make(chan error) timerCh := time.NewTimer(0).C - // TODO(sgotti) TimeoutCecker is needed to forcefully close connection also - // if the Check method is blocked somewhere. - // The idomatic/cleaner solution will be to use a context instead of this - // TimeoutChecker but we have to change the libkv stores to support contexts. - go c.TimeoutChecker(checkOkCh) + c.configMutex.Lock() + ctx, cancel := context.WithTimeout(pctx, c.proxyTimeout) + c.configMutex.Unlock() for { select { + case <-pctx.Done(): + cancel() + return nil case <-timerCh: + // start a new context if it's already done, this happens when the + // context is timed out or cancelled. + select { + case <-ctx.Done(): + c.configMutex.Lock() + ctx, cancel = context.WithTimeout(pctx, c.proxyTimeout) + c.configMutex.Unlock() + default: + } + go func() { - checkCh <- c.Check() + checkCh <- c.check(ctx) }() case err := <-checkCh: if err != nil { - // don't report check ok since it returned an error + // if the check function returned an error then don't stop the + // context so if it times out the TimeoutChecker will close + // connections or it could be cancelled if the next check + // succeeds before the timeout log.Infow("check function error", zap.Error(err)) } else { - // report that check was ok - checkOkCh <- struct{}{} + // check was ok, so cancel the context and start a new one with a new TimeoutChecker + cancel() + + c.configMutex.Lock() + ctx, cancel = context.WithTimeout(pctx, c.proxyTimeout) + c.configMutex.Unlock() + go c.timeoutChecker(ctx) } + c.configMutex.Lock() timerCh = time.NewTimer(c.proxyCheckInterval).C c.configMutex.Unlock() case err := <-c.endPollonProxyCh: if err != nil { + cancel() return fmt.Errorf("proxy error: %v", err) } } } } +func sigHandler(sigs chan os.Signal, cancel context.CancelFunc) { + s := <-sigs + log.Debugw("got signal", "signal", s) + cancel() +} + func Execute() { if err := flagutil.SetFlagsFromEnv(CmdProxy.PersistentFlags(), "STPROXY"); err != nil { log.Fatal(err) @@ -428,11 +459,16 @@ func proxy(c *cobra.Command, args []string) { }() } + ctx, cancel := context.WithCancel(context.Background()) + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + go sigHandler(sigs, cancel) + clusterChecker, err := NewClusterChecker(uid, cfg) if err != nil { log.Fatalf("cannot create cluster checker: %v", err) } - if err = clusterChecker.Start(); err != nil { + if err = clusterChecker.checkLoop(ctx); err != nil { log.Fatalf("cluster checker ended with error: %v", err) } }