-
-
Notifications
You must be signed in to change notification settings - Fork 79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added dial/handshake timeout for non-responsive connections #273
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ import ( | |
"io" | ||
"net" | ||
"sync" | ||
"time" | ||
|
||
"go.nanomsg.org/mangos/v3" | ||
) | ||
|
@@ -93,6 +94,10 @@ func (p *conn) Send(msg *Message) error { | |
func (p *conn) Close() error { | ||
p.Lock() | ||
defer p.Unlock() | ||
// TODO: This does nothing during dial/handshake. | ||
// Ultimately this results in a Dialer that cannot Close while handshake is in progress. | ||
// Even with a timeout mechanism it would be good for Dialer.Close to | ||
// be able to abort pending handshakes in the workQ. | ||
if p.open { | ||
p.open = false | ||
return p.c.Close() | ||
|
@@ -166,10 +171,26 @@ type connHeader struct { | |
// As a side effect, the peer's protocol number is stored in the conn. | ||
// Also, various properties are initialized. | ||
func (p *conn) handshake() error { | ||
|
||
// Rational for a timeout mechanism (which may need to move elsewhere) | ||
// The dial may have worked but that does not guarantee the | ||
// server is going to send us any data. | ||
// No/partial data will cause binary.Read to block indefinitely | ||
// if the socket remains open. Closing the dialer does not work | ||
// because the handshake is not complete (check for p.open). | ||
|
||
// TODO: Timeout should be configurable however that results in a | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this should be configurable. The handshake should go quickly, and 5 seconds is more than enough time to deal with even the crappiest networks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. HOWEVER, I don't think we want to keep this deadline in place once the handshake is completed. Because we could have large BULK transfers (think a 4 GB message, which is perfectly legal) that might take quite a while to send. Or the connection might be backed up due to other large messages in the pipe ahead of it. Therefore we need to ensure that the timeout does not persist once the handshake is complete. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. That is why I defer SetDeadline to 0. That should disable the current deadline per docs "The deadline applies to all future and pending I/O". I figure the deadline set during handshake is done right after dialer connection creation which is prior to the conn being available for transport usage. After the dial/handshake completes other deadlines could be set on it specific to how mangos wants to use the connection. |
||
// large change because each transport dialer would need to | ||
// support it as an option to pass it to the connection | ||
// handshaker. | ||
p.c.SetDeadline(time.Now().Add(5 * time.Second)) | ||
defer p.c.SetDeadline(time.Time{}) | ||
|
||
var err error | ||
|
||
h := connHeader{S: 'S', P: 'P', Proto: p.proto.Self} | ||
if err = binary.Write(p.c, binary.BigEndian, &h); err != nil { | ||
// TODO: should this call _ = p.c.Close() | ||
return err | ||
} | ||
if err = binary.Read(p.c, binary.BigEndian, &h); err != nil { | ||
|
@@ -252,6 +273,8 @@ func (h *connHandshaker) Close() { | |
h.closed = true | ||
h.cv.Broadcast() | ||
for conn := range h.workq { | ||
// This does not do anything because conn.open is | ||
// false and conn.Close is a no-op | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this comment is necessarily true. But I need to review ... it may be that I've missed something here. The intent is to be able to abort the handshaker via Close, and that should close any connections that might not be fully established yet. This code was "recently" (in terms of release numbers, but not necessarily dates) refactored to better deal with stalled handshakes, and I may have messed this up. |
||
_ = conn.Close() | ||
} | ||
for len(h.doneq) != 0 { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
//go:build solaris && cgo | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. my vscode editor added this |
||
// +build solaris,cgo | ||
|
||
// Copyright 2020 The Mangos Authors | ||
|
@@ -49,14 +50,20 @@ import ( | |
import "C" | ||
|
||
func getPeer(c *net.UnixConn, pipe transport.ConnPipe) { | ||
if f, err := c.File(); err == nil { | ||
mc := &C.mycred_t{} | ||
if C.getucred(C.int(f.Fd()), mc) == 0 { | ||
pipe.SetOption(mangos.OptionPeerPID, int(mc.pid)) | ||
pipe.SetOption(mangos.OptionPeerUID, int(mc.uid)) | ||
pipe.SetOption(mangos.OptionPeerGID, int(mc.gid)) | ||
pipe.SetOption(mangos.OptionPeerZone, int(mc.zid)) | ||
} | ||
// This change was necessary to support SetDeadline and Close | ||
// which aborts pending reads/writes. | ||
// The prior code was calling c.File().Fd(). It was not closing | ||
// the file. Additionally the docs say Fd() causes deadlines to not work. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When I originally wrote this, the Control method didn't exist. I need to review the docs to understand what is happening here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here was an attempt to call Close on the connection without this change. Close blocked forever instead of aborting the read.
|
||
if sc, err := c.SyscallConn(); err == nil { | ||
sc.Control(func(fd uintptr) { | ||
mc := &C.mycred_t{} | ||
if C.getucred(C.int(fd), mc) == 0 { | ||
pipe.SetOption(mangos.OptionPeerPID, int(mc.pid)) | ||
pipe.SetOption(mangos.OptionPeerUID, int(mc.uid)) | ||
pipe.SetOption(mangos.OptionPeerGID, int(mc.gid)) | ||
pipe.SetOption(mangos.OptionPeerZone, int(mc.zid)) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
//go:build !windows && !plan9 && !js | ||
// +build !windows,!plan9,!js | ||
|
||
// Copyright 2021 The Mangos Authors | ||
|
@@ -50,6 +51,10 @@ type dialer struct { | |
// Dial implements the Dialer Dial method | ||
func (d *dialer) Dial() (transport.Pipe, error) { | ||
|
||
// TODO: It might be good to pass a context here to abort the dial after | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that that would be a better approach. |
||
// some timeout deadline. Ideally a dial/handshake would cover both | ||
// initial dial and handshake. This is a bigger change because | ||
// it would need to be implemented by each transport dialer. | ||
conn, err := net.DialUnix("unix", nil, d.addr) | ||
if err != nil { | ||
return nil, err | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.