Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: more resilient brod-based kafka connection failure handling [wip] #123

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions lib/kafee/brod_retry_manager.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Kafee.BrodRetryManager do
use GenServer
require Logger

@min_backoff 5_000

def start_link(supervisor, module, options) do
GenServer.start_link(__MODULE__, {supervisor, module, options}, name: manager_name(module))
end

@impl GenServer
def init(state = {supervisor, module, options}) do
case Kafee.BrodSupervisor.start_child(supervisor, module, options) do
{:ok, _pid} ->
{:ok, state}

_error ->
Logger.warning("Initial Kafka connection failed for #{inspect(module)}, will retry in background")
schedule_retry()
{:ok, state}
end
end

@impl GenServer
def handle_info(:retry, state = {supervisor, module, options}) do
case Kafee.BrodSupervisor.start_child(supervisor, module, options) do
{:ok, _pid} ->
Logger.info("Successfully connected to Kafka for #{inspect(module)}")
{:noreply, state}

_error ->
schedule_retry()
{:noreply, state}
end
end

# Handle process exits
def handle_info({:DOWN, _ref, :process, _pid, _reason}, state = {_supervisor, module, _options}) do
Logger.info("Kafka client for #{inspect(module)} went down, scheduling retry")
schedule_retry()
{:noreply, state}
end

defp schedule_retry do
Process.send_after(self(), :retry, @min_backoff)
end

defp manager_name(module) do
# credo:disable-for-next-line Credo.Check.Warning.UnsafeToAtom
:"#{module}.BrodRetryManager"
end
end
79 changes: 79 additions & 0 deletions lib/kafee/brod_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
defmodule Kafee.BrodSupervisor do
use DynamicSupervisor
require Logger

def start_link(module, options) do
name = supervisor_name(module)

case DynamicSupervisor.start_link(__MODULE__, [], name: name) do
{:ok, pid} ->
backoff = options[:restart_delay] || 1_000
{:ok, _state_pid} = Kafee.BrodThrottleManager.start_link(module, backoff)
{:ok, _retry_pid} = Kafee.BrodRetryManager.start_link(pid, module, options)
{:ok, pid}

error ->
error
end
end

@impl DynamicSupervisor
def init([]) do
Process.flag(:trap_exit, true)

DynamicSupervisor.init(
strategy: :one_for_one,
max_restarts: 100_000,
max_seconds: 1
)
end

def start_child(supervisor, module, options) do
case Kafee.BrodThrottleManager.can_restart?(module) do
true ->
do_start_child(supervisor, module, options)

false ->
{:error, :throttled}
end
end

defp do_start_child(supervisor, module, options) do
adapter =
case options[:adapter] do
nil -> nil
adapter when is_atom(adapter) -> adapter
{adapter, _opts} -> adapter
end

child_spec = %{
id: module,
start: {adapter, :start_brod_client, [module, options]},
restart: :temporary,
shutdown: 5_000
}

case DynamicSupervisor.start_child(supervisor, child_spec) do
{:ok, pid} = ok ->
Process.monitor(pid)
Logger.info("Started Kafka client for #{inspect(module)} using #{inspect(adapter)}")
ok

{:error, {:already_started, pid}} ->
{:ok, pid}

error ->
Logger.warning("Failed to start Kafka client for #{inspect(module)} using #{inspect(adapter)}")
error
end
end

def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
{:noreply, state}
end

defp supervisor_name(module) do
# credo:disable-for-next-line Credo.Check.Warning.UnsafeToAtom
:"#{module}.BrodSupervisor"
end
end
105 changes: 105 additions & 0 deletions lib/kafee/brod_throttle_manager.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
defmodule Kafee.BrodThrottleManager do
@moduledoc """
Manages throttling of Kafka client restarts to prevent aggressive reconnection attempts.

The throttle manager accepts either a fixed delay (integer) or a function that takes
the number of restarts and returns a delay value in milliseconds.

## Throttling Behavior

When a Kafka client disconnects or fails:
1. The client process terminates
2. A backoff period begins (default: 5000ms)
3. During the backoff period, all restart attempts are rejected
4. When backoff completes, one restart attempt is allowed
5. If that attempt fails, a new backoff period begins

## Configuration

The restart delay can be configured in three ways:

1. Fixed delay (integer in milliseconds):
```
restart_delay: 10_000 # 10 second delay between restarts
```

2. Dynamic delay (function that takes retry count):
```
restart_delay: fn retries -> retries * 5_000 end # Linear backoff
```

3. Default delay (when no option specified):
```
@default_backoff 5_000 # 5 second fixed delay
```

## Example Logs

The throttle manager emits logs to track its behavior:
```
[info] Kafka client for MyApp.MyConsumer went down, scheduling retry
[debug] Setting restart backoff to 5000ms
[debug] Restart backoff of 5000ms completed
[info] Allowing restart for MyApp.MyConsumer
[info] Successfully connected to Kafka for MyApp.MyConsumer
```
"""

use GenServer
require Logger

@default_backoff 5_000

def start_link(module, backoff \\ @default_backoff) do
GenServer.start_link(__MODULE__, {0, nil, backoff}, name: name(module))
end

def can_restart?(module) do
restartable? =
module
|> name()
|> GenServer.call(:can_restart?)

case restartable? do
true ->
Logger.info("Allowing restart for #{inspect(module)}")
true

false ->
Logger.info("Throttling restart for #{inspect(module)}")
false
end
end

@impl GenServer
def init(state) do
{:ok, state}
end

@impl GenServer
def handle_call(:can_restart?, _from, {restarts, timer, backoff}) do
if timer do
{:reply, false, {restarts, timer, backoff}}
else
delay = calculate_backoff(backoff, restarts)
Logger.debug("Setting restart backoff to #{delay}ms")
new_timer = Process.send_after(self(), {:allow_restart, delay}, delay)
{:reply, true, {restarts + 1, new_timer, backoff}}
end
end

@impl GenServer
def handle_info({:allow_restart, delay}, {restarts, _timer, backoff}) do
Logger.debug("Restart backoff of #{delay}ms completed")
{:noreply, {restarts, nil, backoff}}
end

defp calculate_backoff(backoff, _restarts) when is_integer(backoff), do: backoff
defp calculate_backoff(backoff_fn, restarts) when is_function(backoff_fn, 1), do: backoff_fn.(restarts)
defp calculate_backoff(_, _), do: @default_backoff

defp name(module) do
# credo:disable-for-next-line Credo.Check.Warning.UnsafeToAtom
:"#{inspect(module)}.BrodThrottleManager"
end
end
5 changes: 5 additions & 0 deletions lib/kafee/consumer/brod_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ defmodule Kafee.Consumer.BrodAdapter do
@impl Kafee.Consumer.Adapter
@spec start_link(module(), Kafee.Consumer.options()) :: Supervisor.on_start()
def start_link(consumer, options) do
Kafee.BrodSupervisor.start_link(consumer, options)
end

@doc false
def start_brod_client(consumer, options) do
Supervisor.start_link(__MODULE__, {consumer, options})
end

Expand Down
7 changes: 6 additions & 1 deletion lib/kafee/producer/async_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,12 @@ defmodule Kafee.Producer.AsyncAdapter do
@impl Kafee.Producer.Adapter
@spec start_link(module(), Kafee.Producer.options()) :: Supervisor.on_start()
def start_link(producer, options) do
Supervisor.start_link(__MODULE__, {producer, options}, name: producer)
Kafee.BrodSupervisor.start_link(producer, options)
end

@doc false
def start_brod_client(producer, options) do
Supervisor.start_link(__MODULE__, {producer, options})
end

@doc false
Expand Down
5 changes: 5 additions & 0 deletions lib/kafee/producer/sync_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ defmodule Kafee.Producer.SyncAdapter do
@impl Kafee.Producer.Adapter
@spec start_link(module(), Kafee.Producer.options()) :: Supervisor.on_start()
def start_link(producer, options) do
Kafee.BrodSupervisor.start_link(producer, options)
end

@doc false
def start_brod_client(producer, options) do
Supervisor.start_link(__MODULE__, {producer, options})
end

Expand Down
Loading