From b3bd8e97cfbcd43e08343264c4eda6eb0b3a98af Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Thu, 14 Dec 2023 21:59:40 +0000 Subject: [PATCH] AppendService should retry on JOURNAL_NOT_FOUND Partially roll back a prior commit which began erroring on JOURNAL_NOT_FOUND. There are allowable data races where a created journal is assigned to a primary broker, whose Etcd watch is currently still unaware that the journal even exists. --- broker/client/append_service.go | 8 +++----- broker/client/append_service_test.go | 18 ------------------ 2 files changed, 3 insertions(+), 23 deletions(-) diff --git a/broker/client/append_service.go b/broker/client/append_service.go index 66b03799..642691bf 100644 --- a/broker/client/append_service.go +++ b/broker/client/append_service.go @@ -19,8 +19,7 @@ import ( // pipelined and batched to amortize the cost of broker Append RPCs. It may // also simplify implementations for clients who would prefer to simply have // writes block until successfully committed, as opposed to handling errors -// and retries themselves. AppendService will retry all errors except for -// context cancellation and ErrJournalNotFound. +// and retries themselves. // // For each journal, AppendService manages an ordered list of AsyncAppends, // each having buffered content to be appended. The list is dispatched in @@ -279,8 +278,7 @@ func (p *AsyncAppend) Writer() *bufio.Writer { return p.fb.buf } // also roll back any writes queued by the caller, aborting the append // transaction. Require is valid for use only until Release is called. // Require returns itself, allowing uses like: -// -// Require(maybeErrors()).Release() +// Require(maybeErrors()).Release() func (p *AsyncAppend) Require(err error) *AsyncAppend { if err != nil && p.op.err == nil { p.op.err = err @@ -394,7 +392,7 @@ var serveAppends = func(s *AppendService, aa *AsyncAppend, err error) { err2 = aa.app.Close() } - if err2 == context.Canceled || err2 == context.DeadlineExceeded || err2 == ErrJournalNotFound { + if err2 == context.Canceled || err2 == context.DeadlineExceeded { err = err2 return nil // Break retry loop. } else if err2 != nil { diff --git a/broker/client/append_service_test.go b/broker/client/append_service_test.go index 3b1271cb..e61a24c2 100644 --- a/broker/client/append_service_test.go +++ b/broker/client/append_service_test.go @@ -66,17 +66,6 @@ func (s *AppendServiceSuite) TestBasicAppendWithRetry(c *gc.C) { aaNext.mu.Unlock() c.Check(as.PendingExcept(""), gc.HasLen, 0) - - // Case: broker responds with a terminal JOURNAL_NOT_FOUND error. - aa = as.StartAppend(pb.AppendRequest{Journal: "a/journal"}, nil) - _, _ = aa.Writer().WriteString("hello, world") - c.Assert(aa.Release(), gc.IsNil) - - readHelloWorldAppendRequest(c, broker) // RPC is dispatched to broker. - broker.AppendRespCh <- buildNotFoundFixture(broker) - - c.Check(aa.Err(), gc.Equals, ErrJournalNotFound) - c.Check(aa.Response().Status, gc.DeepEquals, pb.Status_JOURNAL_NOT_FOUND) } func (s *AppendServiceSuite) TestAppendPipelineWithAborts(c *gc.C) { @@ -622,11 +611,4 @@ func buildAppendResponseFixture(ep interface{ Endpoint() pb.Endpoint }) pb.Appen } } -func buildNotFoundFixture(ep interface{ Endpoint() pb.Endpoint }) pb.AppendResponse { - return pb.AppendResponse{ - Status: pb.Status_JOURNAL_NOT_FOUND, - Header: *buildHeaderFixture(ep), - } -} - var _ = gc.Suite(&AppendServiceSuite{})