From 6ae4ae41c1cea8017e6b01ece3f706b467a7ad7d Mon Sep 17 00:00:00 2001 From: Wondertan Date: Thu, 12 Dec 2024 19:05:30 +0100 Subject: [PATCH] ensure correct order of Stop --- p2p/subscriber.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/p2p/subscriber.go b/p2p/subscriber.go index 52aa86c..7ecfde1 100644 --- a/p2p/subscriber.go +++ b/p2p/subscriber.go @@ -83,27 +83,29 @@ func NewSubscriber[H header.Header[H]]( // Start starts the Subscriber and joins the instance's topic. SetVerifier must // be called separately to ensure a validator is mounted on the topic. func (s *Subscriber[H]) Start(context.Context) (err error) { - log.Infow("joining topic", "topic ID", s.pubsubTopicID) + log.Debugf("joining topic", "topic ID", s.pubsubTopicID) err = s.pubsub.RegisterTopicValidator(s.pubsubTopicID, s.verifyMessage) if err != nil { return err } - s.topic, err = s.pubsub.Join(s.pubsubTopicID, pubsub.WithTopicMessageIdFn(s.msgID)) + topic, err := s.pubsub.Join(s.pubsubTopicID, pubsub.WithTopicMessageIdFn(s.msgID)) + if err != nil { + return err + } + + s.topic = topic return err } // Stop closes the topic and unregisters its validator. -func (s *Subscriber[H]) Stop(context.Context) error { - regErr := s.pubsub.UnregisterTopicValidator(s.pubsubTopicID) - if regErr != nil { - // do not return this error as it is non-critical and usually - // means that a validator was not mounted. - log.Warnf("unregistering validator: %s", regErr) - } - - err := s.topic.Close() - return errors.Join(err, s.metrics.Close()) +func (s *Subscriber[H]) Stop(context.Context) (err error) { + err = errors.Join(err, s.metrics.Close()) + // we must close the topic first and then unregister the validator + // this ensures we never get a message after the validator is unregistered + err = errors.Join(err, s.topic.Close()) + err = errors.Join(err, s.pubsub.UnregisterTopicValidator(s.pubsubTopicID)) + return err } // SetVerifier set given verification func as Header PubSub topic validator.