Skip to content

Commit

Permalink
merge master containing conflicting changes to azure_fragment
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelschiff committed Jan 23, 2024
1 parent 3ba90ac commit 79e7fc6
Show file tree
Hide file tree
Showing 13 changed files with 416 additions and 311 deletions.
4 changes: 3 additions & 1 deletion broker/client/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func (a *Appender) Close() (err error) {
switch a.Response.Status {
case pb.Status_OK:
// Pass.
case pb.Status_JOURNAL_NOT_FOUND:
err = ErrJournalNotFound
case pb.Status_NOT_JOURNAL_PRIMARY_BROKER:
err = ErrNotJournalPrimaryBroker
case pb.Status_WRONG_APPEND_OFFSET:
Expand Down Expand Up @@ -178,7 +180,7 @@ func Append(ctx context.Context, rjc pb.RoutedJournalClient, req pb.AppendReques
return a.Response, nil
} else if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
// Fallthrough to retry
} else if err == ErrNotJournalPrimaryBroker {
} else if err == ErrNotJournalPrimaryBroker || err == ErrInsufficientJournalBrokers {
// Fallthrough.
} else {
return a.Response, err
Expand Down
17 changes: 14 additions & 3 deletions broker/client/appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"strings"
"time"

gc "gopkg.in/check.v1"
pb "go.gazette.dev/core/broker/protocol"
"go.gazette.dev/core/broker/teststub"
gc "gopkg.in/check.v1"
)

type AppenderSuite struct{}
Expand Down Expand Up @@ -125,6 +125,17 @@ func (s *AppenderSuite) TestBrokerCommitError(c *gc.C) {
errRe: `validating broker response: Commit.Journal: invalid length .*`,
cachedRoute: 0,
},
// Case: known error status (journal not found).
{
finish: func() {
broker.AppendRespCh <- pb.AppendResponse{
Status: pb.Status_JOURNAL_NOT_FOUND,
Header: *buildHeaderFixture(broker),
}
},
errVal: ErrJournalNotFound,
cachedRoute: 1,
},
// Case: known error status (not primary broker).
{
finish: func() {
Expand Down Expand Up @@ -211,7 +222,7 @@ func (s *AppenderSuite) TestAppendCases(c *gc.C) {
{status: pb.Status_NOT_JOURNAL_PRIMARY_BROKER},
{status: pb.Status_OK},
// Case 2: Unexpected status is surfaced.
{status: pb.Status_INSUFFICIENT_JOURNAL_BROKERS},
{status: pb.Status_WRONG_APPEND_OFFSET},
// Case 3: As are errors.
{err: errors.New("an error")},
}
Expand Down Expand Up @@ -262,7 +273,7 @@ func (s *AppenderSuite) TestAppendCases(c *gc.C) {

// Case 2: Unexpected status is surfaced.
_, err = Append(ctx, rjc, pb.AppendRequest{Journal: "a/journal"}, con, tent)
c.Check(err, gc.ErrorMatches, "INSUFFICIENT_JOURNAL_BROKERS")
c.Check(err, gc.ErrorMatches, "WRONG_APPEND_OFFSET")

// Case 3: As are errors.
_, err = Append(ctx, rjc, pb.AppendRequest{Journal: "a/journal"}, con, tent)
Expand Down
Loading

0 comments on commit 79e7fc6

Please sign in to comment.