Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] JetStream Publish Option ExpectLastSequencePerSubjectForSubject #1606

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions jetstream/jetstream_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,17 @@ func WithExpectLastSequencePerSubject(seq uint64) PublishOpt {
}
}

// WithExpectLastSequencePerSubjectForSubject sets the subject to use when
// ExpectLastSequencePerSubject is set. If the last message for the specified
// subject has a different sequence number server will reject the message and
// publish will fail.
func WithExpectLastSequencePerSubjectForSubject(subj string) PublishOpt {
return func(opts *pubOpts) error {
opts.lastSubjectSeqSubject = subj
return nil
}
}

// WithExpectLastMsgID sets the expected message ID the last message on a stream
// should have. If the last message has a different message ID server will
// reject the message and publish will fail.
Expand Down
7 changes: 7 additions & 0 deletions jetstream/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ const (
// [WithExpectLastSequencePerSubject] option.
ExpectedLastSubjSeqHeader = "Nats-Expected-Last-Subject-Sequence"

// ExpectedLastSubjSeqSubjHeader contains the subject to be used when
// the subject expected last sequence number is set in ExpectedLastSubjSeqHeader.
//
// This can be set when publishing messages using
// [WithExpectLastSequencePerSubjectForSubject] option.
ExpectedLastSubjSeqSubjHeader = "Nats-Expected-Last-Subject-Sequence-Subject"

// ExpectedLastMsgIDHeader contains the expected last message ID on the
// subject and can be used to apply optimistic concurrency control at
// stream level. Server will reject the message if it is not the case.
Expand Down
17 changes: 12 additions & 5 deletions jetstream/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ type (
PublishOpt func(*pubOpts) error

pubOpts struct {
id string
lastMsgID string // Expected last msgId
stream string // Expected stream name
lastSeq *uint64 // Expected last sequence
lastSubjectSeq *uint64 // Expected last sequence per subject
id string
lastMsgID string // Expected last msgId
stream string // Expected stream name
lastSeq *uint64 // Expected last sequence
lastSubjectSeq *uint64 // Expected last sequence per subject
lastSubjectSeqSubject string // Expected last sequence per subject for this subject

// Publish retries for NoResponders err.
retryWait time.Duration // Retry wait between attempts
Expand Down Expand Up @@ -190,6 +191,9 @@ func (js *jetStream) PublishMsg(ctx context.Context, m *nats.Msg, opts ...Publis
if o.lastSubjectSeq != nil {
m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10))
}
if o.lastSubjectSeqSubject != "" {
m.Header.Set(ExpectedLastSubjSeqSubjHeader, o.lastSubjectSeqSubject)
}

var resp *nats.Msg
var err error
Expand Down Expand Up @@ -273,6 +277,9 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut
if o.lastSubjectSeq != nil {
m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10))
}
if o.lastSubjectSeqSubject != "" {
m.Header.Set(ExpectedLastSubjSeqSubjHeader, o.lastSubjectSeqSubject)
}

paf := o.pafRetry
if paf == nil && m.Reply != "" {
Expand Down
202 changes: 201 additions & 1 deletion jetstream/test/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,105 @@ func TestPublishMsg(t *testing.T) {
},
},
},
{
name: "expect last sequence per subject for subject",
msgs: []publishConfig{
{
msg: &nats.Msg{
Data: []byte("msg 1"),
Subject: "FOO.1",
},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 1,
},
},
{
msg: &nats.Msg{
Data: []byte("msg 2"),
Subject: "FOO.2",
},
opts: []jetstream.PublishOpt{},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 2,
},
},
{
msg: &nats.Msg{
Data: []byte("msg 3"),
Subject: "FOO.1",
},
opts: []jetstream.PublishOpt{
jetstream.WithExpectLastSequencePerSubject(1),
jetstream.WithExpectLastSequencePerSubjectForSubject("FOO.1"),
},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 3,
},
expectedHeaders: nats.Header{
"Nats-Expected-Last-Subject-Sequence": []string{"1"},
"Nats-Expected-Last-Subject-Sequence-Subject": []string{"FOO.1"},
},
},
},
},
{
name: "last sequence per subject for different subject",
msgs: []publishConfig{
{
msg: &nats.Msg{
Data: []byte("msg 1"),
Subject: "FOO.1.bar",
},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 1,
},
},
{
msg: &nats.Msg{
Data: []byte("msg 2"),
Subject: "FOO.1.baz",
},
opts: []jetstream.PublishOpt{},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 2,
},
},
{
msg: &nats.Msg{
Data: []byte("msg 3"),
Subject: "FOO.2.baz",
},
opts: []jetstream.PublishOpt{},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 3,
},
},
{
msg: &nats.Msg{
Data: []byte("msg 4"),
Subject: "FOO.1.bax",
},
opts: []jetstream.PublishOpt{
jetstream.WithExpectLastSequencePerSubject(2),
jetstream.WithExpectLastSequencePerSubjectForSubject("FOO.1.*"),
},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 4,
},
expectedHeaders: nats.Header{
"Nats-Expected-Last-Subject-Sequence": []string{"2"},
"Nats-Expected-Last-Subject-Sequence-Subject": []string{"FOO.1.*"},
},
},
},
},
{
name: "invalid last sequence per subject",
msgs: []publishConfig{
Expand Down Expand Up @@ -364,6 +463,107 @@ func TestPublishMsg(t *testing.T) {
},
},
},
{
name: "invalid last sequence per subject for subject",
msgs: []publishConfig{
{
msg: &nats.Msg{
Data: []byte("msg 1"),
Subject: "FOO.1",
},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 1,
},
},
{
msg: &nats.Msg{
Data: []byte("msg 2"),
Subject: "FOO.2",
},
opts: []jetstream.PublishOpt{},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 2,
},
},
{
msg: &nats.Msg{
Data: []byte("msg 3"),
Subject: "FOO.1",
},
opts: []jetstream.PublishOpt{
jetstream.WithExpectLastSequencePerSubject(123),
jetstream.WithExpectLastSequencePerSubjectForSubject("FOO.1"),
},
withError: func(t *testing.T, err error) {
var apiErr *jetstream.APIError
if ok := errors.As(err, &apiErr); !ok {
t.Fatalf("Expected API error; got: %v", err)
}
if apiErr.ErrorCode != 10071 {
t.Fatalf("Expected error code: 10071; got: %d", apiErr.ErrorCode)
}
},
},
},
},
{
name: "invalid last sequence per subject for different subject",
msgs: []publishConfig{
{
msg: &nats.Msg{
Data: []byte("msg 1"),
Subject: "FOO.1.bar",
},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 1,
},
},
{
msg: &nats.Msg{
Data: []byte("msg 2"),
Subject: "FOO.1.baz",
},
opts: []jetstream.PublishOpt{},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 2,
},
},
{
msg: &nats.Msg{
Data: []byte("msg 3"),
Subject: "FOO.2.baz",
},
opts: []jetstream.PublishOpt{},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 3,
},
},
{
msg: &nats.Msg{
Data: []byte("msg 4"),
Subject: "FOO.1.bax",
},
opts: []jetstream.PublishOpt{
jetstream.WithExpectLastSequencePerSubject(123),
jetstream.WithExpectLastSequencePerSubjectForSubject("FOO.1.*"),
},
withError: func(t *testing.T, err error) {
var apiErr *jetstream.APIError
if ok := errors.As(err, &apiErr); !ok {
t.Fatalf("Expected API error; got: %v", err)
}
if apiErr.ErrorCode != 10071 {
t.Fatalf("Expected error code: 10071; got: %d", apiErr.ErrorCode)
}
},
},
},
},
{
name: "expect stream header",
msgs: []publishConfig{
Expand Down Expand Up @@ -548,7 +748,7 @@ func TestPublishMsg(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, MaxMsgSize: 64})
_, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.>"}, MaxMsgSize: 128})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down
43 changes: 30 additions & 13 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,13 +452,14 @@ func (opt pubOptFn) configurePublish(opts *pubOpts) error {
}

type pubOpts struct {
ctx context.Context
ttl time.Duration
id string
lid string // Expected last msgId
str string // Expected stream name
seq *uint64 // Expected last sequence
lss *uint64 // Expected last sequence per subject
ctx context.Context
ttl time.Duration
id string
lid string // Expected last msgId
str string // Expected stream name
seq *uint64 // Expected last sequence
lss *uint64 // Expected last sequence per subject
lsss string // Expected subject to be used for last sequence per subject

// Publish retries for NoResponders err.
rwait time.Duration // Retry wait between attempts
Expand All @@ -484,12 +485,13 @@ type PubAck struct {

// Headers for published messages.
const (
MsgIdHdr = "Nats-Msg-Id"
ExpectedStreamHdr = "Nats-Expected-Stream"
ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence"
ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence"
ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
MsgRollup = "Nats-Rollup"
MsgIdHdr = "Nats-Msg-Id"
ExpectedStreamHdr = "Nats-Expected-Stream"
ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence"
ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence"
ExpectedLastSubjSeqSubjHdr = "Nats-Expected-Last-Subject-Sequence-Subject"
ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
MsgRollup = "Nats-Rollup"
)

// Headers for republished messages and direct gets.
Expand Down Expand Up @@ -549,6 +551,9 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
if o.lss != nil {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10))
}
if o.lsss != "" {
m.Header.Set(ExpectedLastSubjSeqSubjHdr, o.lsss)
}

var resp *Msg
var err error
Expand Down Expand Up @@ -931,6 +936,9 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
if o.lss != nil {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10))
}
if o.lsss != "" {
m.Header.Set(ExpectedLastSubjSeqSubjHdr, o.lsss)
}

// Reply
if m.Reply != _EMPTY_ {
Expand Down Expand Up @@ -1011,6 +1019,15 @@ func ExpectLastSequencePerSubject(seq uint64) PubOpt {
})
}

// ExpectLastSequencePerSubjectForSubject sets subject to be used for the expected sequence per subject in the
// response from the publish.
func ExpectLastSequencePerSubjectForSubject(subj string) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.lsss = subj
return nil
})
}

// ExpectLastMsgId sets the expected last msgId in the response from the publish.
func ExpectLastMsgId(id string) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
Expand Down
Loading