-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Page over stream #159
Page over stream #159
Conversation
def init(conn, stream_name, opts) do | ||
name = "gnat_stream_pager_#{Util.nuid()}" | ||
|
||
first_seq = Keyword.fetch!(opts, :from_seq) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I imagine that we would end up allowing other options like since: utc_datetime
or limit: 1000
in these opts
8ddb9e2
to
60526cb
Compare
description: "Gnat Stream Pager", | ||
opt_start_seq: first_seq, | ||
replay_policy: :instant, | ||
inactive_threshold: 30_000_000_000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jetstream will auto-cleanup our consumer after 30sec of inactivity, so the final cleanup
function call is not crucial
consumer_name: name, | ||
domain: nil, | ||
inbox: inbox, | ||
batch: 10, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also allow for domain
and batch
to be passed in via opts
@@ -0,0 +1,75 @@ | |||
defmodule Gnat.Jetstream.Pager do | |||
@moduledoc false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeping this undocumented until we feel comfortable with the API
@@ -10,7 +10,9 @@ Sends `AckAck` acknowledgement to the server. | |||
Acknowledges a message was completely handled. | |||
""" | |||
@spec ack(message :: Gnat.message()) :: :ok | |||
def ack(message) | |||
def ack(%{reply_to: nil}) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran into a case where I got a %{status: "408", reply_to: nil}
message at the end of the stream and my code tried to ack the message which resulted in a TCP error and it killed my gnat connection. So I thought it would be good to validate the reply_to
here with a more useful error message
@@ -25,14 +25,18 @@ defmodule Gnat do | |||
* `sid` - The subscription ID corresponding to this message. You generally won't need to use this value directly. | |||
* `reply_to` - A topic supplied for expected replies | |||
* `headers` - A set of NATS message headers on the message | |||
* `status` - Similar to an HTTP status, this is present for messages with headers and can indicate the specific purpose of a message. Example `status: "408"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we indicate whether this will always be a number , or a string, etc? Or maybe a link to the NATS reference with a list of status replies, etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmmm I can find several examples in the ADRs, but I don't think we can provide an exhaustive list since clients can publish their own messages with headers and provide their own status codes
I did a bit of experimenting with having a functionality interface for a pull consumer that I think might be useful for people. The basic idea is that the caller calls
Pager.init(...)
and then repeatedly callsPager.page
to get the next "page" of messages. When thepage
function returns{:done, messages}
it means the consumer has reached the current end of the stream and the client should then callPager.cleanup
to immediately cleanup the subscription and consumer.