From 593bc9d73cf2b6d6d157f6da0bf94b1fd76e31f0 Mon Sep 17 00:00:00 2001 From: Maureen Ononiwu Date: Fri, 1 Sep 2023 21:12:49 +0100 Subject: [PATCH] neutrino: parallelized block header download. This commit distributes header download across peers leveraging checckpoints and the workmanager. Signed-off-by: Maureen Ononiwu --- blockmanager.go | 575 +++++++++++++++++++++-- blockmanager_test.go | 955 +++++++++++++++++++++++++++++++++++++- neutrino.go | 52 ++- query/interface.go | 7 + query/worker.go | 26 +- query/worker_test.go | 18 +- query/workmanager.go | 7 + query/workmanager_test.go | 110 +++++ 8 files changed, 1684 insertions(+), 66 deletions(-) diff --git a/blockmanager.go b/blockmanager.go index f5f2d4da..492a916d 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -9,6 +9,7 @@ import ( "fmt" "math" "math/big" + "sort" "sync" "sync/atomic" "time" @@ -91,8 +92,13 @@ type blockManagerCfg struct { // the connected peers. TimeSource blockchain.MedianTimeSource - // QueryDispatcher is used to make queries to connected Bitcoin peers. - QueryDispatcher query.Dispatcher + // cfHeaderQueryDispatcher is used to make queries to connected Bitcoin peers to fetch + // CFHeaders + cfHeaderQueryDispatcher query.Dispatcher + + // cfHeaderQueryDispatcher is used to make queries to connected Bitcoin peers to fetch + // block Headers + blkHdrCheckptQueryDispatcher query.WorkManager // BanPeer bans and disconnects the given peer. BanPeer func(addr string, reason banman.Reason) error @@ -174,6 +180,10 @@ type blockManager struct { // nolint:maligned // time, newHeadersMtx should always be acquired first. newFilterHeadersMtx sync.RWMutex + // writeBatchMtx is the mutex used to hold reading and reading and writing into the + // hdrTipToResponse map. + writeBatchMtx sync.RWMutex + // newFilterHeadersSignal is condition variable which will be used to // notify any waiting callers (via Broadcast()) that the tip of the // current filter header chain has changed. This is useful when callers @@ -207,6 +217,18 @@ type blockManager struct { // nolint:maligned minRetargetTimespan int64 // target timespan / adjustment factor maxRetargetTimespan int64 // target timespan * adjustment factor blocksPerRetarget int32 // target timespan / target time per block + + // hdrTipToResponse is a map that holds the response gotten from querying peers + // using the workmanager, to fetch headers within the chain's checkpointed region. + // It is a map of the request's startheight to the fetch response. + hdrTipToResponse map[int32]*headersMsg + + // hdrTipSlice is a slice that holds request startHeight of the responses that have been + // fetched using the workmanager to fetch headers within the chain's checkpointed region. + // It is used to easily access this startheight in the case we have to delete these responses + // in the hdrTipResponse map during a reorg while fetching headers within the chain's checkpointed + // region. + hdrTipSlice []int32 } // newBlockManager returns a new bitcoin block manager. Use Start to begin @@ -236,6 +258,8 @@ func newBlockManager(cfg *blockManagerCfg) (*blockManager, error) { blocksPerRetarget: int32(targetTimespan / targetTimePerBlock), minRetargetTimespan: targetTimespan / adjustmentFactor, maxRetargetTimespan: targetTimespan * adjustmentFactor, + hdrTipToResponse: make(map[int32]*headersMsg), + hdrTipSlice: make([]int32, 0), } // Next we'll create the two signals that goroutines will use to wait @@ -291,8 +315,21 @@ func (b *blockManager) Start() { } log.Trace("Starting block manager") - b.wg.Add(2) + b.wg.Add(3) go b.blockHandler() + go func() { + wm := b.cfg.blkHdrCheckptQueryDispatcher + + defer b.wg.Done() + defer func(wm query.WorkManager) { + err := wm.Stop() + if err != nil { + log.Errorf("Unable to stop block header workmanager: %v", err) + } + }(wm) + + b.processBlKHeaderInCheckPtRegionInOrder() + }() go func() { defer b.wg.Done() @@ -306,6 +343,12 @@ func (b *blockManager) Start() { return } + checkpoints := b.cfg.ChainParams.Checkpoints + numCheckpts := len(checkpoints) + if numCheckpts != 0 && b.nextCheckpoint != nil { + b.batchCheckpointedBlkHeaders() + } + log.Debug("Peer connected, starting cfHandler.") b.cfHandler() }() @@ -361,6 +404,19 @@ func (b *blockManager) NewPeer(sp *ServerPeer) { } } +// addNewPeerToList adds the peer to the peers list. +func (b *blockManager) addNewPeerToList(peers *list.List, sp *ServerPeer) { + // Ignore if in the process of shutting down. + if atomic.LoadInt32(&b.shutdown) != 0 { + return + } + + log.Infof("New valid peer %s (%s)", sp, sp.UserAgent()) + + // Add the peer as a candidate to sync from. + peers.PushBack(sp) +} + // handleNewPeerMsg deals with new peers that have signalled they may be // considered as a sync peer (they have already successfully negotiated). It // also starts syncing if needed. It is invoked from the syncHandler @@ -374,12 +430,12 @@ func (b *blockManager) handleNewPeerMsg(peers *list.List, sp *ServerPeer) { log.Infof("New valid peer %s (%s)", sp, sp.UserAgent()) // Ignore the peer if it's not a sync candidate. - if !b.isSyncCandidate(sp) { + if !sp.IsSyncCandidate() { return } // Add the peer as a candidate to sync from. - peers.PushBack(sp) + b.addNewPeerToList(peers, sp) // If we're current with our sync peer and the new peer is advertising // a higher block than the newest one we know of, request headers from @@ -419,11 +475,8 @@ func (b *blockManager) DonePeer(sp *ServerPeer) { } } -// handleDonePeerMsg deals with peers that have signalled they are done. It -// removes the peer as a candidate for syncing and in the case where it was the -// current sync peer, attempts to select a new best peer to sync from. It is -// invoked from the syncHandler goroutine. -func (b *blockManager) handleDonePeerMsg(peers *list.List, sp *ServerPeer) { +// removeDonePeerFromList removes the peer from the peers list. +func (b *blockManager) removeDonePeerFromList(peers *list.List, sp *ServerPeer) { // Remove the peer from the list of candidate peers. for e := peers.Front(); e != nil; e = e.Next() { if e.Value == sp { @@ -433,6 +486,17 @@ func (b *blockManager) handleDonePeerMsg(peers *list.List, sp *ServerPeer) { } log.Infof("Lost peer %s", sp) +} + +// handleDonePeerMsg deals with peers that have signalled they are done. It +// removes the peer as a candidate for syncing and in the case where it was the +// current sync peer, attempts to select a new best peer to sync from. It is +// invoked from the syncHandler goroutine. +func (b *blockManager) handleDonePeerMsg(peers *list.List, sp *ServerPeer) { + // Remove the peer from the list of candidate peers. + b.removeDonePeerFromList(peers, sp) + + log.Infof("Lost peer %s", sp) // Attempt to find a new peer to sync from if the quitting peer is the // sync peer. Also, reset the header state. @@ -1111,7 +1175,7 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash, // Hand the queries to the work manager, and consume the verified // responses as they come back. - errChan := b.cfg.QueryDispatcher.Query( + errChan := b.cfg.cfHeaderQueryDispatcher.Query( q.requests(), query.Cancel(b.quit), query.NoRetryMax(), query.ErrChan(make(chan error, 1)), ) @@ -2050,7 +2114,38 @@ func (b *blockManager) blockHandler() { defer b.wg.Done() candidatePeers := list.New() -out: + checkpoints := b.cfg.ChainParams.Checkpoints + if len(checkpoints) == 0 || b.nextCheckpoint == nil { + goto unCheckPtLoop + } + + // Loop to fetch headers within the check pointed range + b.newHeadersMtx.RLock() + for b.headerTip <= uint32(checkpoints[len(checkpoints)-1].Height) { + b.newHeadersMtx.RUnlock() + select { + case m := <-b.peerChan: + switch msg := m.(type) { + case *newPeerMsg: + b.addNewPeerToList(candidatePeers, msg.peer) + case *donePeerMsg: + b.removeDonePeerFromList(candidatePeers, msg.peer) + default: + log.Tracef("Invalid message type in block "+ + "handler: %T", msg) + } + + case <-b.quit: + return + } + b.newHeadersMtx.RLock() + } + b.newHeadersMtx.RUnlock() + + log.Infof("Fetching uncheckpointed block headers from %v", b.headerTip) + b.startSync(candidatePeers) + +unCheckPtLoop: for { // Now check peer messages and quit channels. select { @@ -2074,13 +2169,397 @@ out: } case <-b.quit: - break out + break unCheckPtLoop } } log.Trace("Block handler done") } +// processBlKHeaderInCheckPtRegionInOrder handles and writes the block headers received from querying the +// workmanager while fetching headers within the block header checkpoint region. This process is carried out +// in order. +func (b *blockManager) processBlKHeaderInCheckPtRegionInOrder() { + lenCheckPts := len(b.cfg.ChainParams.Checkpoints) + + // Loop should run as long as we are in the block header checkpointed region. + b.newHeadersMtx.RLock() + for int32(b.headerTip) <= b.cfg.ChainParams.Checkpoints[lenCheckPts-1].Height { + hdrTip := b.headerTip + b.newHeadersMtx.RUnlock() + + select { + // return quickly if the blockmanager quits. + case <-b.quit: + return + default: + } + + // do not go further if we have not received the response mapped to our header tip. + b.writeBatchMtx.RLock() + msg, ok := b.hdrTipToResponse[int32(hdrTip)] + b.writeBatchMtx.RUnlock() + + if !ok { + b.newHeadersMtx.RLock() + continue + } + + b.syncPeerMutex.Lock() + b.syncPeer = msg.peer + b.syncPeerMutex.Unlock() + + b.handleHeadersMsg(msg) + err := b.resetHeaderListToChainTip() + if err != nil { + log.Errorf(err.Error()) + } + + finalNode := b.headerList.Back() + newHdrTip := finalNode.Height + newHdrTipHash := finalNode.Header.BlockHash() + prevCheckPt := b.findPreviousHeaderCheckpoint(newHdrTip) + + log.Tracef("New headertip %v", newHdrTip) + log.Debugf("New headertip %v", newHdrTip) + + // If our header tip has not increased, there is a problem with the headers we received and so we + // delete all the header response within our previous header tip and our new header tip, then send + // another query to the workmanager. + if uint32(newHdrTip) <= hdrTip { + b.deleteAllHeaderTipRespAfterTip(newHdrTip, int32(hdrTip)) + + log.Tracef("while fetching checkpointed headers received invalid headers") + + q := CheckpointedBlockHeadersQuery{ + blockMgr: b, + msgs: []*headerQuery{ + + { + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{&newHdrTipHash}, + HashStop: *b.nextCheckpoint.Hash, + }, + startHeight: newHdrTip, + initialHeight: prevCheckPt.Height, + startHash: newHdrTipHash, + endHeight: b.nextCheckpoint.Height, + initialHash: newHdrTipHash, + // Set it as high priority so that workmanager can schedule before any other queries. + index: 0, + }, + }, + } + + b.cfg.blkHdrCheckptQueryDispatcher.Query( + q.requests(), query.Cancel(b.quit), query.Timeout(1*time.Hour), query.NoRetryMax(), + ) + + log.Tracef("Sending query to workmanager from processBlKHeaderInCheckPtRegionInOrder loop") + } + b.newHeadersMtx.RLock() + } + b.newHeadersMtx.RUnlock() + + b.syncPeerMutex.Lock() + b.syncPeer = nil + b.syncPeerMutex.Unlock() + + log.Infof("Successfully completed fetching checkpointed block headers") +} + +// batchCheckpointedBlkHeaders creates headerQuery to fetch block headers +// within the chain's checkpointed region. +func (b *blockManager) batchCheckpointedBlkHeaders() { + var queryMsgs []*headerQuery + curHeight := b.headerTip + curHash := b.headerTipHash + nextCheckpoint := b.nextCheckpoint + nextCheckptHash := nextCheckpoint.Hash + nextCheckptHeight := nextCheckpoint.Height + + log.Infof("Fetching set of checkpointed blockheaders from "+ + "height=%v, hash=%v\n", curHeight, curHash) + + for nextCheckpoint != nil { + endHash := nextCheckptHash + endHeight := nextCheckptHeight + tmpCurHash := curHash + + msg := &headerQuery{ + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: blockchain.BlockLocator([]*chainhash.Hash{&tmpCurHash}), + HashStop: *endHash, + }, + startHeight: int32(curHeight), + initialHeight: int32(curHeight), + startHash: curHash, + endHeight: endHeight, + initialHash: tmpCurHash, + } + + log.Debugf("Fetching set of checkpointed blockheaders from "+ + "start_height=%v to end-height=%v", curHeight, endHash) + + queryMsgs = append(queryMsgs, msg) + curHeight = uint32(endHeight) + curHash = *endHash + + nextCheckpoint := b.findNextHeaderCheckpoint(int32(curHeight)) + if nextCheckpoint == nil { + break + } + + nextCheckptHeight = nextCheckpoint.Height + nextCheckptHash = nextCheckpoint.Hash + } + + msg := &headerQuery{ + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: blockchain.BlockLocator([]*chainhash.Hash{nextCheckptHash}), + HashStop: zeroHash, + }, + startHeight: nextCheckptHeight, + initialHeight: nextCheckptHeight, + startHash: *nextCheckptHash, + endHeight: nextCheckptHeight + wire.MaxBlockHeadersPerMsg, + initialHash: *nextCheckptHash, + } + + log.Debugf("Fetching set of checkpointed blockheaders from "+ + "start_height=%v to end-height=%v", curHeight, zeroHash) + + queryMsgs = append(queryMsgs, msg) + + log.Debugf("Attempting to query for %v blockheader batches", len(queryMsgs)) + + q := CheckpointedBlockHeadersQuery{ + blockMgr: b, + msgs: queryMsgs, + } + + b.cfg.blkHdrCheckptQueryDispatcher.Query( + q.requests(), query.Cancel(b.quit), query.Timeout(1*time.Hour), query.NoRetryMax(), + ) +} + +// CheckpointedBlockHeadersQuery holds all information necessary to perform and +// // handle a query for checkpointed block headers. +type CheckpointedBlockHeadersQuery struct { + blockMgr *blockManager + msgs []*headerQuery +} + +// requests creates the query.Requests for this block headers query. +func (c *CheckpointedBlockHeadersQuery) requests() []*query.Request { + reqs := make([]*query.Request, len(c.msgs)) + for idx, m := range c.msgs { + reqs[idx] = &query.Request{ + Req: m, + SendQuery: c.PushHeadersMsg, + HandleResp: c.handleResponse, + CloneReq: cloneHeaderQuery, + } + } + + return reqs +} + +// cloneHeaderQuery clones the query.ReqMessage containing the headerQuery Struct. +func cloneHeaderQuery(req query.ReqMessage) query.ReqMessage { + oldReq, ok := req.(*headerQuery) + if !ok { + log.Errorf("request not of type *wire.MsgGetHeaders") + } + oldReqMessage := req.Message().(*wire.MsgGetHeaders) + message := &headerQuery{ + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: oldReqMessage.BlockLocatorHashes, + HashStop: oldReqMessage.HashStop, + }, + startHeight: oldReq.startHeight, + initialHeight: oldReq.initialHeight, + startHash: oldReq.startHash, + endHeight: oldReq.endHeight, + } + + return message +} + +// PushHeadersMsg is the internal response handler used for requests for this +// block Headers query. +func (c *CheckpointedBlockHeadersQuery) PushHeadersMsg(peer query.Peer, + task query.ReqMessage) error { + + request, _ := task.Message().(*wire.MsgGetHeaders) + + requestMsg := task.(*headerQuery) + + // check if we have response for the query already. If we do return an error. + c.blockMgr.writeBatchMtx.RLock() + _, ok := c.blockMgr.hdrTipToResponse[requestMsg.startHeight] + c.blockMgr.writeBatchMtx.RUnlock() + if ok { + log.Debugf("Response already received PushHeadersMessage, peer=%v, "+ + "start_height=%v, end_height=%v, index=%v", peer.Addr(), + requestMsg.startHeight, requestMsg.endHeight) + return query.ErrResponseExistForQuery + } + + sp := peer.(*ServerPeer) + err := sp.PushGetHeadersMsg(request.BlockLocatorHashes, &request.HashStop) + if err != nil { + log.Errorf(err.Error()) + return err + } + + return nil +} + +// handleResponse is the internal response handler used for requests for this +// block header query. +func (c *CheckpointedBlockHeadersQuery) handleResponse(req query.ReqMessage, resp wire.Message, + peer query.Peer, jobErr *error) query.Progress { + + sp := peer.(*ServerPeer) + if peer == nil { + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + msg, ok := resp.(*wire.MsgHeaders) + if !ok { + // We are only looking for msgHeaders messages. + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + request, ok := req.(*headerQuery) + if !ok { + // request should only be of type headerQuery. + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + // Check if we already have a response for this request startHeight, if we do modify our jobErr variable + // so that worker can send appropriate error to workmanager. + c.blockMgr.writeBatchMtx.RLock() + _, ok = c.blockMgr.hdrTipToResponse[request.startHeight] + c.blockMgr.writeBatchMtx.RUnlock() + if ok { + *jobErr = query.ErrResponseExistForQuery + return query.Progress{ + Finished: true, + Progressed: true, + } + } + + // If we received an empty response from peer, return with an error to break worker's + // feed back loop. + hdrLength := len(msg.Headers) + if hdrLength == 0 { + *jobErr = query.ErrResponseErr + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + // The initialHash represents the lower bound checkpoint for this checkpoint region. + // We verify, if the header received at that checkpoint height has the same hash as the + // checkpoint's hash. If it does not, mimicking the handleheaders function behaviour, we + // disconnect the peer and return a failed progress to reschedule the query. + if msg.Headers[0].PrevBlock != request.startHash && + request.startHash == request.initialHash { + + sp.Disconnect() + + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + // If the peer sends us more headers than we need, it is probably not aligned with our chain, so we disconnect + // peer and return a failed progress. + reqMessage := request.Message().(*wire.MsgGetHeaders) + + if hdrLength > int(request.endHeight-request.startHeight) { + sp.Disconnect() + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + // Write header into hdrTipResponse map, add the request's startHeight to the hdrTipSlice, for tracking + // and handling by the processBlKHeaderInCheckPtRegionInOrder loop. + c.blockMgr.writeBatchMtx.Lock() + c.blockMgr.hdrTipToResponse[request.startHeight] = &headersMsg{ + headers: msg, + peer: sp, + } + i := sort.Search(len(c.blockMgr.hdrTipSlice), func(i int) bool { + return c.blockMgr.hdrTipSlice[i] >= request.startHeight + }) + + c.blockMgr.hdrTipSlice = append(c.blockMgr.hdrTipSlice[:i], append([]int32{request.startHeight}, c.blockMgr.hdrTipSlice[i:]...)...) + c.blockMgr.writeBatchMtx.Unlock() + + // Check if job is unfinished, if it is, we modify the job accordingly and send back to the workmanager to be rescheduled. + if msg.Headers[hdrLength-1].BlockHash() != reqMessage.HashStop && reqMessage.HashStop != zeroHash { + // set new startHash, startHeight and blocklocator to set the next set of header for this job. + newStartHash := msg.Headers[hdrLength-1].BlockHash() + request.startHeight += int32(hdrLength) + request.startHash = newStartHash + reqMessage.BlockLocatorHashes = []*chainhash.Hash{&newStartHash} + + // Incase there is a rollback after handling reqMessage + // This ensures the job created by writecheckpt does not exceed that which we have fetched already. + c.blockMgr.writeBatchMtx.RLock() + _, ok = c.blockMgr.hdrTipToResponse[request.startHeight] + c.blockMgr.writeBatchMtx.RUnlock() + + if !ok { + return query.Progress{ + Finished: true, + Progressed: false, + } + } + *jobErr = query.ErrResponseExistForQuery + } + + return query.Progress{ + Finished: true, + Progressed: true, + } +} + +// headerQuery implements ReqMessage interface for fetching block headers. +type headerQuery struct { + message wire.Message + startHeight int32 + initialHeight int32 + startHash chainhash.Hash + endHeight int32 + initialHash chainhash.Hash + index float64 +} + +func (h *headerQuery) Message() wire.Message { + return h.message +} + +func (h *headerQuery) PriorityIndex() float64 { + return h.index +} + // SyncPeer returns the current sync peer. func (b *blockManager) SyncPeer() *ServerPeer { b.syncPeerMutex.Lock() @@ -2089,11 +2568,48 @@ func (b *blockManager) SyncPeer() *ServerPeer { return b.syncPeer } -// isSyncCandidate returns whether or not the peer is a candidate to consider -// syncing from. -func (b *blockManager) isSyncCandidate(sp *ServerPeer) bool { - // The peer is not a candidate for sync if it's not a full node. - return sp.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork +// deleteAllHeaderTipRespAfterTip deletes all responses from newTip to prevTip. +func (b *blockManager) deleteAllHeaderTipRespAfterTip(newTip, prevTip int32) { + b.writeBatchMtx.Lock() + defer b.writeBatchMtx.Unlock() + + var ( + finalIdx int + initialIdx int + ) + + for i := 0; i < len(b.hdrTipSlice) && b.hdrTipSlice[i] <= newTip; i++ { + if b.hdrTipSlice[i] < prevTip { + continue + } + + if b.hdrTipSlice[i] == prevTip { + initialIdx = i + } + + tip := b.hdrTipSlice[i] + + delete(b.hdrTipToResponse, tip) + + finalIdx = i + } + + b.hdrTipSlice = append(b.hdrTipSlice[:initialIdx], b.hdrTipSlice[finalIdx+1:]...) +} + +// resetHeaderListToChainTip resets the headerList to the chain tip. +func (b *blockManager) resetHeaderListToChainTip() error { + header, height, err := b.cfg.BlockHeaders.ChainTip() + if err != nil { + return err + } + b.headerList.ResetHeaderState(headerlist.Node{ + Header: *header, + Height: int32(height), + }) + log.Debugf("Resetting header list to chain tip %v ", b.headerTip) + + return nil } // findNextHeaderCheckpoint returns the next checkpoint after the passed height. @@ -2768,20 +3284,16 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { } } - // When this header is a checkpoint, find the next checkpoint. - if receivedCheckpoint { - b.nextCheckpoint = b.findNextHeaderCheckpoint(finalHeight) - } - // If not current, request the next batch of headers starting from the // latest known header and ending with the next checkpoint. - if b.cfg.ChainParams.Net == chaincfg.SimNetParams.Net || !b.BlockHeadersSynced() { + // Note this must come before reassigning a new b.nextCheckpoint, so that we push headers + // only when the current headers before this takes us past the checkpointed region. + if b.cfg.ChainParams.Net == chaincfg.SimNetParams.Net || !b.BlockHeadersSynced() && + b.nextCheckpoint == nil { + locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash}) - nextHash := zeroHash - if b.nextCheckpoint != nil { - nextHash = *b.nextCheckpoint.Hash - } - err := hmsg.peer.PushGetHeadersMsg(locator, &nextHash) + + err := hmsg.peer.PushGetHeadersMsg(locator, &zeroHash) if err != nil { log.Warnf("Failed to send getheaders message to "+ "peer %s: %s", hmsg.peer.Addr(), err) @@ -2789,6 +3301,11 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { } } + // When this header is a checkpoint, find the next checkpoint. + if receivedCheckpoint { + b.nextCheckpoint = b.findNextHeaderCheckpoint(finalHeight) + } + // Since we have a new set of headers written to disk, we'll send out a // new signal to notify any waiting sub-systems that they can now maybe // proceed do to us extending the header chain. diff --git a/blockmanager_test.go b/blockmanager_test.go index 3bce9475..dca652b5 100644 --- a/blockmanager_test.go +++ b/blockmanager_test.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "reflect" + "sort" "strings" "testing" "time" @@ -89,11 +90,12 @@ func setupBlockManager(t *testing.T) (*blockManager, headerfs.BlockHeaderStore, // Set up a blockManager with the chain service we defined. bm, err := newBlockManager(&blockManagerCfg{ - ChainParams: chaincfg.SimNetParams, - BlockHeaders: hdrStore, - RegFilterHeaders: cfStore, - QueryDispatcher: &mockDispatcher{}, - TimeSource: blockchain.NewMedianTime(), + ChainParams: chaincfg.SimNetParams, + BlockHeaders: hdrStore, + RegFilterHeaders: cfStore, + cfHeaderQueryDispatcher: &mockDispatcher{}, + blkHdrCheckptQueryDispatcher: &mockDispatcher{}, + TimeSource: blockchain.NewMedianTime(), BanPeer: func(string, banman.Reason) error { return nil }, @@ -346,7 +348,7 @@ func TestBlockManagerInitialInterval(t *testing.T) { // We set up a custom query batch method for this test, as we // will use this to feed the blockmanager with our crafted // responses. - bm.cfg.QueryDispatcher.(*mockDispatcher).query = func( + bm.cfg.cfHeaderQueryDispatcher.(*mockDispatcher).query = func( requests []*query.Request, options ...query.QueryOption) chan error { @@ -577,7 +579,7 @@ func TestBlockManagerInvalidInterval(t *testing.T) { require.NoError(t, err) } - bm.cfg.QueryDispatcher.(*mockDispatcher).query = func( + bm.cfg.cfHeaderQueryDispatcher.(*mockDispatcher).query = func( requests []*query.Request, options ...query.QueryOption) chan error { @@ -886,20 +888,6 @@ func TestHandleHeaders(t *testing.T) { fakePeer, err := peer.NewOutboundPeer(&peer.Config{}, "fake:123") require.NoError(t, err) - assertPeerDisconnected := func(shouldBeDisconnected bool) { - // This is quite hacky but works: We expect the peer to be - // disconnected, which sets the unexported "disconnected" field - // to 1. - refValue := reflect.ValueOf(fakePeer).Elem() - foo := refValue.FieldByName("disconnect").Int() - - if shouldBeDisconnected { - require.EqualValues(t, 1, foo) - } else { - require.EqualValues(t, 0, foo) - } - } - // We'll want to use actual, real blocks, so we take a miner harness // that we can use to generate some. harness, err := rpctest.New( @@ -936,7 +924,7 @@ func TestHandleHeaders(t *testing.T) { // Let's feed in the correct headers. This should work fine and the peer // should not be disconnected. bm.handleHeadersMsg(hmsg) - assertPeerDisconnected(false) + assertPeerDisconnected(false, fakePeer, t) // Now scramble the headers and feed them in again. This should cause // the peer to be disconnected. @@ -945,5 +933,926 @@ func TestHandleHeaders(t *testing.T) { hmsg.headers.Headers[j], hmsg.headers.Headers[i] }) bm.handleHeadersMsg(hmsg) - assertPeerDisconnected(true) + assertPeerDisconnected(true, fakePeer, t) +} + +// assertPeerDisconnected asserts that the peer supplied as an argument is disconnected. +func assertPeerDisconnected(shouldBeDisconnected bool, sp *peer.Peer, t *testing.T) { + // This is quite hacky but works: We expect the peer to be + // disconnected, which sets the unexported "disconnected" field + // to 1. + refValue := reflect.ValueOf(sp).Elem() + foo := refValue.FieldByName("disconnect").Int() + + if shouldBeDisconnected { + require.EqualValues(t, 1, foo) + } else { + require.EqualValues(t, 0, foo) + } +} + +// TestBatchCheckpointedBlkHeaders tests the batch checkpointed headers function. +func TestBatchCheckpointedBlkHeaders(t *testing.T) { + t.Parallel() + + // First, we set up a block manager and a fake peer that will act as the + // test's remote peer. + bm, _, _, err := setupBlockManager(t) + require.NoError(t, err) + + // Created checkpoints for our simulated network. + checkpoints := []chaincfg.Checkpoint{ + + { + Hash: &chainhash.Hash{1}, + Height: int32(1), + }, + + { + Hash: &chainhash.Hash{2}, + Height: int32(2), + }, + + { + Hash: &chainhash.Hash{3}, + Height: int32(3), + }, + } + + modParams := chaincfg.SimNetParams + modParams.Checkpoints = append(modParams.Checkpoints, checkpoints...) + bm.cfg.ChainParams = modParams + + // set checkpoint and header tip. + bm.nextCheckpoint = &checkpoints[0] + + bm.newHeadersMtx.Lock() + bm.headerTip = 0 + bm.headerTipHash = chainhash.Hash{0} + bm.newHeadersMtx.Unlock() + + // This is the query we assert to obtain if the function works accordingly. + expectedQuery := CheckpointedBlockHeadersQuery{ + blockMgr: bm, + msgs: []*headerQuery{ + + { + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: blockchain.BlockLocator([]*chainhash.Hash{{0}}), + HashStop: *checkpoints[0].Hash, + }, + startHeight: int32(0), + initialHeight: int32(0), + startHash: chainhash.Hash{0}, + endHeight: checkpoints[0].Height, + initialHash: chainhash.Hash{0}, + }, + + { + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: blockchain.BlockLocator([]*chainhash.Hash{checkpoints[0].Hash}), + HashStop: *checkpoints[1].Hash, + }, + startHeight: checkpoints[0].Height, + initialHeight: checkpoints[0].Height, + startHash: *checkpoints[0].Hash, + endHeight: checkpoints[1].Height, + initialHash: *checkpoints[0].Hash, + }, + + { + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: blockchain.BlockLocator([]*chainhash.Hash{checkpoints[1].Hash}), + HashStop: *checkpoints[2].Hash, + }, + startHeight: checkpoints[1].Height, + initialHeight: checkpoints[1].Height, + startHash: *checkpoints[1].Hash, + endHeight: checkpoints[2].Height, + initialHash: *checkpoints[1].Hash, + }, + + { + + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: blockchain.BlockLocator([]*chainhash.Hash{checkpoints[2].Hash}), + HashStop: zeroHash, + }, + startHeight: checkpoints[2].Height, + initialHeight: checkpoints[2].Height, + startHash: *checkpoints[2].Hash, + endHeight: checkpoints[2].Height + wire.MaxBlockHeadersPerMsg, + initialHash: *checkpoints[2].Hash, + }, + }, + } + + // create request. + expectedRequest := expectedQuery.requests() + + bm.cfg.blkHdrCheckptQueryDispatcher.(*mockDispatcher).query = func(requests []*query.Request, + options ...query.QueryOption) chan error { + + // assert that the requests obtained has same length as that of our expected query. + if len(requests) != len(expectedRequest) { + t.Fatalf("unequal length") + } + + for i, req := range requests { + testEqualReqMessage(req, expectedRequest[i], t) + } + + // Ensure the query options sent by query is four. This is the number of query option supplied as args while + // querying the workmanager. + if len(options) != 3 { + t.Fatalf("expected five option parameter for query but got, %v\n", len(options)) + } + return nil + } + + // call the function that we are testing. + bm.batchCheckpointedBlkHeaders() +} + +// This function tests the ProcessBlKHeaderInCheckPtRegionInOrder function. +func TestProcessBlKHeaderInCheckPtRegionInOrder(t *testing.T) { + t.Parallel() + + // First, we set up a block manager and a fake peer that will act as the + // test's remote peer. + bm, _, _, err := setupBlockManager(t) + require.NoError(t, err) + + fakePeer, err := peer.NewOutboundPeer(&peer.Config{}, "fake:123") + require.NoError(t, err) + + // We'll want to use actual, real blocks, so we take a miner harness + // that we can use to generate some. + harness, err := rpctest.New( + &chaincfg.SimNetParams, nil, []string{"--txindex"}, "", + ) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, harness.TearDown()) + }) + + err = harness.SetUp(false, 0) + require.NoError(t, err) + + // Generate 10 valid blocks that we then feed to the block manager. + blockHashes, err := harness.Client.Generate(30) + require.NoError(t, err) + + // This is the headerMessage containing 10 headers starting at height 0. + hmsgTip0 := &headersMsg{ + headers: &wire.MsgHeaders{ + Headers: make([]*wire.BlockHeader, 10), + }, + peer: &ServerPeer{ + Peer: fakePeer, + }, + } + + // This is the headerMessage containing 10 headers starting at height 10. + hmsgTip10 := &headersMsg{ + headers: &wire.MsgHeaders{ + Headers: make([]*wire.BlockHeader, 10), + }, + peer: &ServerPeer{ + Peer: fakePeer, + }, + } + + // This is the headerMessage containing 10 headers starting at height 20. + hmsgTip20 := &headersMsg{ + headers: &wire.MsgHeaders{ + Headers: make([]*wire.BlockHeader, 10), + }, + peer: &ServerPeer{ + Peer: fakePeer, + }, + } + + // Loop through the generated blockHashes and add headers to their appropriate slices. + for i := range blockHashes { + header, err := harness.Client.GetBlockHeader(blockHashes[i]) + require.NoError(t, err) + + if i < 10 { + hmsgTip0.headers.Headers[i] = header + } + + if i >= 10 && i < 20 { + hmsgTip10.headers.Headers[i-10] = header + } + + if i >= 20 { + hmsgTip20.headers.Headers[i-20] = header + } + } + + // initialize the hdrTipSlice. + bm.hdrTipSlice = make([]int32, 0) + + // Create checkpoint for our test chain. + checkpoint := chaincfg.Checkpoint{ + Hash: blockHashes[29], + Height: int32(30), + } + bm.cfg.ChainParams.Checkpoints = append(bm.cfg.ChainParams.Checkpoints, []chaincfg.Checkpoint{ + checkpoint, + }...) + bm.nextCheckpoint = &checkpoint + + // If ProcessBlKHeaderInCheckPtRegionInOrder loop receives invalid headers assert the query parameters being sent + // to the workmanager is expected. + bm.cfg.blkHdrCheckptQueryDispatcher.(*mockDispatcher).query = func(requests []*query.Request, + options ...query.QueryOption) chan error { + + // The function should send only one request. + if len(requests) != 1 { + t.Fatalf("expected only one request") + } + + finalNode := bm.headerList.Back() + newHdrTip := finalNode.Height + newHdrTipHash := finalNode.Header.BlockHash() + prevCheckPt := bm.findPreviousHeaderCheckpoint(newHdrTip) + + testEqualReqMessage(requests[0], &query.Request{ + + Req: &headerQuery{ + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{&newHdrTipHash}, + HashStop: *bm.nextCheckpoint.Hash, + }, + startHeight: newHdrTip, + initialHeight: prevCheckPt.Height, + startHash: newHdrTipHash, + endHeight: bm.nextCheckpoint.Height, + initialHash: newHdrTipHash, + index: 0, + }, + }, t) + + // The function should include only four query options while querying. + if len(options) != 3 { + t.Fatalf("expected three option parameter for query but got, %v\n", len(options)) + } + return nil + } + + // Call the function in a goroutine. + go bm.processBlKHeaderInCheckPtRegionInOrder() + + // At this point syncPeer should be nil. + bm.syncPeerMutex.RLock() + if bm.syncPeer != nil { + bm.syncPeerMutex.RUnlock() + t.Fatalf("syncPeer should be nil initially") + } + bm.syncPeerMutex.RUnlock() + + // Set header tip to zero and write a response at height 10, ensure the ProcessBlKHeaderInCheckPtRegionInOrder loop + // does not handle the response as it does not correspond to the current header tip. + bm.newHeadersMtx.Lock() + bm.headerTip = 0 + bm.newHeadersMtx.Unlock() + + bm.writeBatchMtx.Lock() + newTipWrite := int32(10) + bm.hdrTipToResponse[newTipWrite] = hmsgTip10 + i := sort.Search(len(bm.hdrTipSlice), func(i int) bool { + return bm.hdrTipSlice[i] >= newTipWrite + }) + + bm.hdrTipSlice = append(bm.hdrTipSlice[:i], append([]int32{newTipWrite}, bm.hdrTipSlice[i:]...)...) + bm.writeBatchMtx.Unlock() + + // SyncPeer should still be nil to indicate that the loop did not handle the response. + bm.syncPeerMutex.RLock() + if bm.syncPeer != nil { + bm.syncPeerMutex.RUnlock() + t.Fatalf("syncPeer should be nil") + } + bm.syncPeerMutex.RUnlock() + + // Set header tip to 20 to indicate that even when the chain's tip is higher that the available tips in the + // hdrTipToResponse map, the loop does not still handle it. + bm.newHeadersMtx.Lock() + bm.headerTip = 20 + bm.newHeadersMtx.Unlock() + + // SyncPeer should still be nil to indicate that the loop did not handle the response. + bm.syncPeerMutex.RLock() + if bm.syncPeer != nil { + bm.syncPeerMutex.RUnlock() + t.Fatalf("syncPeer should be nil") + } + bm.syncPeerMutex.RUnlock() + + // Set headerTip to zero and write a response at height 0 to the hdrTipToResponse map. The loop should handle this + // response now and the following response that would correspond to its new tip after this. + bm.newHeadersMtx.Lock() + bm.headerTip = 0 + bm.newHeadersMtx.Unlock() + + bm.writeBatchMtx.Lock() + newTipWrite = int32(0) + i = sort.Search(len(bm.hdrTipSlice), func(i int) bool { + return bm.hdrTipSlice[i] >= newTipWrite + }) + + bm.hdrTipSlice = append(bm.hdrTipSlice[:i], append([]int32{newTipWrite}, bm.hdrTipSlice[i:]...)...) + + bm.hdrTipToResponse[newTipWrite] = hmsgTip0 + bm.writeBatchMtx.Unlock() + + // Allow time for handling the response. + time.Sleep(1 * time.Second) + bm.syncPeerMutex.RLock() + if bm.syncPeer == nil { + bm.syncPeerMutex.RUnlock() + t.Fatalf("syncPeer should not be nil") + } + bm.syncPeerMutex.RUnlock() + + // Header tip should be 20 as th the loop would handle response at height 0 then the previously written + // height 10. + bm.newHeadersMtx.RLock() + if bm.headerTip != 20 { + hdrTip := bm.headerTip + bm.newHeadersMtx.RUnlock() + t.Fatalf("expected header tip at 10 but got %v\n", hdrTip) + } + bm.newHeadersMtx.RUnlock() + + // Now scramble the headers and feed them in again. This should cause + // the loop to delete this response from the map and re-request for this header from + // the workmanager. + rand.Shuffle(len(hmsgTip20.headers.Headers), func(i, j int) { + hmsgTip20.headers.Headers[i], hmsgTip20.headers.Headers[j] = + hmsgTip20.headers.Headers[j], hmsgTip20.headers.Headers[i] + }) + + // Write this header at height 20, this would cause the loop to handle it. + bm.writeBatchMtx.Lock() + newTipWrite = int32(20) + bm.hdrTipToResponse[newTipWrite] = hmsgTip20 + i = sort.Search(len(bm.hdrTipSlice), func(i int) bool { + return bm.hdrTipSlice[i] >= newTipWrite + }) + + bm.hdrTipSlice = append(bm.hdrTipSlice[:i], append([]int32{newTipWrite}, bm.hdrTipSlice[i:]...)...) + + bm.writeBatchMtx.Unlock() + + // Allow time for handling. + time.Sleep(1 * time.Second) + + // HeadrTip should not advance as headers are invalid. + bm.newHeadersMtx.RLock() + if bm.headerTip != 20 { + hdrTip := bm.headerTip + bm.newHeadersMtx.RUnlock() + t.Fatalf("expected header tip at 20 but got %v\n", hdrTip) + } + bm.newHeadersMtx.RUnlock() + + // Syncpeer should not be nil as we are still in the loop. + bm.syncPeerMutex.RLock() + if bm.syncPeer == nil { + bm.syncPeerMutex.RUnlock() + t.Fatalf("syncPeer should not be nil") + } + bm.syncPeerMutex.RUnlock() + + // The response at header tip 20 should be deleted. + bm.writeBatchMtx.RLock() + _, ok := bm.hdrTipToResponse[int32(20)] + bm.writeBatchMtx.RUnlock() + + if ok { + t.Fatalf("expected response to header tip deleted") + } +} + +// TestCheckpointedBlockHeadersQuery_handleResponse tests the handleResponse method +// of the CheckpointedBlockHeadersQuery. +func TestCheckpointedBlockHeadersQuery_handleResponse(t *testing.T) { + t.Parallel() + + finalResp := query.Progress{ + Finished: true, + Progressed: true, + } + + finalRespNoProgress := query.Progress{ + Finished: true, + Progressed: false, + } + + NoProgressNoFinalResp := query.Progress{ + Finished: false, + Progressed: false, + } + + // handleRespTestCase holds all the information required to test different scenarios while + // using the function. + type handleRespTestCase struct { + + // name of the testcase. + name string + + // resp is the response argument to be sent to the handleResp method as an arg. + resp wire.Message + + // req is the request method to be sent to the handleResp method as an arg. + req query.ReqMessage + + // jobErr is the value of the jobErr arg after the handleResp function is done. + jobErr *error + + // progress is the expected progress to be returned by the handleResp method. + progress query.Progress + + // lastblock is the block with which we obtain its hash to be used as the request's hashStop. + lastBlock wire.BlockHeader + + // peerDisconnected indicates if the peer would be disconnected after the handleResp method is done. + peerDisconnected bool + } + + testCases := []handleRespTestCase{ + + { + // Scenario in which we have a request type that is not the same as the expected headerQuery type.It should + // return no error and NoProgressNoFinalResp query.Progress. + name: "invalid request type", + resp: &wire.MsgHeaders{ + Headers: make([]*wire.BlockHeader, 0, 5), + }, + req: &encodedQuery{}, + jobErr: nil, + progress: NoProgressNoFinalResp, + }, + + { + // Scenario in which we have a response type that is not same as the expected wire.MsgHeaders. It should + // return no error and NoProgressNoFinalResp query.Progress. + name: "invalid response type", + resp: &wire.MsgCFHeaders{}, + req: &headerQuery{ + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {1}, + }, + }, + startHeight: 1, + initialHeight: 1, + startHash: chainhash.Hash{1}, + endHeight: 6, + initialHash: chainhash.Hash{1}, + }, + jobErr: nil, + progress: NoProgressNoFinalResp, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{5}, + }, + }, + + { + // Scenario in which we have the response in the hdrTipResponseMap. While calling these testcases, we + // initialize the hdrTipToResponse map to contain a response at height 0 and 6. Since this request ahs a + // startheight of 0, its response would be in the map already, aligning with this scenario. This scenario + // should return the query.ErrResponseExistForQuery error and return the finalResp query.progress, + name: "response start Height in hdrTipResponse map", + resp: &wire.MsgHeaders{ + Headers: make([]*wire.BlockHeader, 0, 4), + }, + req: &headerQuery{ + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {0}, + }, + }, + startHeight: 0, + initialHeight: 0, + startHash: chainhash.Hash{0}, + endHeight: 5, + initialHash: chainhash.Hash{0}, + }, + jobErr: &query.ErrResponseExistForQuery, + progress: finalResp, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{4}, + }, + }, + + { + // Scenario in which the response we receive from the is of length zero. We should return an error and return + // NoProgressNoFinalResp query.Progress. + name: "response header length 0", + resp: &wire.MsgHeaders{ + Headers: make([]*wire.BlockHeader, 0), + }, + req: &headerQuery{ + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {1}, + }, + }, + startHeight: 1, + initialHeight: 1, + startHash: chainhash.Hash{1}, + endHeight: 5, + initialHash: chainhash.Hash{1}, + }, + jobErr: &query.ErrResponseErr, + progress: NoProgressNoFinalResp, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{4}, + }, + }, + + { + // Scenario in which the response received is at the request's initialHeight (lower bound height in + // checkpoint request) but its first block's previous hash is not same as the checkpoint hash. Its + // jobErr should be nil and the function should return NoProgressNoFinalResp query.progress. + name: "response at initialHash has disconnected start Hash", + resp: &wire.MsgHeaders{ + Headers: []*wire.BlockHeader{ + { + PrevBlock: chainhash.Hash{2}, + }, + { + PrevBlock: chainhash.Hash{3}, + }, + { + PrevBlock: chainhash.Hash{4}, + }, + }, + }, + req: &headerQuery{ + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {1}, + }, + }, + startHeight: 1, + initialHeight: 1, + startHash: chainhash.Hash{1}, + endHeight: 5, + initialHash: chainhash.Hash{1}, + }, + jobErr: nil, + progress: NoProgressNoFinalResp, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{4}, + }, + peerDisconnected: true, + }, + + { + // Scenario in which the response is not at the initial Hash (lower bound hash in the + // checkpoint request) but the response is complete and valid. The jobErr should be nil and + // return finalRespNoProgress query.Progress. + name: "response not at initialHash, valid complete headers", + resp: &wire.MsgHeaders{ + Headers: []*wire.BlockHeader{ + { + PrevBlock: chainhash.Hash{2}, + }, + { + PrevBlock: chainhash.Hash{3}, + }, + { + PrevBlock: chainhash.Hash{4}, + }, + }, + }, + req: &headerQuery{ + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {2}, + }, + }, + startHeight: 2, + initialHeight: 1, + startHash: chainhash.Hash{2}, + endHeight: 5, + initialHash: chainhash.Hash{2}, + }, + jobErr: nil, + progress: finalResp, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{4}, + }, + }, + + { + // Scenario in which the response is not at initial hash (lower bound height in + // checkpoint request) and the response is unfinished. The jobErr should be nil and return + // finalRespNoProgress query.progress. + name: "response not at initial Hash, unfinished response", + resp: &wire.MsgHeaders{ + Headers: []*wire.BlockHeader{ + { + PrevBlock: chainhash.Hash{2}, + }, + { + PrevBlock: chainhash.Hash{3}, + }, + { + PrevBlock: chainhash.Hash{4}, + }, + }, + }, + req: &headerQuery{ + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {2}, + }, + }, + startHeight: 2, + initialHeight: 1, + startHash: chainhash.Hash{2}, + endHeight: 6, + initialHash: chainhash.Hash{2}, + }, + jobErr: nil, + progress: finalRespNoProgress, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{5}, + }, + }, + + { + // Scenario in which the response length is greater than expected. JobErr should be nil, peer + // should be disconnected and the method should return NoProgressNoFinalResp query.progress. + name: "response header length more than expected", + resp: &wire.MsgHeaders{ + Headers: []*wire.BlockHeader{ + { + PrevBlock: chainhash.Hash{1}, + }, + { + PrevBlock: chainhash.Hash{2}, + }, + { + PrevBlock: chainhash.Hash{3}, + }, + { + PrevBlock: chainhash.Hash{4}, + }, + { + PrevBlock: chainhash.Hash{5}, + }, + { + PrevBlock: chainhash.Hash{6}, + }, + }, + }, + req: &headerQuery{ + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {1}, + }, + }, + startHeight: 1, + initialHeight: 1, + startHash: chainhash.Hash{1}, + endHeight: 6, + initialHash: chainhash.Hash{1}, + }, + jobErr: nil, + progress: NoProgressNoFinalResp, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{5}, + }, + peerDisconnected: true, + }, + + { + // Scenario in which response is complete and a valid header. Its start height is at the initial height. + // jobErr should be nil and progress should be finalResp. + name: "complete response valid headers", + resp: &wire.MsgHeaders{ + Headers: []*wire.BlockHeader{ + { + PrevBlock: chainhash.Hash{1}, + }, + { + PrevBlock: chainhash.Hash{2}, + }, + { + PrevBlock: chainhash.Hash{3}, + }, + }, + }, + req: &headerQuery{ + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {1}, + }, + }, + startHeight: 1, + initialHeight: 1, + startHash: chainhash.Hash{1}, + endHeight: 4, + initialHash: chainhash.Hash{1}, + }, + jobErr: nil, + progress: finalResp, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{3}, + }, + }, + + { + // Scenario in which response is at initialHash and the response is incomplete. The joberr should be nil, + // progress, finalRespNoProgress. + name: "response at initial hash, incomplete response, valid headers", + resp: &wire.MsgHeaders{ + Headers: []*wire.BlockHeader{ + { + PrevBlock: chainhash.Hash{1}, + }, + { + PrevBlock: chainhash.Hash{2}, + }, + { + PrevBlock: chainhash.Hash{3}, + }, + }, + }, + req: &headerQuery{ + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {1}, + }, + }, + startHeight: 1, + initialHeight: 1, + startHash: chainhash.Hash{1}, + endHeight: 6, + initialHash: chainhash.Hash{1}, + }, + jobErr: nil, + progress: finalRespNoProgress, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{5}, + }, + }, + + { + // Scenario in which response is incomplete but valid. The new response's start height created in this + // scenario is present in the hdrTipResponseMap. The startHeight is 6 and response at height 6 has been + // preveiously written in to the hdrTipResponse map for the sake of this test. + name: "incomplete response, valid headers, new resp in hdrTipToResponse map", + resp: &wire.MsgHeaders{ + Headers: []*wire.BlockHeader{ + { + PrevBlock: chainhash.Hash{1}, + }, + { + PrevBlock: chainhash.Hash{2}, + }, + { + PrevBlock: chainhash.Hash{3}, + }, + { + PrevBlock: chainhash.Hash{4}, + }, + { + PrevBlock: chainhash.Hash{5}, + }, + }, + }, + req: &headerQuery{ + message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {1}, + }, + }, + startHeight: 1, + initialHeight: 1, + startHash: chainhash.Hash{1}, + endHeight: 10, + initialHash: chainhash.Hash{1}, + }, + jobErr: &query.ErrResponseExistForQuery, + progress: finalResp, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{9}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // set up block manager. + bm, _, _, err := setupBlockManager(t) + require.NoError(t, err) + + var oldReqStartHeight int32 + + bm.hdrTipToResponse[0] = &headersMsg{ + headers: &wire.MsgHeaders{}, + } + bm.hdrTipToResponse[6] = &headersMsg{ + headers: &wire.MsgHeaders{}, + } + + fakePeer, err := peer.NewOutboundPeer(&peer.Config{}, "fake:123") + require.NoError(t, err) + + query := &CheckpointedBlockHeadersQuery{ + blockMgr: bm, + } + req := tc.req + r, ok := tc.req.(*headerQuery) + if ok { + reqMessage, ok := req.Message().(*wire.MsgGetHeaders) + if !ok { + t.Fatalf("request message not of type wire.MsgGetHeaders") + } + reqMessage.HashStop = tc.lastBlock.BlockHash() + req = r + oldReqStartHeight = r.startHeight + } + var jobErr error + + actualProgress := query.handleResponse(req, tc.resp, &ServerPeer{ + Peer: fakePeer, + }, &jobErr) + if actualProgress != tc.progress { + t.Fatalf("unexpected progress.Expected:"+ + "Finished:%v and Progressed: %v but got,"+ + "Finished: %v and Progressed: %v", tc.progress.Finished, tc.progress.Progressed, + actualProgress.Finished, actualProgress.Progressed) + } + + if actualProgress.Finished && !actualProgress.Progressed { + resp := tc.resp.(*wire.MsgHeaders) + request := req.(*headerQuery) + if request.startHash != resp.Headers[len(resp.Headers)-1].BlockHash() { + t.Fatalf("unexpected new startHash") + } + + if request.startHeight != oldReqStartHeight+int32(len(resp.Headers)) { + t.Fatalf("unexpected new start height") + } + + requestMessage := req.Message().(*wire.MsgGetHeaders) + + if *requestMessage.BlockLocatorHashes[0] != request.startHash { + t.Fatalf("unexpected new blockLocator") + } + } + + if tc.jobErr == nil && jobErr != nil { + t.Fatalf("unexpected error: %v\n", jobErr) + } + + if tc.jobErr != nil && jobErr != *tc.jobErr { + t.Fatalf("expected error, %v but got %v\n", *tc.jobErr, jobErr) + } + + assertPeerDisconnected(tc.peerDisconnected, fakePeer, t) + }) + } +} + +// testEqualReqMessage tests if two query.Request are same. +func testEqualReqMessage(a, b *query.Request, t *testing.T) { + aMessage := a.Req.(*headerQuery) + bMessage := b.Req.(*headerQuery) + + if aMessage.startHeight != bMessage.startHeight { + t.Fatalf("dissimilar startHeight") + } + if aMessage.startHash != bMessage.startHash { + t.Fatalf("dissimilar startHash") + } + if aMessage.endHeight != bMessage.endHeight { + t.Fatalf("dissimilar endHash") + } + if aMessage.initialHash != bMessage.initialHash { + t.Fatalf("dissimilar initialHash") + } + + aMessageGetHeaders := aMessage.Message().(*wire.MsgGetHeaders) + bMessageGetHeaders := bMessage.Message().(*wire.MsgGetHeaders) + + if !reflect.DeepEqual(aMessageGetHeaders.BlockLocatorHashes, bMessageGetHeaders.BlockLocatorHashes) { + t.Fatalf("dissimilar blocklocator hash") + } + + if aMessageGetHeaders.HashStop != bMessageGetHeaders.HashStop { + t.Fatalf("dissimilar hashstop") + } + if a.Req.PriorityIndex() != b.Req.PriorityIndex() { + t.Fatalf("dissimilar priority index") + } } diff --git a/neutrino.go b/neutrino.go index 7ee45edd..5179e0d9 100644 --- a/neutrino.go +++ b/neutrino.go @@ -195,6 +195,31 @@ func NewServerPeer(s *ChainService, isPersistent bool) *ServerPeer { } } +// IsSyncCandidate returns whether or not the peer is a candidate to consider +// syncing from. +func (sp *ServerPeer) IsSyncCandidate() bool { + // The peer is not a candidate for sync if it's not a full node. + return sp.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork +} + +// IsPeerBehindStartHeight returns a boolean indicating if the peer's last block height +// is behind the start height of the request. If the peer is not behind the request start +// height false is returned, otherwise, true is. +func (sp *ServerPeer) IsPeerBehindStartHeight(req query.ReqMessage) bool { + queryGetHeaders, ok := req.(*headerQuery) + + if !ok { + log.Debugf("request is not type headerQuery") + + return true + } + + if sp.LastBlock() < queryGetHeaders.startHeight { + return true + } + return false +} + // newestBlock returns the current best block hash and height using the format // required by the configuration for the peer package. func (sp *ServerPeer) newestBlock() (*chainhash.Hash, int32, error) { @@ -800,15 +825,21 @@ func NewChainService(cfg Config) (*ChainService, error) { } bm, err := newBlockManager(&blockManagerCfg{ - ChainParams: s.chainParams, - BlockHeaders: s.BlockHeaders, - RegFilterHeaders: s.RegFilterHeaders, - TimeSource: s.timeSource, - QueryDispatcher: s.workManager, - BanPeer: s.BanPeer, - GetBlock: s.GetBlock, - firstPeerSignal: s.firstPeerConnect, - queryAllPeers: s.queryAllPeers, + ChainParams: s.chainParams, + BlockHeaders: s.BlockHeaders, + RegFilterHeaders: s.RegFilterHeaders, + TimeSource: s.timeSource, + cfHeaderQueryDispatcher: s.workManager, + BanPeer: s.BanPeer, + GetBlock: s.GetBlock, + firstPeerSignal: s.firstPeerConnect, + queryAllPeers: s.queryAllPeers, + blkHdrCheckptQueryDispatcher: query.NewWorkManager(&query.Config{ + ConnectedPeers: s.ConnectedPeers, + NewWorker: query.NewWorker, + Ranking: query.NewPeerRanking(), + IsEligibleWorkerFunc: query.IsWorkerEligibleForBlkHdrFetch, + }), }) if err != nil { return nil, err @@ -1610,6 +1641,9 @@ func (s *ChainService) Start() error { s.addrManager.Start() s.blockManager.Start() s.blockSubscriptionMgr.Start() + if err := s.blockManager.cfg.blkHdrCheckptQueryDispatcher.Start(); err != nil { + return fmt.Errorf("unable to start block header work manager: %v", err) + } if err := s.workManager.Start(); err != nil { return fmt.Errorf("unable to start work manager: %v", err) } diff --git a/query/interface.go b/query/interface.go index 52ab30dd..686a6cbc 100644 --- a/query/interface.go +++ b/query/interface.go @@ -202,4 +202,11 @@ type Peer interface { // OnDisconnect returns a channel that will be closed when this peer is // disconnected. OnDisconnect() <-chan struct{} + + // IsPeerBehindStartHeight returns a boolean indicating if the peer's known last height is behind + // the request's start Height which it receives as an argument. + IsPeerBehindStartHeight(req ReqMessage) bool + + // IsSyncCandidate returns if the peer is a sync candidate. + IsSyncCandidate() bool } diff --git a/query/worker.go b/query/worker.go index f7c45772..6f8d66e7 100644 --- a/query/worker.go +++ b/query/worker.go @@ -94,7 +94,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) { msgChan, cancel := peer.SubscribeRecvMsg() defer cancel() -nexJobLoop: +nextJobLoop: for { log.Tracef("Worker %v waiting for more work", peer.Addr()) @@ -153,7 +153,7 @@ nexJobLoop: case <-quit: return } - goto nexJobLoop + goto nextJobLoop } } @@ -302,6 +302,28 @@ nexJobLoop: } } +func (w *worker) IsSyncCandidate() bool { + return w.peer.IsSyncCandidate() +} + +func (w *worker) IsPeerBehindStartHeight(req ReqMessage) bool { + return w.peer.IsPeerBehindStartHeight(req) +} + +// IsWorkerEligibleForBlkHdrFetch is the eligibility function used for the BlockHdrWorkManager to determine workers +// eligible to receive jobs (the job is to fetch headers). If the peer is not a sync candidate or if its last known +// block height is behind the job query's start height, it returns false. Otherwise, it returns true. +func IsWorkerEligibleForBlkHdrFetch(r *activeWorker, next *queryJob) bool { + if !r.w.IsSyncCandidate() { + return false + } + + if r.w.IsPeerBehindStartHeight(next.Req) { + return false + } + return true +} + // NewJob returns a channel where work that is to be handled by the worker can // be sent. If the worker reads a queryJob from this channel, it is guaranteed // that a response will eventually be deliverd on the results channel (except diff --git a/query/worker_test.go b/query/worker_test.go index c036426b..ad3e8aec 100644 --- a/query/worker_test.go +++ b/query/worker_test.go @@ -10,9 +10,10 @@ import ( ) type mockQueryEncoded struct { - message *wire.MsgGetData - encoding wire.MessageEncoding - index float64 + message *wire.MsgGetData + encoding wire.MessageEncoding + index float64 + startHeight int } func (m *mockQueryEncoded) Message() wire.Message { @@ -49,6 +50,8 @@ type mockPeer struct { responses chan<- wire.Message subscriptions chan chan wire.Message quit chan struct{} + bestHeight int + fullNode bool err error } @@ -69,6 +72,15 @@ func (m *mockPeer) Addr() string { return m.addr } +func (m *mockPeer) IsPeerBehindStartHeight(request ReqMessage) bool { + r := request.(*mockQueryEncoded) + return m.bestHeight < r.startHeight +} + +func (m *mockPeer) IsSyncCandidate() bool { + return m.fullNode +} + // makeJob returns a new query job that will be done when it is given the // finalResp message. Similarly ot will progress on being given the // progressResp message, while any other message will be ignored. diff --git a/query/workmanager.go b/query/workmanager.go index 2c95e1f5..aed55690 100644 --- a/query/workmanager.go +++ b/query/workmanager.go @@ -46,6 +46,13 @@ type Worker interface { // delivered on the results channel (except when the quit channel has // been closed). NewJob() chan<- *queryJob + + // IsPeerBehindStartHeight returns a boolean indicating if the peer's known last height is behind + // the request's start Height which it receives as an argument. + IsPeerBehindStartHeight(req ReqMessage) bool + + // IsSyncCandidate returns if the peer is a sync candidate. + IsSyncCandidate() bool } // PeerRanking is an interface that must be satisfied by the underlying module diff --git a/query/workmanager_test.go b/query/workmanager_test.go index 33116ac0..5b8bb195 100644 --- a/query/workmanager_test.go +++ b/query/workmanager_test.go @@ -15,6 +15,14 @@ type mockWorker struct { results chan *jobResult } +func (m *mockWorker) IsPeerBehindStartHeight(req ReqMessage) bool { + return m.peer.IsPeerBehindStartHeight(req) +} + +func (m *mockWorker) IsSyncCandidate() bool { + return m.peer.IsSyncCandidate() +} + var _ Worker = (*mockWorker)(nil) func (m *mockWorker) NewJob() chan<- *queryJob { @@ -977,3 +985,105 @@ func TestWorkManagerResultUnfinished(t *testing.T) { t.Fatalf("nothing received on errChan") } } + +// TestIsWorkerEligibleForBlkHdrFetch tests the IsWorkerEligibleForBlkHdrFetch function. +func TestIsWorkerEligibleForBlkHdrFetch(t *testing.T) { + type testArgs struct { + name string + activeWorker *activeWorker + job *queryJob + expectedEligibility bool + } + + testCases := []testArgs{ + { + name: "peer sync candidate, best height behind job start Height", + activeWorker: &activeWorker{ + w: &mockWorker{ + peer: &mockPeer{ + bestHeight: 5, + fullNode: true, + }, + }, + }, + job: &queryJob{ + Request: &Request{ + Req: &mockQueryEncoded{ + startHeight: 10, + }, + }, + }, + expectedEligibility: false, + }, + + { + name: "peer sync candidate, best height ahead job start Height", + activeWorker: &activeWorker{ + w: &mockWorker{ + peer: &mockPeer{ + bestHeight: 10, + fullNode: true, + }, + }, + }, + job: &queryJob{ + Request: &Request{ + Req: &mockQueryEncoded{ + startHeight: 5, + }, + }, + }, + expectedEligibility: true, + }, + + { + name: "peer not sync candidate, best height behind job start Height", + activeWorker: &activeWorker{ + w: &mockWorker{ + peer: &mockPeer{ + bestHeight: 5, + fullNode: false, + }, + }, + }, + job: &queryJob{ + Request: &Request{ + Req: &mockQueryEncoded{ + startHeight: 10, + }, + }, + }, + expectedEligibility: false, + }, + + { + name: "peer not sync candidate, best height ahead job start Height", + activeWorker: &activeWorker{ + w: &mockWorker{ + peer: &mockPeer{ + bestHeight: 10, + fullNode: false, + }, + }, + }, + job: &queryJob{ + Request: &Request{ + Req: &mockQueryEncoded{ + startHeight: 5, + }, + }, + }, + expectedEligibility: false, + }, + } + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + isEligible := IsWorkerEligibleForBlkHdrFetch(test.activeWorker, test.job) + if isEligible != test.expectedEligibility { + t.Fatalf("Expected '%v'for eligibility check but got"+ + "'%v'\n", test.expectedEligibility, isEligible) + } + }) + } +}