From 2f1a69184988f97bbb30c06a9aa9d743d9833937 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Wed, 19 Sep 2018 09:36:39 +0200 Subject: [PATCH] proxy: use context for proxy check and timeouts Use a context with deadline of proxyTimeout instead of a timer to handle checks and timeouts. As an additional effect the check function will exit or will not update the proxy address if the context is done. Also handle signals to stop proxy. --- cmd/proxy/cmd/proxy.go | 156 ++++++++++++++++++++------------ tests/integration/proxy_test.go | 2 + 2 files changed, 98 insertions(+), 60 deletions(-) diff --git a/cmd/proxy/cmd/proxy.go b/cmd/proxy/cmd/proxy.go index 39f1f3b99..9b1f02f30 100644 --- a/cmd/proxy/cmd/proxy.go +++ b/cmd/proxy/cmd/proxy.go @@ -19,7 +19,10 @@ import ( "fmt" "net" "net/http" + "os" + "os/signal" "sync" + "syscall" "time" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -163,15 +166,15 @@ func (c *ClusterChecker) stopPollonProxy() { } } -func (c *ClusterChecker) sendPollonConfData(confData pollon.ConfData) { +func (c *ClusterChecker) updateDestAddress(destAddr *net.TCPAddr) { c.pollonMutex.Lock() defer c.pollonMutex.Unlock() if c.pp != nil { - c.pp.C <- confData + c.pp.C <- pollon.ConfData{DestAddr: destAddr} } } -func (c *ClusterChecker) SetProxyInfo(e store.Store, generation int64, proxyTimeout time.Duration) error { +func (c *ClusterChecker) setProxyInfo(ctx context.Context, e store.Store, generation int64, proxyTimeout time.Duration) error { proxyInfo := &cluster.ProxyInfo{ InfoUID: common.UID(), UID: c.uid, @@ -180,15 +183,15 @@ func (c *ClusterChecker) SetProxyInfo(e store.Store, generation int64, proxyTime } log.Debugf("proxyInfo dump: %s", spew.Sdump(proxyInfo)) - if err := c.e.SetProxyInfo(context.TODO(), proxyInfo, 2*proxyTimeout); err != nil { + if err := c.e.SetProxyInfo(ctx, proxyInfo, 2*proxyTimeout); err != nil { return err } return nil } -// Check reads the cluster data and applies the right pollon configuration. -func (c *ClusterChecker) Check() error { - cd, _, err := c.e.GetClusterData(context.TODO()) +// check reads the cluster data and applies the right pollon configuration. +func (c *ClusterChecker) check(ctx context.Context) error { + cd, _, err := c.e.GetClusterData(ctx) if err != nil { return fmt.Errorf("cannot get cluster data: %v", err) } @@ -201,15 +204,15 @@ func (c *ClusterChecker) Check() error { log.Debugf("cd dump: %s", spew.Sdump(cd)) if cd == nil { log.Infow("no clusterdata available, closing connections to master") - c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) + c.updateDestAddress(nil) return nil } if cd.FormatVersion != cluster.CurrentCDFormatVersion { - c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) + c.updateDestAddress(nil) return fmt.Errorf("unsupported clusterdata format version: %d", cd.FormatVersion) } if err = cd.Cluster.Spec.Validate(); err != nil { - c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) + c.updateDestAddress(nil) return fmt.Errorf("clusterdata validation failed: %v", err) } @@ -228,9 +231,9 @@ func (c *ClusterChecker) Check() error { proxy := cd.Proxy if proxy == nil { log.Infow("no proxy object available, closing connections to master") - c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) + c.updateDestAddress(nil) // ignore errors on setting proxy info - if err = c.SetProxyInfo(c.e, cluster.NoGeneration, proxyTimeout); err != nil { + if err = c.setProxyInfo(ctx, c.e, cluster.NoGeneration, proxyTimeout); err != nil { log.Errorw("failed to update proxyInfo", zap.Error(err)) } else { // update proxyCheckinterval and proxyTimeout only if we successfully updated our proxy info @@ -245,9 +248,9 @@ func (c *ClusterChecker) Check() error { db, ok := cd.DBs[proxy.Spec.MasterDBUID] if !ok { log.Infow("no db object available, closing connections to master", "db", proxy.Spec.MasterDBUID) - c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) + c.updateDestAddress(nil) // ignore errors on setting proxy info - if err = c.SetProxyInfo(c.e, proxy.Generation, proxyTimeout); err != nil { + if err = c.setProxyInfo(ctx, c.e, proxy.Generation, proxyTimeout); err != nil { log.Errorw("failed to update proxyInfo", zap.Error(err)) } else { // update proxyCheckinterval and proxyTimeout only if we successfully updated our proxy info @@ -259,14 +262,15 @@ func (c *ClusterChecker) Check() error { return nil } + // TODO(sgotti) use a resolver with a context if it exists addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(db.Status.ListenAddress, db.Status.Port)) if err != nil { log.Errorw("cannot resolve db address", zap.Error(err)) - c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) + c.updateDestAddress(nil) return nil } - log.Infow("master address", "address", addr) - if err = c.SetProxyInfo(c.e, proxy.Generation, proxyTimeout); err != nil { + + if err = c.setProxyInfo(ctx, c.e, proxy.Generation, proxyTimeout); err != nil { // if we failed to update our proxy info when a master is defined we // cannot ignore this error since the sentinel won't know that we exist // and are sending connections to a master so, when electing a new @@ -282,84 +286,111 @@ func (c *ClusterChecker) Check() error { // start proxing only if we are inside enabledProxies, this ensures that the // sentinel has read our proxyinfo and knows we are alive - if util.StringInSlice(proxy.Spec.EnabledProxies, c.uid) { - log.Infow("proxying to master address", "address", addr) - c.sendPollonConfData(pollon.ConfData{DestAddr: addr}) - } else { + if !util.StringInSlice(proxy.Spec.EnabledProxies, c.uid) { log.Infow("not proxying to master address since we aren't in the enabled proxies list", "address", addr) - c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) + c.updateDestAddress(nil) + return nil } - return nil -} - -func (c *ClusterChecker) TimeoutChecker(checkOkCh chan struct{}) { - c.configMutex.Lock() - timeoutTimer := time.NewTimer(c.proxyTimeout) - c.configMutex.Unlock() - - for { - select { - case <-timeoutTimer.C: - log.Infow("check timeout timer fired") - // if the check timeouts close all connections and stop listening - // (for example to avoid load balancers forward connections to us - // since we aren't ready or in a bad state) - c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) - if c.stopListening { - c.stopPollonProxy() - } + // before updating the pollon address, check that the context isn't timed + // out, usually if the context is timeout out one of the above calls will + // return an error but libkv stores doesn't handle contexts so we should + // check here. + select { + case <-ctx.Done(): + log.Infow("not updating proxy address since context is done: %v", ctx.Err()) + return nil + default: + } - case <-checkOkCh: - log.Debugw("check ok message received") + log.Infow("proxying to master address", "address", addr) + c.updateDestAddress(addr) - // ignore if stop succeeded or not due to timer already expired - timeoutTimer.Stop() + return nil +} - c.configMutex.Lock() - timeoutTimer = time.NewTimer(c.proxyTimeout) - c.configMutex.Unlock() +// timeoutChecker will forcefully close connections when the context times +// out. +func (c *ClusterChecker) timeoutChecker(ctx context.Context) { + <-ctx.Done() + if ctx.Err() == context.DeadlineExceeded { + log.Infow("check timeout fired") + // if the check timeouts close all connections and stop listening + // (for example to avoid load balancers forward connections to us + // since we aren't ready or in a bad state) + c.updateDestAddress(nil) + if c.stopListening { + c.stopPollonProxy() } } } -func (c *ClusterChecker) Start() error { - checkOkCh := make(chan struct{}) +// checkLoop executes at predefined intervals the Check function. It'll force +// close connections when a check function continuosly fails for more than a +// timeout. +func (c *ClusterChecker) checkLoop(pctx context.Context) error { checkCh := make(chan error) timerCh := time.NewTimer(0).C - // TODO(sgotti) TimeoutCecker is needed to forcefully close connection also - // if the Check method is blocked somewhere. - // The idomatic/cleaner solution will be to use a context instead of this - // TimeoutChecker but we have to change the libkv stores to support contexts. - go c.TimeoutChecker(checkOkCh) + c.configMutex.Lock() + ctx, cancel := context.WithTimeout(pctx, c.proxyTimeout) + c.configMutex.Unlock() for { select { + case <-pctx.Done(): + cancel() + return nil case <-timerCh: + // start a new context if it's already done, this happens when the + // context is timed out or cancelled. + select { + case <-ctx.Done(): + c.configMutex.Lock() + ctx, cancel = context.WithTimeout(pctx, c.proxyTimeout) + c.configMutex.Unlock() + default: + } + go func() { - checkCh <- c.Check() + checkCh <- c.check(ctx) }() case err := <-checkCh: if err != nil { - // don't report check ok since it returned an error + // if the check function returned an error then don't stop the + // context so if it times out the TimeoutChecker will close + // connections or it could be cancelled if the next check + // succeeds before the timeout log.Infow("check function error", zap.Error(err)) } else { - // report that check was ok - checkOkCh <- struct{}{} + // check was ok, so cancel the context and start a new one with a new TimeoutChecker + cancel() + + c.configMutex.Lock() + ctx, cancel = context.WithTimeout(pctx, c.proxyTimeout) + c.configMutex.Unlock() + go c.timeoutChecker(ctx) } + c.configMutex.Lock() timerCh = time.NewTimer(c.proxyCheckInterval).C c.configMutex.Unlock() case err := <-c.endPollonProxyCh: if err != nil { + cancel() return fmt.Errorf("proxy error: %v", err) } } } } +func sigHandler(sigs chan os.Signal, cancel context.CancelFunc) { + s := <-sigs + log.Debugw("got signal", "signal", s) + cancel() +} + func Execute() { if err := flagutil.SetFlagsFromEnv(CmdProxy.PersistentFlags(), "STPROXY"); err != nil { log.Fatal(err) @@ -428,11 +459,16 @@ func proxy(c *cobra.Command, args []string) { }() } + ctx, cancel := context.WithCancel(context.Background()) + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + go sigHandler(sigs, cancel) + clusterChecker, err := NewClusterChecker(uid, cfg) if err != nil { log.Fatalf("cannot create cluster checker: %v", err) } - if err = clusterChecker.Start(); err != nil { + if err = clusterChecker.checkLoop(ctx); err != nil { log.Fatalf("cluster checker ended with error: %v", err) } } diff --git a/tests/integration/proxy_test.go b/tests/integration/proxy_test.go index f3bb6e5de..5f54a12a5 100644 --- a/tests/integration/proxy_test.go +++ b/tests/integration/proxy_test.go @@ -89,6 +89,8 @@ func TestProxyListening(t *testing.T) { Spec: &cluster.ClusterSpec{ InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew), FailInterval: &cluster.Duration{Duration: 10 * time.Second}, + // user faster check interval for tests + ProxyCheckInterval: &cluster.Duration{Duration: 1 * time.Second}, }, Status: cluster.ClusterStatus{ CurrentGeneration: 1,