Skip to content

Commit

Permalink
Merge pull request #348 from lesismal/dialer_online
Browse files Browse the repository at this point in the history
Dialer online, async close
  • Loading branch information
lesismal authored Sep 14, 2023
2 parents f6372ad + 19275a0 commit eb81029
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 9 deletions.
27 changes: 27 additions & 0 deletions nbhttp/client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@ func (c *ClientConn) closeWithErrorWithoutLock(err error) {
}
c.handlers = nil
if c.conn != nil {
nbc, ok := c.conn.(*nbio.Conn)
if !ok {
if tlsConn, ok2 := c.conn.(*tls.Conn); ok2 {
nbc, ok = tlsConn.Conn().(*nbio.Conn)
}
}
if ok {
key, _ := conn2Array(nbc)
c.Engine.mux.Lock()
delete(c.Engine.dialerConns, key)
c.Engine.mux.Unlock()
}
c.conn.Close()
c.conn = nil
}
Expand Down Expand Up @@ -247,6 +259,11 @@ func (c *ClientConn) Do(req *http.Request, handler func(res *http.Response, conn
return
}

key, _ := conn2Array(nbc)
engine.mux.Lock()
engine.dialerConns[key] = struct{}{}
engine.mux.Unlock()

c.conn = nbc
processor := NewClientProcessor(c, c.onResponse)
parser := NewParser(processor, true, engine.ReadLimit, nbc.Execute)
Expand Down Expand Up @@ -288,6 +305,16 @@ func (c *ClientConn) Do(req *http.Request, handler func(res *http.Response, conn
return
}

key, err := conn2Array(nbc)
if err != nil {
logging.Error("add dialer conn failed: %v", err)
c.closeWithErrorWithoutLock(err)
return
}
engine.mux.Lock()
engine.dialerConns[key] = struct{}{}
engine.mux.Unlock()

isNonblock := true
tlsConn.ResetConn(nbc, isNonblock)

Expand Down
18 changes: 15 additions & 3 deletions nbhttp/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,9 @@ type Engine struct {
_onClose func(c net.Conn, err error)
_onStop func()

mux sync.Mutex
conns map[connValue]struct{}
mux sync.Mutex
conns map[connValue]struct{}
dialerConns map[connValue]struct{}

// tlsBuffers [][]byte
// getTLSBuffer func(c *nbio.Conn) []byte
Expand Down Expand Up @@ -258,6 +259,11 @@ func (e *Engine) Online() int {
return len(e.conns)
}

// DialerOnline .
func (e *Engine) DialerOnline() int {
return len(e.dialerConns)
}

func (e *Engine) closeAllConns() {
e.mux.Lock()
defer e.mux.Unlock()
Expand All @@ -266,6 +272,11 @@ func (e *Engine) closeAllConns() {
c.Close()
}
}
for key := range e.dialerConns {
if c, err := array2Conn(key); err == nil {
c.Close()
}
}
}

func (e *Engine) listen(ln net.Listener, tlsConfig *tls.Config, addConn func(net.Conn, *tls.Config, func()), decrease func()) {
Expand Down Expand Up @@ -459,7 +470,7 @@ func (e *Engine) Shutdown(ctx context.Context) error {
logging.Info("NBIO[%v] shutdown timeout", e.Engine.Name)
return ctx.Err()
case <-ticker.C:
if len(e.conns) == 0 {
if len(e.conns)+len(e.dialerConns) == 0 {
goto Exit
}
}
Expand Down Expand Up @@ -965,6 +976,7 @@ func NewEngine(conf Config) *Engine {
_onStop: func() {},
CheckUtf8: utf8.Valid,
conns: map[connValue]struct{}{},
dialerConns: map[connValue]struct{}{},
ExecuteClient: clientExecutor,

emptyRequest: (&http.Request{}).WithContext(baseCtx),
Expand Down
12 changes: 6 additions & 6 deletions nbhttp/websocket/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,17 @@ func (c *Conn) Close() error {
if c.Conn == nil {
return nil
}
if c.IsAsyncWrite() {
c.Engine.AfterFunc(time.Second, func() { c.Conn.Close() })
return nil
}
return c.Conn.Close()
}

// CloseWithError .
func (c *Conn) CloseWithError(err error) error {
c.SetCloseError(err)
return c.Conn.Close()
return c.Close()
}

// SetCloseError .
Expand Down Expand Up @@ -237,11 +241,7 @@ func (c *Conn) handleWsMessage(opcode MessageType, data []byte) {
}

ErrExit:
if c.IsAsyncWrite() {
c.Engine.AfterFunc(time.Second, func() { c.Conn.Close() })
} else {
c.Conn.Close()
}
c.Close()
}

func (c *Conn) nextFrame() (opcode MessageType, body []byte, ok, fin, res1, res2, res3 bool, err error) {
Expand Down

0 comments on commit eb81029

Please sign in to comment.