Skip to content

Commit

Permalink
add reduce function and a unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
mmmries committed May 14, 2024
1 parent c0c0b45 commit 297ae7b
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
29 changes: 28 additions & 1 deletion lib/gnat/jetstream/pager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,40 @@ defmodule Gnat.Jetstream.Pager do
end
end

def reduce(conn, stream_name, opts, initial_state, fun) do
with {:ok, pager} <- init(conn, stream_name, opts) do
page_through(pager, initial_state, fun)
end
end

defp page_through(pager, state, fun) do
case page(pager) do
{:page, messages} ->
new_state = Enum.reduce(messages, state, fun)
page_through(pager, new_state, fun)

{:done, messages} ->
new_state = Enum.reduce(messages, state, fun)
:ok = cleanup(pager)
{:ok, new_state}

{:error, error} ->
{:error, error}
end
end

defp receive_messages(%{batch: batch}, messages) when length(messages) == batch do
{:page, Enum.reverse(messages)}
end

@terminals ["404", "408"]
defp receive_messages(%{sub: sid} = state, messages) do
receive do
{:msg, %{sid: ^sid, status: "408"}} ->
{:msg, %{sid: ^sid, status: status}} when status in @terminals ->
{:done, Enum.reverse(messages)}

{:msg, %{sid: ^sid, reply_to: nil} = msg} ->
IO.inspect(msg)
{:done, Enum.reverse(messages)}

{:msg, %{sid: ^sid} = message} ->
Expand Down
34 changes: 34 additions & 0 deletions test/jetstream/pager_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule Gnat.Jetstream.PagerTest do
use Gnat.Jetstream.ConnCase
alias Gnat.Jetstream.Pager
alias Gnat.Jetstream.API.Stream

@moduletag with_gnat: :gnat

test "paging over a simple stream" do
{:ok, _stream} = create_stream("pager_a")
Enum.each(1..100, fn i ->
:ok = Gnat.pub(:gnat, "input.pager_a", "#{i}")
end)

{:ok, res} = Pager.reduce(:gnat, "pager_a", [from_seq: 1], 0, fn msg, total ->
total + String.to_integer(msg.body)
end)
assert res == 5050

{:ok, res} = Pager.reduce(:gnat, "pager_a", [from_seq: 51], 0, fn msg, total ->
total + String.to_integer(msg.body)
end)
assert res == 3775

Stream.delete(:gnat, "pager_a")
end

defp create_stream(name) do
stream = %Stream{
name: name,
subjects: ["input.#{name}"]
}
Stream.create(:gnat, stream)
end
end

0 comments on commit 297ae7b

Please sign in to comment.