Skip to content

Commit

Permalink
Add retries to deregister and panic for mailbox (#168)
Browse files Browse the repository at this point in the history
* Add retries to deregister and panic for mailbox

* Make sure we panic when the actor fails to close its mailbox.

Co-authored-by: Matt Braymer-Hayes <[email protected]>
  • Loading branch information
ajroetker and mattayes authored Jun 28, 2021
1 parent 7909a4c commit a9b90db
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 12 deletions.
34 changes: 25 additions & 9 deletions mailbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,19 @@ package grid

import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/lytics/retry"
)

var (
// errDeregisteredFailed is used internally when we can't deregister a key from etcd.
// It's used by Server.startActorC() to ensure we panic.
errDeregisteredFailed = errors.New("grid: deregistered failed")
)

// Mailbox for receiving messages.
Expand All @@ -15,7 +25,7 @@ type Mailbox struct {
C <-chan Request
c chan Request
closed bool
cleanup func() error
cleanup func()
}

// Close the mailbox.
Expand All @@ -27,8 +37,9 @@ func (box *Mailbox) Close() error {
box.closed = true
close(box.c)

box.cleanup()
// Run server provided clean up.
return box.cleanup()
return nil
}

// Name of mailbox, without namespace.
Expand Down Expand Up @@ -136,7 +147,7 @@ func newMailbox(s *Server, name, nsName string, size int) (*Mailbox, error) {
}

boxC := make(chan Request, size)
cleanup := func() error {
cleanup := func() {
s.mumb.Lock()
defer s.mumb.Unlock()

Expand All @@ -145,12 +156,17 @@ func newMailbox(s *Server, name, nsName string, size int) (*Mailbox, error) {
delete(s.mailboxes, nsName)

// Deregister the name.
timeout, cancel := context.WithTimeout(context.Background(), s.cfg.Timeout)
defer cancel()
err := s.registry.Deregister(timeout, nsName)

// Return any error from the deregister call.
return err
var err error
retry.X(3, 3*time.Second,
func() bool {
timeout, cancel := context.WithTimeout(context.Background(), s.cfg.Timeout)
err = s.registry.Deregister(timeout, nsName)
cancel()
return err != nil
})
if err != nil {
panic(fmt.Errorf("%w: unable to deregister mailbox: %v, error: %v", errDeregisteredFailed, nsName, err))
}
}
box := &Mailbox{
name: name,
Expand Down
10 changes: 7 additions & 3 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,15 @@ func (s *Server) startActorC(c context.Context, start *ActorStart) error {
go func() {
defer s.deregisterActor(nsName)
defer func() {
if err := recover(); err != nil {
if r := recover(); r != nil {
if err, ok := r.(error); ok && errors.Is(err, errDeregisteredFailed) {
// NOTE (2021-06) (mh): We need to panic here
panic(err)
}

stack := niceStack(debug.Stack())
s.logf("panic in namespace: %v, actor: %v, recovered from: %v, stack trace: %v",
s.cfg.Namespace, start.Name, err, stack)
s.cfg.Namespace, start.Name, r, stack)
}
}()
actor.Act(actorCtx)
Expand All @@ -466,7 +471,6 @@ func (s *Server) deregisterActor(nsName string) {
return err != nil
})
if err != nil {
s.logf("failed to deregister actor: %v, error: %v", nsName, err)
panic(fmt.Sprintf("unable to deregister actor: %v, error: %v", nsName, err))
}
}
Expand Down

0 comments on commit a9b90db

Please sign in to comment.