Skip to content

Commit

Permalink
[FIXED] Respect consumer's starting seq, even if in the future
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen committed Dec 23, 2024
1 parent 87e32fe commit 31b599f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
12 changes: 9 additions & 3 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5366,12 +5366,18 @@ func (o *consumer) selectStartingSeqNo() {
o.sseq = o.cfg.OptStartSeq
}

if state.FirstSeq == 0 {
if state.FirstSeq == 0 && (o.cfg.Direct || o.cfg.OptStartSeq == 0) {
// If the stream is empty, deliver only new.
// But only if mirroring/sourcing, or start seq is unset, otherwise need to respect provided value.
o.sseq = 1
} else if o.sseq > state.LastSeq && (o.cfg.Direct || o.cfg.OptStartSeq == 0) {
// If selected sequence is in the future, clamp back down.
// But only if mirroring/sourcing, or start seq is unset, otherwise need to respect provided value.
o.sseq = state.LastSeq + 1
} else if o.sseq < state.FirstSeq {
// If the first sequence is further ahead than the starting sequence,
// there are no messages there anymore, so move the sequence up.
o.sseq = state.FirstSeq
} else if o.sseq > state.LastSeq {
o.sseq = state.LastSeq + 1
}
}

Expand Down
45 changes: 45 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6984,6 +6984,51 @@ func TestJetStreamClusterDontSnapshotTooOften(t *testing.T) {
}
}

func TestJetStreamClusterRespectConsumerStartSeq(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Create replicated stream.
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

// We could have published messages into the stream that have not yet been applied on the follower.
// If we create a consumer with a starting sequence in the future, we must respect it.
ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
DeliverPolicy: nats.DeliverByStartSequencePolicy,
OptStartSeq: 20,
})
require_NoError(t, err)
require_Equal(t, ci.Delivered.Stream, 19)

// Same thing if the first sequence is not 0.
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 10})
require_NoError(t, err)

ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
DeliverPolicy: nats.DeliverByStartSequencePolicy,
OptStartSeq: 20,
})
require_NoError(t, err)
require_Equal(t, ci.Delivered.Stream, 19)

// Only if we're requested to start at a sequence that's not available anymore
// can we safely move it up. That data is gone already, so can't do anything else.
ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
DeliverPolicy: nats.DeliverByStartSequencePolicy,
OptStartSeq: 5,
})
require_NoError(t, err)
require_Equal(t, ci.Delivered.Stream, 9)
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down

0 comments on commit 31b599f

Please sign in to comment.