Skip to content

Commit

Permalink
Merge pull request #310 from koinos/305-gossip-timeout
Browse files Browse the repository at this point in the history
Gossip Timeout
  • Loading branch information
sgerbino authored Dec 4, 2024
2 parents ceae8a7 + 310788d commit f3b5003
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 5 deletions.
3 changes: 2 additions & 1 deletion internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ func NewKoinosP2PNode(ctx context.Context, listenAddr string, localRPC rpc.Local
node.PeerErrorChan,
node.Host.ID(),
node,
node.Applicator)
node.Applicator,
config.GossipOptions)

node.ConnectionManager = p2p.NewConnectionManager(
node.Host,
Expand Down
2 changes: 2 additions & 0 deletions internal/options/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type Config struct {
GossipToggleOptions GossipToggleOptions
ApplicatorOptions ApplicatorOptions
ConnectionManagerOptions ConnectionManagerOptions
GossipOptions GossipOptions
}

// NewConfig creates a new Config
Expand All @@ -19,6 +20,7 @@ func NewConfig() *Config {
GossipToggleOptions: *NewGossipToggleOptions(),
ApplicatorOptions: *NewApplicatorOptions(),
ConnectionManagerOptions: *NewConnectionManagerOptions(),
GossipOptions: *NewGossipOptions(),
}
return &config
}
22 changes: 22 additions & 0 deletions internal/options/gossip_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package options

import "time"

const (
blockTimeoutDefault = 3 * time.Second
transactionTimeoutDefault = 1 * time.Second
)

// GossipOptions are options for Gossip
type GossipOptions struct {
BlockTimeout time.Duration
TransactionTimeout time.Duration
}

// New GossipOptions returns default initialized GossipOptions
func NewGossipOptions() *GossipOptions {
return &GossipOptions{
BlockTimeout: blockTimeoutDefault,
TransactionTimeout: transactionTimeoutDefault,
}
}
5 changes: 5 additions & 0 deletions internal/p2p/applicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,11 @@ func (a *Applicator) handleBlockStatus(ctx context.Context, status *blockApplica
if status.err != nil && (errors.Is(status.err, p2perrors.ErrBlockState)) {
a.tryBlockApplication(ctx, status.block, false)
} else if status.err == nil || !errors.Is(status.err, p2perrors.ErrUnknownPreviousBlock) {

if status.err == nil {
a.checkBlockChildren(ctx, string(status.block.Id))
}

a.removeBlockEntry(ctx, string(status.block.Id), status.err)
}
}
Expand Down
17 changes: 13 additions & 4 deletions internal/p2p/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

log "github.com/koinos/koinos-log-golang/v2"
"github.com/koinos/koinos-p2p/internal/options"
"github.com/koinos/koinos-p2p/internal/p2perrors"
"github.com/koinos/koinos-p2p/internal/rpc"
"github.com/koinos/koinos-proto-golang/v2/koinos/canonical"
Expand Down Expand Up @@ -159,6 +160,7 @@ type KoinosGossip struct {
myPeerID peer.ID
libProvider LastIrreversibleBlockProvider
applicator *Applicator
opts options.GossipOptions
gossipCancel *context.CancelFunc
recentBlocks uint32
recentTrxs uint32
Expand All @@ -172,7 +174,8 @@ func NewKoinosGossip(
peerErrorChan chan<- PeerError,
id peer.ID,
libProvider LastIrreversibleBlockProvider,
applicator *Applicator) *KoinosGossip {
applicator *Applicator,
opts options.GossipOptions) *KoinosGossip {

block := NewGossipManager(ps, peerErrorChan, BlockTopicName)
transaction := NewGossipManager(ps, peerErrorChan, TransactionTopicName)
Expand All @@ -185,6 +188,7 @@ func NewKoinosGossip(
myPeerID: id,
libProvider: libProvider,
applicator: applicator,
opts: opts,
}

return &kg
Expand Down Expand Up @@ -361,8 +365,10 @@ func (kg *KoinosGossip) applyBlock(ctx context.Context, pid peer.ID, msg *pubsub

log.Debugf("Pushing gossip block - %s from peer %v", util.BlockString(block), msg.ReceivedFrom)

// TODO: Fix nil argument
if err := kg.applicator.ApplyBlock(ctx, block); err != nil {
applyBlockContext, cancelApplyBlock := context.WithTimeout(ctx, kg.opts.BlockTimeout)
defer cancelApplyBlock()

if err := kg.applicator.ApplyBlock(applyBlockContext, block); err != nil {
return fmt.Errorf("%w - %s, %v", p2perrors.ErrBlockApplication, util.BlockString(block), err.Error())
}

Expand Down Expand Up @@ -426,7 +432,10 @@ func (kg *KoinosGossip) applyTransaction(ctx context.Context, pid peer.ID, msg *
return fmt.Errorf("%w, gossiped transaction missing id", p2perrors.ErrDeserialization)
}

if err := kg.applicator.ApplyTransaction(ctx, transaction); err != nil {
applyTransactionContext, cancelApplyTransaction := context.WithTimeout(ctx, kg.opts.TransactionTimeout)
defer cancelApplyTransaction()

if err := kg.applicator.ApplyTransaction(applyTransactionContext, transaction); err != nil {
if errors.Is(err, p2perrors.ErrInvalidNonce) {
return fmt.Errorf("%w - %s, %v", p2perrors.ErrInvalidNonce, util.TransactionString(transaction), err.Error())
}
Expand Down

0 comments on commit f3b5003

Please sign in to comment.