diff --git a/lib/jackalope.ex b/lib/jackalope.ex index 48d0798..3bc3ebe 100644 --- a/lib/jackalope.ex +++ b/lib/jackalope.ex @@ -131,7 +131,8 @@ defmodule Jackalope do [ handler: jackalope_handler, max_work_list_size: max_work_list_size, - work_list_mod: work_list_mod + work_list_mod: work_list_mod, + data_dir: Keyword.get(opts, :data_dir, "/data/jackalope") ]}, {Jackalope.Supervisor, [ diff --git a/lib/jackalope/persistent_work_list.ex b/lib/jackalope/persistent_work_list.ex new file mode 100644 index 0000000..ebc23c4 --- /dev/null +++ b/lib/jackalope/persistent_work_list.ex @@ -0,0 +1,520 @@ +defmodule Jackalope.PersistentWorkList do + @moduledoc """ + A work list whose work items are persisted as individual files. + """ + + use GenServer + + alias Jackalope.WorkList.Expiration + + require Logger + + @tick_delay 10 * 60 * 1_000 + + defmodule State do + @moduledoc false + + @type t :: %__MODULE__{ + # The maximum number of items that can be persisted as files + max_size: non_neg_integer(), + # The lowest index of an unexpired, not-pending item. No pending item has an index >= bottom_index. + bottom_index: non_neg_integer(), + # The index at which the next item will be pushed + next_index: non_neg_integer(), + # Indices greater than bottom_index of (maybe prematurely and forcibly) expired items. + # Used to get a correct count of work items (i.e. not pending). + expired: [], + # Cache of item expiration times for all persisted items (pending and not) + expirations: %{required(non_neg_integer()) => integer}, + # Indices of pending items mapped by their references. + pending: %{required(reference()) => non_neg_integer()}, + # The file directory persists items waiting execution or pending confirmation of execution. + data_dir: String.t(), + # The function to use to get an expiration given the item + expiration_fn: fun(), + # The function to use to update an item's expiration + update_expiration_fn: fun() + } + + defstruct bottom_index: 0, + next_index: 0, + expired: [], + expirations: %{}, + pending: %{}, + data_dir: nil, + max_size: nil, + expiration_fn: nil, + update_expiration_fn: nil + end + + @doc "Create a new work list" + @spec new(Keyword.t()) :: pid() + def new(opts \\ []) do + Logger.info("[Jackalope] Starting #{__MODULE__} with #{inspect(opts)}") + {:ok, pid} = GenServer.start_link(__MODULE__, opts) + pid + end + + @impl GenServer + def init(opts) do + send(self(), :tick) + + initial_state = + %State{ + max_size: Keyword.get(opts, :max_size), + data_dir: Keyword.fetch!(opts, :data_dir), + expiration_fn: Keyword.fetch!(opts, :expiration_fn), + update_expiration_fn: Keyword.fetch!(opts, :update_expiration_fn) + } + |> recover() + + {:ok, initial_state} + end + + @impl GenServer + def handle_info(:tick, state) do + :ok = record_time_now(state) + Process.send_after(self(), :tick, @tick_delay) + {:noreply, state} + end + + @impl GenServer + def handle_call(:count, _from, state) do + {:reply, count(state), state} + end + + def handle_call(:count_pending, _from, state) do + {:reply, count_pending(state), state} + end + + def handle_call(:peek, _from, state) do + {:reply, peek_oldest(state), state} + end + + def handle_call({:push, item}, _from, state) do + updated_state = add_item(item, state) + {:reply, :ok, bound_items(updated_state)} + end + + def handle_call(:pop, _from, state) do + updated_state = remove_oldest(state) + {:reply, :ok, updated_state} + end + + # The item becoming pending is always the one at bottom index + def handle_call({:pending, ref}, _from, state) do + updated_state = + %State{state | pending: Map.put(state.pending, ref, state.bottom_index)} + |> move_bottom_index() + + {:reply, :ok, updated_state} + end + + def handle_call({:done, ref}, _from, state) do + case Map.get(state.pending, ref) do + nil -> + Logger.warn( + "[Jackalope] Unknown pending work list item reference #{inspect(ref)}. Ignored." + ) + + {:reply, nil, state} + + index -> + {:ok, item} = stored_item_at(index, state, remove: true) + + updated_state = + %State{state | pending: Map.delete(state.pending, ref)} + |> clean_up() + + {:reply, item, updated_state} + end + end + + def handle_call(:remove_all, _from, state) do + {:ok, _} = File.rm_rf(state.data_dir) + :ok = File.mkdir_p!(state.data_dir) + record_time_now(state) + + {:reply, :ok, reset_state(state)} + end + + def handle_call(:reset_pending, _from, state) do + bottom_index = + case bottom_pending_index(state) do + nil -> state.bottom_index + index -> index + end + + {:reply, :ok, %State{state | bottom_index: bottom_index, pending: %{}}} + end + + @impl GenServer + def terminate(_reason, state) do + record_time_now(state) + end + + ## PRIVATE + + defp count(state) do + (state.next_index - state.bottom_index - length(state.expired)) + |> max(0) + end + + defp count_pending(state), do: Enum.count(state.pending) + + # Ought to be the same as Enum.count(state.expirations) + defp persisted_count(state), do: count(state) + count_pending(state) + + defp record_time_now(state) do + time = Expiration.now() |> Integer.to_string() + new_time_path = Path.join(state.data_dir, "new_time") + time_path = Path.join(state.data_dir, "time") + :ok = File.write!(new_time_path, time, [:write]) + :ok = File.rename!(new_time_path, time_path) + end + + # Peek at oldest non-pending work item + defp peek_oldest(state) do + cond do + empty?(state) -> + nil + + true -> + # If this fails, let it crash + {:ok, item} = stored_item_at(state.bottom_index, state) + item + end + end + + defp add_item(item, state) do + index = state.next_index + :ok = store_item(item, index, state) + expiration = state.expiration_fn.(item) + + %State{ + state + | next_index: index + 1, + expirations: Map.put(state.expirations, index, expiration) + } + end + + defp remove_oldest(state) do + index = state.bottom_index + path = item_file_path(index, state) + # If this fails, let it crash + :ok = File.rm!(path) + + %State{state | expirations: Map.delete(state.expirations, index)} + |> move_bottom_index() + |> clean_up() + end + + # Move bottom index up until it is not an expired + defp move_bottom_index(state) do + next_bottom_index = state.bottom_index + 1 + + cond do + next_bottom_index > state.next_index -> + state + + next_bottom_index in state.expired -> + %State{ + state + | bottom_index: next_bottom_index + } + |> move_bottom_index() + + next_bottom_index <= state.next_index -> + %State{ + state + | bottom_index: next_bottom_index + } + end + end + + # No non-pending items? + defp empty?(state), do: state.bottom_index == state.next_index + + defp bottom_pending_index(state) do + if Enum.empty?(state.pending), do: nil, else: Enum.min(Map.values(state.pending)) + end + + # Maybe reset indices to initial values and cleanup expired list + defp clean_up(state) do + if empty?(state) and Enum.empty?(state.pending) do + %State{state | bottom_index: 0, next_index: 0, expired: []} + else + cleanup_expired(state) + end + end + + # Remove from expired all indices smaller than the smallest index of a persisted item + defp cleanup_expired(state) do + expired_index_min = + case bottom_pending_index(state) do + nil -> state.bottom_index + index -> min(index, state.bottom_index) + end + + updated_expired = + state.expired + |> Enum.reject(&(&1 < expired_index_min)) + + %State{state | expired: updated_expired} + end + + defp store_item(item, index, state) do + path = item_file_path(index, state) + if File.exists?(path), do: raise("Overwritting item file") + binary = item_to_binary(item) + File.write!(path, binary) + end + + # Returns {:ok, any()} | {:error, :not_found} + defp stored_item_at(index, state, opts \\ []) do + path = item_file_path(index, state) + + case File.read(path) do + {:ok, binary} -> + _ = if Keyword.get(opts, :remove, false), do: File.rm(path) + + item_from_binary(binary) + + {:error, :not_found} -> + Logger.warn("[Jackalope] File not found #{inspect(path)}}") + + {:error, :not_found} + end + end + + defp item_file_path(index, state) do + Path.join(state.data_dir, "#{index}.item") + end + + defp item_from_binary(binary) do + item = :erlang.binary_to_term(binary) + {:ok, item} + rescue + error -> + Logger.warn("[Jackalope] Failed to convert work item from binary: #{inspect(error)}") + {:error, :invalid} + end + + defp item_to_binary(item), do: :erlang.term_to_binary(item) + + defp bound_items(state) do + max = state.max_size + + if persisted_count(state) > max do + updated_state = remove_expired_items(state) + excess_count = persisted_count(updated_state) - max + + remove_excess(excess_count, updated_state) + else + state + end + end + + # Remove expired, persisted items, whether pending or not. + defp remove_expired_items(state) do + if empty?(state) do + state + else + Enum.reduce( + Map.keys(state.expirations), + state, + fn index, acc -> + maybe_expire(index, acc) + end + ) + end + end + + defp maybe_expire(index, state) do + if index in state.expired do + state + else + expiration = Map.fetch!(state.expirations, index) + + if Expiration.after?(expiration, Expiration.now()) do + state + else + Logger.info("[Jackalope] Expiring persistent work list item at #{index}") + forget_item(index, state) + end + end + end + + defp remove_excess(excess_count, state) when excess_count <= 0, do: state + + # Try removing excess_count persisted items but don't touch pending items. + # Remove items closest to expiration first + defp remove_excess(excess_count, state) do + if empty?(state) do + state + else + live_indices = + state.bottom_index..(state.next_index - 1) + |> Enum.reject(&(&1 in state.expired)) + |> Enum.sort(fn index1, index2 -> + Map.fetch!(state.expirations, index1) <= Map.fetch!(state.expirations, index2) + end) + |> Enum.take(excess_count) + + Enum.reduce( + live_indices, + state, + fn index, acc -> forget_item(index, acc) end + ) + end + end + + # Forget persisted item, whether pending or not + defp forget_item(index, state) do + path = item_file_path(index, state) + :ok = File.rm!(path) + + updated_state = + cond do + pending_item?(index, state) -> + %State{state | pending: Map.delete(state.pending, index)} + + index == state.bottom_index -> + move_bottom_index(state) + + true -> + %State{state | expired: [index | state.expired]} + end + + %State{updated_state | expirations: Map.delete(state.expirations, index)} + |> clean_up() + end + + defp pending_item?(index, state), do: Map.has_key?(state.pending, index) + + defp recover(state) do + :ok = File.mkdir_p!(state.data_dir) + now = Expiration.now() + + item_files = + File.ls!(state.data_dir) |> Enum.filter(&Regex.match?(~r/.*\.item/, &1)) |> Enum.sort() + + item_files + |> Enum.reduce( + reset_state(state), + fn file, acc -> + recover_file(Path.join(state.data_dir, file), now, acc) + end + ) + |> bound_items() + end + + defp reset_state(state) do + %State{state | bottom_index: 0, next_index: 0, expired: [], pending: %{}, expirations: %{}} + end + + defp recover_file(file, now, state) do + binary = File.read!(file) + :ok = File.rm!(file) + + updated_state = + case item_from_binary(binary) do + {:ok, item} -> + index = state.next_index + # TODO - do some version checking... + rebased_expiration = + Expiration.rebase_expiration(state.expiration_fn.(item), latest_time(state), now) + + rebased_item = state.update_expiration_fn.(item, rebased_expiration) + :ok = store_item(rebased_item, index, state) + + %State{ + state + | next_index: index + 1, + expirations: Map.put(state.expirations, index, rebased_expiration) + } + + {:error, :invalid} -> + # Ignore invalid item file + state + end + + updated_state + end + + defp latest_time(state) do + path = Path.join(state.data_dir, "time") + + if File.exists?(path) do + time_s = File.read!(path) + + case Integer.parse(time_s) do + {time, _} -> + time + + other -> + Logger.warn("[Jackalope] Invalid stored latest time: #{inspect(other)}") + Expiration.now() + end + else + Logger.info("[Jackalope] No latest time found for recovery. Using now.") + Expiration.now() + end + end +end + +defimpl Jackalope.WorkList, for: PID do + @impl Jackalope.WorkList + def peek(work_list) do + GenServer.call(work_list, :peek) + end + + @impl Jackalope.WorkList + def push(work_list, item) do + :ok = GenServer.call(work_list, {:push, item}) + work_list + end + + @impl Jackalope.WorkList + def pop(work_list) do + :ok = GenServer.call(work_list, :pop) + work_list + end + + @impl Jackalope.WorkList + def pending(work_list, ref) do + :ok = GenServer.call(work_list, {:pending, ref}) + work_list + end + + @impl Jackalope.WorkList + def reset_pending(work_list) do + :ok = GenServer.call(work_list, :reset_pending) + work_list + end + + @impl Jackalope.WorkList + def done(work_list, ref) do + item = GenServer.call(work_list, {:done, ref}) + {work_list, item} + end + + @impl Jackalope.WorkList + def count(work_list) do + GenServer.call(work_list, :count) + end + + @impl Jackalope.WorkList + def count_pending(work_list) do + GenServer.call(work_list, :count_pending) + end + + @impl Jackalope.WorkList + def empty?(work_list), do: peek(work_list) == nil + + @impl Jackalope.WorkList + def remove_all(work_list) do + :ok = GenServer.call(work_list, :remove_all) + work_list + end +end diff --git a/lib/jackalope/session.ex b/lib/jackalope/session.ex index 30b97fd..80731c7 100644 --- a/lib/jackalope/session.ex +++ b/lib/jackalope/session.ex @@ -77,12 +77,14 @@ defmodule Jackalope.Session do work_list_mod = Keyword.fetch!(opts, :work_list_mod) work_list = - work_list_mod.new( - fn {_cmd, opts} -> Keyword.fetch!(opts, :expiration) end, - fn {cmd, opts}, expiration -> {cmd, Keyword.put(opts, :expiration, expiration)} end, - max_work_list_size, - opts + Keyword.merge(opts, + expiration_fn: fn {_cmd, opts} -> Keyword.fetch!(opts, :expiration) end, + update_expiration_fn: fn {cmd, opts}, expiration -> + {cmd, Keyword.put(opts, :expiration, expiration)} + end, + max_size: max_work_list_size ) + |> work_list_mod.new() initial_state = %State{ work_list: work_list, diff --git a/lib/jackalope/transient_work_list.ex b/lib/jackalope/transient_work_list.ex index 25a88aa..9638ad3 100644 --- a/lib/jackalope/transient_work_list.ex +++ b/lib/jackalope/transient_work_list.ex @@ -16,10 +16,12 @@ defmodule Jackalope.TransientWorkList do @type t() :: %__MODULE__{items: list(), max_size: non_neg_integer()} @doc "Create a new work list" - @spec new(function(), function(), non_neg_integer(), Keyword.t()) :: t() - def new(expiration_fn, _update_expiration_fn, max_size \\ @default_max_size, _opts \\ []) - when max_size > 0 do - %__MODULE__{max_size: max_size, expiration_fn: expiration_fn} + @spec new(Keyword.t()) :: t() + def new(opts) do + %__MODULE__{ + max_size: Keyword.get(opts, :max_size, @default_max_size), + expiration_fn: Keyword.fetch!(opts, :expiration_fn) + } end @doc false diff --git a/test/jackalope_test.exs b/test/jackalope_test.exs index bd8b6da..d4751e2 100644 --- a/test/jackalope_test.exs +++ b/test/jackalope_test.exs @@ -7,8 +7,6 @@ defmodule JackalopeTest do alias JackalopeTest.ScriptedMqttServer, as: MqttServer alias Tortoise311.Package - @work_list_mod Jackalope.TransientWorkList - setup context do {:ok, mqtt_server_pid} = start_supervised(MqttServer) Process.link(mqtt_server_pid) @@ -16,8 +14,8 @@ defmodule JackalopeTest do {:ok, [client_id: client_id, mqtt_server_pid: mqtt_server_pid]} end - describe "start_link/1" do - test "connect to a MQTT server (tcp)", context do + for work_list_mod <- [Jackalope.PersistentWorkList, Jackalope.TransientWorkList] do + test "connect to a MQTT server (tcp) #{work_list_mod}", context do transport = setup_server(context) assert {:ok, pid} = @@ -25,7 +23,8 @@ defmodule JackalopeTest do server: transport, client_id: context.client_id, handler: JackalopeTest.TestHandler, - work_list_mod: @work_list_mod + work_list_mod: Jackalope.TransientWorkList, + data_dir: "/tmp/jackalope" ) assert_receive {MqttServer, {:received, %Package.Connect{}}} @@ -35,11 +34,9 @@ defmodule JackalopeTest do assert_receive {MqttServer, :completed}, 200 end - end - describe "publish/3" do - test "publish with QoS=0", context do - connect(context) + test "publish with QoS=0 #{work_list_mod}", context do + connect(context, work_list_mod: unquote(work_list_mod)) flush = expect_publish( @@ -55,8 +52,8 @@ defmodule JackalopeTest do assert expected_payload == payload end - test "publish with QoS=1", context do - connect(context) + test "publish with QoS=1 #{work_list_mod}", context do + connect(context, work_list_mod: unquote(work_list_mod)) flush = expect_publish( @@ -72,15 +69,9 @@ defmodule JackalopeTest do assert %Package.Publish{topic: "foo", qos: 1} = received_publish assert expected_payload == received_publish.payload end - end - - defp get_session_work_list() do - :sys.get_state(Jackalope.Session).work_list - end - describe "work list" do - test "dropping work orders", context do - connect(context, max_work_list_size: 10) + test "dropping work orders #{work_list_mod}", context do + connect(context, max_work_list_size: 10, work_list_mod: unquote(work_list_mod)) pause_mqtt_server(context) work_list = get_session_work_list() @@ -97,8 +88,8 @@ defmodule JackalopeTest do assert WorkList.count(work_list) == 10 end - test "pending and done work items", context do - connect(context, max_work_list_size: 10) + test "pending and done work items #{work_list_mod}", context do + connect(context, max_work_list_size: 10, work_list_mod: unquote(work_list_mod)) pause_mqtt_server(context) work_list = get_session_work_list() @@ -124,8 +115,8 @@ defmodule JackalopeTest do assert WorkList.count(work_list) == 4 end - test "dropping pending work items", context do - connect(context, max_work_list_size: 10) + test "dropping work items #{work_list_mod}", context do + connect(context, max_work_list_size: 10, work_list_mod: unquote(work_list_mod)) pause_mqtt_server(context) work_list = get_session_work_list() @@ -137,14 +128,13 @@ defmodule JackalopeTest do {{:publish, "foo", "{\"msg\": \"hello #{i}\"}", [qos: 1]}, [expiration: Expiration.expiration(:infinity)]} ) - |> WorkList.pending(make_ref()) end) - assert WorkList.count_pending(work_list) == 10 + assert WorkList.count(work_list) == 10 end - test "reset_pending work items", context do - connect(context, max_work_list_size: 10) + test "reset_pending work items #{work_list_mod}", context do + connect(context, max_work_list_size: 10, work_list_mod: unquote(work_list_mod)) pause_mqtt_server(context) work_list = get_session_work_list() @@ -165,20 +155,63 @@ defmodule JackalopeTest do work_list = WorkList.reset_pending(work_list) assert WorkList.count(work_list) == 5 end + end - test "rebasing expiration" do - time = Expiration.now() - exp1 = Expiration.expiration(100) - exp2 = Expiration.expiration(200) - stop_time = time + 10 - assert Expiration.after?(exp2, exp1) - restart_time = Enum.random(-10_000..10_000) - ttl1 = Expiration.rebase_expiration(exp1, stop_time, restart_time) - ttl2 = Expiration.rebase_expiration(exp2, stop_time, restart_time) - assert Expiration.after?(exp2, exp1) - assert ttl1 == restart_time + 90 - assert ttl2 <= restart_time + 190 - end + test "rebasing expiration" do + time = Expiration.now() + exp1 = Expiration.expiration(100) + exp2 = Expiration.expiration(200) + stop_time = time + 10 + assert Expiration.after?(exp2, exp1) + restart_time = Enum.random(-10_000..10_000) + ttl1 = Expiration.rebase_expiration(exp1, stop_time, restart_time) + ttl2 = Expiration.rebase_expiration(exp2, stop_time, restart_time) + assert Expiration.after?(exp2, exp1) + assert ttl1 == restart_time + 90 + assert ttl2 <= restart_time + 190 + end + + test "recovering" do + work_list = + Jackalope.PersistentWorkList.new( + expiration_fn: fn {_cmd, opts} -> Keyword.fetch!(opts, :expiration) end, + update_expiration_fn: fn {cmd, opts}, expiration -> + {cmd, Keyword.put(opts, :expiration, expiration)} + end, + max_size: 10, + data_dir: "/tmp/jackalope" + ) + + work_list = WorkList.remove_all(work_list) + + work_list = + Enum.reduce(1..15, work_list, fn i, acc -> + WorkList.push( + acc, + {{:publish, "foo", %{"msg" => "hello #{i}"}, [qos: 1]}, + [expiration: Expiration.expiration(1_000)]} + ) + end) + + ref = make_ref() + work_list = WorkList.pending(work_list, ref) + :ok = GenServer.stop(work_list, :normal) + + work_list = + Jackalope.PersistentWorkList.new( + expiration_fn: fn {_cmd, opts} -> Keyword.fetch!(opts, :expiration) end, + update_expiration_fn: fn {cmd, opts}, expiration -> + {cmd, Keyword.put(opts, :expiration, expiration)} + end, + max_size: 5, + data_dir: "/tmp/jackalope" + ) + + assert WorkList.count(work_list) == 5 + end + + defp get_session_work_list() do + :sys.get_state(Jackalope.Session).work_list end # Apologies for the mess after this point; these are helpers that @@ -198,12 +231,14 @@ defmodule JackalopeTest do {Tortoise311.Transport.Tcp, [host: ip, port: port]} end - defp connect(%{client_id: client_id} = context, opts \\ []) do + defp connect(%{client_id: client_id} = context, opts) do transport = setup_server(context) handler = Keyword.get(opts, :handler, JackalopeTest.TestHandler) initial_topics = Keyword.get(opts, :initial_topics) + work_list_mod = Keyword.fetch!(opts, :work_list_mod) max_work_list_size = Keyword.get(opts, :max_work_list_size, 100) + reset? = Keyword.get(opts, :reset, true) start_supervised!( {Jackalope, @@ -213,7 +248,8 @@ defmodule JackalopeTest do handler: handler, initial_topics: initial_topics, max_work_list_size: max_work_list_size, - work_list_mod: @work_list_mod + work_list_mod: work_list_mod, + data_dir: "/tmp/jackalope" ]} ) @@ -221,8 +257,11 @@ defmodule JackalopeTest do assert_receive {MqttServer, :completed} work_list = get_session_work_list() - WorkList.remove_all(work_list) - assert WorkList.empty?(work_list) + + if reset? do + WorkList.remove_all(work_list) + assert WorkList.empty?(work_list) + end end defp expect_publish(context, %Package.Publish{qos: 0} = publish) do