-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: watch brod client restarts for manual control #124
base: main
Are you sure you want to change the base?
Conversation
require Logger | ||
|
||
# credo:disable-for-next-line Credo.Check.Readability.NestedFunctionCalls | ||
@log_prefix "#{inspect(__MODULE__)}]" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a huge fan of this. Elixir already has some built in metadata related to what is calling the logger which is more consistent and should be used instead of custom prefixes like this. In datadog this would be @logger.method_name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, ultimately depends on a preference for reading logs (and this is mine 😂 ). If you imagine a case of N jobs that all have similar logs and you're trying to look at a few different cases, the metadata part is just annoying -- even if pulling it into a column in the view.
) | ||
|
||
Process.sleep(@restart_delay) | ||
{:noreply, state, {:continue, :start_child}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh, TIL I didn't realize you could call :continue
from a handle_continue
function 🤯
lib/kafee/process_manager.ex
Outdated
{:noreply, state, {:continue, :start_child}} | ||
end | ||
|
||
defp start_child(%{child_spec: child_spec, supervisor: supervisor} = state) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we can just do {:continue, :start_child}
we can probably get rid of this function and make the handle_continue
function more direct.
So, just to be clear on trade offs, this sets the brod client to a temporary process so there will be times it does not exist. This means any sync work (like the sync producer) will error if trying to send a message when the client is down. You will want to use the async producer if you want better handling of errors. Two, there is a very small chance that this process manager process crashes or for some reason doesn't restart the client. I think it's pretty rare (thanks BEAM!) but worth mentioning. Lastly it might be useful to purge some of the brod client logs and attempt to rewrite them to something more helpful. Otherwise my much longer term thought is writing our own Kafka client off the kpro library that brod uses 🤷 Code looks good to me if. Limited testing seems to fix the issue. |
For the first point... that's what I'd expect from the sync producer. We're typically outboxing all kafka messages, or throwing them into a transaction that would ultimately get retried. The second one is more concerning, but I don't currently have an idea for that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this PR as it's more concise and seems more along the grains of OTP than #123 .
There are some comments / questions surrounding the work section of the new process.
One thing to add for the above comments - I suggest for the possibility of ProcessManager not starting the child:
I think you can have a threshold, or a max retry attempt number that, when reached, would just shutdown ProcessManager.
Relying on it's parent supervisor to restart ProcessManager (it will restart based on the restart strategy Supervisors have) will restart it, and ProcessManager will reset / retry / crash when max is reached. I think this is a good two-phase crash triage loop that could work imho.
Process.sleep(@restart_delay) | ||
{:noreply, state, {:continue, :start_child}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about moving it into its own function?
Also - to my knowledge GenServer's way of doing things shouldn't involve Process.sleep()
most of the time if it can help it.
How about perhaps Process.send_after
-> handle_info()
-> runing an abstracted out logic inside handle_continue
?
That way you can use handle_continue()
for init()
.
Alternative way of perhaps shortening the timeout function, with protection against repeated calls to start a child is with the GenServer timeout feature, instead of Process.send_after()
or Process.sleep()
. You'd write less boilerplate handler code.
For example:
defmodule SimpleTimeoutServer do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(:ok) do
# third argument here (or the last one after :continue) is the timeout millisec
# If no messages come into the process inbox, the `:timeout` message will emit into the process inbox
# if messages do come in, the timeout message scheduling is canceled
{:ok, 0, 5000}
end
@impl true
def handle_info(:timeout, state) do
IO.puts("Timeout occurred. Current state: #{state}")
# below code would set another timeout to happen
{:noreply, state, 5000}
end
end
Example from wms-service is in the StreamReleaseAgent.
@impl GenServer | ||
def handle_info({:DOWN, ref, :process, pid, reason}, %{monitor_ref: ref, child_pid: pid} = state) do | ||
Logger.info("#{@log_prefix} Child process down. Restarting in #{@restart_delay}ms...", | ||
reason: reason |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding the @log_prefix
here can allow for easier filtering by DataDog I think?
start: {Agent, :start_link, [fn -> %{} end]} | ||
} | ||
|
||
ProcessManager.start_link(opts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ProcessManager.start_link(opts) | |
assert{:error, {{:badkey, :supervisor}, _}} = ProcessManager.start_link(opts) |
I think that'd be enough and you don't need to check on the assert_receive
.
Checking the value on the return of this function showed:
{:error,
{{:badkey, :supervisor},
[
{:erlang, :map_get,
[
:supervisor,
%{
id: :test_child,
start: {Agent, :start_link,
[#Function<3.102245195/0 in Kafee.ProcessManagerTest."test start_link/1 fails to start without required supervisor option"/1>]}
}
], [error_info: %{module: :erl_erts_errors}]},
{Kafee.ProcessManager, :init, 1,
[file: ~c"lib/kafee/process_manager.ex", line: 28]},
{:gen_server, :init_it, 2, [file: ~c"gen_server.erl", line: 980]},
{:gen_server, :init_it, 6, [file: ~c"gen_server.erl", line: 935]},
{:proc_lib, :init_p_do_apply, 3, [file: ~c"proc_lib.erl", line: 241]}
]}}
Checklist
Problem
Brod client crashes everything when not able to connect to Kafka in some instances.
Details
This is an alternative approach to #123 that should hopefully simplify things by using the same supervisor to what's currently in use. Still needs testing.