From 8607a7adfa951ddcc3d141371fa04c8d1e5e6d34 Mon Sep 17 00:00:00 2001 From: Matt Sutkowski Date: Fri, 29 Nov 2024 13:42:02 -0800 Subject: [PATCH 1/2] wip --- lib/kafee/brod_retry_manager.ex | 45 ++++++++++++++++ lib/kafee/brod_supervisor.ex | 82 +++++++++++++++++++++++++++++ lib/kafee/consumer/brod_adapter.ex | 5 ++ lib/kafee/producer/async_adapter.ex | 7 ++- lib/kafee/producer/sync_adapter.ex | 5 ++ 5 files changed, 143 insertions(+), 1 deletion(-) create mode 100644 lib/kafee/brod_retry_manager.ex create mode 100644 lib/kafee/brod_supervisor.ex diff --git a/lib/kafee/brod_retry_manager.ex b/lib/kafee/brod_retry_manager.ex new file mode 100644 index 0000000..1552019 --- /dev/null +++ b/lib/kafee/brod_retry_manager.ex @@ -0,0 +1,45 @@ +defmodule Kafee.BrodRetryManager do + use GenServer + require Logger + + @min_backoff 5_000 + + def start_link(supervisor, module, options) do + GenServer.start_link(__MODULE__, {supervisor, module, options}, name: manager_name(module)) + end + + @impl GenServer + def init(state = {supervisor, module, options}) do + case Kafee.BrodSupervisor.start_child(supervisor, module, options) do + {:ok, _pid} -> + {:ok, state} + + _error -> + Logger.warning("Initial Kafka connection failed for #{inspect(module)}, will retry in background") + schedule_retry() + {:ok, state} + end + end + + @impl GenServer + def handle_info(:retry, state = {supervisor, module, options}) do + case Kafee.BrodSupervisor.start_child(supervisor, module, options) do + {:ok, _pid} -> + Logger.info("Successfully connected to Kafka for #{inspect(module)}") + {:noreply, state} + + _error -> + schedule_retry() + {:noreply, state} + end + end + + defp schedule_retry do + Process.send_after(self(), :retry, @min_backoff) + end + + defp manager_name(module) do + # credo:disable-for-next-line Credo.Check.Warning.UnsafeToAtom + :"#{module}.BrodRetryManager" + end +end diff --git a/lib/kafee/brod_supervisor.ex b/lib/kafee/brod_supervisor.ex new file mode 100644 index 0000000..848c966 --- /dev/null +++ b/lib/kafee/brod_supervisor.ex @@ -0,0 +1,82 @@ +defmodule Kafee.BrodSupervisor do + @moduledoc """ + A supervisor for Brod-based producers and consumers that provides connection resiliency. + This prevents Kafka connection issues from affecting your application's supervision tree + and automatically retries connections when Kafka is unavailable. + """ + use DynamicSupervisor + require Logger + + @min_backoff 5_000 + @max_backoff 300_000 + + def start_link(module, options) do + name = supervisor_name(module) + + case DynamicSupervisor.start_link(__MODULE__, [], name: name) do + {:ok, pid} -> + {:ok, _retry_pid} = Kafee.BrodRetryManager.start_link(pid, module, options) + {:ok, pid} + + error -> + error + end + end + + @impl DynamicSupervisor + def init([]) do + DynamicSupervisor.init( + strategy: :one_for_one, + max_restarts: 100_000, + max_seconds: 1 + ) + end + + def start_child(supervisor, module, options) do + adapter = + case options[:adapter] do + nil -> nil + adapter when is_atom(adapter) -> adapter + {adapter, _opts} -> adapter + end + + backoff = + min( + @max_backoff, + options[:restart_delay] || @min_backoff + ) + + brod_options = + Keyword.merge(options, + retry_backoff_ms: backoff, + max_retries: 0 + ) + + child_spec = %{ + id: module, + start: {adapter, :start_brod_client, [module, brod_options]}, + restart: :permanent, + shutdown: 5_000, + restart_delay: backoff + } + + case DynamicSupervisor.start_child(supervisor, child_spec) do + {:ok, _pid} = ok -> + Logger.info("Started Kafka client for #{inspect(module)} using #{inspect(adapter)}") + ok + + {:error, {:already_started, pid}} -> + {:ok, pid} + + error -> + Logger.warning("Failed to start Kafka client for #{inspect(module)} using #{inspect(adapter)}") + + error + end + end + + defp supervisor_name(module) do + # credo:disable-for-next-line + :"#{module}.BrodSupervisor" + end +end diff --git a/lib/kafee/consumer/brod_adapter.ex b/lib/kafee/consumer/brod_adapter.ex index b858ca3..d5250f6 100644 --- a/lib/kafee/consumer/brod_adapter.ex +++ b/lib/kafee/consumer/brod_adapter.ex @@ -53,6 +53,11 @@ defmodule Kafee.Consumer.BrodAdapter do @impl Kafee.Consumer.Adapter @spec start_link(module(), Kafee.Consumer.options()) :: Supervisor.on_start() def start_link(consumer, options) do + Kafee.BrodSupervisor.start_link(consumer, options) + end + + @doc false + def start_brod_client(consumer, options) do Supervisor.start_link(__MODULE__, {consumer, options}) end diff --git a/lib/kafee/producer/async_adapter.ex b/lib/kafee/producer/async_adapter.ex index d4a4239..ff9ac36 100644 --- a/lib/kafee/producer/async_adapter.ex +++ b/lib/kafee/producer/async_adapter.ex @@ -153,7 +153,12 @@ defmodule Kafee.Producer.AsyncAdapter do @impl Kafee.Producer.Adapter @spec start_link(module(), Kafee.Producer.options()) :: Supervisor.on_start() def start_link(producer, options) do - Supervisor.start_link(__MODULE__, {producer, options}, name: producer) + Kafee.BrodSupervisor.start_link(producer, options) + end + + @doc false + def start_brod_client(producer, options) do + Supervisor.start_link(__MODULE__, {producer, options}) end @doc false diff --git a/lib/kafee/producer/sync_adapter.ex b/lib/kafee/producer/sync_adapter.ex index 1b4d4c6..bc402eb 100644 --- a/lib/kafee/producer/sync_adapter.ex +++ b/lib/kafee/producer/sync_adapter.ex @@ -52,6 +52,11 @@ defmodule Kafee.Producer.SyncAdapter do @impl Kafee.Producer.Adapter @spec start_link(module(), Kafee.Producer.options()) :: Supervisor.on_start() def start_link(producer, options) do + Kafee.BrodSupervisor.start_link(producer, options) + end + + @doc false + def start_brod_client(producer, options) do Supervisor.start_link(__MODULE__, {producer, options}) end From 6bd96673210004b948f6903f7f94dc7fee8f2728 Mon Sep 17 00:00:00 2001 From: Matt Sutkowski Date: Fri, 29 Nov 2024 17:27:58 -0800 Subject: [PATCH 2/2] maybe more betterer --- lib/kafee/brod_retry_manager.ex | 7 ++ lib/kafee/brod_supervisor.ex | 51 +++++++------- lib/kafee/brod_throttle_manager.ex | 105 +++++++++++++++++++++++++++++ 3 files changed, 136 insertions(+), 27 deletions(-) create mode 100644 lib/kafee/brod_throttle_manager.ex diff --git a/lib/kafee/brod_retry_manager.ex b/lib/kafee/brod_retry_manager.ex index 1552019..5ea2fc1 100644 --- a/lib/kafee/brod_retry_manager.ex +++ b/lib/kafee/brod_retry_manager.ex @@ -34,6 +34,13 @@ defmodule Kafee.BrodRetryManager do end end + # Handle process exits + def handle_info({:DOWN, _ref, :process, _pid, _reason}, state = {_supervisor, module, _options}) do + Logger.info("Kafka client for #{inspect(module)} went down, scheduling retry") + schedule_retry() + {:noreply, state} + end + defp schedule_retry do Process.send_after(self(), :retry, @min_backoff) end diff --git a/lib/kafee/brod_supervisor.ex b/lib/kafee/brod_supervisor.ex index 848c966..9b5bd78 100644 --- a/lib/kafee/brod_supervisor.ex +++ b/lib/kafee/brod_supervisor.ex @@ -1,20 +1,14 @@ defmodule Kafee.BrodSupervisor do - @moduledoc """ - A supervisor for Brod-based producers and consumers that provides connection resiliency. - This prevents Kafka connection issues from affecting your application's supervision tree - and automatically retries connections when Kafka is unavailable. - """ use DynamicSupervisor require Logger - @min_backoff 5_000 - @max_backoff 300_000 - def start_link(module, options) do name = supervisor_name(module) case DynamicSupervisor.start_link(__MODULE__, [], name: name) do {:ok, pid} -> + backoff = options[:restart_delay] || 1_000 + {:ok, _state_pid} = Kafee.BrodThrottleManager.start_link(module, backoff) {:ok, _retry_pid} = Kafee.BrodRetryManager.start_link(pid, module, options) {:ok, pid} @@ -25,6 +19,8 @@ defmodule Kafee.BrodSupervisor do @impl DynamicSupervisor def init([]) do + Process.flag(:trap_exit, true) + DynamicSupervisor.init( strategy: :one_for_one, max_restarts: 100_000, @@ -33,6 +29,16 @@ defmodule Kafee.BrodSupervisor do end def start_child(supervisor, module, options) do + case Kafee.BrodThrottleManager.can_restart?(module) do + true -> + do_start_child(supervisor, module, options) + + false -> + {:error, :throttled} + end + end + + defp do_start_child(supervisor, module, options) do adapter = case options[:adapter] do nil -> nil @@ -40,28 +46,16 @@ defmodule Kafee.BrodSupervisor do {adapter, _opts} -> adapter end - backoff = - min( - @max_backoff, - options[:restart_delay] || @min_backoff - ) - - brod_options = - Keyword.merge(options, - retry_backoff_ms: backoff, - max_retries: 0 - ) - child_spec = %{ id: module, - start: {adapter, :start_brod_client, [module, brod_options]}, - restart: :permanent, - shutdown: 5_000, - restart_delay: backoff + start: {adapter, :start_brod_client, [module, options]}, + restart: :temporary, + shutdown: 5_000 } case DynamicSupervisor.start_child(supervisor, child_spec) do - {:ok, _pid} = ok -> + {:ok, pid} = ok -> + Process.monitor(pid) Logger.info("Started Kafka client for #{inspect(module)} using #{inspect(adapter)}") ok @@ -70,13 +64,16 @@ defmodule Kafee.BrodSupervisor do error -> Logger.warning("Failed to start Kafka client for #{inspect(module)} using #{inspect(adapter)}") - error end end + def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do + {:noreply, state} + end + defp supervisor_name(module) do - # credo:disable-for-next-line + # credo:disable-for-next-line Credo.Check.Warning.UnsafeToAtom :"#{module}.BrodSupervisor" end end diff --git a/lib/kafee/brod_throttle_manager.ex b/lib/kafee/brod_throttle_manager.ex new file mode 100644 index 0000000..57af963 --- /dev/null +++ b/lib/kafee/brod_throttle_manager.ex @@ -0,0 +1,105 @@ +defmodule Kafee.BrodThrottleManager do + @moduledoc """ + Manages throttling of Kafka client restarts to prevent aggressive reconnection attempts. + + The throttle manager accepts either a fixed delay (integer) or a function that takes + the number of restarts and returns a delay value in milliseconds. + + ## Throttling Behavior + + When a Kafka client disconnects or fails: + 1. The client process terminates + 2. A backoff period begins (default: 5000ms) + 3. During the backoff period, all restart attempts are rejected + 4. When backoff completes, one restart attempt is allowed + 5. If that attempt fails, a new backoff period begins + + ## Configuration + + The restart delay can be configured in three ways: + + 1. Fixed delay (integer in milliseconds): + ``` + restart_delay: 10_000 # 10 second delay between restarts + ``` + + 2. Dynamic delay (function that takes retry count): + ``` + restart_delay: fn retries -> retries * 5_000 end # Linear backoff + ``` + + 3. Default delay (when no option specified): + ``` + @default_backoff 5_000 # 5 second fixed delay + ``` + + ## Example Logs + + The throttle manager emits logs to track its behavior: + ``` + [info] Kafka client for MyApp.MyConsumer went down, scheduling retry + [debug] Setting restart backoff to 5000ms + [debug] Restart backoff of 5000ms completed + [info] Allowing restart for MyApp.MyConsumer + [info] Successfully connected to Kafka for MyApp.MyConsumer + ``` + """ + + use GenServer + require Logger + + @default_backoff 5_000 + + def start_link(module, backoff \\ @default_backoff) do + GenServer.start_link(__MODULE__, {0, nil, backoff}, name: name(module)) + end + + def can_restart?(module) do + restartable? = + module + |> name() + |> GenServer.call(:can_restart?) + + case restartable? do + true -> + Logger.info("Allowing restart for #{inspect(module)}") + true + + false -> + Logger.info("Throttling restart for #{inspect(module)}") + false + end + end + + @impl GenServer + def init(state) do + {:ok, state} + end + + @impl GenServer + def handle_call(:can_restart?, _from, {restarts, timer, backoff}) do + if timer do + {:reply, false, {restarts, timer, backoff}} + else + delay = calculate_backoff(backoff, restarts) + Logger.debug("Setting restart backoff to #{delay}ms") + new_timer = Process.send_after(self(), {:allow_restart, delay}, delay) + {:reply, true, {restarts + 1, new_timer, backoff}} + end + end + + @impl GenServer + def handle_info({:allow_restart, delay}, {restarts, _timer, backoff}) do + Logger.debug("Restart backoff of #{delay}ms completed") + {:noreply, {restarts, nil, backoff}} + end + + defp calculate_backoff(backoff, _restarts) when is_integer(backoff), do: backoff + defp calculate_backoff(backoff_fn, restarts) when is_function(backoff_fn, 1), do: backoff_fn.(restarts) + defp calculate_backoff(_, _), do: @default_backoff + + defp name(module) do + # credo:disable-for-next-line Credo.Check.Warning.UnsafeToAtom + :"#{inspect(module)}.BrodThrottleManager" + end +end