Skip to content

Commit

Permalink
consumer: when writing recovered ACKs, allow for missing journals
Browse files Browse the repository at this point in the history
It's not uncommon for recovered ACKs to contain journals which have
since been deleted. When this happens, log a warning and otherwise
discard the ACK intent.

Do this only for recovered intents: writen ACKs of the current session
must still exist.
  • Loading branch information
jgraettinger committed Dec 13, 2023
1 parent 0f225e4 commit 7985348
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 3 deletions.
22 changes: 19 additions & 3 deletions consumer/transaction.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package consumer

import (
"bytes"
"fmt"
"io"
"runtime/trace"
Expand Down Expand Up @@ -43,9 +44,24 @@ func runTransactions(s *shard, cp pc.Checkpoint, readCh <-chan EnvelopeOrError,
<-realTimer.C // Timer starts as idle.

// Begin by acknowledging (or re-acknowledging) messages published as part
// of the most-recent recovered transaction checkpoint.
if err := txnAcknowledge(s, &prev, cp); err != nil {
return fmt.Errorf("txnAcknowledge(recovered Checkpoint): %w", err)
// of the most-recent recovered transaction checkpoint. This is a relaxed
// form of txnAcknowledge(), as we allow recovered intents to name journals
// that don't actually exist (presumably they were deleted in the meantime).
for journal, ack := range cp.AckIntents {
var op = client.NewAsyncOperation()
prev.acks[op] = struct{}{}

go func(req pb.AppendRequest, r *bytes.Reader) (_err error) {
defer func() { op.Resolve(_err) }()

if _, err := client.Append(s.ctx, s.ajc, req, r); err == client.ErrJournalNotFound {
log.WithField("journal", req.Journal).
Warn("discarding recovered ACK-intent of non-existent journal")
} else if err != nil {
return fmt.Errorf("writing recovered ACK intent for journal %s: %w", req.Journal, err)
}
return nil
}(pb.AppendRequest{Journal: journal}, bytes.NewReader(ack))
}

for {
Expand Down
2 changes: 2 additions & 0 deletions consumer/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,8 @@ func TestRunTxnsACKsRecoveredCheckpoint(t *testing.T) {
var cp = playAndComplete(t, shard)
cp.AckIntents = map[pb.Journal][]byte{
echoOut.Name: []byte(`{"Key": "recovered fixture"}` + "\n"),
// Recovered ACK intents may included journals which do not exist.
"does/not/exist": []byte(`{"Key": "discarded fixture"}` + "\n"),
}

// Use a read channel fixture which immediately closes.
Expand Down

0 comments on commit 7985348

Please sign in to comment.