Skip to content

Commit

Permalink
Merge pull request #250 from koinos/245-px
Browse files Browse the repository at this point in the history
Enable libp2p pubsub score
  • Loading branch information
mvandeberg authored Oct 13, 2022
2 parents 93e777e + 776949e commit 48a7752
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 17 deletions.
18 changes: 18 additions & 0 deletions internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,24 @@ func NewKoinosP2PNode(ctx context.Context, listenAddr string, localRPC rpc.Local
ctx, node.Host,
pubsub.WithMessageIdFn(generateMessageID),
pubsub.WithPeerExchange(true),
pubsub.WithPeerScore(
&pubsub.PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 {
rawScore := float64(node.PeerErrorHandler.GetPeerErrorScore(ctx, p))
return -rawScore + float64(node.PeerErrorHandler.GetOptions().ErrorScoreReconnectThreshold)
},
AppSpecificWeight: 1,
DecayInterval: 1 * time.Minute,
DecayToZero: 0.01,
},
&pubsub.PeerScoreThresholds{
GossipThreshold: -1,
PublishThreshold: -1,
GraylistThreshold: -1,
AcceptPXThreshold: 5000,
OpportunisticGraftThreshold: .1,
},
),
)
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions internal/options/error_handler_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ const (
chainNotConnectedErrorScoreDefault = errorScoreThresholdDefault * 2
checkpointMismatchErrorScoreDefault = errorScoreThresholdDefault * 2
localRPCErrorScoreDefault = 0
peerRPCErrorScoreDefault = 1000
peerRPCErrorScoreDefault = 10000
localRPCTimeoutErrorScoreDefault = 0
peerRPCTimeoutErrorScoreDefault = 1000
peerRPCTimeoutErrorScoreDefault = 10000
processRequestTimeoutErrorScoreDefault = 0
forkBombErrorScoreDefault = 200000
forkBombErrorScoreDefault = errorScoreThresholdDefault * 2
maxHeightErrorScoreDefault = blockApplicationErrorScoreDefault
unknownErrorScoreDefault = blockApplicationErrorScoreDefault
)
Expand Down
59 changes: 49 additions & 10 deletions internal/p2p/error_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,20 @@ type canConnectRequest struct {
resultChan chan<- bool
}

type getPeerErrorScoreRequest struct {
addr ma.Multiaddr
resultChan chan<- uint64
}

// PeerErrorHandler handles PeerErrors and tracks errors over time
// to determine if a peer should be disconnected from
type PeerErrorHandler struct {
errorScores map[string]*errorScoreRecord
disconnectPeerChan chan<- peer.ID
peerErrorChan <-chan PeerError
canConnectChan chan canConnectRequest
addrProvider PeerAddressProvider
errorScores map[string]*errorScoreRecord
disconnectPeerChan chan<- peer.ID
peerErrorChan <-chan PeerError
canConnectChan chan canConnectRequest
getPeerErrorScoreChan chan getPeerErrorScoreRequest
addrProvider PeerAddressProvider

opts options.PeerErrorHandlerOptions
}
Expand All @@ -57,6 +63,7 @@ func (p *PeerErrorHandler) CanConnect(ctx context.Context, id peer.ID) bool {
return true
}

// CanConnectAddr to peer if the peer's error score is below the error score threshold
func (p *PeerErrorHandler) CanConnectAddr(ctx context.Context, addr ma.Multiaddr) bool {
resultChan := make(chan bool, 1)
p.canConnectChan <- canConnectRequest{
Expand All @@ -72,6 +79,26 @@ func (p *PeerErrorHandler) CanConnectAddr(ctx context.Context, addr ma.Multiaddr
}
}

// GetPeerErrorScore returns the current error score for a given peer ID
func (p *PeerErrorHandler) GetPeerErrorScore(ctx context.Context, id peer.ID) uint64 {
resultChan := make(chan uint64, 1)
if addr := p.addrProvider.GetPeerAddress(ctx, id); addr != nil {
p.getPeerErrorScoreChan <- getPeerErrorScoreRequest{addr, resultChan}

select {
case res := <-resultChan:
return res
case <-ctx.Done():
}
}

return 0
}

func (p *PeerErrorHandler) GetOptions() options.PeerErrorHandlerOptions {
return p.opts
}

func (p *PeerErrorHandler) handleCanConnect(addr ma.Multiaddr) bool {
if record, ok := p.errorScores[ma.Split(addr)[0].String()]; ok {
p.decayErrorScore(record)
Expand All @@ -81,6 +108,15 @@ func (p *PeerErrorHandler) handleCanConnect(addr ma.Multiaddr) bool {
return true
}

func (p *PeerErrorHandler) handleGetPeerErrorScore(addr ma.Multiaddr) uint64 {
if record, ok := p.errorScores[ma.Split(addr)[0].String()]; ok {
p.decayErrorScore(record)
return record.score
}

return 0
}

func (p *PeerErrorHandler) handleError(ctx context.Context, peerErr PeerError) {
if addr := p.addrProvider.GetPeerAddress(ctx, peerErr.id); addr != nil {
ipAddr := ma.Split(addr)[0].String()
Expand Down Expand Up @@ -201,6 +237,8 @@ func (p *PeerErrorHandler) Start(ctx context.Context) {
p.handleError(ctx, perr)
case req := <-p.canConnectChan:
req.resultChan <- p.handleCanConnect(req.addr)
case req := <-p.getPeerErrorScoreChan:
req.resultChan <- p.handleGetPeerErrorScore(req.addr)

case <-ctx.Done():
return
Expand All @@ -216,11 +254,12 @@ func NewPeerErrorHandler(
opts options.PeerErrorHandlerOptions) *PeerErrorHandler {

return &PeerErrorHandler{
errorScores: make(map[string]*errorScoreRecord),
disconnectPeerChan: disconnectPeerChan,
peerErrorChan: peerErrorChan,
canConnectChan: make(chan canConnectRequest),
opts: opts,
errorScores: make(map[string]*errorScoreRecord),
disconnectPeerChan: disconnectPeerChan,
peerErrorChan: peerErrorChan,
canConnectChan: make(chan canConnectRequest, 10),
getPeerErrorScoreChan: make(chan getPeerErrorScoreRequest, 10),
opts: opts,
}
}

Expand Down
12 changes: 8 additions & 4 deletions internal/rpc/peer_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ func (p *PeerRPC) GetChainID(ctx context.Context) (id multihash.Multihash, err e
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
err = fmt.Errorf("%w, %s", p2perrors.ErrPeerRPCTimeout, err)
} else {
err = fmt.Errorf("%w, %s", p2perrors.ErrPeerRPC, err)
}
err = fmt.Errorf("%w, %s", p2perrors.ErrPeerRPC, err)
}
return rpcResp.ID, err
}
Expand All @@ -46,8 +47,9 @@ func (p *PeerRPC) GetHeadBlock(ctx context.Context) (id multihash.Multihash, hei
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
err = fmt.Errorf("%w, %s", p2perrors.ErrPeerRPCTimeout, err)
} else {
err = fmt.Errorf("%w, %s", p2perrors.ErrPeerRPC, err)
}
err = fmt.Errorf("%w, %s", p2perrors.ErrPeerRPC, err)
}
return rpcResp.ID, rpcResp.Height, err
}
Expand All @@ -63,8 +65,9 @@ func (p *PeerRPC) GetAncestorBlockID(ctx context.Context, parentID multihash.Mul
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
err = fmt.Errorf("%w, %s", p2perrors.ErrPeerRPCTimeout, err)
} else {
err = fmt.Errorf("%w, %s", p2perrors.ErrPeerRPC, err)
}
err = fmt.Errorf("%w, %s", p2perrors.ErrPeerRPC, err)
}
return rpcResp.ID, err
}
Expand All @@ -81,8 +84,9 @@ func (p *PeerRPC) GetBlocks(ctx context.Context, headBlockID multihash.Multihash
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil, fmt.Errorf("%w, %s", p2perrors.ErrPeerRPCTimeout, err)
} else {
return nil, fmt.Errorf("%w, %s", p2perrors.ErrPeerRPC, err)
}
return nil, fmt.Errorf("%w, %s", p2perrors.ErrPeerRPC, err)
}

if uint32(len(rpcResp.Blocks)) != numBlocks {
Expand Down

0 comments on commit 48a7752

Please sign in to comment.