-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Page over stream #159
Page over stream #159
Changes from 4 commits
e790ea4
ad882fd
60526cb
9d22d8e
cd2216c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,7 +10,9 @@ Sends `AckAck` acknowledgement to the server. | |
Acknowledges a message was completely handled. | ||
""" | ||
@spec ack(message :: Gnat.message()) :: :ok | ||
def ack(message) | ||
def ack(%{reply_to: nil}) do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I ran into a case where I got a |
||
{:error, "Cannot ack message with no reply-to"} | ||
end | ||
|
||
def ack(%{gnat: gnat, reply_to: reply_to}) do | ||
Gnat.pub(gnat, reply_to, "") | ||
|
@@ -23,7 +25,9 @@ Acknowledges the message was handled and requests delivery of the next message t | |
subject. Only applies to Pull-mode. | ||
""" | ||
@spec ack_next(message :: Gnat.message(), consumer_subject :: binary()) :: :ok | ||
def ack_next(message, consumer_subject) | ||
def ack_next(%{reply_to: nil}, _consumer_subject) do | ||
{:error, "Cannot ack message with no reply-to"} | ||
end | ||
|
||
def ack_next(%{gnat: gnat, reply_to: reply_to}, consumer_subject) do | ||
Gnat.pub(gnat, reply_to, "+NXT", reply_to: consumer_subject) | ||
|
@@ -36,7 +40,9 @@ Signals that the message will not be processed now and processing can move onto | |
NAK'd message will be retried. | ||
""" | ||
@spec nack(message :: Gnat.message()) :: :ok | ||
def nack(message) | ||
def nack(%{reply_to: nil}) do | ||
{:error, "Cannot ack message with no reply-to"} | ||
end | ||
|
||
def nack(%{gnat: gnat, reply_to: reply_to}) do | ||
Gnat.pub(gnat, reply_to, "-NAK") | ||
|
@@ -49,7 +55,9 @@ When sent before the `AckWait` period indicates that work is ongoing and the per | |
extended by another equal to `AckWait`. | ||
""" | ||
@spec ack_progress(message :: Gnat.message()) :: :ok | ||
def ack_progress(message) | ||
def ack_progress(%{reply_to: nil}) do | ||
{:error, "Cannot ack message with no reply-to"} | ||
end | ||
|
||
def ack_progress(%{gnat: gnat, reply_to: reply_to}) do | ||
Gnat.pub(gnat, reply_to, "+WPI") | ||
|
@@ -62,7 +70,9 @@ Instructs the server to stop redelivery of a message without acknowledging it as | |
processed. | ||
""" | ||
@spec ack_term(message :: Gnat.message()) :: :ok | ||
def ack_term(message) | ||
def ack_term(%{reply_to: nil}) do | ||
{:error, "Cannot ack message with no reply-to"} | ||
end | ||
|
||
def ack_term(%{gnat: gnat, reply_to: reply_to}) do | ||
Gnat.pub(gnat, reply_to, "+TERM") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
defmodule Gnat.Jetstream.Pager do | ||
@moduledoc false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keeping this undocumented until we feel comfortable with the API |
||
|
||
alias Gnat.Jetstream | ||
alias Gnat.Jetstream.API.{Consumer, Util} | ||
|
||
@opaque pager :: map() | ||
@type message :: Gnat.message() | ||
|
||
def init(conn, stream_name, opts) do | ||
name = "gnat_stream_pager_#{Util.nuid()}" | ||
|
||
first_seq = Keyword.fetch!(opts, :from_seq) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I imagine that we would end up allowing other options like |
||
|
||
consumer = %Consumer{ | ||
stream_name: stream_name, | ||
durable_name: name, | ||
ack_policy: :explicit, | ||
ack_wait: 30_000_000_000, | ||
deliver_policy: :by_start_sequence, | ||
description: "Gnat Stream Pager", | ||
opt_start_seq: first_seq, | ||
replay_policy: :instant, | ||
inactive_threshold: 30_000_000_000 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Jetstream will auto-cleanup our consumer after 30sec of inactivity, so the final |
||
} | ||
inbox = Util.reply_inbox() | ||
|
||
with {:ok, _config} <- Consumer.create(conn, consumer), | ||
{:ok, sub} <- Gnat.sub(conn, self(), inbox) do | ||
state = | ||
%{ | ||
conn: conn, | ||
stream_name: stream_name, | ||
consumer_name: name, | ||
domain: nil, | ||
inbox: inbox, | ||
batch: 10, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should also allow for |
||
sub: sub | ||
} | ||
|
||
{:ok, state} | ||
end | ||
end | ||
|
||
@spec page(pager()) :: {:page, list(message())} | {:done, list(message())} | {:error, term()} | ||
def page(%{conn: conn, batch: batch} = state) do | ||
opts = [batch: batch, no_wait: true] | ||
with :ok <- Consumer.request_next_message(conn, state.stream_name, state.consumer_name, state.inbox, state.domain, opts) do | ||
receive_messages(state, []) | ||
end | ||
end | ||
|
||
def cleanup(%{conn: conn} = state) do | ||
with :ok <- Gnat.unsub(conn, state.sub), | ||
:ok <- Consumer.delete(conn, state.stream_name, state.consumer_name, state.domain) do | ||
:ok | ||
end | ||
end | ||
|
||
defp receive_messages(%{batch: batch}, messages) when length(messages) == batch do | ||
{:page, Enum.reverse(messages)} | ||
end | ||
|
||
defp receive_messages(%{sub: sid} = state, messages) do | ||
receive do | ||
{:msg, %{sid: ^sid, status: "408"}} -> | ||
{:done, Enum.reverse(messages)} | ||
|
||
{:msg, %{sid: ^sid} = message} -> | ||
with :ok <- Jetstream.ack(message) do | ||
receive_messages(state, [message | messages ]) | ||
end | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we indicate whether this will always be a number , or a string, etc? Or maybe a link to the NATS reference with a list of status replies, etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmmm I can find several examples in the ADRs, but I don't think we can provide an exhaustive list since clients can publish their own messages with headers and provide their own status codes