diff --git a/consumer/transaction.go b/consumer/transaction.go index 202389e9..5d2ab8b6 100644 --- a/consumer/transaction.go +++ b/consumer/transaction.go @@ -1,6 +1,7 @@ package consumer import ( + "bytes" "fmt" "io" "runtime/trace" @@ -43,9 +44,24 @@ func runTransactions(s *shard, cp pc.Checkpoint, readCh <-chan EnvelopeOrError, <-realTimer.C // Timer starts as idle. // Begin by acknowledging (or re-acknowledging) messages published as part - // of the most-recent recovered transaction checkpoint. - if err := txnAcknowledge(s, &prev, cp); err != nil { - return fmt.Errorf("txnAcknowledge(recovered Checkpoint): %w", err) + // of the most-recent recovered transaction checkpoint. This is a relaxed + // form of txnAcknowledge(), as we allow recovered intents to name journals + // that don't actually exist (presumably they were deleted in the meantime). + for journal, ack := range cp.AckIntents { + var op = client.NewAsyncOperation() + prev.acks[op] = struct{}{} + + go func(req pb.AppendRequest, r *bytes.Reader) (_err error) { + defer func() { op.Resolve(_err) }() + + if _, err := client.Append(s.ctx, s.ajc, req, r); err == client.ErrJournalNotFound { + log.WithField("journal", req.Journal). + Warn("discarding recovered ACK-intent of non-existent journal") + } else if err != nil { + return fmt.Errorf("writing recovered ACK intent for journal %s: %w", req.Journal, err) + } + return nil + }(pb.AppendRequest{Journal: journal}, bytes.NewReader(ack)) } for { diff --git a/consumer/transaction_test.go b/consumer/transaction_test.go index be5e2556..13823abb 100644 --- a/consumer/transaction_test.go +++ b/consumer/transaction_test.go @@ -651,6 +651,8 @@ func TestRunTxnsACKsRecoveredCheckpoint(t *testing.T) { var cp = playAndComplete(t, shard) cp.AckIntents = map[pb.Journal][]byte{ echoOut.Name: []byte(`{"Key": "recovered fixture"}` + "\n"), + // Recovered ACK intents may included journals which do not exist. + "does/not/exist": []byte(`{"Key": "discarded fixture"}` + "\n"), } // Use a read channel fixture which immediately closes.