Skip to content

Commit

Permalink
#316: Add applicator reset
Browse files Browse the repository at this point in the history
  • Loading branch information
mvandeberg committed Dec 11, 2024
1 parent f3b5003 commit e6c9081
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 0 deletions.
81 changes: 81 additions & 0 deletions internal/p2p/applicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package p2p

import (
"context"
"encoding/hex"
"errors"
"slices"
"sync/atomic"
"time"

log "github.com/koinos/koinos-log-golang/v2"
"github.com/koinos/koinos-p2p/internal/options"
"github.com/koinos/koinos-p2p/internal/p2perrors"
"github.com/koinos/koinos-p2p/internal/rpc"
Expand Down Expand Up @@ -60,6 +63,7 @@ type Applicator struct {
blockBroadcastChan chan *broadcast.BlockAccepted
blockStatusChan chan *blockApplicationStatus
tryBlockChan chan *tryBlockApplicationRequest
resetChan chan *void

applyBlockChan chan *applyBlockRequest
applyTransactionChan chan *applyTransactionRequest
Expand Down Expand Up @@ -89,6 +93,7 @@ func NewApplicator(ctx context.Context, rpc rpc.LocalRPC, cache *TransactionCach
blockBroadcastChan: make(chan *broadcast.BlockAccepted, 10),
blockStatusChan: make(chan *blockApplicationStatus, 10),
tryBlockChan: make(chan *tryBlockApplicationRequest, 10),
resetChan: make(chan *void, 1),
applyBlockChan: make(chan *applyBlockRequest, 10),
applyTransactionChan: make(chan *applyTransactionRequest, 10),
opts: opts,
Expand Down Expand Up @@ -129,6 +134,13 @@ func (a *Applicator) ApplyTransaction(ctx context.Context, trx *protocol.Transac
}
}

func (a *Applicator) Reset(ctx context.Context) {
select {
case a.resetChan <- &void{}:
case <-ctx.Done():
}
}

// HandleForkHeads handles a fork heads broadcast
func (a *Applicator) HandleForkHeads(forkHeads *broadcast.ForkHeads) {
a.forkHeadsChan <- forkHeads
Expand Down Expand Up @@ -367,6 +379,73 @@ func (a *Applicator) handleBlockBroadcast(ctx context.Context, blockAccept *broa
a.checkBlockChildren(ctx, string(blockAccept.Block.Id))
}

func (a *Applicator) handleReset(ctx context.Context) {
log.Warn("Resetting applicator...")

// Block IDs
log.Warn("Block IDs:")

for id, entry := range a.blocksById {
log.Warnf(" - 0x%s", hex.EncodeToString([]byte(id)))

for _, errChan := range entry.errChans {
defer close(errChan)
select {
case errChan <- p2perrors.ErrBlockApplicationTimeout:
case <-ctx.Done():
}
}
}

a.blocksById = make(map[string]*blockEntry)

// Blocks by Previous
log.Warn("Blocks by previous:")

for prevId, blocks := range a.blocksByPrevious {
log.Warnf(" - 0x%s:", hex.EncodeToString([]byte(prevId)))

for id, _ := range blocks {
log.Warnf(" - 0x%s", hex.EncodeToString([]byte(id)))
}
}

a.blocksByPrevious = make(map[string]map[string]void)

// Blocks by Height
log.Warnf("Blocks by height:")
heights := make([]uint64, 0)

for height := range a.blocksByHeight {
heights = append(heights, height)
}

slices.Sort(heights)

for _, height := range heights {
log.Warnf(" - %v:", height)

for id, _ := range a.blocksByHeight[height] {
log.Warnf(" - 0x%s", hex.EncodeToString([]byte(id)))
}
}

a.blocksByHeight = make(map[uint64]map[string]void)

// Pending Blocks
log.Warnf("Pending blocks:")

for id, _ := range a.pendingBlocks {
log.Warnf(" - 0x%s", hex.EncodeToString([]byte(id)))
}

a.pendingBlocks = make(map[string]void)

a.forkWatchdog = NewForkWatchdog()
// Transaction cache is sent to use during construction so we cannot reset it
// HighestBlock and LIB should not be reset because these values are still accurate from Chain
}

func (a *Applicator) handleApplyBlock(request *applyBlockRequest) {
var err error
if request.block.Header.Height <= atomic.LoadUint64(&a.lib) {
Expand Down Expand Up @@ -403,6 +482,8 @@ func (a *Applicator) Start(ctx context.Context) {
a.handleBlockBroadcast(ctx, blockBroadcast)
case tryApplyBlock := <-a.tryBlockChan:
a.handleTryBlockApplication(ctx, tryApplyBlock)
case <-a.resetChan:
a.handleReset(ctx)

case <-ctx.Done():
return
Expand Down
163 changes: 163 additions & 0 deletions internal/p2p/applicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,169 @@ func TestApplicator(t *testing.T) {
<-testChan4
}

func TestReset(t *testing.T) {
ctx := context.Background()
rpc := applicatorTestRPC{
blocksToFail: make(map[string]void),
unlinkableBlocks: make(map[string]void),
head: []byte{0x00},
invalidNonceTrxs: make(map[string]void),
}

applicator, err := NewApplicator(ctx, &rpc, NewTransactionCache(time.Minute), *options.NewApplicatorOptions())
if err != nil {
t.Error(err)
}

applicator.Start(ctx)

// Blocks will be applied using the following topology
//
// 0 -- 1 -- 2a -- 3a
// 2b \ 3b
//
// These blocks will be applied in the following order
//
// 2b (does not connect)
// 3a (block is in the future, passes)
// 3b (block is in the future, fails)
// 2a (success immediately, triggers 3)
// 1 (succeeds immediately)

block1 := &protocol.Block{
Id: []byte{0x01},
Header: &protocol.BlockHeader{
Height: 1,
Previous: []byte{0},
},
}

block2a := &protocol.Block{
Id: []byte{0x02, 0x0a},
Header: &protocol.BlockHeader{
Height: 2,
Previous: block1.Id,
},
}

block2b := &protocol.Block{
Id: []byte{0x02, 0x0b},
Header: &protocol.BlockHeader{
Height: 2,
Previous: []byte{0x00, 0x00},
},
}

block3a := &protocol.Block{
Id: []byte{0x03, 0x0a},
Header: &protocol.BlockHeader{
Height: 3,
Previous: block2a.Id,
},
}

block3b := &protocol.Block{
Id: []byte{0x03, 0x0b},
Header: &protocol.BlockHeader{
Height: 3,
Previous: block2a.Id,
},
}

testChan1 := make(chan struct{})
testChan2 := make(chan struct{})
testChan3 := make(chan struct{})
testChan4 := make(chan struct{})
testChan5 := make(chan struct{})

rpc.unlinkableBlocks[string(block1.Id)] = void{}

go func() {
err := applicator.ApplyBlock(ctx, block1)
if err != p2perrors.ErrBlockApplicationTimeout {
t.Errorf("block1 - ErrBlockApplicationTimeout expected but not returned, was: %v", err)
}
testChan1 <- struct{}{}
}()

go func() {
err := applicator.ApplyBlock(ctx, block2a)
if err != p2perrors.ErrBlockApplicationTimeout {
t.Errorf("block2a - ErrBlockApplicationTimeout expected but not returned, was: %v", err)
}
testChan2 <- struct{}{}
}()

go func() {
err := applicator.ApplyBlock(ctx, block2b)
if err != p2perrors.ErrBlockApplicationTimeout {
t.Errorf("block2b - ErrBlockApplicationTimeout expected but not returned, was: %v", err)
}
testChan3 <- struct{}{}
}()

go func() {
err := applicator.ApplyBlock(ctx, block3a)
if err != p2perrors.ErrBlockApplicationTimeout {
t.Errorf("block3a - ErrBlockApplicationTimeout expected but not returned, was: %v", err)
}
testChan4 <- struct{}{}
}()

go func() {
err := applicator.ApplyBlock(ctx, block3b)
if err != p2perrors.ErrBlockApplicationTimeout {
t.Errorf("block3b - ErrBlockApplicationTimeout expected but not returned, was: %v", err)
}
testChan5 <- struct{}{}
}()

time.Sleep(time.Second)

// Lengths are sufficient for this test as other tests check correctness by behavior
if len(applicator.blocksById) != 5 {
t.Errorf("Expected 5 blocks by ID, was: %v", len(applicator.blocksById))
}

if len(applicator.blocksByPrevious) != 4 {
t.Errorf("Expected 3 entries for blocks by previous, was: %v", len(applicator.blocksByPrevious))
}

if len(applicator.blocksByHeight) != 3 {
t.Errorf("Expected 3 entries for blocks by height, was: %v", len(applicator.blocksByHeight))
}

if len(applicator.pendingBlocks) != 0 {
t.Errorf("Expected 0 entries in pending blocks, was: %v", len(applicator.pendingBlocks))
}

applicator.Reset(ctx)

time.Sleep(time.Second)

if len(applicator.blocksById) != 0 {
t.Errorf("Expected 0 blocks by ID, was: %v", len(applicator.blocksById))
}

if len(applicator.blocksByPrevious) != 0 {
t.Errorf("Expected 0 entries for blocks by previous, was: %v", len(applicator.blocksByPrevious))
}

if len(applicator.blocksByHeight) != 0 {
t.Errorf("Expected 0 entries for blocks by height, was: %v", len(applicator.blocksByHeight))
}

if len(applicator.pendingBlocks) != 0 {
t.Errorf("Expected 0 entries in pending blocks, was: %v", len(applicator.pendingBlocks))
}

<-testChan1
<-testChan2
<-testChan3
<-testChan4
<-testChan5
}

func TestApplicatorLimits(t *testing.T) {
ctx := context.Background()
rpc := applicatorTestRPC{
Expand Down

0 comments on commit e6c9081

Please sign in to comment.