diff --git a/cmd/retransmitter/retransmit/network/incoming_peer.go b/cmd/retransmitter/retransmit/network/incoming_peer.go index d7e9aa664..b5ebeee37 100644 --- a/cmd/retransmitter/retransmit/network/incoming_peer.go +++ b/cmd/retransmitter/retransmit/network/incoming_peer.go @@ -3,10 +3,11 @@ package network import ( "context" "fmt" - "github.com/wavesplatform/gowaves/pkg/p2p/peer" "net" "time" + "github.com/wavesplatform/gowaves/pkg/p2p/peer" + "github.com/wavesplatform/gowaves/pkg/libs/bytespool" "github.com/wavesplatform/gowaves/pkg/p2p/conn" "github.com/wavesplatform/gowaves/pkg/proto" @@ -37,13 +38,13 @@ func RunIncomingPeer(ctx context.Context, params IncomingPeerParams) { _, err := readHandshake.ReadFrom(c) if err != nil { zap.S().Error("failed to read handshake: ", err) - c.Close() + _ = c.Close() return } select { case <-ctx.Done(): - c.Close() + _ = c.Close() return default: } @@ -64,13 +65,13 @@ func RunIncomingPeer(ctx context.Context, params IncomingPeerParams) { _, err = writeHandshake.WriteTo(c) if err != nil { zap.S().Error("failed to write handshake: ", err) - c.Close() + _ = c.Close() return } select { case <-ctx.Done(): - c.Close() + _ = c.Close() return default: } diff --git a/cmd/retransmitter/retransmit/network/outgoing_peer.go b/cmd/retransmitter/retransmit/network/outgoing_peer.go index d985a3239..85012c4d9 100644 --- a/cmd/retransmitter/retransmit/network/outgoing_peer.go +++ b/cmd/retransmitter/retransmit/network/outgoing_peer.go @@ -112,7 +112,7 @@ func (a *OutgoingPeer) connect(ctx context.Context, wavesNetwork string, remote select { case <-ctx.Done(): - c.Close() + _ = c.Close() return nil, nil, errors.Wrap(ctx.Err(), "OutgoingPeer.connect") default: } diff --git a/pkg/node/blocks_applier/blocks_applier.go b/pkg/node/blocks_applier/blocks_applier.go index 6ae411b8c..be999f767 100644 --- a/pkg/node/blocks_applier/blocks_applier.go +++ b/pkg/node/blocks_applier/blocks_applier.go @@ -21,6 +21,17 @@ type innerState interface { RollbackToHeight(height proto.Height) error } +func (a *innerBlocksApplier) exists(storage innerState, block *proto.Block) (bool, error) { + _, err := storage.Block(block.BlockID()) + if err == nil { + return true, nil + } + if state.IsNotFound(err) { + return false, nil + } + return false, err +} + func (a *innerBlocksApplier) apply(storage innerState, blocks []*proto.Block) (proto.Height, error) { if len(blocks) == 0 { return 0, errors.New("empty blocks") @@ -163,6 +174,10 @@ func NewBlocksApplier() *BlocksApplier { } } +func (a *BlocksApplier) BlockExists(state state.State, block *proto.Block) (bool, error) { + return a.inner.exists(state, block) +} + func (a *BlocksApplier) Apply(state state.State, blocks []*proto.Block) (proto.Height, error) { return a.inner.apply(state, blocks) } diff --git a/pkg/node/peer_manager/peer_manager.go b/pkg/node/peer_manager/peer_manager.go index 14af74891..b76fed81b 100644 --- a/pkg/node/peer_manager/peer_manager.go +++ b/pkg/node/peer_manager/peer_manager.go @@ -170,11 +170,11 @@ func (a *PeerManagerImpl) connectedCount() int { func (a *PeerManagerImpl) NewConnection(p peer.Peer) error { _, connected := a.Connected(p) if connected { - p.Close() + _ = p.Close() return errors.New("already connected") } if a.IsSuspended(p) { - p.Close() + _ = p.Close() return errors.New("peer is suspended") } if p.Handshake().Version.CmpMinor(a.version) >= 2 { @@ -184,7 +184,7 @@ func (a *PeerManagerImpl) NewConnection(p peer.Peer) error { p.Handshake().Version.String(), ) a.Suspend(p, err.Error()) - p.Close() + _ = p.Close() return err } @@ -239,7 +239,7 @@ func (a *PeerManagerImpl) PeerWithHighestScore() (peer.Peer, *big.Int, bool) { return nil, nil, false } - var peers []peerInfo + peers := make([]peerInfo, 0) for _, p := range a.active { peers = append(peers, p) } @@ -323,7 +323,7 @@ func (a *PeerManagerImpl) KnownPeers() ([]proto.TCPAddr, error) { func (a *PeerManagerImpl) Close() { a.mu.Lock() for _, v := range a.active { - v.peer.Close() + _ = v.peer.Close() } a.mu.Unlock() } diff --git a/pkg/node/state_fsm/fsm.go b/pkg/node/state_fsm/fsm.go index 4f178c29c..ccca39a1c 100644 --- a/pkg/node/state_fsm/fsm.go +++ b/pkg/node/state_fsm/fsm.go @@ -20,6 +20,7 @@ import ( type Async []Task type BlocksApplier interface { + BlockExists(state storage.State, block *proto.Block) (bool, error) Apply(state storage.State, block []*proto.Block) (proto.Height, error) ApplyMicro(state storage.State, block *proto.Block) (proto.Height, error) } diff --git a/pkg/node/state_fsm/fsm_ng.go b/pkg/node/state_fsm/fsm_ng.go index d22dd3433..ba5dd44a3 100644 --- a/pkg/node/state_fsm/fsm_ng.go +++ b/pkg/node/state_fsm/fsm_ng.go @@ -15,6 +15,7 @@ import ( type NGFsm struct { BaseInfo + blocksCache blockStatesCache } func (a *NGFsm) Transaction(p peer.Peer, t proto.Transaction) (FSM, Async, error) { @@ -46,7 +47,8 @@ func (a *NGFsm) Halt() (FSM, Async, error) { func NewNGFsm12(info BaseInfo) *NGFsm { return &NGFsm{ - BaseInfo: info, + BaseInfo: info, + blocksCache: blockStatesCache{blockStates: map[proto.BlockID]proto.Block{}}, } } @@ -68,9 +70,46 @@ func (a *NGFsm) Score(p peer.Peer, score *proto.Score) (FSM, Async, error) { return handleScore(a, a.BaseInfo, p, score) } +func (a *NGFsm) rollbackToStateFromCache(blockFromCache *proto.Block) error { + previousBlockID := blockFromCache.Parent + err := a.storage.RollbackTo(previousBlockID) + if err != nil { + return errors.Wrapf(err, "failed to rollback to height %d", blockFromCache.Height) + } + _, err = a.blocksApplier.Apply(a.storage, []*proto.Block{blockFromCache}) + if err != nil { + return err + } + return nil +} + func (a *NGFsm) Block(peer peer.Peer, block *proto.Block) (FSM, Async, error) { + ok, err := a.blocksApplier.BlockExists(a.storage, block) + if err != nil { + return a, nil, err + } + if ok { + return a, nil, proto.NewInfoMsg(errors.Errorf("Block '%s' already exists", block.BlockID().String())) + } + metrics.FSMKeyBlockReceived("ng", block, peer.Handshake().NodeName) - _, err := a.blocksApplier.Apply(a.storage, []*proto.Block{block}) + + top := a.storage.TopBlock() + if top.BlockID() != block.Parent { // does block refer to last block + zap.S().Debugf("Key-block '%s' has parent '%s' which is not the top block '%s'", + block.ID.String(), block.Parent.String(), top.ID.String()) + if blockFromCache, ok := a.blocksCache.Get(block.Parent); ok { + zap.S().Debugf("Re-applying block '%s' from cache", blockFromCache.ID.String()) + err := a.rollbackToStateFromCache(blockFromCache) + if err != nil { + return a, nil, err + } + } + } + a.blocksCache.Clear() + a.blocksCache.AddBlockState(block) + + _, err = a.blocksApplier.Apply(a.storage, []*proto.Block{block}) if err != nil { metrics.FSMKeyBlockDeclined("ng", block, err) return a, nil, err @@ -79,6 +118,7 @@ func (a *NGFsm) Block(peer peer.Peer, block *proto.Block) (FSM, Async, error) { a.Scheduler.Reschedule() a.actions.SendScore(a.storage) a.CleanUtx() + return NewNGFsm12(a.BaseInfo), nil, nil } @@ -90,15 +130,20 @@ func (a *NGFsm) MinedBlock(block *proto.Block, limits proto.MiningLimits, keyPai return err }) if err != nil { - zap.S().Info("NGFsm MinedBlock err ", err) + zap.S().Warnf("Failed to apply mined block '%s': %v", block.ID.String(), err) metrics.FSMKeyBlockDeclined("ng", block, err) return a, nil, err } metrics.FSMKeyBlockApplied("ng", block) + + a.blocksCache.Clear() + a.blocksCache.AddBlockState(block) + a.Reschedule() a.actions.SendBlock(block) a.actions.SendScore(a.storage) a.CleanUtx() + return NewNGFsm12(a.BaseInfo), Tasks(NewMineMicroTask(1*time.Second, block, limits, keyPair, vrf)), nil } @@ -109,16 +154,17 @@ func (a *NGFsm) BlockIDs(_ peer.Peer, _ []proto.BlockID) (FSM, Async, error) { // New microblock received from the network func (a *NGFsm) MicroBlock(p peer.Peer, micro *proto.MicroBlock) (FSM, Async, error) { metrics.FSMMicroBlockReceived("ng", micro, p.Handshake().NodeName) - id, err := a.checkAndAppendMicroblock(micro) // the TopBlock() is used here + block, err := a.checkAndAppendMicroblock(micro) // the TopBlock() is used here if err != nil { metrics.FSMMicroBlockDeclined("ng", micro, err) return a, nil, err } - a.MicroBlockCache.Add(id, micro) + a.MicroBlockCache.Add(block.BlockID(), micro) + a.blocksCache.AddBlockState(block) a.BaseInfo.Reschedule() // Notify all connected peers about new microblock, send them microblock inv network message - inv, ok := a.MicroBlockInvCache.Get(id) + inv, ok := a.MicroBlockInvCache.Get(block.BlockID()) if ok { invBts, err := inv.MarshalBinary() if err == nil { @@ -146,6 +192,7 @@ func (a *NGFsm) mineMicro(minedBlock *proto.Block, rest proto.MiningLimits, keyP if err != nil { return a, nil, errors.Wrap(err, "NGFsm.mineMicro") } + a.blocksCache.AddBlockState(block) metrics.FSMMicroBlockGenerated("ng", micro) err = a.storage.Map(func(s state.NonThreadSafeState) error { _, err := a.blocksApplier.ApplyMicro(s, block) @@ -168,6 +215,7 @@ func (a *NGFsm) mineMicro(minedBlock *proto.Block, rest proto.MiningLimits, keyP if err != nil { return a, nil, err } + a.MicroBlockCache.Add(block.BlockID(), micro) a.MicroBlockInvCache.Add(block.BlockID(), inv) // TODO wrap @@ -178,40 +226,41 @@ func (a *NGFsm) mineMicro(minedBlock *proto.Block, rest proto.MiningLimits, keyP }, ) }) + return a, Tasks(NewMineMicroTask(5*time.Second, block, rest, keyPair, vrf)), nil } // Check than microblock is appendable and append it -func (a *NGFsm) checkAndAppendMicroblock(micro *proto.MicroBlock) (proto.BlockID, error) { +func (a *NGFsm) checkAndAppendMicroblock(micro *proto.MicroBlock) (*proto.Block, error) { top := a.storage.TopBlock() // Get the last block if top.BlockID() != micro.Reference { // Microblock doesn't refer to last block err := errors.Errorf("microblock TBID '%s' refer to block ID '%s' but last block ID is '%s'", micro.TotalBlockID.String(), micro.Reference.String(), top.BlockID().String()) metrics.FSMMicroBlockDeclined("ng", micro, err) - return proto.BlockID{}, proto.NewInfoMsg(err) + return &proto.Block{}, proto.NewInfoMsg(err) } ok, err := micro.VerifySignature(a.scheme) if err != nil { - return proto.BlockID{}, err + return nil, err } if !ok { - return proto.BlockID{}, errors.Errorf("microblock '%s' has invalid signature", micro.TotalBlockID.String()) + return nil, errors.Errorf("microblock '%s' has invalid signature", micro.TotalBlockID.String()) } newTrs := top.Transactions.Join(micro.Transactions) newBlock, err := proto.CreateBlock(newTrs, top.Timestamp, top.Parent, top.GenPublicKey, top.NxtConsensus, top.Version, top.Features, top.RewardVote, a.scheme) if err != nil { - return proto.BlockID{}, err + return nil, err } newBlock.BlockSignature = micro.TotalResBlockSigField ok, err = newBlock.VerifySignature(a.scheme) if err != nil { - return proto.BlockID{}, err + return nil, err } if !ok { - return proto.BlockID{}, errors.New("incorrect signature for applied microblock") + return nil, errors.New("incorrect signature for applied microblock") } err = newBlock.GenerateBlockID(a.scheme) if err != nil { - return proto.BlockID{}, errors.Wrap(err, "NGFsm microBlockByID: failed generate block id") + return nil, errors.Wrap(err, "NGFsm microBlockByID: failed generate block id") } err = a.storage.Map(func(state state.State) error { _, err := a.blocksApplier.ApplyMicro(state, newBlock) @@ -219,10 +268,10 @@ func (a *NGFsm) checkAndAppendMicroblock(micro *proto.MicroBlock) (proto.BlockID }) if err != nil { metrics.FSMMicroBlockDeclined("ng", micro, err) - return proto.BlockID{}, errors.Wrap(err, "failed to apply created from micro block") + return nil, errors.Wrap(err, "failed to apply created from micro block") } metrics.FSMMicroBlockApplied("ng", micro) - return newBlock.BlockID(), nil + return newBlock, nil } func (a *NGFsm) MicroBlockInv(p peer.Peer, inv *proto.MicroBlockInv) (FSM, Async, error) { @@ -235,3 +284,25 @@ func (a *NGFsm) MicroBlockInv(p peer.Peer, inv *proto.MicroBlockInv) (FSM, Async func MinedBlockNgTransition(info BaseInfo, block *proto.Block, limits proto.MiningLimits, keyPair proto.KeyPair, vrf []byte) (FSM, Async, error) { return NewNGFsm12(info).MinedBlock(block, limits, keyPair, vrf) } + +type blockStatesCache struct { + blockStates map[proto.BlockID]proto.Block +} + +func (c *blockStatesCache) AddBlockState(block *proto.Block) { + c.blockStates[block.ID] = *block + zap.S().Debugf("Block '%s' added to cache, total blocks in cache: %d", block.ID.String(), len(c.blockStates)) +} + +func (c *blockStatesCache) Clear() { + c.blockStates = map[proto.BlockID]proto.Block{} + zap.S().Debug("Block cache is empty") +} + +func (c *blockStatesCache) Get(blockID proto.BlockID) (*proto.Block, bool) { + block, ok := c.blockStates[blockID] + if !ok { + return nil, false + } + return &block, true +} diff --git a/pkg/p2p/incoming/Incoming.go b/pkg/p2p/incoming/Incoming.go index 6dcb7255f..be864ff0c 100644 --- a/pkg/p2p/incoming/Incoming.go +++ b/pkg/p2p/incoming/Incoming.go @@ -42,13 +42,13 @@ func runIncomingPeer(ctx context.Context, cancel context.CancelFunc, params Inco _, err := readHandshake.ReadFrom(c) if err != nil { zap.S().Error("failed to read handshake: ", err) - c.Close() + _ = c.Close() return err } select { case <-ctx.Done(): - c.Close() + _ = c.Close() return errors.Wrap(ctx.Err(), "RunIncomingPeer") default: } @@ -65,13 +65,13 @@ func runIncomingPeer(ctx context.Context, cancel context.CancelFunc, params Inco _, err = writeHandshake.WriteTo(c) if err != nil { zap.S().Error("failed to write handshake: ", err) - c.Close() + _ = c.Close() return err } select { case <-ctx.Done(): - c.Close() + _ = c.Close() return errors.Wrap(ctx.Err(), "RunIncomingPeer") default: } diff --git a/pkg/proto/block_test.go b/pkg/proto/block_test.go index d755e5783..ee8856b50 100644 --- a/pkg/proto/block_test.go +++ b/pkg/proto/block_test.go @@ -83,9 +83,9 @@ func blockFromBinaryToBinary(t *testing.T, hexStr, jsonStr string) { var b Block err = b.UnmarshalBinary(decoded, MainNetScheme) assert.NoError(t, err, "UnmarshalBinary() for block failed") - bytes, err := BlockEncodeJson(&b) + bts, err := BlockEncodeJson(&b) assert.NoError(t, err, "json.Marshal() for block failed") - str := string(bytes) + str := string(bts) assert.Equalf(t, jsonStr, str, "block marshaled to wrong json:\nhave: %s\nwant: %s", str, jsonStr) bin, err := b.MarshalBinary() assert.NoError(t, err, "MarshalBinary() for block failed") @@ -96,9 +96,9 @@ func blockFromJSONToJSON(t *testing.T, jsonStr string) { var b Block err := json.Unmarshal([]byte(jsonStr), &b) assert.NoError(t, err, "json.Unmarshal() for block failed") - bytes, err := BlockEncodeJson(&b) + bts, err := BlockEncodeJson(&b) assert.NoError(t, err, "json.Marshal() for block failed") - str := string(bytes) + str := string(bts) assert.JSONEqf(t, jsonStr, str, "block marshaled to wrong json:\nhave: %s\nwant: %s", str, jsonStr) } @@ -130,9 +130,9 @@ func headerFromBinaryToBinary(t *testing.T, hexStr, jsonStr string) { var header BlockHeader err = header.UnmarshalHeaderFromBinary(decoded, MainNetScheme) assert.NoError(t, err, "UnmarshalHeaderFromBinary() failed") - bytes, err := json.Marshal(header) + bts, err := json.Marshal(header) assert.NoError(t, err, "json.Marshal() for header failed") - str := string(bytes) + str := string(bts) assert.Equalf(t, jsonStr, str, "header marshaled to wrong json:\nhave: %s\nwant: %s", str, jsonStr) bin, err := header.MarshalHeaderToBinary() assert.NoError(t, err, "MarshalHeaderToBinary() failed") @@ -143,9 +143,9 @@ func headerFromJSONToJSON(t *testing.T, jsonStr string) { var header BlockHeader err := json.Unmarshal([]byte(jsonStr), &header) assert.NoError(t, err, "json.Unmarshal() for header failed") - bytes, err := json.Marshal(header) + bts, err := json.Marshal(header) assert.NoError(t, err, "json.Marshal() for header failed") - str := string(bytes) + str := string(bts) assert.JSONEqf(t, jsonStr, str, "header marshaled to wrong json:\nhave: %s\nwant: %s", str, jsonStr) } diff --git a/pkg/services/services.go b/pkg/services/services.go index 11104a10f..928180488 100644 --- a/pkg/services/services.go +++ b/pkg/services/services.go @@ -10,6 +10,7 @@ import ( ) type BlocksApplier interface { + BlockExists(state state.State, block *proto.Block) (bool, error) Apply(state state.State, block []*proto.Block) (proto.Height, error) ApplyMicro(state state.State, block *proto.Block) (proto.Height, error) }