Skip to content

Commit

Permalink
Added block state cache (#434)
Browse files Browse the repository at this point in the history
* Rescheduling on new microblock was removed

* Metrics for blocks and microblocks replaced with new one

* Metrics improved.

* Issue with microblock invalid signature in case of protobuf transactions fixed. Handling of protobuf transactions broadcast messages added.

* RIDE function takeString works with string as UTF-16 string

* Proofs validation added to protobuf and json unmarshaling

* Dependencies updated

* Added block state cache

* Changed the condition of rollback

* Changed position of condition of rollback

* Adding block states receiving microblocks

* Less logging especially of context cancelation. Naming fixed.

* Peer error handling on context cancelation is reverted, but logging is updated.

* Block cache cleaning moved to key block application functions.

* Functions to check that block already exists added to FSM's block applier.

* Cache size debug messages added. Fixed block ID to get from cache.

Co-authored-by: Alexey Kiselev <[email protected]>
  • Loading branch information
esuwu and alexeykiselev authored Mar 12, 2021
1 parent 0db805c commit e2fb049
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 39 deletions.
11 changes: 6 additions & 5 deletions cmd/retransmitter/retransmit/network/incoming_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package network
import (
"context"
"fmt"
"github.com/wavesplatform/gowaves/pkg/p2p/peer"
"net"
"time"

"github.com/wavesplatform/gowaves/pkg/p2p/peer"

"github.com/wavesplatform/gowaves/pkg/libs/bytespool"
"github.com/wavesplatform/gowaves/pkg/p2p/conn"
"github.com/wavesplatform/gowaves/pkg/proto"
Expand Down Expand Up @@ -37,13 +38,13 @@ func RunIncomingPeer(ctx context.Context, params IncomingPeerParams) {
_, err := readHandshake.ReadFrom(c)
if err != nil {
zap.S().Error("failed to read handshake: ", err)
c.Close()
_ = c.Close()
return
}

select {
case <-ctx.Done():
c.Close()
_ = c.Close()
return
default:
}
Expand All @@ -64,13 +65,13 @@ func RunIncomingPeer(ctx context.Context, params IncomingPeerParams) {
_, err = writeHandshake.WriteTo(c)
if err != nil {
zap.S().Error("failed to write handshake: ", err)
c.Close()
_ = c.Close()
return
}

select {
case <-ctx.Done():
c.Close()
_ = c.Close()
return
default:
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/retransmitter/retransmit/network/outgoing_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (a *OutgoingPeer) connect(ctx context.Context, wavesNetwork string, remote

select {
case <-ctx.Done():
c.Close()
_ = c.Close()
return nil, nil, errors.Wrap(ctx.Err(), "OutgoingPeer.connect")
default:
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/node/blocks_applier/blocks_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ type innerState interface {
RollbackToHeight(height proto.Height) error
}

func (a *innerBlocksApplier) exists(storage innerState, block *proto.Block) (bool, error) {
_, err := storage.Block(block.BlockID())
if err == nil {
return true, nil
}
if state.IsNotFound(err) {
return false, nil
}
return false, err
}

func (a *innerBlocksApplier) apply(storage innerState, blocks []*proto.Block) (proto.Height, error) {
if len(blocks) == 0 {
return 0, errors.New("empty blocks")
Expand Down Expand Up @@ -163,6 +174,10 @@ func NewBlocksApplier() *BlocksApplier {
}
}

func (a *BlocksApplier) BlockExists(state state.State, block *proto.Block) (bool, error) {
return a.inner.exists(state, block)
}

func (a *BlocksApplier) Apply(state state.State, blocks []*proto.Block) (proto.Height, error) {
return a.inner.apply(state, blocks)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/node/peer_manager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,11 @@ func (a *PeerManagerImpl) connectedCount() int {
func (a *PeerManagerImpl) NewConnection(p peer.Peer) error {
_, connected := a.Connected(p)
if connected {
p.Close()
_ = p.Close()
return errors.New("already connected")
}
if a.IsSuspended(p) {
p.Close()
_ = p.Close()
return errors.New("peer is suspended")
}
if p.Handshake().Version.CmpMinor(a.version) >= 2 {
Expand All @@ -184,7 +184,7 @@ func (a *PeerManagerImpl) NewConnection(p peer.Peer) error {
p.Handshake().Version.String(),
)
a.Suspend(p, err.Error())
p.Close()
_ = p.Close()
return err
}

Expand Down Expand Up @@ -239,7 +239,7 @@ func (a *PeerManagerImpl) PeerWithHighestScore() (peer.Peer, *big.Int, bool) {
return nil, nil, false
}

var peers []peerInfo
peers := make([]peerInfo, 0)
for _, p := range a.active {
peers = append(peers, p)
}
Expand Down Expand Up @@ -323,7 +323,7 @@ func (a *PeerManagerImpl) KnownPeers() ([]proto.TCPAddr, error) {
func (a *PeerManagerImpl) Close() {
a.mu.Lock()
for _, v := range a.active {
v.peer.Close()
_ = v.peer.Close()
}
a.mu.Unlock()
}
Expand Down
1 change: 1 addition & 0 deletions pkg/node/state_fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
type Async []Task

type BlocksApplier interface {
BlockExists(state storage.State, block *proto.Block) (bool, error)
Apply(state storage.State, block []*proto.Block) (proto.Height, error)
ApplyMicro(state storage.State, block *proto.Block) (proto.Height, error)
}
Expand Down
103 changes: 87 additions & 16 deletions pkg/node/state_fsm/fsm_ng.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

type NGFsm struct {
BaseInfo
blocksCache blockStatesCache
}

func (a *NGFsm) Transaction(p peer.Peer, t proto.Transaction) (FSM, Async, error) {
Expand Down Expand Up @@ -46,7 +47,8 @@ func (a *NGFsm) Halt() (FSM, Async, error) {

func NewNGFsm12(info BaseInfo) *NGFsm {
return &NGFsm{
BaseInfo: info,
BaseInfo: info,
blocksCache: blockStatesCache{blockStates: map[proto.BlockID]proto.Block{}},
}
}

Expand All @@ -68,9 +70,46 @@ func (a *NGFsm) Score(p peer.Peer, score *proto.Score) (FSM, Async, error) {
return handleScore(a, a.BaseInfo, p, score)
}

func (a *NGFsm) rollbackToStateFromCache(blockFromCache *proto.Block) error {
previousBlockID := blockFromCache.Parent
err := a.storage.RollbackTo(previousBlockID)
if err != nil {
return errors.Wrapf(err, "failed to rollback to height %d", blockFromCache.Height)
}
_, err = a.blocksApplier.Apply(a.storage, []*proto.Block{blockFromCache})
if err != nil {
return err
}
return nil
}

func (a *NGFsm) Block(peer peer.Peer, block *proto.Block) (FSM, Async, error) {
ok, err := a.blocksApplier.BlockExists(a.storage, block)
if err != nil {
return a, nil, err
}
if ok {
return a, nil, proto.NewInfoMsg(errors.Errorf("Block '%s' already exists", block.BlockID().String()))
}

metrics.FSMKeyBlockReceived("ng", block, peer.Handshake().NodeName)
_, err := a.blocksApplier.Apply(a.storage, []*proto.Block{block})

top := a.storage.TopBlock()
if top.BlockID() != block.Parent { // does block refer to last block
zap.S().Debugf("Key-block '%s' has parent '%s' which is not the top block '%s'",
block.ID.String(), block.Parent.String(), top.ID.String())
if blockFromCache, ok := a.blocksCache.Get(block.Parent); ok {
zap.S().Debugf("Re-applying block '%s' from cache", blockFromCache.ID.String())
err := a.rollbackToStateFromCache(blockFromCache)
if err != nil {
return a, nil, err
}
}
}
a.blocksCache.Clear()
a.blocksCache.AddBlockState(block)

_, err = a.blocksApplier.Apply(a.storage, []*proto.Block{block})
if err != nil {
metrics.FSMKeyBlockDeclined("ng", block, err)
return a, nil, err
Expand All @@ -79,6 +118,7 @@ func (a *NGFsm) Block(peer peer.Peer, block *proto.Block) (FSM, Async, error) {
a.Scheduler.Reschedule()
a.actions.SendScore(a.storage)
a.CleanUtx()

return NewNGFsm12(a.BaseInfo), nil, nil
}

Expand All @@ -90,15 +130,20 @@ func (a *NGFsm) MinedBlock(block *proto.Block, limits proto.MiningLimits, keyPai
return err
})
if err != nil {
zap.S().Info("NGFsm MinedBlock err ", err)
zap.S().Warnf("Failed to apply mined block '%s': %v", block.ID.String(), err)
metrics.FSMKeyBlockDeclined("ng", block, err)
return a, nil, err
}
metrics.FSMKeyBlockApplied("ng", block)

a.blocksCache.Clear()
a.blocksCache.AddBlockState(block)

a.Reschedule()
a.actions.SendBlock(block)
a.actions.SendScore(a.storage)
a.CleanUtx()

return NewNGFsm12(a.BaseInfo), Tasks(NewMineMicroTask(1*time.Second, block, limits, keyPair, vrf)), nil
}

Expand All @@ -109,16 +154,17 @@ func (a *NGFsm) BlockIDs(_ peer.Peer, _ []proto.BlockID) (FSM, Async, error) {
// New microblock received from the network
func (a *NGFsm) MicroBlock(p peer.Peer, micro *proto.MicroBlock) (FSM, Async, error) {
metrics.FSMMicroBlockReceived("ng", micro, p.Handshake().NodeName)
id, err := a.checkAndAppendMicroblock(micro) // the TopBlock() is used here
block, err := a.checkAndAppendMicroblock(micro) // the TopBlock() is used here
if err != nil {
metrics.FSMMicroBlockDeclined("ng", micro, err)
return a, nil, err
}
a.MicroBlockCache.Add(id, micro)
a.MicroBlockCache.Add(block.BlockID(), micro)
a.blocksCache.AddBlockState(block)
a.BaseInfo.Reschedule()

// Notify all connected peers about new microblock, send them microblock inv network message
inv, ok := a.MicroBlockInvCache.Get(id)
inv, ok := a.MicroBlockInvCache.Get(block.BlockID())
if ok {
invBts, err := inv.MarshalBinary()
if err == nil {
Expand Down Expand Up @@ -146,6 +192,7 @@ func (a *NGFsm) mineMicro(minedBlock *proto.Block, rest proto.MiningLimits, keyP
if err != nil {
return a, nil, errors.Wrap(err, "NGFsm.mineMicro")
}
a.blocksCache.AddBlockState(block)
metrics.FSMMicroBlockGenerated("ng", micro)
err = a.storage.Map(func(s state.NonThreadSafeState) error {
_, err := a.blocksApplier.ApplyMicro(s, block)
Expand All @@ -168,6 +215,7 @@ func (a *NGFsm) mineMicro(minedBlock *proto.Block, rest proto.MiningLimits, keyP
if err != nil {
return a, nil, err
}

a.MicroBlockCache.Add(block.BlockID(), micro)
a.MicroBlockInvCache.Add(block.BlockID(), inv)
// TODO wrap
Expand All @@ -178,51 +226,52 @@ func (a *NGFsm) mineMicro(minedBlock *proto.Block, rest proto.MiningLimits, keyP
},
)
})

return a, Tasks(NewMineMicroTask(5*time.Second, block, rest, keyPair, vrf)), nil
}

// Check than microblock is appendable and append it
func (a *NGFsm) checkAndAppendMicroblock(micro *proto.MicroBlock) (proto.BlockID, error) {
func (a *NGFsm) checkAndAppendMicroblock(micro *proto.MicroBlock) (*proto.Block, error) {
top := a.storage.TopBlock() // Get the last block
if top.BlockID() != micro.Reference { // Microblock doesn't refer to last block
err := errors.Errorf("microblock TBID '%s' refer to block ID '%s' but last block ID is '%s'", micro.TotalBlockID.String(), micro.Reference.String(), top.BlockID().String())
metrics.FSMMicroBlockDeclined("ng", micro, err)
return proto.BlockID{}, proto.NewInfoMsg(err)
return &proto.Block{}, proto.NewInfoMsg(err)
}
ok, err := micro.VerifySignature(a.scheme)
if err != nil {
return proto.BlockID{}, err
return nil, err
}
if !ok {
return proto.BlockID{}, errors.Errorf("microblock '%s' has invalid signature", micro.TotalBlockID.String())
return nil, errors.Errorf("microblock '%s' has invalid signature", micro.TotalBlockID.String())
}
newTrs := top.Transactions.Join(micro.Transactions)
newBlock, err := proto.CreateBlock(newTrs, top.Timestamp, top.Parent, top.GenPublicKey, top.NxtConsensus, top.Version, top.Features, top.RewardVote, a.scheme)
if err != nil {
return proto.BlockID{}, err
return nil, err
}
newBlock.BlockSignature = micro.TotalResBlockSigField
ok, err = newBlock.VerifySignature(a.scheme)
if err != nil {
return proto.BlockID{}, err
return nil, err
}
if !ok {
return proto.BlockID{}, errors.New("incorrect signature for applied microblock")
return nil, errors.New("incorrect signature for applied microblock")
}
err = newBlock.GenerateBlockID(a.scheme)
if err != nil {
return proto.BlockID{}, errors.Wrap(err, "NGFsm microBlockByID: failed generate block id")
return nil, errors.Wrap(err, "NGFsm microBlockByID: failed generate block id")
}
err = a.storage.Map(func(state state.State) error {
_, err := a.blocksApplier.ApplyMicro(state, newBlock)
return err
})
if err != nil {
metrics.FSMMicroBlockDeclined("ng", micro, err)
return proto.BlockID{}, errors.Wrap(err, "failed to apply created from micro block")
return nil, errors.Wrap(err, "failed to apply created from micro block")
}
metrics.FSMMicroBlockApplied("ng", micro)
return newBlock.BlockID(), nil
return newBlock, nil
}

func (a *NGFsm) MicroBlockInv(p peer.Peer, inv *proto.MicroBlockInv) (FSM, Async, error) {
Expand All @@ -235,3 +284,25 @@ func (a *NGFsm) MicroBlockInv(p peer.Peer, inv *proto.MicroBlockInv) (FSM, Async
func MinedBlockNgTransition(info BaseInfo, block *proto.Block, limits proto.MiningLimits, keyPair proto.KeyPair, vrf []byte) (FSM, Async, error) {
return NewNGFsm12(info).MinedBlock(block, limits, keyPair, vrf)
}

type blockStatesCache struct {
blockStates map[proto.BlockID]proto.Block
}

func (c *blockStatesCache) AddBlockState(block *proto.Block) {
c.blockStates[block.ID] = *block
zap.S().Debugf("Block '%s' added to cache, total blocks in cache: %d", block.ID.String(), len(c.blockStates))
}

func (c *blockStatesCache) Clear() {
c.blockStates = map[proto.BlockID]proto.Block{}
zap.S().Debug("Block cache is empty")
}

func (c *blockStatesCache) Get(blockID proto.BlockID) (*proto.Block, bool) {
block, ok := c.blockStates[blockID]
if !ok {
return nil, false
}
return &block, true
}
8 changes: 4 additions & 4 deletions pkg/p2p/incoming/Incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ func runIncomingPeer(ctx context.Context, cancel context.CancelFunc, params Inco
_, err := readHandshake.ReadFrom(c)
if err != nil {
zap.S().Error("failed to read handshake: ", err)
c.Close()
_ = c.Close()
return err
}

select {
case <-ctx.Done():
c.Close()
_ = c.Close()
return errors.Wrap(ctx.Err(), "RunIncomingPeer")
default:
}
Expand All @@ -65,13 +65,13 @@ func runIncomingPeer(ctx context.Context, cancel context.CancelFunc, params Inco
_, err = writeHandshake.WriteTo(c)
if err != nil {
zap.S().Error("failed to write handshake: ", err)
c.Close()
_ = c.Close()
return err
}

select {
case <-ctx.Done():
c.Close()
_ = c.Close()
return errors.Wrap(ctx.Err(), "RunIncomingPeer")
default:
}
Expand Down
Loading

0 comments on commit e2fb049

Please sign in to comment.