Skip to content

Commit

Permalink
clean dead conns from channel buffer on new connection
Browse files Browse the repository at this point in the history
  • Loading branch information
davidnewhall committed Aug 26, 2023
1 parent 260b110 commit 91322fb
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 24 deletions.
1 change: 1 addition & 0 deletions server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (s *Server) HandleRequest(name string) http.Handler {
// Send the incoming http request to the peer through the WebSocket connection.
if err := connection.proxyRequest(resp, req); err != nil {
// An error occurred throw the connection away.
// This most commonly happens when the requestor gives up waiting for the request (client-side timeout elapses).
connection.Close(fmt.Sprintf("proxy error: %v", err))
// Try to return an error to the client.
// This might fail if response headers have already been sent.
Expand Down
44 changes: 20 additions & 24 deletions server/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
type Pool struct {
connected time.Time
handshake *mulch.Handshake
done bool
minSize int
idleTimeout time.Duration
id string
Expand All @@ -33,22 +32,17 @@ type clientID string
// NewPool creates a new Pool, and starts one go routine per pool to keep it clean and running.
// Each pool represents 1 client, and each client may have many connections.
func NewPool(server *Server, client *PoolConfig, altID string) *Pool {
// We increase the idle pool buffer size in case a client restarts.
// This allows the restarted client to reconnect before the previous connections get used up from the buffer.
// If the buffer fills, new connections are rejected.
const idlePoolMultiplier = 3

if altID == "" {
altID = client.ID
}

// update pool size; we add 1 so the pool may have 1 thread more than it's minimum idle.
// update pool size; we add 1 so the pool may have 1 threads more than it's minimum idle.
pool := &Pool{
connected: time.Now(),
handshake: client.Handshake,
id: altID,
minSize: client.Size + 1, // This 1 allows slightly less thread teardown/bringup.
idle: make(chan *Connection, client.MaxSize*idlePoolMultiplier),
idle: make(chan *Connection, client.MaxSize+1),
idleTimeout: server.Config.IdleTimeout,
newConn: make(chan *Connection),
askClean: make(chan struct{}),
Expand All @@ -67,13 +61,6 @@ func (pool *Pool) shutdown() {
pool.Debugf("Shutting down pool: %v", pool.id)
defer pool.Debugf("Done shutting down pool: %v", pool.id)

if pool.done {
pool.Debugf("Pool was already shut down? %v", pool.id)
return
}

pool.done = true

for _, connection := range pool.connections {
connection.Close("shutdown")
}
Expand All @@ -99,18 +86,17 @@ func (pool *Pool) keepRunning() {
return
}

if !pool.done {
pool.clean()
pool.connections = append(pool.connections, conn)
pool.Printf("Registering new connection from %s [%s], tunnels: %d, max: %d",
pool.id, conn.sock.RemoteAddr(), len(pool.connections), cap(pool.idle))
}
pool.clean()
pool.connections = append(pool.connections, conn)
pool.Printf("Registering new connection from %s [%s], tunnels: %d, idle: %d/%d",
pool.id, conn.sock.RemoteAddr(), len(pool.connections), len(pool.idle), cap(pool.idle))
}
}
}

// Register creates a new Connection and adds it to the pool.
func (pool *Pool) Register(ws *websocket.Conn) {
pool.cleanIdleChan()
pool.newConn <- NewConnection(pool, ws)
}

Expand All @@ -134,18 +120,28 @@ func (pool *Pool) clean() {
pool.connections = save
}

// cleanIdleChan removes all non-idle connections from the idle channel buffer.
// This should run every time a new connection registers; to clean out old dead connections.
func (pool *Pool) cleanIdleChan() {
for i := len(pool.idle); i > 0; i-- {
if conn := <-pool.idle; conn.Status() == Idle {
pool.idle <- conn
}
}
}

func (pool *Pool) cleanConnection(connection *Connection, idle int) (int, bool) {
// Ensure a busy connection is never closed.
connection.lock.Lock()
defer connection.lock.Unlock()

if connection.status == Idle {
idle++
// Terminate the connection if it is idle since more that IdleTimeout.
// Terminate the connection if it is idle since more than IdleTimeout.
if age := time.Since(connection.idleSince); idle > pool.minSize && age > pool.idleTimeout {
// We have enough idle connections in the pool, and this one is old.
pool.Printf("Closing idle connection: %s [%s], tunnels: %d , max: %d",
pool.id, connection.sock.RemoteAddr(), len(pool.connections), cap(pool.idle))
pool.Printf("Closing idle connection: %s [%s], tunnels: %d , idle: %d/%d",
pool.id, connection.sock.RemoteAddr(), len(pool.connections), len(pool.idle), cap(pool.idle))
connection.close("idle " + age.String())
}
}
Expand Down

0 comments on commit 91322fb

Please sign in to comment.