From 16a5adb74718ad42124d214e003f4918d913d21e Mon Sep 17 00:00:00 2001 From: John Doak Date: Mon, 8 Feb 2021 23:27:25 -0800 Subject: [PATCH] server close now closes the conns --- ipc/uds/highlevel/chunk/rpc/rpc.go | 7 +++++ ipc/uds/uds.go | 42 +++++++++++++++++++++++------- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/ipc/uds/highlevel/chunk/rpc/rpc.go b/ipc/uds/highlevel/chunk/rpc/rpc.go index c594c97..b9e55d3 100644 --- a/ipc/uds/highlevel/chunk/rpc/rpc.go +++ b/ipc/uds/highlevel/chunk/rpc/rpc.go @@ -376,6 +376,13 @@ func (s *Server) handleRequests(conn *uds.Conn) { buff, err := chunker.Read() if err != nil { if err != io.EOF { + // If the server has been closed, then this will be a *net.Op error. + // We shouldn't error if the server was stopped. + select { + case <-s.stop: + return + default: + } log.Println(err) } return diff --git a/ipc/uds/uds.go b/ipc/uds/uds.go index b6058ff..755e544 100644 --- a/ipc/uds/uds.go +++ b/ipc/uds/uds.go @@ -39,6 +39,7 @@ import ( "os/user" "strconv" "sync" + "sync/atomic" "time" ) @@ -100,6 +101,8 @@ type Cred struct { // after being received. type Conn struct { Cred Cred + id uint32 + serv *Server conn *net.UnixConn buffer *bufio.Reader readDeadline time.Time @@ -109,8 +112,10 @@ type Conn struct { readMu, writeMu sync.Mutex } -func newConn(conn *net.UnixConn, cred Cred) *Conn { +func newConn(conn *net.UnixConn, cred Cred, id uint32, serv *Server) *Conn { c := &Conn{ + id: id, + serv: serv, Cred: cred, conn: conn, buffer: bufio.NewReaderSize(conn, 1024), @@ -121,7 +126,12 @@ func newConn(conn *net.UnixConn, cred Cred) *Conn { // Close implements io.Closer.Close(). func (c *Conn) Close() error { - return c.conn.Close() + err := c.conn.Close() + + c.serv.openMU.Lock() + delete(c.serv.openConns, c.id) + c.serv.openMU.Unlock() + return err } // WriteOnly let's the Conn know that this Conn will only be used for writing. This will cause a @@ -214,7 +224,10 @@ type Server struct { oneByte []byte errCh chan error connCh chan *Conn - closed chan struct{} + + connID uint32 // atomic guard + openMU sync.Mutex // guards openConns + openConns map[uint32]*Conn } // NewServer creates a new UDS server that creates and listens to the file at socketPath. uid and gid are @@ -247,11 +260,11 @@ func NewServer(socketAddr string, uid, gid int, fileMode os.FileMode) (*Server, } serv := &Server{ - l: l.(*net.UnixListener), - oneByte: make([]byte, 1), - errCh: make(chan error, 1), - connCh: make(chan *Conn, 1), - closed: make(chan struct{}), + l: l.(*net.UnixListener), + oneByte: make([]byte, 1), + errCh: make(chan error, 1), + connCh: make(chan *Conn, 1), + openConns: map[uint32]*Conn{}, } go serv.accept() return serv, nil @@ -265,7 +278,11 @@ func (c *Server) Conn() chan *Conn { // Close stops listening for connections on the socket. func (c *Server) Close() error { - return c.l.Close() + err := c.l.Close() + for _, conn := range c.openConns { + conn.Close() + } + return err } // Closed returns a channel that returns an error when the connection to the server is closed. @@ -300,7 +317,12 @@ func (c *Server) accept() { conn.Close() continue } - c.connCh <- newConn(uc, cred) + connID := atomic.AddUint32(&c.connID, 1) + co := newConn(uc, cred, connID, c) + c.openMU.Lock() + c.openConns[connID] = co + c.openMU.Unlock() + c.connCh <- co } }() }