diff --git a/QTransport/swarm_QTransport.go b/QTransport/swarm_QTransport.go index a187c61a..b1ce9643 100644 --- a/QTransport/swarm_QTransport.go +++ b/QTransport/swarm_QTransport.go @@ -1,4 +1,5 @@ package QTransport + // This file contains the whole Transport to QTransport abstraction layer. import ( @@ -14,6 +15,7 @@ import ( // 2^31 var defaultNonProxyQuality uint32 = 2147483648 + // 2^31+2^30 var defaultProxyQuality uint32 = 3221225472 @@ -30,9 +32,7 @@ func (t TransportUpgrader) Dial(ctx context.Context, raddr ma.Multiaddr, p peer. if err != nil { return nil, err } - return upgradedCapableConn{ - listenedUpgradedCapableConn{BaseCapableConn: conn, t: t}, - }, nil + return upgradedCapableConn{BaseCapableConn: conn, t: t}, nil } func (t TransportUpgrader) Listen(laddr ma.Multiaddr) (transport.QListener, error) { @@ -50,48 +50,43 @@ func (t TransportUpgrader) Score(raddr ma.Multiaddr, _ peer.ID) (transport.Score if t.Proxy() { if manet.IsIPLoopback(raddr) { return transport.Score{ - Quality: defaultProxyQuality >> 16, - IsQuality: true, - Fd: 1, + Quality: defaultProxyQuality >> 16, + Fd: 1, }, nil } if manet.IsPrivateAddr(raddr) { return transport.Score{ - Quality: defaultProxyQuality >> 8, - IsQuality: true, - Fd: 1, + Quality: defaultProxyQuality >> 8, + Fd: 1, }, nil } return transport.Score{ - Quality: defaultProxyQuality, - IsQuality: true, - Fd: 1, + Quality: defaultProxyQuality, + Fd: 1, }, nil } if manet.IsIPLoopback(raddr) { return transport.Score{ - Quality: defaultNonProxyQuality >> 16, - IsQuality: true, - Fd: 1, + Quality: defaultNonProxyQuality >> 16, + Fd: 1, }, nil } if manet.IsPrivateAddr(raddr) { return transport.Score{ - Quality: defaultNonProxyQuality >> 8, - IsQuality: true, - Fd: 1, + Quality: defaultNonProxyQuality >> 8, + Fd: 1, }, nil } return transport.Score{ - Quality: defaultNonProxyQuality, - IsQuality: true, - Fd: 1, + Quality: defaultNonProxyQuality, + Fd: 1, }, nil } // Used to upgrade `transport.CapableConn` to `transport.QCapableConn`. type upgradedCapableConn struct { - listenedUpgradedCapableConn + transport.BaseCapableConn + t transport.QTransport } func (c upgradedCapableConn) Quality() uint32 { @@ -114,12 +109,7 @@ func (c upgradedCapableConn) Quality() uint32 { return defaultNonProxyQuality } -type listenedUpgradedCapableConn struct { - transport.BaseCapableConn - t transport.QTransport -} - -func (c listenedUpgradedCapableConn) Transport() transport.QTransport { +func (c upgradedCapableConn) Transport() transport.QTransport { return c.t } @@ -128,10 +118,10 @@ type upgradedListener struct { t transport.QTransport } -func (l upgradedListener) Accept() (transport.ListenedQCapableConn, error) { +func (l upgradedListener) Accept() (transport.QCapableConn, error) { c, err := l.BaseListener.(transport.Listener).Accept() if err != nil { return nil, err } - return listenedUpgradedCapableConn{BaseCapableConn: c, t: l.t}, nil + return upgradedCapableConn{BaseCapableConn: c, t: l.t}, nil } diff --git a/dial_bus.go b/dial_bus.go new file mode 100644 index 00000000..cd8aa229 --- /dev/null +++ b/dial_bus.go @@ -0,0 +1,279 @@ +package swarm + +import ( + "context" + "fmt" + "sync" + + ma "github.com/multiformats/go-multiaddr" + + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/transport" +) + +// DialBus manage dialing. +type dialBus struct { + p peer.ID + s *Swarm + + // Used to cancel if no more dial are requested to this peer. + wanting struct { + sync.Mutex + refcount uint + } + + c struct { + sync.RWMutex + // Closed if c != nil. + available chan struct{} + // The current best connection avaible. + conn *Conn + } + + // Used for dials to cancel other if they are better. + dials struct { + sync.Mutex + d *dialJob + } + + err struct { + sync.RWMutex + // If available is closed means no more new dial are to expect. + available chan struct{} + // Value of the error. + err error + } +} + +func (s *Swarm) newDialBus(p peer.ID) *dialBus { + db := &dialBus{p: p, s: s} + db.c.available = make(chan struct{}) + db.err.available = make(chan struct{}) + return db +} + +func (s *Swarm) getOrCreateDialBus(p peer.ID) *dialBus { + s.conns.RLock() + if db := s.conns.m[p]; db != nil { + s.conns.RUnlock() + return db + } + s.conns.RUnlock() + s.conns.Lock() + db := s.newDialBus(p) + s.conns.m[p] = db + s.conns.Unlock() + return db +} + +var ErrDialCanceled = fmt.Errorf("All dial context were canceled.") + +// doDial start a dialling operation. +func (d *dialBus) doDial(ctx context.Context) error { + // TODO: Emit events + s := d.s + if d.p == s.local { + return &DialError{Peer: d.p, Cause: ErrDialToSelf} + } + + sk := s.peers.PrivKey(s.local) + if sk == nil { + // fine for sk to be nil, just log. + log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.") + } + + peerAddrs := s.peers.Addrs(d.p) + if len(peerAddrs) == 0 { + return &DialError{Peer: d.p, Cause: ErrNoAddresses} + } + goodAddrs := s.filterKnownUndialables(peerAddrs) + if len(goodAddrs) == 0 { + return &DialError{Peer: d.p, Cause: ErrNoGoodAddresses} + } + + d.dials.Lock() + d.c.RLock() + var currentQuality uint32 + if d.c.conn != nil { + currentQuality = d.c.conn.conn.Quality() + } + // False if an other is already on the way or if we start one. + var isNotDialling bool = d.dials.d == nil +AddrIterator: + for _, addr := range goodAddrs { + // Iterate over the linked list + current := d.dials.d + for current != nil { + // Check if we are already dialling this address. + if current.raddr == addr { + continue AddrIterator + } + current = current.next + } + // If not start a dial to this address. + ctxDial, cancel := context.WithCancel(context.Background()) + tpt := s.TransportForDialing(addr) + if tpt == nil { + continue + } + score, err := tpt.Score(addr, d.p) + if err != nil || (d.c.conn != nil && score.Quality >= currentQuality) { + continue + } + current.next = &dialJob{ + cancel: cancel, + raddr: addr, + quality: score.Quality, + } + isNotDialling = false + // TODO: Implement fd limiting. + go d.watchDial(ctxDial, current.next, tpt) + } + d.c.RUnlock() + d.dials.Unlock() + + if isNotDialling { + return &DialError{Peer: d.p, Cause: fmt.Errorf("Can't connect to %s, no dial started.", d.p.Pretty())} + } + + // Start a context manager, this will monitor the status of the dial and + // respond to the ctx. + go func() { + select { + // Check if we get a connection. + case <-d.c.available: + // Great nothing to do. + // Check if all dials were bad. + case <-d.err.available: + // Sad but still not our problem. + // Finaly check if we don't want of this anymore. + case <-ctx.Done(): + d.wanting.Lock() + defer d.wanting.Unlock() + d.wanting.refcount-- + // Checking if we were the last wanting this dial. + if d.wanting.refcount == 0 { + // If canceling all dial. + // Aquire all locks to ensure a complete stop of the dialBus. + d.dials.Lock() + d.c.Lock() + defer d.c.Unlock() + d.err.Lock() + defer d.err.Unlock() + // Iterate over the linked list + current := d.dials.d + // TODO: move teardown logic to an external function + for current != nil { + // cancel each dial. + current.cancel() + current = current.next + } + d.dials.Unlock() + if d.c.conn != nil { + d.c.conn.Close() + } + // Safely close. + select { + case <-d.err.available: + default: + d.err.err = ErrDialCanceled + close(d.err.available) + } + } + } + }() + return nil +} + +func (d *dialBus) watchDial(ctx context.Context, di *dialJob, tpt transport.QTransport) { + var endingBad = true + defer func() { + // we have finish dialling, remove us from the job list. + d.dials.Lock() + defer d.dials.Unlock() + // Start iterating + current := d.dials.d + var past *dialJob = nil + for current != nil { + if current == di { + if past == nil { + d.dials.d = current.next + } else { + past.next = current.next + } + } + past, current = current, current.next + } + if endingBad { + // Checking if we were the last dial. + if d.dials.d == nil { + // If raising an error. + d.err.Lock() + defer d.err.Unlock() + // Safely close. + select { + case <-d.err.available: + default: + d.err.err = ErrAllDialsFailed + close(d.err.available) + } + } + } + }() + conn, err := tpt.Dial(ctx, di.raddr, d.p) + if err != nil { + log.Error(fmt.Errorf("Error dialing %s with transport %T: %s", d.p, tpt, err)) + return + } + // Check if we should die (e.g. a bad implemented transport not respecting the + // context). + select { + case <-ctx.Done(): + conn.Close() + return + default: + } + // Trust the transport? Yeah... right. + if conn.RemotePeer() != d.p { + log.Error(fmt.Errorf("BUG in transport %T: tried to dial %s, dialed %s", tpt, d.p, conn.RemotePeer())) + conn.Close() + return + } + // Upgrading + err = d.s.addConn(conn, network.DirOutbound) + if err != nil { + conn.Close() + log.Error(fmt.Errorf("Error upgrading to network.Conn %s with transport %T: %s", d.p, tpt, err)) + return + } + endingBad = false +} + +func (d *dialBus) watchForConn(ctx context.Context) (*Conn, error) { + // Wait for a response. + select { + // First try to get the conn. + case <-d.c.available: + d.c.RLock() + defer d.c.RUnlock() + return d.c.conn, nil + // Else try to get the error message. + case <-d.err.available: + d.err.RLock() + defer d.err.RUnlock() + return nil, d.err.err + // And finaly verify if connection is still wanted. + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// dialJob is a linked list of dial item, each dial have one. +// Its a linked list to have light fast item deletion. +type dialJob struct { + quality uint32 + cancel func() + raddr ma.Multiaddr + next *dialJob +} diff --git a/dial_sync.go b/dial_sync.go deleted file mode 100644 index 4c1230f5..00000000 --- a/dial_sync.go +++ /dev/null @@ -1,136 +0,0 @@ -package swarm - -import ( - "context" - "errors" - "sync" - - "github.com/libp2p/go-libp2p-core/peer" -) - -// TODO: change this text when we fix the bug -var errDialCanceled = errors.New("dial was aborted internally, likely due to https://git.io/Je2wW") - -// DialFunc is the type of function expected by DialSync. -type DialFunc func(context.Context, peer.ID) (*Conn, error) - -// NewDialSync constructs a new DialSync -func NewDialSync(dfn DialFunc) *DialSync { - return &DialSync{ - dials: make(map[peer.ID]*activeDial), - dialFunc: dfn, - } -} - -// DialSync is a dial synchronization helper that ensures that at most one dial -// to any given peer is active at any given time. -type DialSync struct { - dials map[peer.ID]*activeDial - dialsLk sync.Mutex - dialFunc DialFunc -} - -type activeDial struct { - id peer.ID - refCnt int - refCntLk sync.Mutex - cancel func() - - err error - conn *Conn - waitch chan struct{} - - ds *DialSync -} - -func (ad *activeDial) wait(ctx context.Context) (*Conn, error) { - defer ad.decref() - select { - case <-ad.waitch: - return ad.conn, ad.err - case <-ctx.Done(): - return nil, ctx.Err() - } -} - -func (ad *activeDial) incref() { - ad.refCntLk.Lock() - defer ad.refCntLk.Unlock() - ad.refCnt++ -} - -func (ad *activeDial) decref() { - ad.refCntLk.Lock() - ad.refCnt-- - maybeZero := (ad.refCnt <= 0) - ad.refCntLk.Unlock() - - // make sure to always take locks in correct order. - if maybeZero { - ad.ds.dialsLk.Lock() - ad.refCntLk.Lock() - // check again after lock swap drop to make sure nobody else called incref - // in between locks - if ad.refCnt <= 0 { - ad.cancel() - delete(ad.ds.dials, ad.id) - } - ad.refCntLk.Unlock() - ad.ds.dialsLk.Unlock() - } -} - -func (ad *activeDial) start(ctx context.Context) { - ad.conn, ad.err = ad.ds.dialFunc(ctx, ad.id) - - // This isn't the user's context so we should fix the error. - switch ad.err { - case context.Canceled: - // The dial was canceled with `CancelDial`. - ad.err = errDialCanceled - case context.DeadlineExceeded: - // We hit an internal timeout, not a context timeout. - ad.err = ErrDialTimeout - } - close(ad.waitch) - ad.cancel() -} - -func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { - ds.dialsLk.Lock() - defer ds.dialsLk.Unlock() - - actd, ok := ds.dials[p] - if !ok { - adctx, cancel := context.WithCancel(context.Background()) - actd = &activeDial{ - id: p, - cancel: cancel, - waitch: make(chan struct{}), - ds: ds, - } - ds.dials[p] = actd - - go actd.start(adctx) - } - - // increase ref count before dropping dialsLk - actd.incref() - - return actd -} - -// DialLock initiates a dial to the given peer if there are none in progress -// then waits for the dial to that peer to complete. -func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) { - return ds.getActiveDial(p).wait(ctx) -} - -// CancelDial cancels all in-progress dials to the given peer. -func (ds *DialSync) CancelDial(p peer.ID) { - ds.dialsLk.Lock() - defer ds.dialsLk.Unlock() - if ad, ok := ds.dials[p]; ok { - ad.cancel() - } -} diff --git a/dial_sync_test.go b/dial_sync_test.go deleted file mode 100644 index 485d1a31..00000000 --- a/dial_sync_test.go +++ /dev/null @@ -1,229 +0,0 @@ -package swarm_test - -import ( - "context" - "fmt" - "sync" - "testing" - "time" - - . "github.com/libp2p/go-libp2p-swarm" - - "github.com/libp2p/go-libp2p-core/peer" -) - -func getMockDialFunc() (DialFunc, func(), context.Context, <-chan struct{}) { - dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care - dialctx, cancel := context.WithCancel(context.Background()) - ch := make(chan struct{}) - f := func(ctx context.Context, p peer.ID) (*Conn, error) { - dfcalls <- struct{}{} - defer cancel() - select { - case <-ch: - return new(Conn), nil - case <-ctx.Done(): - return nil, ctx.Err() - } - } - - o := new(sync.Once) - - return f, func() { o.Do(func() { close(ch) }) }, dialctx, dfcalls -} - -func TestBasicDialSync(t *testing.T) { - df, done, _, callsch := getMockDialFunc() - - dsync := NewDialSync(df) - - p := peer.ID("testpeer") - - ctx := context.Background() - - finished := make(chan struct{}) - go func() { - _, err := dsync.DialLock(ctx, p) - if err != nil { - t.Error(err) - } - finished <- struct{}{} - }() - - go func() { - _, err := dsync.DialLock(ctx, p) - if err != nil { - t.Error(err) - } - finished <- struct{}{} - }() - - // short sleep just to make sure we've moved around in the scheduler - time.Sleep(time.Millisecond * 20) - done() - - <-finished - <-finished - - if len(callsch) > 1 { - t.Fatal("should only have called dial func once!") - } -} - -func TestDialSyncCancel(t *testing.T) { - df, done, _, dcall := getMockDialFunc() - - dsync := NewDialSync(df) - - p := peer.ID("testpeer") - - ctx1, cancel1 := context.WithCancel(context.Background()) - - finished := make(chan struct{}) - go func() { - _, err := dsync.DialLock(ctx1, p) - if err != ctx1.Err() { - t.Error("should have gotten context error") - } - finished <- struct{}{} - }() - - // make sure the above makes it through the wait code first - select { - case <-dcall: - case <-time.After(time.Second): - t.Fatal("timed out waiting for dial to start") - } - - // Add a second dialwait in so two actors are waiting on the same dial - go func() { - _, err := dsync.DialLock(context.Background(), p) - if err != nil { - t.Error(err) - } - finished <- struct{}{} - }() - - time.Sleep(time.Millisecond * 20) - - // cancel the first dialwait, it should not affect the second at all - cancel1() - select { - case <-finished: - case <-time.After(time.Second): - t.Fatal("timed out waiting for wait to exit") - } - - // short sleep just to make sure we've moved around in the scheduler - time.Sleep(time.Millisecond * 20) - done() - - <-finished -} - -func TestDialSyncAllCancel(t *testing.T) { - df, done, dctx, _ := getMockDialFunc() - - dsync := NewDialSync(df) - - p := peer.ID("testpeer") - - ctx1, cancel1 := context.WithCancel(context.Background()) - - finished := make(chan struct{}) - go func() { - _, err := dsync.DialLock(ctx1, p) - if err != ctx1.Err() { - t.Error("should have gotten context error") - } - finished <- struct{}{} - }() - - // Add a second dialwait in so two actors are waiting on the same dial - go func() { - _, err := dsync.DialLock(ctx1, p) - if err != ctx1.Err() { - t.Error("should have gotten context error") - } - finished <- struct{}{} - }() - - cancel1() - for i := 0; i < 2; i++ { - select { - case <-finished: - case <-time.After(time.Second): - t.Fatal("timed out waiting for wait to exit") - } - } - - // the dial should have exited now - select { - case <-dctx.Done(): - case <-time.After(time.Second): - t.Fatal("timed out waiting for dial to return") - } - - // should be able to successfully dial that peer again - done() - _, err := dsync.DialLock(context.Background(), p) - if err != nil { - t.Fatal(err) - } -} - -func TestFailFirst(t *testing.T) { - var count int - f := func(ctx context.Context, p peer.ID) (*Conn, error) { - if count > 0 { - return new(Conn), nil - } - count++ - return nil, fmt.Errorf("gophers ate the modem") - } - - ds := NewDialSync(f) - - p := peer.ID("testing") - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - - _, err := ds.DialLock(ctx, p) - if err == nil { - t.Fatal("expected gophers to have eaten the modem") - } - - c, err := ds.DialLock(ctx, p) - if err != nil { - t.Fatal(err) - } - - if c == nil { - t.Fatal("should have gotten a 'real' conn back") - } -} - -func TestStressActiveDial(t *testing.T) { - ds := NewDialSync(func(ctx context.Context, p peer.ID) (*Conn, error) { - return nil, nil - }) - - wg := sync.WaitGroup{} - - pid := peer.ID("foo") - - makeDials := func() { - for i := 0; i < 10000; i++ { - ds.DialLock(context.Background(), pid) - } - wg.Done() - } - - for i := 0; i < 100; i++ { - wg.Add(1) - go makeDials() - } - - wg.Wait() -} diff --git a/limiter.go b/limiter.go deleted file mode 100644 index 6808dd71..00000000 --- a/limiter.go +++ /dev/null @@ -1,226 +0,0 @@ -package swarm - -import ( - "context" - "os" - "strconv" - "sync" - "time" - - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/transport" - - addrutil "github.com/libp2p/go-addr-util" - ma "github.com/multiformats/go-multiaddr" -) - -type dialResult struct { - Conn transport.CapableConn - Addr ma.Multiaddr - Err error -} - -type dialJob struct { - addr ma.Multiaddr - peer peer.ID - ctx context.Context - resp chan dialResult -} - -func (dj *dialJob) cancelled() bool { - return dj.ctx.Err() != nil -} - -func (dj *dialJob) dialTimeout() time.Duration { - timeout := transport.DialTimeout - if lowTimeoutFilters.AddrBlocked(dj.addr) { - timeout = DialTimeoutLocal - } - - return timeout -} - -type dialLimiter struct { - lk sync.Mutex - - fdConsuming int - fdLimit int - waitingOnFd []*dialJob - - dialFunc dialfunc - - activePerPeer map[peer.ID]int - perPeerLimit int - waitingOnPeerLimit map[peer.ID][]*dialJob -} - -type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (transport.CapableConn, error) - -func newDialLimiter(df dialfunc) *dialLimiter { - fd := ConcurrentFdDials - if env := os.Getenv("LIBP2P_SWARM_FD_LIMIT"); env != "" { - if n, err := strconv.ParseInt(env, 10, 32); err == nil { - fd = int(n) - } - } - return newDialLimiterWithParams(df, fd, DefaultPerPeerRateLimit) -} - -func newDialLimiterWithParams(df dialfunc, fdLimit, perPeerLimit int) *dialLimiter { - return &dialLimiter{ - fdLimit: fdLimit, - perPeerLimit: perPeerLimit, - waitingOnPeerLimit: make(map[peer.ID][]*dialJob), - activePerPeer: make(map[peer.ID]int), - dialFunc: df, - } -} - -// freeFDToken frees FD token and if there are any schedules another waiting dialJob -// in it's place -func (dl *dialLimiter) freeFDToken() { - log.Debugf("[limiter] freeing FD token; waiting: %d; consuming: %d", len(dl.waitingOnFd), dl.fdConsuming) - dl.fdConsuming-- - - for len(dl.waitingOnFd) > 0 { - next := dl.waitingOnFd[0] - dl.waitingOnFd[0] = nil // clear out memory - dl.waitingOnFd = dl.waitingOnFd[1:] - - if len(dl.waitingOnFd) == 0 { - // clear out memory. - dl.waitingOnFd = nil - } - - // Skip over canceled dials instead of queuing up a goroutine. - if next.cancelled() { - dl.freePeerToken(next) - continue - } - dl.fdConsuming++ - - // we already have activePerPeer token at this point so we can just dial - go dl.executeDial(next) - return - } -} - -func (dl *dialLimiter) freePeerToken(dj *dialJob) { - log.Debugf("[limiter] freeing peer token; peer %s; addr: %s; active for peer: %d; waiting on peer limit: %d", - dj.peer, dj.addr, dl.activePerPeer[dj.peer], len(dl.waitingOnPeerLimit[dj.peer])) - // release tokens in reverse order than we take them - dl.activePerPeer[dj.peer]-- - if dl.activePerPeer[dj.peer] == 0 { - delete(dl.activePerPeer, dj.peer) - } - - waitlist := dl.waitingOnPeerLimit[dj.peer] - for len(waitlist) > 0 { - next := waitlist[0] - waitlist[0] = nil // clear out memory - waitlist = waitlist[1:] - - if len(waitlist) == 0 { - delete(dl.waitingOnPeerLimit, next.peer) - } else { - dl.waitingOnPeerLimit[next.peer] = waitlist - } - - if next.cancelled() { - continue - } - - dl.activePerPeer[next.peer]++ // just kidding, we still want this token - - dl.addCheckFdLimit(next) - return - } -} - -func (dl *dialLimiter) finishedDial(dj *dialJob) { - dl.lk.Lock() - defer dl.lk.Unlock() - - if addrutil.IsFDCostlyTransport(dj.addr) { - dl.freeFDToken() - } - - dl.freePeerToken(dj) -} - -func (dl *dialLimiter) addCheckFdLimit(dj *dialJob) { - if addrutil.IsFDCostlyTransport(dj.addr) { - if dl.fdConsuming >= dl.fdLimit { - log.Debugf("[limiter] blocked dial waiting on FD token; peer: %s; addr: %s; consuming: %d; "+ - "limit: %d; waiting: %d", dj.peer, dj.addr, dl.fdConsuming, dl.fdLimit, len(dl.waitingOnFd)) - dl.waitingOnFd = append(dl.waitingOnFd, dj) - return - } - - log.Debugf("[limiter] taking FD token: peer: %s; addr: %s; prev consuming: %d", - dj.peer, dj.addr, dl.fdConsuming) - // take token - dl.fdConsuming++ - } - - log.Debugf("[limiter] executing dial; peer: %s; addr: %s; FD consuming: %d; waiting: %d", - dj.peer, dj.addr, dl.fdConsuming, len(dl.waitingOnFd)) - go dl.executeDial(dj) -} - -func (dl *dialLimiter) addCheckPeerLimit(dj *dialJob) { - if dl.activePerPeer[dj.peer] >= dl.perPeerLimit { - log.Debugf("[limiter] blocked dial waiting on peer limit; peer: %s; addr: %s; active: %d; "+ - "peer limit: %d; waiting: %d", dj.peer, dj.addr, dl.activePerPeer[dj.peer], dl.perPeerLimit, - len(dl.waitingOnPeerLimit[dj.peer])) - wlist := dl.waitingOnPeerLimit[dj.peer] - dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj) - return - } - dl.activePerPeer[dj.peer]++ - - dl.addCheckFdLimit(dj) -} - -// AddDialJob tries to take the needed tokens for starting the given dial job. -// If it acquires all needed tokens, it immediately starts the dial, otherwise -// it will put it on the waitlist for the requested token. -func (dl *dialLimiter) AddDialJob(dj *dialJob) { - dl.lk.Lock() - defer dl.lk.Unlock() - - log.Debugf("[limiter] adding a dial job through limiter: %v", dj.addr) - dl.addCheckPeerLimit(dj) -} - -func (dl *dialLimiter) clearAllPeerDials(p peer.ID) { - dl.lk.Lock() - defer dl.lk.Unlock() - delete(dl.waitingOnPeerLimit, p) - log.Debugf("[limiter] clearing all peer dials: %v", p) - // NB: the waitingOnFd list doesn't need to be cleaned out here, we will - // remove them as we encounter them because they are 'cancelled' at this - // point -} - -// executeDial calls the dialFunc, and reports the result through the response -// channel when finished. Once the response is sent it also releases all tokens -// it held during the dial. -func (dl *dialLimiter) executeDial(j *dialJob) { - defer dl.finishedDial(j) - if j.cancelled() { - return - } - - dctx, cancel := context.WithTimeout(j.ctx, j.dialTimeout()) - defer cancel() - - con, err := dl.dialFunc(dctx, j.peer, j.addr) - select { - case j.resp <- dialResult{Conn: con, Addr: j.addr, Err: err}: - case <-j.ctx.Done(): - if err == nil { - con.Close() - } - } -} diff --git a/swarm.go b/swarm.go index 3fac0b3f..d70799fb 100644 --- a/swarm.go +++ b/swarm.go @@ -57,7 +57,7 @@ type Swarm struct { conns struct { sync.RWMutex - m map[peer.ID][]*Conn + m map[peer.ID]*dialBus } listeners struct { @@ -66,7 +66,7 @@ type Swarm struct { ifaceListenAddres []ma.Multiaddr cacheEOL time.Time - m map[transport.Listener]struct{} + m map[transport.QListener]struct{} } notifs struct { @@ -84,9 +84,7 @@ type Swarm struct { streamh atomic.Value // dialing helpers - dsync *DialSync - backf DialBackoff - limiter *dialLimiter + backf DialBackoff // filters for addresses that shouldnt be dialed (or accepted) Filters *filter.Filters @@ -105,13 +103,11 @@ func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc Filters: filter.NewFilters(), } - s.conns.m = make(map[peer.ID][]*Conn) - s.listeners.m = make(map[transport.Listener]struct{}) + s.conns.m = make(map[peer.ID]*dialBus) + s.listeners.m = make(map[transport.QListener]struct{}) s.transports.m = make(map[int]transport.QTransport) s.notifs.m = make(map[network.Notifiee]struct{}) - s.dsync = NewDialSync(s.doDial) - s.limiter = newDialLimiter(s.dialAddr) s.proc = goprocessctx.WithContextAndTeardown(ctx, s.teardown) s.ctx = goprocessctx.OnClosingContext(s.proc) @@ -140,21 +136,28 @@ func (s *Swarm) teardown() error { // possible. for l := range listeners { - go func(l transport.Listener) { + go func(l transport.QListener) { if err := l.Close(); err != nil { log.Errorf("error when shutting down listener: %s", err) } }(l) } - for _, cs := range conns { - for _, c := range cs { - go func(c *Conn) { - if err := c.Close(); err != nil { - log.Errorf("error when shutting down connection: %s", err) - } - }(c) - } + for _, db := range conns { + go func(db *dialBus) { + db.dials.Lock() + defer db.dials.Unlock() + db.c.Lock() + defer db.c.Unlock() + current := db.dials.d + for current != nil { + current.cancel() + current = current.next + } + if err := db.c.conn.Close(); err != nil { + log.Errorf("error when shutting down connection: %s", err) + } + }(db) } // Wait for everything to finish. @@ -180,13 +183,13 @@ func (s *Swarm) Process() goprocess.Process { return s.proc } -func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, error) { +func (s *Swarm) addConn(tc transport.QCapableConn, dir network.Direction) error { // The underlying transport (or the dialer) *should* filter it's own // connections but we should double check anyways. raddr := tc.RemoteMultiaddr() if s.Filters.AddrBlocked(raddr) { tc.Close() - return nil, ErrAddrFiltered + return ErrAddrFiltered } p := tc.RemotePeer() @@ -205,7 +208,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, if s.conns.m == nil { s.conns.Unlock() tc.Close() - return nil, ErrSwarmClosed + return ErrSwarmClosed } // Wrap and register the connection. @@ -216,7 +219,46 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, stat: stat, } c.streams.m = make(map[*Stream]struct{}) - s.conns.m[p] = append(s.conns.m[p], c) + + // Setuping dialBus + d, ok := s.conns.m[p] + if !ok { + d = s.newDialBus(p) + s.conns.m[p] = d + } + // Get Quality that before locking, some transport (e.g. webrtc-aside) may return before quality is avaible. + quality := tc.Quality() + d.c.Lock() + if d.c.conn == nil { + // We are first, good ! + d.c.conn = c + close(d.c.available) + d.c.Unlock() + } else { + // Check who is better + if d.c.conn.conn.Quality() <= quality { + // We are not the best, terminating. + tc.Close() + // Good, we finished. + return ErrAlreadyFoundBetter + } + // If its us replace and shutdown. + oldConn := d.c.conn + d.c.conn = c + d.c.Unlock() + oldConn.foundBetter() + // Cancel worst dial. + d.dials.Lock() + current := d.dials.d + for current != nil { + // If same or equal cancel. + if current.quality >= quality { + current.cancel() + } + current = current.next + } + d.dials.Unlock() + } // Add two swarm refs: // * One will be decremented after the close notifications fire in Conn.doClose @@ -228,10 +270,6 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, c.notifyLk.Lock() s.conns.Unlock() - // We have a connection now. Cancel all other in-progress dials. - // This should be fast, no reason to wait till later. - s.dsync.CancelDial(p) - s.notifyAll(func(f network.Notifiee) { f.Connected(s, c) }) @@ -246,7 +284,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, go h(c) } - return c, nil + return nil } // Peerstore returns this swarms internal Peerstore. @@ -336,46 +374,31 @@ func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (network.Stream, error } } -// ConnsToPeer returns all the live connections to peer. func (s *Swarm) ConnsToPeer(p peer.ID) []network.Conn { - // TODO: Consider sorting the connection list best to worst. Currently, - // it's sorted oldest to newest. - s.conns.RLock() - defer s.conns.RUnlock() - conns := s.conns.m[p] - output := make([]network.Conn, len(conns)) - for i, c := range conns { - output[i] = c + c := s.bestConnToPeer(p) + if c == nil { + return []network.Conn{} } - return output + return []network.Conn{c} } // bestConnToPeer returns the best connection to peer. func (s *Swarm) bestConnToPeer(p peer.ID) *Conn { // Selects the best connection we have to the peer. - // TODO: Prefer some transports over others. Currently, we just select - // the newest non-closed connection with the most streams. s.conns.RLock() - defer s.conns.RUnlock() - - var best *Conn - bestLen := 0 - for _, c := range s.conns.m[p] { - if c.conn.IsClosed() { - // We *will* garbage collect this soon anyways. - continue - } - c.streams.Lock() - cLen := len(c.streams.m) - c.streams.Unlock() - - if cLen >= bestLen { - best = c - bestLen = cLen - } - + db, ok := s.conns.m[p] + s.conns.RUnlock() + if !ok { + return nil } - return best + select { + case <-db.c.available: + db.c.RLock() + defer db.c.RUnlock() + return db.c.conn + default: + } + return nil } // Connectedness returns our "connectedness" state with the given peer. @@ -394,11 +417,9 @@ func (s *Swarm) Conns() []network.Conn { s.conns.RLock() defer s.conns.RUnlock() - conns := make([]network.Conn, 0, len(s.conns.m)) - for _, cs := range s.conns.m { - for _, c := range cs { - conns = append(conns, c) - } + conns := make([]network.Conn, len(s.conns.m)) + for _, db := range s.conns.m { + conns = append(conns, db.c.conn) } return conns } @@ -490,23 +511,20 @@ func (s *Swarm) removeConn(c *Conn) { p := c.RemotePeer() s.conns.Lock() - defer s.conns.Unlock() - cs := s.conns.m[p] - for i, ci := range cs { - if ci == c { - if len(cs) == 1 { - delete(s.conns.m, p) - } else { - // NOTE: We're intentionally preserving order. - // This way, connections to a peer are always - // sorted oldest to newest. - copy(cs[i:], cs[i+1:]) - cs[len(cs)-1] = nil - s.conns.m[p] = cs[:len(cs)-1] - } - return + db := s.conns.m[p] + db.c.Lock() + if c == db.c.conn { + delete(s.conns.m, p) + db.dials.Lock() + current := db.dials.d + for current != nil { + current.cancel() + current = current.next } + db.dials.Unlock() } + s.conns.Unlock() + db.c.Unlock() } // String returns a string representation of Network. diff --git a/swarm_conn.go b/swarm_conn.go index c09957c1..06df87e8 100644 --- a/swarm_conn.go +++ b/swarm_conn.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "sync" + "time" ic "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/mux" @@ -22,7 +23,7 @@ var ErrConnClosed = errors.New("connection closed") // Conn is the connection type used by swarm. In general, you won't use this // type directly. type Conn struct { - conn transport.CapableConn + conn transport.QCapableConn swarm *Swarm closeOnce sync.Once @@ -36,6 +37,14 @@ type Conn struct { } stat network.Stat + + onBetter struct { + sync.Mutex + // != 0 once a better conn is found + hardCloseDeadline time.Time + // List of function to call when better is found, each on in their own goroutine + handlers []network.OnBetterHandler + } } // Close closes this connection. @@ -221,3 +230,41 @@ func (c *Conn) GetStreams() []network.Stream { } return streams } + +// OnBetter callback the handler when a better connection is found. +// Threadsafe, callback will be called even if registering after the event +// happend, call to OnBetter are non blocking. +// The callback is gonna be called only one time. +func (c *Conn) OnBetter(h network.OnBetterHandler) { + c.onBetter.Lock() + defer c.onBetter.Lock() + // If a better conn already have been found, yield the event. + if !c.onBetter.hardCloseDeadline.IsZero() { + go h(c.onBetter.hardCloseDeadline) + return + } + // Else add to the list. + c.onBetter.handlers = append(c.onBetter.handlers, h) + return +} + +var ErrAlreadyFoundBetter = fmt.Errorf("Found better have been called at least twice, that a bug in Swarm.") + +// Found better raise the OnBetter event, must be called after replacing the +// pointer to the new conn in the coresponding dialBus. +func (c *Conn) foundBetter() { + c.onBetter.Lock() + defer c.onBetter.Lock() + if !c.onBetter.hardCloseDeadline.IsZero() { + log.Error(ErrAlreadyFoundBetter) + return + } + // TODO: Support real deadline + c.onBetter.hardCloseDeadline = time.Now().Add(time.Second * 5) + for _, v := range c.onBetter.handlers { + go v(c.onBetter.hardCloseDeadline) + } + time.Sleep(time.Second * 5) + // TODO: support asyncClose + c.Close() +} diff --git a/swarm_dial.go b/swarm_dial.go index 475a7e25..214dd0ae 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -3,31 +3,16 @@ package swarm import ( "context" "errors" - "fmt" "sync" "time" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/transport" - lgbl "github.com/libp2p/go-libp2p-loggables" - logging "github.com/ipfs/go-log" addrutil "github.com/libp2p/go-addr-util" ma "github.com/multiformats/go-multiaddr" ) -// Diagram of dial sync: -// -// many callers of Dial() synched w. dials many addrs results to callers -// ----------------------\ dialsync use earliest /-------------- -// -----------------------\ |----------\ /---------------- -// ------------------------>------------<------- >---------<----------------- -// -----------------------| \----x \---------------- -// ----------------------| \-----x \--------------- -// any may fail if no addr at end -// retry dialAttempt x - var ( // ErrDialBackoff is returned by the backoff code when a given peer has // been dialed too frequently @@ -185,98 +170,10 @@ func (s *Swarm) DialPeer(ctx context.Context, p peer.ID) (network.Conn, error) { return s.dialPeer(ctx, p) } -// internal dial method that returns an unwrapped conn -// -// It is gated by the swarm's dial synchronization systems: dialsync and -// dialbackoff. func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { - log.Debugf("[%s] swarm dialing peer [%s]", s.local, p) - var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil) - err := p.Validate() - if err != nil { - return nil, err - } - - if p == s.local { - log.Event(ctx, "swarmDialSelf", logdial) - return nil, ErrDialToSelf - } - - defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done() - - // check if we already have an open connection first - conn := s.bestConnToPeer(p) - if conn != nil { - return conn, nil - } - - // if this peer has been backed off, lets get out of here - if s.backf.Backoff(p) { - log.Event(ctx, "swarmDialBackoff", p) - return nil, ErrDialBackoff - } - - // apply the DialPeer timeout - ctx, cancel := context.WithTimeout(ctx, network.GetDialPeerTimeout(ctx)) - defer cancel() - - conn, err = s.dsync.DialLock(ctx, p) - if err == nil { - return conn, nil - } - - log.Debugf("network for %s finished dialing %s", s.local, p) - - if ctx.Err() != nil { - // Context error trumps any dial errors as it was likely the ultimate cause. - return nil, ctx.Err() - } - - if s.ctx.Err() != nil { - // Ok, so the swarm is shutting down. - return nil, ErrSwarmClosed - } - - return nil, err -} - -// doDial is an ugly shim method to retain all the logging and backoff logic -// of the old dialsync code -func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) { - // Short circuit. - // By the time we take the dial lock, we may already *have* a connection - // to the peer. - c := s.bestConnToPeer(p) - if c != nil { - return c, nil - } - - logdial := lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil) - - // ok, we have been charged to dial! let's do it. - // if it succeeds, dial will add the conn to the swarm itself. - defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done() - - conn, err := s.dial(ctx, p) - if err != nil { - conn = s.bestConnToPeer(p) - if conn != nil { - // Hm? What error? - // Could have canceled the dial because we received a - // connection or some other random reason. - // Just ignore the error and return the connection. - log.Debugf("ignoring dial error because we have a connection: %s", err) - return conn, nil - } - if err != context.Canceled { - log.Event(ctx, "swarmDialBackoffAdd", logdial) - s.backf.AddBackoff(p) // let others know to backoff - } - - // ok, we failed. - return nil, err - } - return conn, nil + db := s.getOrCreateDialBus(p) + db.doDial(ctx) + return db.watchForConn(ctx) } func (s *Swarm) canDial(addr ma.Multiaddr) bool { @@ -284,76 +181,6 @@ func (s *Swarm) canDial(addr ma.Multiaddr) bool { return t != nil && t.CanDial(addr) } -// dial is the actual swarm's dial logic, gated by Dial. -func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { - var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil) - if p == s.local { - log.Event(ctx, "swarmDialDoDialSelf", logdial) - return nil, ErrDialToSelf - } - defer log.EventBegin(ctx, "swarmDialDo", logdial).Done() - logdial["dial"] = "failure" // start off with failure. set to "success" at the end. - - sk := s.peers.PrivKey(s.local) - logdial["encrypted"] = sk != nil // log whether this will be an encrypted dial or not. - if sk == nil { - // fine for sk to be nil, just log. - log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.") - } - - ////// - /* - This slice-to-chan code is temporary, the peerstore can currently provide - a channel as an interface for receiving addresses, but more thought - needs to be put into the execution. For now, this allows us to use - the improved rate limiter, while maintaining the outward behaviour - that we previously had (halting a dial when we run out of addrs) - */ - peerAddrs := s.peers.Addrs(p) - if len(peerAddrs) == 0 { - return nil, &DialError{Peer: p, Cause: ErrNoAddresses} - } - goodAddrs := s.filterKnownUndialables(peerAddrs) - if len(goodAddrs) == 0 { - return nil, &DialError{Peer: p, Cause: ErrNoGoodAddresses} - } - goodAddrsChan := make(chan ma.Multiaddr, len(goodAddrs)) - for _, a := range goodAddrs { - goodAddrsChan <- a - } - close(goodAddrsChan) - ///////// - - // try to get a connection to any addr - connC, dialErr := s.dialAddrs(ctx, p, goodAddrsChan) - if dialErr != nil { - logdial["error"] = dialErr.Cause.Error() - switch dialErr.Cause { - case context.Canceled, context.DeadlineExceeded: - // Always prefer the context errors as we rely on being - // able to check them. - // - // Removing this will BREAK backoff (causing us to - // backoff when canceling dials). - return nil, dialErr.Cause - } - return nil, dialErr - } - logdial["conn"] = logging.Metadata{ - "localAddr": connC.LocalMultiaddr(), - "remoteAddr": connC.RemoteMultiaddr(), - } - swarmC, err := s.addConn(connC, network.DirOutbound) - if err != nil { - logdial["error"] = err.Error() - connC.Close() // close the connection. didn't work out :( - return nil, &DialError{Peer: p, Cause: err} - } - - logdial["dial"] = "success" - return swarmC, nil -} - // filterKnownUndialables takes a list of multiaddrs, and removes those // that we definitely don't want to dial: addresses configured to be blocked, // IPv6 link-local addresses, addresses without a dial-capable transport, @@ -378,112 +205,3 @@ func (s *Swarm) filterKnownUndialables(addrs []ma.Multiaddr) []ma.Multiaddr { addrutil.FilterNeg(s.Filters.AddrBlocked), ) } - -func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.Multiaddr) (transport.CapableConn, *DialError) { - log.Debugf("%s swarm dialing %s", s.local, p) - - ctx, cancel := context.WithCancel(ctx) - defer cancel() // cancel work when we exit func - - // use a single response type instead of errs and conns, reduces complexity *a ton* - respch := make(chan dialResult) - err := new(DialError) - - defer s.limiter.clearAllPeerDials(p) - - var active int -dialLoop: - for remoteAddrs != nil || active > 0 { - // Check for context cancellations and/or responses first. - select { - case <-ctx.Done(): - break dialLoop - case resp := <-respch: - active-- - if resp.Err != nil { - // Errors are normal, lots of dials will fail - log.Infof("got error on dial: %s", resp.Err) - err.recordErr(resp.Addr, resp.Err) - } else if resp.Conn != nil { - return resp.Conn, nil - } - - // We got a result, try again from the top. - continue - default: - } - - // Now, attempt to dial. - select { - case addr, ok := <-remoteAddrs: - if !ok { - remoteAddrs = nil - continue - } - - s.limitedDial(ctx, p, addr, respch) - active++ - case <-ctx.Done(): - break dialLoop - case resp := <-respch: - active-- - if resp.Err != nil { - // Errors are normal, lots of dials will fail - log.Infof("got error on dial: %s", resp.Err) - err.recordErr(resp.Addr, resp.Err) - } else if resp.Conn != nil { - return resp.Conn, nil - } - } - } - - if ctxErr := ctx.Err(); ctxErr != nil { - err.Cause = ctxErr - } else if len(err.DialErrors) == 0 { - err.Cause = network.ErrNoRemoteAddrs - } else { - err.Cause = ErrAllDialsFailed - } - return nil, err -} - -// limitedDial will start a dial to the given peer when -// it is able, respecting the various different types of rate -// limiting that occur without using extra goroutines per addr -func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) { - s.limiter.AddDialJob(&dialJob{ - addr: a, - peer: p, - resp: resp, - ctx: ctx, - }) -} - -func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (transport.CapableConn, error) { - // Just to double check. Costs nothing. - if s.local == p { - return nil, ErrDialToSelf - } - log.Debugf("%s swarm dialing %s %s", s.local, p, addr) - - tpt := s.TransportForDialing(addr) - if tpt == nil { - return nil, ErrNoTransport - } - - connC, err := tpt.Dial(ctx, addr, p) - if err != nil { - return nil, err - } - - // Trust the transport? Yeah... right. - if connC.RemotePeer() != p { - connC.Close() - err = fmt.Errorf("BUG in transport %T: tried to dial %s, dialed %s", p, connC.RemotePeer(), tpt) - log.Error(err) - return nil, err - } - - // success! we got one! - return connC, nil -} diff --git a/swarm_listen.go b/swarm_listen.go index 09d411df..a9acd16c 100644 --- a/swarm_listen.go +++ b/swarm_listen.go @@ -93,7 +93,7 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error { s.refs.Add(1) go func() { defer s.refs.Done() - _, err := s.addConn(c, network.DirInbound) + err := s.addConn(c, network.DirInbound) switch err { case nil: case ErrSwarmClosed: