Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consume() timeout in between pulls #1703

Open
brettinternet opened this issue Aug 23, 2024 · 3 comments
Open

Consume() timeout in between pulls #1703

brettinternet opened this issue Aug 23, 2024 · 3 comments
Assignees
Labels
defect Suspected defect such as a bug or regression

Comments

@brettinternet
Copy link

brettinternet commented Aug 23, 2024

Observed behavior

I'm encountering an issue with the consumer's Consume method where each incoming message is throttled to exactly 30 second waits in between messages.

I'm using a JetStream shared between two accounts. I use default server and client options. I am running an embedded server in Go, and I have a "central account" that publishes to the remote client which imports the service.

// Central account
ac.Exports.Add(
	&jwt.Export{
		Subject: "$JS.API.>",
		Type:    jwt.Service,
	},
	&jwt.Export{
		Subject: "$JS.ACK.>",
		Type:    jwt.Service,
	},
)

go func() {
	for {
		js.PublishAsync("my.subject", []byte("hello, I'm the central server account"))
		time.Sleep(5 * time.Second)
	}
}()
// Remote account
ac.Imports.Add(
	&jwt.Import{
		Name:    "server",
		Type:    jwt.Service,
		Account: serverAcctPubKey,
		Subject: "$JS.API.>",
	},
	&jwt.Import{
		Type:    jwt.Service,
		Account: serverAcctPubKey,
		Subject: "$JS.ACK.>", // not sure if this is necessary for single ack, but it appears to be otherwise messages are redelivered
	},
)

// ...

consumer, err := c.js.CreateOrUpdateConsumer(ctx, "mystream", jetstream.ConsumerConfig{
  Name:          "myconsumer",
  Durable:       "myconsumer",
  FilterSubject: "my.subject"
})

cons.Consume(func(m *nats.Msg) {
  fmt.Println(time.Now().Format(time.RFC3339))
	m.Ack()
})

While the consumer slowly processes them in 30 second intervals, it appears the messages are stuck in ack pending (info.NumAckPending == info.NumRedelivered) and go down with each consumer activity.

The only error I see in ConsumeErrHandler is nats: no heartbeat received. I'm not sure if this is related to the issue I'm facing. I'm using the latest version of the NATS server and the Go client.

This issue occurs except when I add the option jetstream.PullMaxMessages(1) to the Consume() function.

I also have no issues with the alternative method to pull messages:

go func() {
  for {
    msg, err := consumer.Next()
    if err != nil {
      fmt.Println("err", err)
    } else {
      fmt.Println("msg", string(msg.Data()))
      msg.Ack()
    }
  }
}()

Could there be something wrong with consumer.Consume()?

Expected behavior

I'm expecting the callback in consumer.Consume to be called as soon as each message is published by the other account.

Server and client version

github.com/nats-io/jwt/v2 v2.5.8
github.com/nats-io/nats-server/v2 v2.10.18
github.com/nats-io/nats.go v1.37.0
github.com/nats-io/nkeys v0.4.7

Host environment

Darwin 23.6.0 arm64

Steps to reproduce

No response

@brettinternet brettinternet added the defect Suspected defect such as a bug or regression label Aug 23, 2024
@Jarema
Copy link
Member

Jarema commented Aug 25, 2024

Hey.

I think you are missing the _INBOX.> import, as that is where the client subscribes for messages.
You can customize it with CustomInboxPrefix().

Example of custom inboxes: https://natsbyexample.com/examples/auth/private-inbox/cli

@brettinternet
Copy link
Author

Hi @Jarema, thank you for your reply.

I'm struggling to find an example in the test files for nats-server or nats.go that demonstrates this. I'm not seeing _INBOX.> imported anywhere. As for permissions, I'm running into this issue even when I provide blanket permissions to everything in both accounts in my example above:

accountClaims.DefaultPermissions.Pub.Allow.Add(">")
accountClaims.DefaultPermissions.Sub.Allow.Add(">")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

3 participants