Skip to content

Commit

Permalink
Merge pull request #247 from koinos/no-duplicate-requests
Browse files Browse the repository at this point in the history
Track pending block applications to prevent duplicate request applications
  • Loading branch information
sgerbino authored Oct 10, 2022
2 parents b239133 + 55c54e8 commit 5d9f4e7
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions internal/p2p/applicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Applicator struct {
blocksById map[string]*blockEntry
blocksByPrevious map[string]map[string]void
blocksByHeight map[uint64]map[string]void
pendingBlocks map[string]void

newBlockChan chan *blockEntry
forkHeadsChan chan *broadcast.ForkHeads
Expand All @@ -72,6 +73,7 @@ func NewApplicator(ctx context.Context, rpc rpc.LocalRPC, opts options.Applicato
blocksById: make(map[string]*blockEntry),
blocksByPrevious: make(map[string]map[string]void),
blocksByHeight: make(map[uint64]map[string]void),
pendingBlocks: make(map[string]void),
newBlockChan: make(chan *blockEntry, 10),
forkHeadsChan: make(chan *broadcast.ForkHeads, 10),
blockBroadcastChan: make(chan *broadcast.BlockAccepted, 10),
Expand Down Expand Up @@ -187,6 +189,13 @@ func (b *Applicator) removeEntry(ctx context.Context, id string, err error) {
}

func (b *Applicator) requestApplication(ctx context.Context, block *protocol.Block) {
// If there is already a pending application of the block, return
if _, ok := b.pendingBlocks[string(block.Id)]; ok {
return
}

b.pendingBlocks[string(block.Id)] = void{}

go func() {
errChan := make(chan error, 1)

Expand Down Expand Up @@ -226,6 +235,14 @@ func (b *Applicator) requestApplication(ctx context.Context, block *protocol.Blo
}()
}

func (b *Applicator) handleBlockStatus(ctx context.Context, status *blockApplicationStatus) {
delete(b.pendingBlocks, string(status.block.Id))

if status.err == nil || !errors.Is(status.err, p2perrors.ErrUnknownPreviousBlock) {
b.removeEntry(ctx, string(status.block.Id), status.err)
}
}

func (b *Applicator) handleNewBlock(ctx context.Context, entry *blockEntry) {
var err error

Expand Down Expand Up @@ -311,9 +328,7 @@ func (b *Applicator) Start(ctx context.Context) {
for {
select {
case status := <-b.blockStatusChan:
if status.err == nil || !errors.Is(status.err, p2perrors.ErrUnknownPreviousBlock) {
b.removeEntry(ctx, string(status.block.Id), status.err)
}
b.handleBlockStatus(ctx, status)
case entry := <-b.newBlockChan:
b.handleNewBlock(ctx, entry)
case forkHeads := <-b.forkHeadsChan:
Expand Down

0 comments on commit 5d9f4e7

Please sign in to comment.