From 8698c48919695af5e9f3d06119ea271cc2d7e1ff Mon Sep 17 00:00:00 2001 From: Andrey Kiskyak Date: Fri, 8 Oct 2021 17:07:11 +0300 Subject: [PATCH] Add proxy to multi backends feature --- basic.go | 11 +++ examples/simple/simple.go | 2 +- pollon.go | 166 ++++++++++++++++++++++++++++++++------ 3 files changed, 153 insertions(+), 26 deletions(-) create mode 100644 basic.go diff --git a/basic.go b/basic.go new file mode 100644 index 0000000..7486e65 --- /dev/null +++ b/basic.go @@ -0,0 +1,11 @@ +package pollon + +// Contains tells whether a contains x. +func Contains(a []string, x string) bool { + for _, n := range a { + if x == n { + return true + } + } + return false +} diff --git a/examples/simple/simple.go b/examples/simple/simple.go index 572856a..cfde873 100644 --- a/examples/simple/simple.go +++ b/examples/simple/simple.go @@ -38,7 +38,7 @@ func Check(c chan pollon.ConfData) { return } log.Printf("address: %s", addr) - c <- pollon.ConfData{DestAddr: addr} + c <- pollon.ConfData{DestAddr: []*net.TCPAddr{addr}} } func main() { diff --git a/pollon.go b/pollon.go index a7abeb2..eb6661e 100644 --- a/pollon.go +++ b/pollon.go @@ -17,61 +17,125 @@ package pollon import ( "fmt" "io" + "math/rand" "net" "sync" + "sync/atomic" "time" ) +type LBType string + +const ( + Random LBType = "random" + LeastQueue LBType = "leastqueue" +) + type ConfData struct { - DestAddr *net.TCPAddr + DestAddr []*net.TCPAddr } -type Proxy struct { - C chan ConfData - listener *net.TCPListener - confMutex sync.Mutex +type Backend struct { destAddr *net.TCPAddr closeConns chan struct{} - stop chan struct{} - endCh chan error - connMutex sync.Mutex + needClean bool + connNum int32 +} +type Proxy struct { + C chan ConfData + listener *net.TCPListener + confMutex sync.Mutex + connMutex sync.Mutex + stop chan struct{} + endCh chan error keepAlive bool keepAliveIdle time.Duration keepAliveCount int keepAliveInterval time.Duration + backends []*Backend + lbType LBType } func NewProxy(listener *net.TCPListener) (*Proxy, error) { return &Proxy{ - C: make(chan ConfData), - listener: listener, - closeConns: make(chan struct{}), - stop: make(chan struct{}), - endCh: make(chan error), - connMutex: sync.Mutex{}, + C: make(chan ConfData), + listener: listener, + stop: make(chan struct{}), + endCh: make(chan error), + lbType: Random, }, nil } +func newBackend(destAddr *net.TCPAddr) *Backend { + return &Backend{ + destAddr: destAddr, + closeConns: make(chan struct{}), + } +} + +func (p *Proxy) GetBackend() *Backend { + if p.lbType == LeastQueue { + var backResult *Backend = nil + var connNum int32 + for _, b := range p.backends { + if backResult == nil { + backResult = b + connNum = atomic.LoadInt32(&b.connNum) + continue + } + currConnNum := atomic.LoadInt32(&b.connNum) + if connNum > currConnNum { + backResult = b + connNum = currConnNum + } + } + return backResult + } + if len(p.backends) > 0 { + return p.backends[rand.Intn(len(p.backends))] + } + return nil +} + +func (b *Backend) incConn() { + atomic.AddInt32(&b.connNum, 1) +} +func (b *Backend) decConn() { + atomic.AddInt32(&b.connNum, -1) +} + +// proxy client connection func (p *Proxy) proxyConn(conn *net.TCPConn) { - p.connMutex.Lock() - closeConns := p.closeConns - destAddr := p.destAddr - p.connMutex.Unlock() + log.Printf("INFO start source connection: %v", conn) defer func() { log.Printf("closing source connection: %v", conn) conn.Close() }() defer conn.Close() - + p.connMutex.Lock() + back := p.GetBackend() + p.connMutex.Unlock() + if back == nil { + log.Printf("ERR no backends, closing source connection: %v", conn) + return + } + p.confMutex.Lock() + closeConns := back.closeConns + destAddr := back.destAddr + p.confMutex.Unlock() if destAddr == nil { + log.Printf("ERR bad destAddr, closing source connection: %v", conn) return } + back.incConn() + defer back.decConn() var d net.Dialer d.Cancel = closeConns destConnInterface, err := d.Dial("tcp", destAddr.String()) if err != nil { + log.Printf("ERR destAddr dial, closing source connection: %v", conn) conn.Close() return } @@ -119,19 +183,38 @@ func (p *Proxy) proxyConn(conn *net.TCPConn) { } } +//reconfig backends func (p *Proxy) confCheck() { for { select { case <-p.stop: + p.confMutex.Lock() + // Is last iteration before func() exit, use defer + defer p.confMutex.Unlock() + for _, back := range p.backends { + back.needClean = true + } + p.BackendCleaning() return case confData := <-p.C: - if confData.DestAddr.String() != p.destAddr.String() { - p.connMutex.Lock() - close(p.closeConns) - p.closeConns = make(chan struct{}) - p.destAddr = confData.DestAddr - p.connMutex.Unlock() + var dAddrStr []string + p.confMutex.Lock() + // Add new backends + for _, dAddr := range confData.DestAddr { + // if New backend exists + if !Contains(p.GetBackendsString(), dAddr.String()) { + p.backends = append(p.backends, newBackend(dAddr)) + } + dAddrStr = append(dAddrStr, dAddr.String()) + } + // Delete stale backends & force close connections + for _, back := range p.backends { + if !Contains(dAddrStr, back.destAddr.String()) { + back.needClean = true + } } + p.BackendCleaning() + p.confMutex.Unlock() } } } @@ -168,6 +251,35 @@ func (p *Proxy) Start() error { return nil } +func (p *Proxy) GetBackendsString() []string { + var result []string + for _, b := range p.backends { + result = append(result, b.destAddr.String()) + } + return result +} + +func (p *Proxy) BackendCleaning() { + last := len(p.backends) - 1 + if last < 0 { + return + } + for i := last; i >= 0; i-- { + if p.backends[i].needClean { + close(p.backends[i].closeConns) + if i != last { + p.backends[i], p.backends[last] = p.backends[last], p.backends[i] + } + last-- + } + } + if last < 0 { + p.backends = nil + } else { + p.backends = p.backends[:last+1] + } +} + func (p *Proxy) SetKeepAlive(keepalive bool) { p.keepAlive = keepalive } @@ -183,3 +295,7 @@ func (p *Proxy) SetKeepAliveCount(n int) { func (p *Proxy) SetKeepAliveInterval(d time.Duration) { p.keepAliveInterval = d } + +func (p *Proxy) SetLBType(lbt LBType) { + p.lbType = lbt +}