Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize header download across peers to fetch headers within a chain's checkpointed region. #282

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
545 changes: 516 additions & 29 deletions blockmanager.go

Large diffs are not rendered by default.

913 changes: 890 additions & 23 deletions blockmanager_test.go

Large diffs are not rendered by default.

52 changes: 43 additions & 9 deletions neutrino.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,31 @@ func NewServerPeer(s *ChainService, isPersistent bool) *ServerPeer {
}
}

// IsSyncCandidate returns whether or not the peer is a candidate to consider
// syncing from.
func (sp *ServerPeer) IsSyncCandidate() bool {
// The peer is not a candidate for sync if it's not a full node.
return sp.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork
}

// IsPeerBehindStartHeight returns a boolean indicating if the peer's last block height
// is behind the start height of the request. If the peer is not behind the request start
// height false is returned, otherwise, true is.
func (sp *ServerPeer) IsPeerBehindStartHeight(req query.ReqMessage) bool {
queryGetHeaders, ok := req.(*headerQuery)

if !ok {
log.Debugf("request is not type headerQuery")

return true
}

if sp.LastBlock() < queryGetHeaders.startHeight {
return true
}
return false
}
Comment on lines +208 to +221
Copy link

@ProofOfKeags ProofOfKeags Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just pass in the start height, this function should take two arguments: the server peer, and the start height. This is also doing a downcasting operation and that clutters the logic, do that at the call site. You are duplicating the downcasting logic in a lot of places and it isn't necessary. Also returning true in the case that the downcast fails clashes strongly with the name here. This function could just be

func (sp *ServerPeeer) IsPeerBehindStartHeight(h int32) bool {
    return sp.LastBlock() < h
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// newestBlock returns the current best block hash and height using the format
// required by the configuration for the peer package.
func (sp *ServerPeer) newestBlock() (*chainhash.Hash, int32, error) {
Expand Down Expand Up @@ -800,15 +825,21 @@ func NewChainService(cfg Config) (*ChainService, error) {
}

bm, err := newBlockManager(&blockManagerCfg{
ChainParams: s.chainParams,
BlockHeaders: s.BlockHeaders,
RegFilterHeaders: s.RegFilterHeaders,
TimeSource: s.timeSource,
QueryDispatcher: s.workManager,
BanPeer: s.BanPeer,
GetBlock: s.GetBlock,
firstPeerSignal: s.firstPeerConnect,
queryAllPeers: s.queryAllPeers,
ChainParams: s.chainParams,
BlockHeaders: s.BlockHeaders,
RegFilterHeaders: s.RegFilterHeaders,
TimeSource: s.timeSource,
cfHeaderQueryDispatcher: s.workManager,
BanPeer: s.BanPeer,
GetBlock: s.GetBlock,
firstPeerSignal: s.firstPeerConnect,
queryAllPeers: s.queryAllPeers,
blkHdrCheckptQueryDispatcher: query.NewWorkManager(&query.Config{
ConnectedPeers: s.ConnectedPeers,
NewWorker: query.NewWorker,
Ranking: query.NewPeerRanking(),
IsEligibleWorkerFunc: query.IsWorkerEligibleForBlkHdrFetch,
}),
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -1610,6 +1641,9 @@ func (s *ChainService) Start() error {
s.addrManager.Start()
s.blockManager.Start()
s.blockSubscriptionMgr.Start()
if err := s.blockManager.cfg.blkHdrCheckptQueryDispatcher.Start(); err != nil {
return fmt.Errorf("unable to start block header work manager: %v", err)
}
if err := s.workManager.Start(); err != nil {
return fmt.Errorf("unable to start work manager: %v", err)
}
Expand Down
7 changes: 7 additions & 0 deletions query/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,4 +224,11 @@ type Peer interface {
// OnDisconnect returns a channel that will be closed when this peer is
// disconnected.
OnDisconnect() <-chan struct{}

// IsPeerBehindStartHeight returns a boolean indicating if the peer's known last height is behind
// the request's start Height which it receives as an argument.
IsPeerBehindStartHeight(req ReqMessage) bool

// IsSyncCandidate returns true if the peer is a sync candidate.
IsSyncCandidate() bool
}
26 changes: 24 additions & 2 deletions query/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
msgChan, cancel := peer.SubscribeRecvMsg()
defer cancel()

nexJobLoop:
nextJobLoop:
for {
log.Tracef("Worker %v waiting for more work", peer.Addr())

Expand Down Expand Up @@ -154,7 +154,7 @@ nexJobLoop:
case <-quit:
return
}
goto nexJobLoop
goto nextJobLoop
}
}

Expand Down Expand Up @@ -308,6 +308,28 @@ nexJobLoop:
}
}

func (w *worker) IsSyncCandidate() bool {
return w.peer.IsSyncCandidate()
}

func (w *worker) IsPeerBehindStartHeight(req ReqMessage) bool {
return w.peer.IsPeerBehindStartHeight(req)
}

// IsWorkerEligibleForBlkHdrFetch is the eligibility function used for the BlockHdrWorkManager to determine workers
// eligible to receive jobs (the job is to fetch headers). If the peer is not a sync candidate or if its last known
// block height is behind the job query's start height, it returns false. Otherwise, it returns true.
func IsWorkerEligibleForBlkHdrFetch(r *activeWorker, next *queryJob) bool {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be needed. The workers should never be given peers that aren't eligible to be synced off of in the first place.

Copy link
Contributor Author

@Chinwendu20 Chinwendu20 Sep 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The work manager is not exclusively for fetching block headers. So it would receive connected peer messages for all peers. It just happens that in this use case where it is fetching block headers we are only interested in peers that fulfill the condition as specified in that function . That is why I included that there. So a worker could not be useful in fetching block headers but it could be eligible to fetch cfheaders or filters for example.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

judging by the name, the "next" argument should be known to be a block header fetch. If you accept all types of queries here then the function should be renamed and reworked to decide if the peer is eligible for that request in general

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the function is literally, IsWorkerEligibleForBlkHdrFetch does it still ring as generic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is difference between the queryJob for fetching block headers and that of fetching cfheaders is the Req field in Request.
https://github.com/Chinwendu20/neutrino/blob/64b278771ff75da0b30136af7d4ab0ace06d897e/query/interface.go#L152-L175
which is defined as an interface, its implementations are defined in the blockmanager.go. So dealing with its concrete implementations in query package would lead to a circular dependence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there is a better way to go about it.

if !r.w.IsSyncCandidate() {
return false
}

if r.w.IsPeerBehindStartHeight(next.Req) {
return false
}
return true
}

// NewJob returns a channel where work that is to be handled by the worker can
// be sent. If the worker reads a queryJob from this channel, it is guaranteed
// that a response will eventually be deliverd on the results channel (except
Expand Down
18 changes: 15 additions & 3 deletions query/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
)

type mockQueryEncoded struct {
message *wire.MsgGetData
encoding wire.MessageEncoding
index float64
message *wire.MsgGetData
encoding wire.MessageEncoding
index float64
startHeight int
}

func (m *mockQueryEncoded) Message() wire.Message {
Expand Down Expand Up @@ -52,6 +53,8 @@ type mockPeer struct {
responses chan<- wire.Message
subscriptions chan chan wire.Message
quit chan struct{}
bestHeight int
fullNode bool
err error
}

Expand All @@ -72,6 +75,15 @@ func (m *mockPeer) Addr() string {
return m.addr
}

func (m *mockPeer) IsPeerBehindStartHeight(request ReqMessage) bool {
r := request.(*mockQueryEncoded)
return m.bestHeight < r.startHeight
}

func (m *mockPeer) IsSyncCandidate() bool {
return m.fullNode
}

// makeJob returns a new query job that will be done when it is given the
// finalResp message. Similarly ot will progress on being given the
// progressResp message, while any other message will be ignored.
Expand Down
7 changes: 7 additions & 0 deletions query/workmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ type Worker interface {
// delivered on the results channel (except when the quit channel has
// been closed).
NewJob() chan<- *queryJob

// IsPeerBehindStartHeight returns a boolean indicating if the peer's known last height is behind
// the request's start Height which it receives as an argument.
IsPeerBehindStartHeight(req ReqMessage) bool

// IsSyncCandidate returns if the peer is a sync candidate.
IsSyncCandidate() bool
}

// PeerRanking is an interface that must be satisfied by the underlying module
Expand Down
110 changes: 110 additions & 0 deletions query/workmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ type mockWorker struct {
results chan *jobResult
}

func (m *mockWorker) IsPeerBehindStartHeight(req ReqMessage) bool {
return m.peer.IsPeerBehindStartHeight(req)
}

func (m *mockWorker) IsSyncCandidate() bool {
return m.peer.IsSyncCandidate()
}

var _ Worker = (*mockWorker)(nil)

func (m *mockWorker) NewJob() chan<- *queryJob {
Expand Down Expand Up @@ -985,3 +993,105 @@ func TestWorkManagerResultUnfinished(t *testing.T) {
t.Fatalf("nothing received on errChan")
}
}

// TestIsWorkerEligibleForBlkHdrFetch tests the IsWorkerEligibleForBlkHdrFetch function.
func TestIsWorkerEligibleForBlkHdrFetch(t *testing.T) {
type testArgs struct {
name string
activeWorker *activeWorker
job *queryJob
expectedEligibility bool
}

testCases := []testArgs{
{
name: "peer sync candidate, best height behind job start Height",
activeWorker: &activeWorker{
w: &mockWorker{
peer: &mockPeer{
bestHeight: 5,
fullNode: true,
},
},
},
job: &queryJob{
Request: &Request{
Req: &mockQueryEncoded{
startHeight: 10,
},
},
},
expectedEligibility: false,
},

{
name: "peer sync candidate, best height ahead job start Height",
activeWorker: &activeWorker{
w: &mockWorker{
peer: &mockPeer{
bestHeight: 10,
fullNode: true,
},
},
},
job: &queryJob{
Request: &Request{
Req: &mockQueryEncoded{
startHeight: 5,
},
},
},
expectedEligibility: true,
},

{
name: "peer not sync candidate, best height behind job start Height",
activeWorker: &activeWorker{
w: &mockWorker{
peer: &mockPeer{
bestHeight: 5,
fullNode: false,
},
},
},
job: &queryJob{
Request: &Request{
Req: &mockQueryEncoded{
startHeight: 10,
},
},
},
expectedEligibility: false,
},

{
name: "peer not sync candidate, best height ahead job start Height",
activeWorker: &activeWorker{
w: &mockWorker{
peer: &mockPeer{
bestHeight: 10,
fullNode: false,
},
},
},
job: &queryJob{
Request: &Request{
Req: &mockQueryEncoded{
startHeight: 5,
},
},
},
expectedEligibility: false,
},
}

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
isEligible := IsWorkerEligibleForBlkHdrFetch(test.activeWorker, test.job)
if isEligible != test.expectedEligibility {
t.Fatalf("Expected '%v'for eligibility check but got"+
"'%v'\n", test.expectedEligibility, isEligible)
}
})
}
}