Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: watch brod client restarts for manual control #124

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

btkostner
Copy link
Contributor

Checklist

Problem

Brod client crashes everything when not able to connect to Kafka in some instances.

Details

This is an alternative approach to #123 that should hopefully simplify things by using the same supervisor to what's currently in use. Still needs testing.

require Logger

# credo:disable-for-next-line Credo.Check.Readability.NestedFunctionCalls
@log_prefix "#{inspect(__MODULE__)}]"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a huge fan of this. Elixir already has some built in metadata related to what is calling the logger which is more consistent and should be used instead of custom prefixes like this. In datadog this would be @logger.method_name

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, ultimately depends on a preference for reading logs (and this is mine 😂 ). If you imagine a case of N jobs that all have similar logs and you're trying to look at a few different cases, the metadata part is just annoying -- even if pulling it into a column in the view.

)

Process.sleep(@restart_delay)
{:noreply, state, {:continue, :start_child}}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, TIL I didn't realize you could call :continue from a handle_continue function 🤯

{:noreply, state, {:continue, :start_child}}
end

defp start_child(%{child_spec: child_spec, supervisor: supervisor} = state) do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we can just do {:continue, :start_child} we can probably get rid of this function and make the handle_continue function more direct.

@btkostner
Copy link
Contributor Author

So, just to be clear on trade offs, this sets the brod client to a temporary process so there will be times it does not exist. This means any sync work (like the sync producer) will error if trying to send a message when the client is down. You will want to use the async producer if you want better handling of errors.

Two, there is a very small chance that this process manager process crashes or for some reason doesn't restart the client. I think it's pretty rare (thanks BEAM!) but worth mentioning.

Lastly it might be useful to purge some of the brod client logs and attempt to rewrite them to something more helpful. Otherwise my much longer term thought is writing our own Kafka client off the kpro library that brod uses 🤷

Code looks good to me if. Limited testing seems to fix the issue.

@msutkowski
Copy link
Member

So, just to be clear on trade offs, this sets the brod client to a temporary process so there will be times it does not exist. This means any sync work (like the sync producer) will error if trying to send a message when the client is down. You will want to use the async producer if you want better handling of errors.

Two, there is a very small chance that this process manager process crashes or for some reason doesn't restart the client. I think it's pretty rare (thanks BEAM!) but worth mentioning.

Lastly it might be useful to purge some of the brod client logs and attempt to rewrite them to something more helpful. Otherwise my much longer term thought is writing our own Kafka client off the kpro library that brod uses 🤷

Code looks good to me if. Limited testing seems to fix the issue.

For the first point... that's what I'd expect from the sync producer. We're typically outboxing all kafka messages, or throwing them into a transaction that would ultimately get retried.

The second one is more concerning, but I don't currently have an idea for that.

Copy link
Contributor

@seungjinstord seungjinstord left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this PR as it's more concise and seems more along the grains of OTP than #123 .
There are some comments / questions surrounding the work section of the new process.

One thing to add for the above comments - I suggest for the possibility of ProcessManager not starting the child:
I think you can have a threshold, or a max retry attempt number that, when reached, would just shutdown ProcessManager.

Relying on it's parent supervisor to restart ProcessManager (it will restart based on the restart strategy Supervisors have) will restart it, and ProcessManager will reset / retry / crash when max is reached. I think this is a good two-phase crash triage loop that could work imho.

Comment on lines +61 to +62
Process.sleep(@restart_delay)
{:noreply, state, {:continue, :start_child}}
Copy link
Contributor

@seungjinstord seungjinstord Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving it into its own function?
Also - to my knowledge GenServer's way of doing things shouldn't involve Process.sleep() most of the time if it can help it.

How about perhaps Process.send_after -> handle_info() -> runing an abstracted out logic inside handle_continue?

That way you can use handle_continue() for init().

Alternative way of perhaps shortening the timeout function, with protection against repeated calls to start a child is with the GenServer timeout feature, instead of Process.send_after() or Process.sleep(). You'd write less boilerplate handler code.

For example:

defmodule SimpleTimeoutServer do
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  @impl true
  def init(:ok) do
    # third argument here (or the last one after :continue) is the timeout millisec
    # If no messages come into the process inbox, the `:timeout` message will emit into the process inbox
    # if messages do come in, the timeout message scheduling is canceled
    {:ok, 0, 5000}
  end

  @impl true
  def handle_info(:timeout, state) do
    IO.puts("Timeout occurred. Current state: #{state}")
    # below code would set another timeout to happen
    {:noreply, state, 5000}
  end
end

Example from wms-service is in the StreamReleaseAgent.

@impl GenServer
def handle_info({:DOWN, ref, :process, pid, reason}, %{monitor_ref: ref, child_pid: pid} = state) do
Logger.info("#{@log_prefix} Child process down. Restarting in #{@restart_delay}ms...",
reason: reason
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the @log_prefix here can allow for easier filtering by DataDog I think?

start: {Agent, :start_link, [fn -> %{} end]}
}

ProcessManager.start_link(opts)
Copy link
Contributor

@seungjinstord seungjinstord Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ProcessManager.start_link(opts)
assert{:error, {{:badkey, :supervisor}, _}} = ProcessManager.start_link(opts)

I think that'd be enough and you don't need to check on the assert_receive.
Checking the value on the return of this function showed:

{:error,
 {{:badkey, :supervisor},
  [
    {:erlang, :map_get,
     [
       :supervisor,
       %{
         id: :test_child,
         start: {Agent, :start_link,
          [#Function<3.102245195/0 in Kafee.ProcessManagerTest."test start_link/1 fails to start without required supervisor option"/1>]}
       }
     ], [error_info: %{module: :erl_erts_errors}]},
    {Kafee.ProcessManager, :init, 1,
     [file: ~c"lib/kafee/process_manager.ex", line: 28]},
    {:gen_server, :init_it, 2, [file: ~c"gen_server.erl", line: 980]},
    {:gen_server, :init_it, 6, [file: ~c"gen_server.erl", line: 935]},
    {:proc_lib, :init_p_do_apply, 3, [file: ~c"proc_lib.erl", line: 241]}
  ]}}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants