Skip to content

Commit

Permalink
AppendService should retry on JOURNAL_NOT_FOUND
Browse files Browse the repository at this point in the history
Partially roll back a prior commit which began erroring on
JOURNAL_NOT_FOUND. There are allowable data races where a created
journal is assigned to a primary broker, whose Etcd watch is currently
still unaware that the journal even exists.
  • Loading branch information
jgraettinger committed Dec 14, 2023
1 parent 7985348 commit b3bd8e9
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 23 deletions.
8 changes: 3 additions & 5 deletions broker/client/append_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import (
// pipelined and batched to amortize the cost of broker Append RPCs. It may
// also simplify implementations for clients who would prefer to simply have
// writes block until successfully committed, as opposed to handling errors
// and retries themselves. AppendService will retry all errors except for
// context cancellation and ErrJournalNotFound.
// and retries themselves.
//
// For each journal, AppendService manages an ordered list of AsyncAppends,
// each having buffered content to be appended. The list is dispatched in
Expand Down Expand Up @@ -279,8 +278,7 @@ func (p *AsyncAppend) Writer() *bufio.Writer { return p.fb.buf }
// also roll back any writes queued by the caller, aborting the append
// transaction. Require is valid for use only until Release is called.
// Require returns itself, allowing uses like:
//
// Require(maybeErrors()).Release()
// Require(maybeErrors()).Release()
func (p *AsyncAppend) Require(err error) *AsyncAppend {
if err != nil && p.op.err == nil {
p.op.err = err
Expand Down Expand Up @@ -394,7 +392,7 @@ var serveAppends = func(s *AppendService, aa *AsyncAppend, err error) {
err2 = aa.app.Close()
}

if err2 == context.Canceled || err2 == context.DeadlineExceeded || err2 == ErrJournalNotFound {
if err2 == context.Canceled || err2 == context.DeadlineExceeded {
err = err2
return nil // Break retry loop.
} else if err2 != nil {
Expand Down
18 changes: 0 additions & 18 deletions broker/client/append_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,6 @@ func (s *AppendServiceSuite) TestBasicAppendWithRetry(c *gc.C) {
aaNext.mu.Unlock()

c.Check(as.PendingExcept(""), gc.HasLen, 0)

// Case: broker responds with a terminal JOURNAL_NOT_FOUND error.
aa = as.StartAppend(pb.AppendRequest{Journal: "a/journal"}, nil)
_, _ = aa.Writer().WriteString("hello, world")
c.Assert(aa.Release(), gc.IsNil)

readHelloWorldAppendRequest(c, broker) // RPC is dispatched to broker.
broker.AppendRespCh <- buildNotFoundFixture(broker)

c.Check(aa.Err(), gc.Equals, ErrJournalNotFound)
c.Check(aa.Response().Status, gc.DeepEquals, pb.Status_JOURNAL_NOT_FOUND)
}

func (s *AppendServiceSuite) TestAppendPipelineWithAborts(c *gc.C) {
Expand Down Expand Up @@ -622,11 +611,4 @@ func buildAppendResponseFixture(ep interface{ Endpoint() pb.Endpoint }) pb.Appen
}
}

func buildNotFoundFixture(ep interface{ Endpoint() pb.Endpoint }) pb.AppendResponse {
return pb.AppendResponse{
Status: pb.Status_JOURNAL_NOT_FOUND,
Header: *buildHeaderFixture(ep),
}
}

var _ = gc.Suite(&AppendServiceSuite{})

0 comments on commit b3bd8e9

Please sign in to comment.