Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Page over stream #159

Merged
merged 5 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/gnat.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@ defmodule Gnat do
* `sid` - The subscription ID corresponding to this message. You generally won't need to use this value directly.
* `reply_to` - A topic supplied for expected replies
* `headers` - A set of NATS message headers on the message
* `status` - Similar to an HTTP status, this is present for messages with headers and can indicate the specific purpose of a message. Example `status: "408"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we indicate whether this will always be a number , or a string, etc? Or maybe a link to the NATS reference with a list of status replies, etc?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmm I can find several examples in the ADRs, but I don't think we can provide an exhaustive list since clients can publish their own messages with headers and provide their own status codes

* `description` - A string description of the `status`
"""
@type message :: %{
required(:gnat) => t(),
required(:topic) => binary(),
required(:body) => iodata(),
required(:sid) => non_neg_integer(),
optional(:reply_to) => binary(),
optional(:headers) => headers()
optional(:headers) => headers(),
optional(:status) => String.t(),
optional(:description) => String.t()
}
@type sent_message :: {:msg, message()}

Expand Down
20 changes: 15 additions & 5 deletions lib/gnat/jetstream/jetstream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ Sends `AckAck` acknowledgement to the server.
Acknowledges a message was completely handled.
"""
@spec ack(message :: Gnat.message()) :: :ok
def ack(message)
def ack(%{reply_to: nil}) do
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran into a case where I got a %{status: "408", reply_to: nil} message at the end of the stream and my code tried to ack the message which resulted in a TCP error and it killed my gnat connection. So I thought it would be good to validate the reply_to here with a more useful error message

{:error, "Cannot ack message with no reply-to"}
end

def ack(%{gnat: gnat, reply_to: reply_to}) do
Gnat.pub(gnat, reply_to, "")
Expand All @@ -23,7 +25,9 @@ Acknowledges the message was handled and requests delivery of the next message t
subject. Only applies to Pull-mode.
"""
@spec ack_next(message :: Gnat.message(), consumer_subject :: binary()) :: :ok
def ack_next(message, consumer_subject)
def ack_next(%{reply_to: nil}, _consumer_subject) do
{:error, "Cannot ack message with no reply-to"}
end

def ack_next(%{gnat: gnat, reply_to: reply_to}, consumer_subject) do
Gnat.pub(gnat, reply_to, "+NXT", reply_to: consumer_subject)
Expand All @@ -36,7 +40,9 @@ Signals that the message will not be processed now and processing can move onto
NAK'd message will be retried.
"""
@spec nack(message :: Gnat.message()) :: :ok
def nack(message)
def nack(%{reply_to: nil}) do
{:error, "Cannot ack message with no reply-to"}
end

def nack(%{gnat: gnat, reply_to: reply_to}) do
Gnat.pub(gnat, reply_to, "-NAK")
Expand All @@ -49,7 +55,9 @@ When sent before the `AckWait` period indicates that work is ongoing and the per
extended by another equal to `AckWait`.
"""
@spec ack_progress(message :: Gnat.message()) :: :ok
def ack_progress(message)
def ack_progress(%{reply_to: nil}) do
{:error, "Cannot ack message with no reply-to"}
end

def ack_progress(%{gnat: gnat, reply_to: reply_to}) do
Gnat.pub(gnat, reply_to, "+WPI")
Expand All @@ -62,7 +70,9 @@ Instructs the server to stop redelivery of a message without acknowledging it as
processed.
"""
@spec ack_term(message :: Gnat.message()) :: :ok
def ack_term(message)
def ack_term(%{reply_to: nil}) do
{:error, "Cannot ack message with no reply-to"}
end

def ack_term(%{gnat: gnat, reply_to: reply_to}) do
Gnat.pub(gnat, reply_to, "+TERM")
Expand Down
75 changes: 75 additions & 0 deletions lib/gnat/jetstream/pager.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
defmodule Gnat.Jetstream.Pager do
@moduledoc false
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping this undocumented until we feel comfortable with the API


alias Gnat.Jetstream
alias Gnat.Jetstream.API.{Consumer, Util}

@opaque pager :: map()
@type message :: Gnat.message()

def init(conn, stream_name, opts) do
name = "gnat_stream_pager_#{Util.nuid()}"

first_seq = Keyword.fetch!(opts, :from_seq)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine that we would end up allowing other options like since: utc_datetime or limit: 1000 in these opts


consumer = %Consumer{
stream_name: stream_name,
durable_name: name,
ack_policy: :explicit,
ack_wait: 30_000_000_000,
deliver_policy: :by_start_sequence,
description: "Gnat Stream Pager",
opt_start_seq: first_seq,
replay_policy: :instant,
inactive_threshold: 30_000_000_000
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jetstream will auto-cleanup our consumer after 30sec of inactivity, so the final cleanup function call is not crucial

}
inbox = Util.reply_inbox()

with {:ok, _config} <- Consumer.create(conn, consumer),
{:ok, sub} <- Gnat.sub(conn, self(), inbox) do
state =
%{
conn: conn,
stream_name: stream_name,
consumer_name: name,
domain: nil,
inbox: inbox,
batch: 10,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also allow for domain and batch to be passed in via opts

sub: sub
}

{:ok, state}
end
end

@spec page(pager()) :: {:page, list(message())} | {:done, list(message())} | {:error, term()}
def page(%{conn: conn, batch: batch} = state) do
opts = [batch: batch, no_wait: true]
with :ok <- Consumer.request_next_message(conn, state.stream_name, state.consumer_name, state.inbox, state.domain, opts) do
receive_messages(state, [])
end
end

def cleanup(%{conn: conn} = state) do
with :ok <- Gnat.unsub(conn, state.sub),
:ok <- Consumer.delete(conn, state.stream_name, state.consumer_name, state.domain) do
:ok
end
end

defp receive_messages(%{batch: batch}, messages) when length(messages) == batch do
{:page, Enum.reverse(messages)}
end

defp receive_messages(%{sub: sid} = state, messages) do
receive do
{:msg, %{sid: ^sid, status: "408"}} ->
{:done, Enum.reverse(messages)}

{:msg, %{sid: ^sid} = message} ->
with :ok <- Jetstream.ack(message) do
receive_messages(state, [message | messages ])
end
end
end
end
Loading