Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(p2p): catch rare nil Header #234

Merged
merged 2 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 91 additions & 61 deletions p2p/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package p2p
import (
"context"
"errors"
"sync"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -29,6 +30,10 @@ type Subscriber[H header.Header[H]] struct {
pubsub *pubsub.PubSub
topic *pubsub.Topic
msgID pubsub.MsgIdFunction

verifierMu sync.Mutex
verifierSema chan struct{} // closed when verifier is set
verifier func(context.Context, H) error
cristaloleg marked this conversation as resolved.
Show resolved Hide resolved
}

// WithSubscriberMetrics enables metrics collection for the Subscriber.
Expand Down Expand Up @@ -71,81 +76,50 @@ func NewSubscriber[H header.Header[H]](
pubsubTopicID: PubsubTopicID(params.networkID),
pubsub: ps,
msgID: msgID,
verifierSema: make(chan struct{}),
}, nil
}

// Start starts the Subscriber and joins the instance's topic. SetVerifier must
// be called separately to ensure a validator is mounted on the topic.
func (s *Subscriber[H]) Start(context.Context) (err error) {
log.Infow("joining topic", "topic ID", s.pubsubTopicID)
s.topic, err = s.pubsub.Join(s.pubsubTopicID, pubsub.WithTopicMessageIdFn(s.msgID))
return err
}
log.Debugf("joining topic", "topic ID", s.pubsubTopicID)
err = s.pubsub.RegisterTopicValidator(s.pubsubTopicID, s.verifyMessage)
if err != nil {
return err
}

// Stop closes the topic and unregisters its validator.
func (s *Subscriber[H]) Stop(context.Context) error {
regErr := s.pubsub.UnregisterTopicValidator(s.pubsubTopicID)
if regErr != nil {
// do not return this error as it is non-critical and usually
// means that a validator was not mounted.
log.Warnf("unregistering validator: %s", regErr)
topic, err := s.pubsub.Join(s.pubsubTopicID, pubsub.WithTopicMessageIdFn(s.msgID))
if err != nil {
return err
}

err := s.topic.Close()
return errors.Join(err, s.metrics.Close())
s.topic = topic
return err
}

// SetVerifier set given verification func as Header PubSub topic validator
// Does not punish peers if *header.VerifyError is given with Uncertain set to true.
func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error {
pval := func(ctx context.Context, p peer.ID, msg *pubsub.Message) (res pubsub.ValidationResult) {
defer func() {
err := recover()
if err != nil {
log.Errorf("PANIC while unmarshalling or verifying header: %s", err)
res = pubsub.ValidationReject
}
}()

hdr := header.New[H]()
err := hdr.UnmarshalBinary(msg.Data)
if err != nil {
log.Errorw("unmarshalling header",
"from", p.ShortString(),
"err", err)
s.metrics.reject(ctx)
return pubsub.ValidationReject
}
// ensure header validity
err = hdr.Validate()
if err != nil {
log.Errorw("invalid header",
"from", p.ShortString(),
"err", err)
s.metrics.reject(ctx)
return pubsub.ValidationReject
}

var verErr *header.VerifyError
err = val(ctx, hdr)
switch {
case errors.As(err, &verErr) && verErr.SoftFailure:
s.metrics.ignore(ctx)
return pubsub.ValidationIgnore
case err != nil:
s.metrics.reject(ctx)
return pubsub.ValidationReject
default:
}
// Stop closes the topic and unregisters its validator.
func (s *Subscriber[H]) Stop(context.Context) (err error) {
err = errors.Join(err, s.metrics.Close())
// we must close the topic first and then unregister the validator
// this ensures we never get a message after the validator is unregistered
err = errors.Join(err, s.topic.Close())
err = errors.Join(err, s.pubsub.UnregisterTopicValidator(s.pubsubTopicID))
return err
}

// keep the valid header in the msg so Subscriptions can access it without
// additional unmarshalling
msg.ValidatorData = hdr
s.metrics.accept(ctx, len(msg.Data))
return pubsub.ValidationAccept
// SetVerifier set given verification func as Header PubSub topic validator.
// Does not punish peers if *header.VerifyError is given with SoftFailure set to true.
func (s *Subscriber[H]) SetVerifier(verifier func(context.Context, H) error) error {
s.verifierMu.Lock()
defer s.verifierMu.Unlock()
if s.verifier != nil {
cristaloleg marked this conversation as resolved.
Show resolved Hide resolved
return errors.New("verifier already set")
}

return s.pubsub.RegisterTopicValidator(s.pubsubTopicID, pval)
s.verifier = verifier
close(s.verifierSema)
return nil
}

// Subscribe returns a new subscription to the Subscriber's
Expand All @@ -166,3 +140,59 @@ func (s *Subscriber[H]) Broadcast(ctx context.Context, header H, opts ...pubsub.
}
return s.topic.Publish(ctx, bin, opts...)
}

func (s *Subscriber[H]) verifyMessage(ctx context.Context, p peer.ID, msg *pubsub.Message) (res pubsub.ValidationResult) {
defer func() {
err := recover()
if err != nil {
log.Errorf("PANIC while unmarshalling or verifying header: %s", err)
res = pubsub.ValidationReject
}
}()

hdr := header.New[H]()
err := hdr.UnmarshalBinary(msg.Data)
if err != nil {
log.Errorw("unmarshalling header",
"from", p.ShortString(),
"err", err)
s.metrics.reject(ctx)
return pubsub.ValidationReject
}
// ensure header validity
err = hdr.Validate()
if err != nil {
log.Errorw("invalid header",
"from", p.ShortString(),
"err", err)
s.metrics.reject(ctx)
return pubsub.ValidationReject
}

// ensure we have a verifier set before verifying the message
select {
case <-s.verifierSema:
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
case <-ctx.Done():
log.Errorw("verifier was not set before incoming header verification", "from", p.ShortString())
s.metrics.ignore(ctx)
return pubsub.ValidationIgnore
}

var verErr *header.VerifyError
err = s.verifier(ctx, hdr)
switch {
case errors.As(err, &verErr) && verErr.SoftFailure:
s.metrics.ignore(ctx)
return pubsub.ValidationIgnore
case err != nil:
s.metrics.reject(ctx)
return pubsub.ValidationReject
default:
}

// keep the valid header in the msg so Subscriptions can access it without
// additional unmarshalling
msg.ValidatorData = hdr
s.metrics.accept(ctx, len(msg.Data))
return pubsub.ValidationAccept
}
13 changes: 8 additions & 5 deletions p2p/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func TestSubscriber(t *testing.T) {
require.NoError(t, err)
err = p2pSub1.Start(context.Background())
require.NoError(t, err)
err = p2pSub1.SetVerifier(func(context.Context, *headertest.DummyHeader) error {
return nil
})
require.NoError(t, err)

// get mock host and create new gossipsub on it
pubsub2, err := pubsub.NewGossipSub(ctx, net.Hosts()[1],
Expand All @@ -45,6 +49,10 @@ func TestSubscriber(t *testing.T) {
require.NoError(t, err)
err = p2pSub2.Start(context.Background())
require.NoError(t, err)
err = p2pSub2.SetVerifier(func(context.Context, *headertest.DummyHeader) error {
return nil
})
require.NoError(t, err)

sub0, err := net.Hosts()[0].EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{})
require.NoError(t, err)
Expand All @@ -68,11 +76,6 @@ func TestSubscriber(t *testing.T) {
_, err = p2pSub2.Subscribe()
require.NoError(t, err)

err = p2pSub1.SetVerifier(func(context.Context, *headertest.DummyHeader) error {
return nil
})
require.NoError(t, err)

subscription, err := p2pSub1.Subscribe()
require.NoError(t, err)

Expand Down
Loading