Skip to content

Commit

Permalink
server close now closes the conns
Browse files Browse the repository at this point in the history
  • Loading branch information
John Doak authored and John Doak committed Feb 9, 2021
1 parent 5c45df9 commit 16a5adb
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 10 deletions.
7 changes: 7 additions & 0 deletions ipc/uds/highlevel/chunk/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,13 @@ func (s *Server) handleRequests(conn *uds.Conn) {
buff, err := chunker.Read()
if err != nil {
if err != io.EOF {
// If the server has been closed, then this will be a *net.Op error.
// We shouldn't error if the server was stopped.
select {
case <-s.stop:
return
default:
}
log.Println(err)
}
return
Expand Down
42 changes: 32 additions & 10 deletions ipc/uds/uds.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"os/user"
"strconv"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -100,6 +101,8 @@ type Cred struct {
// after being received.
type Conn struct {
Cred Cred
id uint32
serv *Server
conn *net.UnixConn
buffer *bufio.Reader
readDeadline time.Time
Expand All @@ -109,8 +112,10 @@ type Conn struct {
readMu, writeMu sync.Mutex
}

func newConn(conn *net.UnixConn, cred Cred) *Conn {
func newConn(conn *net.UnixConn, cred Cred, id uint32, serv *Server) *Conn {
c := &Conn{
id: id,
serv: serv,
Cred: cred,
conn: conn,
buffer: bufio.NewReaderSize(conn, 1024),
Expand All @@ -121,7 +126,12 @@ func newConn(conn *net.UnixConn, cred Cred) *Conn {

// Close implements io.Closer.Close().
func (c *Conn) Close() error {
return c.conn.Close()
err := c.conn.Close()

c.serv.openMU.Lock()
delete(c.serv.openConns, c.id)
c.serv.openMU.Unlock()
return err
}

// WriteOnly let's the Conn know that this Conn will only be used for writing. This will cause a
Expand Down Expand Up @@ -214,7 +224,10 @@ type Server struct {
oneByte []byte
errCh chan error
connCh chan *Conn
closed chan struct{}

connID uint32 // atomic guard
openMU sync.Mutex // guards openConns
openConns map[uint32]*Conn
}

// NewServer creates a new UDS server that creates and listens to the file at socketPath. uid and gid are
Expand Down Expand Up @@ -247,11 +260,11 @@ func NewServer(socketAddr string, uid, gid int, fileMode os.FileMode) (*Server,
}

serv := &Server{
l: l.(*net.UnixListener),
oneByte: make([]byte, 1),
errCh: make(chan error, 1),
connCh: make(chan *Conn, 1),
closed: make(chan struct{}),
l: l.(*net.UnixListener),
oneByte: make([]byte, 1),
errCh: make(chan error, 1),
connCh: make(chan *Conn, 1),
openConns: map[uint32]*Conn{},
}
go serv.accept()
return serv, nil
Expand All @@ -265,7 +278,11 @@ func (c *Server) Conn() chan *Conn {

// Close stops listening for connections on the socket.
func (c *Server) Close() error {
return c.l.Close()
err := c.l.Close()
for _, conn := range c.openConns {
conn.Close()
}
return err
}

// Closed returns a channel that returns an error when the connection to the server is closed.
Expand Down Expand Up @@ -300,7 +317,12 @@ func (c *Server) accept() {
conn.Close()
continue
}
c.connCh <- newConn(uc, cred)
connID := atomic.AddUint32(&c.connID, 1)
co := newConn(uc, cred, connID, c)
c.openMU.Lock()
c.openConns[connID] = co
c.openMU.Unlock()
c.connCh <- co
}
}()
}
Expand Down

0 comments on commit 16a5adb

Please sign in to comment.