You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm encountering an issue with the consumer's Consume method where each incoming message is throttled to exactly 30 second waits in between messages.
I'm using a JetStream shared between two accounts. I use default server and client options. I am running an embedded server in Go, and I have a "central account" that publishes to the remote client which imports the service.
// Central accountac.Exports.Add(
&jwt.Export{
Subject: "$JS.API.>",
Type: jwt.Service,
},
&jwt.Export{
Subject: "$JS.ACK.>",
Type: jwt.Service,
},
)
gofunc() {
for {
js.PublishAsync("my.subject", []byte("hello, I'm the central server account"))
time.Sleep(5*time.Second)
}
}()
// Remote accountac.Imports.Add(
&jwt.Import{
Name: "server",
Type: jwt.Service,
Account: serverAcctPubKey,
Subject: "$JS.API.>",
},
&jwt.Import{
Type: jwt.Service,
Account: serverAcctPubKey,
Subject: "$JS.ACK.>", // not sure if this is necessary for single ack, but it appears to be otherwise messages are redelivered
},
)
// ...consumer, err:=c.js.CreateOrUpdateConsumer(ctx, "mystream", jetstream.ConsumerConfig{
Name: "myconsumer",
Durable: "myconsumer",
FilterSubject: "my.subject"
})
cons.Consume(func(m*nats.Msg) {
fmt.Println(time.Now().Format(time.RFC3339))
m.Ack()
})
While the consumer slowly processes them in 30 second intervals, it appears the messages are stuck in ack pending (info.NumAckPending == info.NumRedelivered) and go down with each consumer activity.
The only error I see in ConsumeErrHandler is nats: no heartbeat received. I'm not sure if this is related to the issue I'm facing. I'm using the latest version of the NATS server and the Go client.
This issue occurs except when I add the option jetstream.PullMaxMessages(1) to the Consume() function.
I also have no issues with the alternative method to pull messages:
I'm struggling to find an example in the test files for nats-server or nats.go that demonstrates this. I'm not seeing _INBOX.>imported anywhere. As for permissions, I'm running into this issue even when I provide blanket permissions to everything in both accounts in my example above:
Observed behavior
I'm encountering an issue with the consumer's
Consume
method where each incoming message is throttled to exactly 30 second waits in between messages.I'm using a JetStream shared between two accounts. I use default server and client options. I am running an embedded server in Go, and I have a "central account" that publishes to the remote client which imports the service.
While the consumer slowly processes them in 30 second intervals, it appears the messages are stuck in ack pending (
info.NumAckPending
==info.NumRedelivered
) and go down with each consumer activity.The only error I see in
ConsumeErrHandler
isnats: no heartbeat received
. I'm not sure if this is related to the issue I'm facing. I'm using the latest version of the NATS server and the Go client.This issue occurs except when I add the option
jetstream.PullMaxMessages(1)
to theConsume()
function.I also have no issues with the alternative method to pull messages:
Could there be something wrong with
consumer.Consume()
?Expected behavior
I'm expecting the callback in
consumer.Consume
to be called as soon as each message is published by the other account.Server and client version
Host environment
Darwin 23.6.0 arm64
Steps to reproduce
No response
The text was updated successfully, but these errors were encountered: