Skip to content

Commit

Permalink
neutrino: parallelized block header download.
Browse files Browse the repository at this point in the history
This commit distributes header download across peers leveraging
checckpoints and the workmanager.

Signed-off-by: Maureen Ononiwu <[email protected]>
  • Loading branch information
Chinwendu20 committed Sep 2, 2023
1 parent f1f8897 commit 57c289f
Show file tree
Hide file tree
Showing 8 changed files with 1,685 additions and 66 deletions.
576 changes: 547 additions & 29 deletions blockmanager.go

Large diffs are not rendered by default.

955 changes: 932 additions & 23 deletions blockmanager_test.go

Large diffs are not rendered by default.

52 changes: 43 additions & 9 deletions neutrino.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,31 @@ func NewServerPeer(s *ChainService, isPersistent bool) *ServerPeer {
}
}

// IsSyncCandidate returns whether or not the peer is a candidate to consider
// syncing from.
func (sp *ServerPeer) IsSyncCandidate() bool {
// The peer is not a candidate for sync if it's not a full node.
return sp.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork
}

// IsPeerBehindStartHeight returns a boolean indicating if the peer's last block height
// is behind the start height of the request. If the peer is not behind the request start
// height false is returned, otherwise, true is.
func (sp *ServerPeer) IsPeerBehindStartHeight(req query.ReqMessage) bool {
queryGetHeaders, ok := req.(*headerQuery)

if !ok {
log.Debugf("request is not type headerQuery")

return true
}

if sp.LastBlock() < queryGetHeaders.startHeight {
return true
}
return false
}

// newestBlock returns the current best block hash and height using the format
// required by the configuration for the peer package.
func (sp *ServerPeer) newestBlock() (*chainhash.Hash, int32, error) {
Expand Down Expand Up @@ -800,15 +825,21 @@ func NewChainService(cfg Config) (*ChainService, error) {
}

bm, err := newBlockManager(&blockManagerCfg{
ChainParams: s.chainParams,
BlockHeaders: s.BlockHeaders,
RegFilterHeaders: s.RegFilterHeaders,
TimeSource: s.timeSource,
QueryDispatcher: s.workManager,
BanPeer: s.BanPeer,
GetBlock: s.GetBlock,
firstPeerSignal: s.firstPeerConnect,
queryAllPeers: s.queryAllPeers,
ChainParams: s.chainParams,
BlockHeaders: s.BlockHeaders,
RegFilterHeaders: s.RegFilterHeaders,
TimeSource: s.timeSource,
cfHeaderQueryDispatcher: s.workManager,
BanPeer: s.BanPeer,
GetBlock: s.GetBlock,
firstPeerSignal: s.firstPeerConnect,
queryAllPeers: s.queryAllPeers,
blkHdrCheckptQueryDispatcher: query.NewWorkManager(&query.Config{
ConnectedPeers: s.ConnectedPeers,
NewWorker: query.NewWorker,
Ranking: query.NewPeerRanking(),
IsEligibleWorkerFunc: query.IsWorkerEligibleForBlkHdrFetch,
}),
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -1610,6 +1641,9 @@ func (s *ChainService) Start() error {
s.addrManager.Start()
s.blockManager.Start()
s.blockSubscriptionMgr.Start()
if err := s.blockManager.cfg.blkHdrCheckptQueryDispatcher.Start(); err != nil {
return fmt.Errorf("unable to start block header work manager: %v", err)
}
if err := s.workManager.Start(); err != nil {
return fmt.Errorf("unable to start work manager: %v", err)
}
Expand Down
7 changes: 7 additions & 0 deletions query/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,11 @@ type Peer interface {
// OnDisconnect returns a channel that will be closed when this peer is
// disconnected.
OnDisconnect() <-chan struct{}

// IsPeerBehindStartHeight returns a boolean indicating if the peer's known last height is behind
// the request's start Height which it receives as an argument.
IsPeerBehindStartHeight(req ReqMessage) bool

// IsSyncCandidate returns if the peer is a sync candidate.
IsSyncCandidate() bool
}
26 changes: 24 additions & 2 deletions query/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
msgChan, cancel := peer.SubscribeRecvMsg()
defer cancel()

nexJobLoop:
nextJobLoop:
for {
log.Tracef("Worker %v waiting for more work", peer.Addr())

Expand Down Expand Up @@ -153,7 +153,7 @@ nexJobLoop:
case <-quit:
return
}
goto nexJobLoop
goto nextJobLoop
}
}

Expand Down Expand Up @@ -302,6 +302,28 @@ nexJobLoop:
}
}

func (w *worker) IsSyncCandidate() bool {
return w.peer.IsSyncCandidate()
}

func (w *worker) IsPeerBehindStartHeight(req ReqMessage) bool {
return w.peer.IsPeerBehindStartHeight(req)
}

// IsWorkerEligibleForBlkHdrFetch is the eligibility function used for the BlockHdrWorkManager to determine workers
// eligible to receive jobs (the job is to fetch headers). If the peer is not a sync candidate or if its last known
// block height is behind the job query's start height, it returns false. Otherwise, it returns true.
func IsWorkerEligibleForBlkHdrFetch(r *activeWorker, next *queryJob) bool {
if !r.w.IsSyncCandidate() {
return false
}

if r.w.IsPeerBehindStartHeight(next.Req) {
return false
}
return true
}

// NewJob returns a channel where work that is to be handled by the worker can
// be sent. If the worker reads a queryJob from this channel, it is guaranteed
// that a response will eventually be deliverd on the results channel (except
Expand Down
18 changes: 15 additions & 3 deletions query/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
)

type mockQueryEncoded struct {
message *wire.MsgGetData
encoding wire.MessageEncoding
index float64
message *wire.MsgGetData
encoding wire.MessageEncoding
index float64
startHeight int
}

func (m *mockQueryEncoded) Message() wire.Message {
Expand Down Expand Up @@ -49,6 +50,8 @@ type mockPeer struct {
responses chan<- wire.Message
subscriptions chan chan wire.Message
quit chan struct{}
bestHeight int
fullNode bool
err error
}

Expand All @@ -69,6 +72,15 @@ func (m *mockPeer) Addr() string {
return m.addr
}

func (m *mockPeer) IsPeerBehindStartHeight(request ReqMessage) bool {
r := request.(*mockQueryEncoded)
return m.bestHeight < r.startHeight
}

func (m *mockPeer) IsSyncCandidate() bool {
return m.fullNode
}

// makeJob returns a new query job that will be done when it is given the
// finalResp message. Similarly ot will progress on being given the
// progressResp message, while any other message will be ignored.
Expand Down
7 changes: 7 additions & 0 deletions query/workmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ type Worker interface {
// delivered on the results channel (except when the quit channel has
// been closed).
NewJob() chan<- *queryJob

// IsPeerBehindStartHeight returns a boolean indicating if the peer's known last height is behind
// the request's start Height which it receives as an argument.
IsPeerBehindStartHeight(req ReqMessage) bool

// IsSyncCandidate returns if the peer is a sync candidate.
IsSyncCandidate() bool
}

// PeerRanking is an interface that must be satisfied by the underlying module
Expand Down
110 changes: 110 additions & 0 deletions query/workmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ type mockWorker struct {
results chan *jobResult
}

func (m *mockWorker) IsPeerBehindStartHeight(req ReqMessage) bool {
return m.peer.IsPeerBehindStartHeight(req)
}

func (m *mockWorker) IsSyncCandidate() bool {
return m.peer.IsSyncCandidate()
}

var _ Worker = (*mockWorker)(nil)

func (m *mockWorker) NewJob() chan<- *queryJob {
Expand Down Expand Up @@ -977,3 +985,105 @@ func TestWorkManagerResultUnfinished(t *testing.T) {
t.Fatalf("nothing received on errChan")
}
}

// TestIsWorkerEligibleForBlkHdrFetch tests the IsWorkerEligibleForBlkHdrFetch function.
func TestIsWorkerEligibleForBlkHdrFetch(t *testing.T) {
type testArgs struct {
name string
activeWorker *activeWorker
job *queryJob
expectedEligibility bool
}

testCases := []testArgs{
{
name: "peer sync candidate, best height behind job start Height",
activeWorker: &activeWorker{
w: &mockWorker{
peer: &mockPeer{
bestHeight: 5,
fullNode: true,
},
},
},
job: &queryJob{
Request: &Request{
Req: &mockQueryEncoded{
startHeight: 10,
},
},
},
expectedEligibility: false,
},

{
name: "peer sync candidate, best height ahead job start Height",
activeWorker: &activeWorker{
w: &mockWorker{
peer: &mockPeer{
bestHeight: 10,
fullNode: true,
},
},
},
job: &queryJob{
Request: &Request{
Req: &mockQueryEncoded{
startHeight: 5,
},
},
},
expectedEligibility: true,
},

{
name: "peer not sync candidate, best height behind job start Height",
activeWorker: &activeWorker{
w: &mockWorker{
peer: &mockPeer{
bestHeight: 5,
fullNode: false,
},
},
},
job: &queryJob{
Request: &Request{
Req: &mockQueryEncoded{
startHeight: 10,
},
},
},
expectedEligibility: false,
},

{
name: "peer not sync candidate, best height ahead job start Height",
activeWorker: &activeWorker{
w: &mockWorker{
peer: &mockPeer{
bestHeight: 10,
fullNode: false,
},
},
},
job: &queryJob{
Request: &Request{
Req: &mockQueryEncoded{
startHeight: 5,
},
},
},
expectedEligibility: false,
},
}

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
isEligible := IsWorkerEligibleForBlkHdrFetch(test.activeWorker, test.job)
if isEligible != test.expectedEligibility {
t.Fatalf("Expected '%v'for eligibility check but got"+
"'%v'\n", test.expectedEligibility, isEligible)
}
})
}
}

0 comments on commit 57c289f

Please sign in to comment.