From a9b90db28ed1fe1debefdb70ccae0be80ca07f72 Mon Sep 17 00:00:00 2001 From: AJ Roetker Date: Mon, 28 Jun 2021 11:58:20 -0700 Subject: [PATCH] Add retries to deregister and panic for mailbox (#168) * Add retries to deregister and panic for mailbox * Make sure we panic when the actor fails to close its mailbox. Co-authored-by: Matt Braymer-Hayes --- mailbox.go | 34 +++++++++++++++++++++++++--------- server.go | 10 +++++++--- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/mailbox.go b/mailbox.go index 0eb19e2..cad01a7 100644 --- a/mailbox.go +++ b/mailbox.go @@ -2,9 +2,19 @@ package grid import ( "context" + "errors" "fmt" "strings" "sync" + "time" + + "github.com/lytics/retry" +) + +var ( + // errDeregisteredFailed is used internally when we can't deregister a key from etcd. + // It's used by Server.startActorC() to ensure we panic. + errDeregisteredFailed = errors.New("grid: deregistered failed") ) // Mailbox for receiving messages. @@ -15,7 +25,7 @@ type Mailbox struct { C <-chan Request c chan Request closed bool - cleanup func() error + cleanup func() } // Close the mailbox. @@ -27,8 +37,9 @@ func (box *Mailbox) Close() error { box.closed = true close(box.c) + box.cleanup() // Run server provided clean up. - return box.cleanup() + return nil } // Name of mailbox, without namespace. @@ -136,7 +147,7 @@ func newMailbox(s *Server, name, nsName string, size int) (*Mailbox, error) { } boxC := make(chan Request, size) - cleanup := func() error { + cleanup := func() { s.mumb.Lock() defer s.mumb.Unlock() @@ -145,12 +156,17 @@ func newMailbox(s *Server, name, nsName string, size int) (*Mailbox, error) { delete(s.mailboxes, nsName) // Deregister the name. - timeout, cancel := context.WithTimeout(context.Background(), s.cfg.Timeout) - defer cancel() - err := s.registry.Deregister(timeout, nsName) - - // Return any error from the deregister call. - return err + var err error + retry.X(3, 3*time.Second, + func() bool { + timeout, cancel := context.WithTimeout(context.Background(), s.cfg.Timeout) + err = s.registry.Deregister(timeout, nsName) + cancel() + return err != nil + }) + if err != nil { + panic(fmt.Errorf("%w: unable to deregister mailbox: %v, error: %v", errDeregisteredFailed, nsName, err)) + } } box := &Mailbox{ name: name, diff --git a/server.go b/server.go index 4442978..970417d 100644 --- a/server.go +++ b/server.go @@ -444,10 +444,15 @@ func (s *Server) startActorC(c context.Context, start *ActorStart) error { go func() { defer s.deregisterActor(nsName) defer func() { - if err := recover(); err != nil { + if r := recover(); r != nil { + if err, ok := r.(error); ok && errors.Is(err, errDeregisteredFailed) { + // NOTE (2021-06) (mh): We need to panic here + panic(err) + } + stack := niceStack(debug.Stack()) s.logf("panic in namespace: %v, actor: %v, recovered from: %v, stack trace: %v", - s.cfg.Namespace, start.Name, err, stack) + s.cfg.Namespace, start.Name, r, stack) } }() actor.Act(actorCtx) @@ -466,7 +471,6 @@ func (s *Server) deregisterActor(nsName string) { return err != nil }) if err != nil { - s.logf("failed to deregister actor: %v, error: %v", nsName, err) panic(fmt.Sprintf("unable to deregister actor: %v, error: %v", nsName, err)) } }