Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxy: use context for proxy check and timeouts #762

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 96 additions & 60 deletions cmd/proxy/cmd/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
"fmt"
"net"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -163,15 +166,15 @@ func (c *ClusterChecker) stopPollonProxy() {
}
}

func (c *ClusterChecker) sendPollonConfData(confData pollon.ConfData) {
func (c *ClusterChecker) updateDestAddress(destAddr *net.TCPAddr) {
c.pollonMutex.Lock()
defer c.pollonMutex.Unlock()
if c.pp != nil {
c.pp.C <- confData
c.pp.C <- pollon.ConfData{DestAddr: destAddr}
}
}

func (c *ClusterChecker) SetProxyInfo(e store.Store, generation int64, proxyTimeout time.Duration) error {
func (c *ClusterChecker) setProxyInfo(ctx context.Context, e store.Store, generation int64, proxyTimeout time.Duration) error {
proxyInfo := &cluster.ProxyInfo{
InfoUID: common.UID(),
UID: c.uid,
Expand All @@ -180,15 +183,15 @@ func (c *ClusterChecker) SetProxyInfo(e store.Store, generation int64, proxyTime
}
log.Debugf("proxyInfo dump: %s", spew.Sdump(proxyInfo))

if err := c.e.SetProxyInfo(context.TODO(), proxyInfo, 2*proxyTimeout); err != nil {
if err := c.e.SetProxyInfo(ctx, proxyInfo, 2*proxyTimeout); err != nil {
return err
}
return nil
}

// Check reads the cluster data and applies the right pollon configuration.
func (c *ClusterChecker) Check() error {
cd, _, err := c.e.GetClusterData(context.TODO())
// check reads the cluster data and applies the right pollon configuration.
func (c *ClusterChecker) check(ctx context.Context) error {
cd, _, err := c.e.GetClusterData(ctx)
if err != nil {
return fmt.Errorf("cannot get cluster data: %v", err)
}
Expand All @@ -201,15 +204,15 @@ func (c *ClusterChecker) Check() error {
log.Debugf("cd dump: %s", spew.Sdump(cd))
if cd == nil {
log.Infow("no clusterdata available, closing connections to master")
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
c.updateDestAddress(nil)
return nil
}
if cd.FormatVersion != cluster.CurrentCDFormatVersion {
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
c.updateDestAddress(nil)
return fmt.Errorf("unsupported clusterdata format version: %d", cd.FormatVersion)
}
if err = cd.Cluster.Spec.Validate(); err != nil {
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
c.updateDestAddress(nil)
return fmt.Errorf("clusterdata validation failed: %v", err)
}

Expand All @@ -228,9 +231,9 @@ func (c *ClusterChecker) Check() error {
proxy := cd.Proxy
if proxy == nil {
log.Infow("no proxy object available, closing connections to master")
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
c.updateDestAddress(nil)
// ignore errors on setting proxy info
if err = c.SetProxyInfo(c.e, cluster.NoGeneration, proxyTimeout); err != nil {
if err = c.setProxyInfo(ctx, c.e, cluster.NoGeneration, proxyTimeout); err != nil {
log.Errorw("failed to update proxyInfo", zap.Error(err))
} else {
// update proxyCheckinterval and proxyTimeout only if we successfully updated our proxy info
Expand All @@ -245,9 +248,9 @@ func (c *ClusterChecker) Check() error {
db, ok := cd.DBs[proxy.Spec.MasterDBUID]
if !ok {
log.Infow("no db object available, closing connections to master", "db", proxy.Spec.MasterDBUID)
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
c.updateDestAddress(nil)
// ignore errors on setting proxy info
if err = c.SetProxyInfo(c.e, proxy.Generation, proxyTimeout); err != nil {
if err = c.setProxyInfo(ctx, c.e, proxy.Generation, proxyTimeout); err != nil {
log.Errorw("failed to update proxyInfo", zap.Error(err))
} else {
// update proxyCheckinterval and proxyTimeout only if we successfully updated our proxy info
Expand All @@ -259,14 +262,15 @@ func (c *ClusterChecker) Check() error {
return nil
}

// TODO(sgotti) use a resolver with a context if it exists
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(db.Status.ListenAddress, db.Status.Port))
if err != nil {
log.Errorw("cannot resolve db address", zap.Error(err))
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
c.updateDestAddress(nil)
return nil
}
log.Infow("master address", "address", addr)
if err = c.SetProxyInfo(c.e, proxy.Generation, proxyTimeout); err != nil {

if err = c.setProxyInfo(ctx, c.e, proxy.Generation, proxyTimeout); err != nil {
// if we failed to update our proxy info when a master is defined we
// cannot ignore this error since the sentinel won't know that we exist
// and are sending connections to a master so, when electing a new
Expand All @@ -282,84 +286,111 @@ func (c *ClusterChecker) Check() error {

// start proxing only if we are inside enabledProxies, this ensures that the
// sentinel has read our proxyinfo and knows we are alive
if util.StringInSlice(proxy.Spec.EnabledProxies, c.uid) {
log.Infow("proxying to master address", "address", addr)
c.sendPollonConfData(pollon.ConfData{DestAddr: addr})
} else {
if !util.StringInSlice(proxy.Spec.EnabledProxies, c.uid) {
log.Infow("not proxying to master address since we aren't in the enabled proxies list", "address", addr)
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
c.updateDestAddress(nil)
return nil
}

return nil
}

func (c *ClusterChecker) TimeoutChecker(checkOkCh chan struct{}) {
c.configMutex.Lock()
timeoutTimer := time.NewTimer(c.proxyTimeout)
c.configMutex.Unlock()

for {
select {
case <-timeoutTimer.C:
log.Infow("check timeout timer fired")
// if the check timeouts close all connections and stop listening
// (for example to avoid load balancers forward connections to us
// since we aren't ready or in a bad state)
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
if c.stopListening {
c.stopPollonProxy()
}
// before updating the pollon address, check that the context isn't timed
// out, usually if the context is timeout out one of the above calls will
// return an error but libkv stores doesn't handle contexts so we should
// check here.
select {
case <-ctx.Done():
log.Infow("not updating proxy address since context is done: %v", ctx.Err())
return nil
default:
}

case <-checkOkCh:
log.Debugw("check ok message received")
log.Infow("proxying to master address", "address", addr)
c.updateDestAddress(addr)

// ignore if stop succeeded or not due to timer already expired
timeoutTimer.Stop()
return nil
}

c.configMutex.Lock()
timeoutTimer = time.NewTimer(c.proxyTimeout)
c.configMutex.Unlock()
// timeoutChecker will forcefully close connections when the context times
// out.
func (c *ClusterChecker) timeoutChecker(ctx context.Context) {
<-ctx.Done()
if ctx.Err() == context.DeadlineExceeded {
log.Infow("check timeout fired")
// if the check timeouts close all connections and stop listening
// (for example to avoid load balancers forward connections to us
// since we aren't ready or in a bad state)
c.updateDestAddress(nil)
if c.stopListening {
c.stopPollonProxy()
}
}
}

func (c *ClusterChecker) Start() error {
checkOkCh := make(chan struct{})
// checkLoop executes at predefined intervals the Check function. It'll force
// close connections when a check function continuosly fails for more than a
// timeout.
func (c *ClusterChecker) checkLoop(pctx context.Context) error {
checkCh := make(chan error)
timerCh := time.NewTimer(0).C

// TODO(sgotti) TimeoutCecker is needed to forcefully close connection also
// if the Check method is blocked somewhere.
// The idomatic/cleaner solution will be to use a context instead of this
// TimeoutChecker but we have to change the libkv stores to support contexts.
go c.TimeoutChecker(checkOkCh)
c.configMutex.Lock()
ctx, cancel := context.WithTimeout(pctx, c.proxyTimeout)
c.configMutex.Unlock()

for {
select {
case <-pctx.Done():
cancel()
return nil
case <-timerCh:
// start a new context if it's already done, this happens when the
// context is timed out or cancelled.
select {
case <-ctx.Done():
c.configMutex.Lock()
ctx, cancel = context.WithTimeout(pctx, c.proxyTimeout)
c.configMutex.Unlock()
default:
}

go func() {
checkCh <- c.Check()
checkCh <- c.check(ctx)
}()
case err := <-checkCh:
if err != nil {
// don't report check ok since it returned an error
// if the check function returned an error then don't stop the
// context so if it times out the TimeoutChecker will close
// connections or it could be cancelled if the next check
// succeeds before the timeout
log.Infow("check function error", zap.Error(err))
} else {
// report that check was ok
checkOkCh <- struct{}{}
// check was ok, so cancel the context and start a new one with a new TimeoutChecker
cancel()

c.configMutex.Lock()
ctx, cancel = context.WithTimeout(pctx, c.proxyTimeout)
c.configMutex.Unlock()
go c.timeoutChecker(ctx)
}

c.configMutex.Lock()
timerCh = time.NewTimer(c.proxyCheckInterval).C
c.configMutex.Unlock()

case err := <-c.endPollonProxyCh:
if err != nil {
cancel()
return fmt.Errorf("proxy error: %v", err)
}
}
}
}

func sigHandler(sigs chan os.Signal, cancel context.CancelFunc) {
s := <-sigs
log.Debugw("got signal", "signal", s)
cancel()
}

func Execute() {
if err := flagutil.SetFlagsFromEnv(CmdProxy.PersistentFlags(), "STPROXY"); err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -428,11 +459,16 @@ func proxy(c *cobra.Command, args []string) {
}()
}

ctx, cancel := context.WithCancel(context.Background())
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go sigHandler(sigs, cancel)

clusterChecker, err := NewClusterChecker(uid, cfg)
if err != nil {
log.Fatalf("cannot create cluster checker: %v", err)
}
if err = clusterChecker.Start(); err != nil {
if err = clusterChecker.checkLoop(ctx); err != nil {
log.Fatalf("cluster checker ended with error: %v", err)
}
}
2 changes: 2 additions & 0 deletions tests/integration/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func TestProxyListening(t *testing.T) {
Spec: &cluster.ClusterSpec{
InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew),
FailInterval: &cluster.Duration{Duration: 10 * time.Second},
// user faster check interval for tests
ProxyCheckInterval: &cluster.Duration{Duration: 1 * time.Second},
},
Status: cluster.ClusterStatus{
CurrentGeneration: 1,
Expand Down