diff --git a/lib/kafee/brod_retry_manager.ex b/lib/kafee/brod_retry_manager.ex new file mode 100644 index 0000000..5ea2fc1 --- /dev/null +++ b/lib/kafee/brod_retry_manager.ex @@ -0,0 +1,52 @@ +defmodule Kafee.BrodRetryManager do + use GenServer + require Logger + + @min_backoff 5_000 + + def start_link(supervisor, module, options) do + GenServer.start_link(__MODULE__, {supervisor, module, options}, name: manager_name(module)) + end + + @impl GenServer + def init(state = {supervisor, module, options}) do + case Kafee.BrodSupervisor.start_child(supervisor, module, options) do + {:ok, _pid} -> + {:ok, state} + + _error -> + Logger.warning("Initial Kafka connection failed for #{inspect(module)}, will retry in background") + schedule_retry() + {:ok, state} + end + end + + @impl GenServer + def handle_info(:retry, state = {supervisor, module, options}) do + case Kafee.BrodSupervisor.start_child(supervisor, module, options) do + {:ok, _pid} -> + Logger.info("Successfully connected to Kafka for #{inspect(module)}") + {:noreply, state} + + _error -> + schedule_retry() + {:noreply, state} + end + end + + # Handle process exits + def handle_info({:DOWN, _ref, :process, _pid, _reason}, state = {_supervisor, module, _options}) do + Logger.info("Kafka client for #{inspect(module)} went down, scheduling retry") + schedule_retry() + {:noreply, state} + end + + defp schedule_retry do + Process.send_after(self(), :retry, @min_backoff) + end + + defp manager_name(module) do + # credo:disable-for-next-line Credo.Check.Warning.UnsafeToAtom + :"#{module}.BrodRetryManager" + end +end diff --git a/lib/kafee/brod_supervisor.ex b/lib/kafee/brod_supervisor.ex new file mode 100644 index 0000000..9b5bd78 --- /dev/null +++ b/lib/kafee/brod_supervisor.ex @@ -0,0 +1,79 @@ +defmodule Kafee.BrodSupervisor do + use DynamicSupervisor + require Logger + + def start_link(module, options) do + name = supervisor_name(module) + + case DynamicSupervisor.start_link(__MODULE__, [], name: name) do + {:ok, pid} -> + backoff = options[:restart_delay] || 1_000 + {:ok, _state_pid} = Kafee.BrodThrottleManager.start_link(module, backoff) + {:ok, _retry_pid} = Kafee.BrodRetryManager.start_link(pid, module, options) + {:ok, pid} + + error -> + error + end + end + + @impl DynamicSupervisor + def init([]) do + Process.flag(:trap_exit, true) + + DynamicSupervisor.init( + strategy: :one_for_one, + max_restarts: 100_000, + max_seconds: 1 + ) + end + + def start_child(supervisor, module, options) do + case Kafee.BrodThrottleManager.can_restart?(module) do + true -> + do_start_child(supervisor, module, options) + + false -> + {:error, :throttled} + end + end + + defp do_start_child(supervisor, module, options) do + adapter = + case options[:adapter] do + nil -> nil + adapter when is_atom(adapter) -> adapter + {adapter, _opts} -> adapter + end + + child_spec = %{ + id: module, + start: {adapter, :start_brod_client, [module, options]}, + restart: :temporary, + shutdown: 5_000 + } + + case DynamicSupervisor.start_child(supervisor, child_spec) do + {:ok, pid} = ok -> + Process.monitor(pid) + Logger.info("Started Kafka client for #{inspect(module)} using #{inspect(adapter)}") + ok + + {:error, {:already_started, pid}} -> + {:ok, pid} + + error -> + Logger.warning("Failed to start Kafka client for #{inspect(module)} using #{inspect(adapter)}") + error + end + end + + def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do + {:noreply, state} + end + + defp supervisor_name(module) do + # credo:disable-for-next-line Credo.Check.Warning.UnsafeToAtom + :"#{module}.BrodSupervisor" + end +end diff --git a/lib/kafee/brod_throttle_manager.ex b/lib/kafee/brod_throttle_manager.ex new file mode 100644 index 0000000..57af963 --- /dev/null +++ b/lib/kafee/brod_throttle_manager.ex @@ -0,0 +1,105 @@ +defmodule Kafee.BrodThrottleManager do + @moduledoc """ + Manages throttling of Kafka client restarts to prevent aggressive reconnection attempts. + + The throttle manager accepts either a fixed delay (integer) or a function that takes + the number of restarts and returns a delay value in milliseconds. + + ## Throttling Behavior + + When a Kafka client disconnects or fails: + 1. The client process terminates + 2. A backoff period begins (default: 5000ms) + 3. During the backoff period, all restart attempts are rejected + 4. When backoff completes, one restart attempt is allowed + 5. If that attempt fails, a new backoff period begins + + ## Configuration + + The restart delay can be configured in three ways: + + 1. Fixed delay (integer in milliseconds): + ``` + restart_delay: 10_000 # 10 second delay between restarts + ``` + + 2. Dynamic delay (function that takes retry count): + ``` + restart_delay: fn retries -> retries * 5_000 end # Linear backoff + ``` + + 3. Default delay (when no option specified): + ``` + @default_backoff 5_000 # 5 second fixed delay + ``` + + ## Example Logs + + The throttle manager emits logs to track its behavior: + ``` + [info] Kafka client for MyApp.MyConsumer went down, scheduling retry + [debug] Setting restart backoff to 5000ms + [debug] Restart backoff of 5000ms completed + [info] Allowing restart for MyApp.MyConsumer + [info] Successfully connected to Kafka for MyApp.MyConsumer + ``` + """ + + use GenServer + require Logger + + @default_backoff 5_000 + + def start_link(module, backoff \\ @default_backoff) do + GenServer.start_link(__MODULE__, {0, nil, backoff}, name: name(module)) + end + + def can_restart?(module) do + restartable? = + module + |> name() + |> GenServer.call(:can_restart?) + + case restartable? do + true -> + Logger.info("Allowing restart for #{inspect(module)}") + true + + false -> + Logger.info("Throttling restart for #{inspect(module)}") + false + end + end + + @impl GenServer + def init(state) do + {:ok, state} + end + + @impl GenServer + def handle_call(:can_restart?, _from, {restarts, timer, backoff}) do + if timer do + {:reply, false, {restarts, timer, backoff}} + else + delay = calculate_backoff(backoff, restarts) + Logger.debug("Setting restart backoff to #{delay}ms") + new_timer = Process.send_after(self(), {:allow_restart, delay}, delay) + {:reply, true, {restarts + 1, new_timer, backoff}} + end + end + + @impl GenServer + def handle_info({:allow_restart, delay}, {restarts, _timer, backoff}) do + Logger.debug("Restart backoff of #{delay}ms completed") + {:noreply, {restarts, nil, backoff}} + end + + defp calculate_backoff(backoff, _restarts) when is_integer(backoff), do: backoff + defp calculate_backoff(backoff_fn, restarts) when is_function(backoff_fn, 1), do: backoff_fn.(restarts) + defp calculate_backoff(_, _), do: @default_backoff + + defp name(module) do + # credo:disable-for-next-line Credo.Check.Warning.UnsafeToAtom + :"#{inspect(module)}.BrodThrottleManager" + end +end diff --git a/lib/kafee/consumer/brod_adapter.ex b/lib/kafee/consumer/brod_adapter.ex index b858ca3..d5250f6 100644 --- a/lib/kafee/consumer/brod_adapter.ex +++ b/lib/kafee/consumer/brod_adapter.ex @@ -53,6 +53,11 @@ defmodule Kafee.Consumer.BrodAdapter do @impl Kafee.Consumer.Adapter @spec start_link(module(), Kafee.Consumer.options()) :: Supervisor.on_start() def start_link(consumer, options) do + Kafee.BrodSupervisor.start_link(consumer, options) + end + + @doc false + def start_brod_client(consumer, options) do Supervisor.start_link(__MODULE__, {consumer, options}) end diff --git a/lib/kafee/producer/async_adapter.ex b/lib/kafee/producer/async_adapter.ex index d4a4239..ff9ac36 100644 --- a/lib/kafee/producer/async_adapter.ex +++ b/lib/kafee/producer/async_adapter.ex @@ -153,7 +153,12 @@ defmodule Kafee.Producer.AsyncAdapter do @impl Kafee.Producer.Adapter @spec start_link(module(), Kafee.Producer.options()) :: Supervisor.on_start() def start_link(producer, options) do - Supervisor.start_link(__MODULE__, {producer, options}, name: producer) + Kafee.BrodSupervisor.start_link(producer, options) + end + + @doc false + def start_brod_client(producer, options) do + Supervisor.start_link(__MODULE__, {producer, options}) end @doc false diff --git a/lib/kafee/producer/sync_adapter.ex b/lib/kafee/producer/sync_adapter.ex index 1b4d4c6..bc402eb 100644 --- a/lib/kafee/producer/sync_adapter.ex +++ b/lib/kafee/producer/sync_adapter.ex @@ -52,6 +52,11 @@ defmodule Kafee.Producer.SyncAdapter do @impl Kafee.Producer.Adapter @spec start_link(module(), Kafee.Producer.options()) :: Supervisor.on_start() def start_link(producer, options) do + Kafee.BrodSupervisor.start_link(producer, options) + end + + @doc false + def start_brod_client(producer, options) do Supervisor.start_link(__MODULE__, {producer, options}) end