Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pause/resume logic #6

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 137 additions & 24 deletions pollon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,52 +19,68 @@ import (
"io"
"net"
"sync"
"time"
)

type ConfData struct {
DestAddr *net.TCPAddr
}

type Proxy struct {
C chan ConfData
listener *net.TCPListener
confMutex sync.Mutex
destAddr *net.TCPAddr
listener *net.TCPListener

configMutex sync.Mutex
C chan ConfData
destAddr *net.TCPAddr

connMutex sync.Mutex
closeConns chan struct{}
stop chan struct{}
stopCh chan struct{}
endCh chan error
connMutex sync.Mutex

limiter *time.Ticker
state int
}

const (
proxyActive = iota
proxyPaused
)

func NewProxy(listener *net.TCPListener) (*Proxy, error) {
return &Proxy{
C: make(chan ConfData),
listener: listener,
closeConns: make(chan struct{}),
stop: make(chan struct{}),
endCh: make(chan error),
connMutex: sync.Mutex{},
listener: listener,
configMutex: sync.Mutex{},
C: make(chan ConfData),
connMutex: sync.Mutex{},
closeConns: make(chan struct{}),
stopCh: make(chan struct{}),
endCh: make(chan error),
}, nil
}

func (p *Proxy) proxyConn(conn *net.TCPConn) {
p.connMutex.Lock()
if p.limiter != nil {
<-p.limiter.C
}
closeConns := p.closeConns
destAddr := p.destAddr
p.connMutex.Unlock()

defer func() {
log.Printf("closing source connection: %v", conn)
conn.Close()
}()
defer conn.Close()

if destAddr == nil {
log.Print("can't open connection: destination address empty")
return
}

destConn, err := net.DialTCP("tcp", nil, p.destAddr)
if err != nil {
conn.Close()
log.Printf("failed to open connection to destination: %v", err)
return
}
defer func() {
Expand All @@ -73,36 +89,39 @@ func (p *Proxy) proxyConn(conn *net.TCPConn) {
}()

var wg sync.WaitGroup
end := make(chan bool)
end := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
n, err := io.Copy(destConn, conn)
if err != nil {
log.Printf("error copying from source to dest: %v", err)
}
conn.Close()
conn.CloseWrite()
destConn.CloseRead()
log.Printf("ending. copied %d bytes from source to dest", n)
wg.Done()
}()

wg.Add(1)
go func() {
defer wg.Done()
n, err := io.Copy(conn, destConn)
if err != nil {
log.Printf("error copying from dest to source: %v", err)
}
destConn.Close()
destConn.CloseWrite()
conn.CloseRead()
log.Printf("ending. copied %d bytes from dest to source", n)
wg.Done()
}()

go func() {
wg.Wait()
end <- true
close(end)
}()

select {
case <-end:
log.Printf("all io copy goroutines done")
log.Printf("connection closing - all copy goroutines done")
return
case <-closeConns:
log.Printf("closing all connections")
Expand All @@ -113,12 +132,14 @@ func (p *Proxy) proxyConn(conn *net.TCPConn) {
func (p *Proxy) confCheck() {
for {
select {
case <-p.stop:
case <-p.stopCh:
return
case confData := <-p.C:
if confData.DestAddr.String() != p.destAddr.String() {
p.connMutex.Lock()
close(p.closeConns)
if p.closeConns != nil {
close(p.closeConns)
}
p.closeConns = make(chan struct{})
p.destAddr = confData.DestAddr
p.connMutex.Unlock()
Expand Down Expand Up @@ -146,9 +167,101 @@ func (p *Proxy) Start() error {
go p.confCheck()
go p.accepter()
err := <-p.endCh
close(p.stop)
close(p.stopCh)
if err != nil {
return fmt.Errorf("proxy error: %v", err)
}
return nil
}

// Pause makes proxy continue accepting new connections, but prevents it
// from opening new connections to destination address until proxy status
// changes.
// It doesn't affect already open connections in any way.
func (p *Proxy) Pause() {
p.configMutex.Lock()
if p.state == proxyActive {
p.state = proxyPaused
p.connMutex.Unlock()
}
p.configMutex.Unlock()
}

// Resume makes a paused proxy resume opening news connections to destination
// address.
// It makes no attempt to spread backlogged connection opens over time, so it
// can cause thundering herd problem - use with caution
func (p *Proxy) Resume() {
p.configMutex.Lock()
if p.state == proxyPaused {
p.state = proxyActive
if p.closeConns == nil {
p.closeConns = make(chan struct{})
}
p.connMutex.Unlock()
}
p.configMutex.Unlock()
}

// PauseAndDisconnect first pauses the proxy and then, after timeout t has
// passed, closes all connections opened prior to the proxy being paused.
// It can be used for a graceful switchover when paired with client-side
// connection timeout
func (p *Proxy) PauseAndDisconnect(t time.Duration) {
p.configMutex.Lock()

if p.state == proxyActive {
p.state = proxyPaused
p.connMutex.Lock()

closeConns := p.closeConns
p.closeConns = nil
go func() {
<-time.NewTimer(t).C
close(closeConns)
}()
}

p.configMutex.Unlock()
}

// GradualResume makes the proxy resume opening connections to destination,
// but to avoid thundering herd problem by opening at the rate of one
// connection every interval i for the period of duration d
func (p *Proxy) GradualResume(i, d time.Duration) {
p.configMutex.Lock()
if p.state == proxyPaused {
p.state = proxyActive
if p.closeConns == nil {
p.closeConns = make(chan struct{})
}
p.limit(i, d)
p.connMutex.Unlock()
}
p.configMutex.Unlock()
}

func (p *Proxy) limit(i, d time.Duration) {
if i.Nanoseconds() <= 0 || d.Nanoseconds() <= 0 {
return
}
p.limiter = time.NewTicker(i)
go func() {
<-time.NewTimer(d).C
p.connMutex.Lock()
p.limiter = nil
p.connMutex.Unlock()
}()
}

// Config atomically updates proxy destination address without affecting
// already open connections
func (p *Proxy) Config(conf ConfData) {
p.configMutex.Lock()
if conf.DestAddr.String() != p.destAddr.String() {
p.connMutex.Lock()
p.destAddr = conf.DestAddr
p.connMutex.Unlock()
}
p.configMutex.Unlock()
}