Skip to content

Commit

Permalink
connection manager
Browse files Browse the repository at this point in the history
  • Loading branch information
yang-bsft committed Sep 22, 2021
1 parent c717503 commit 459381b
Show file tree
Hide file tree
Showing 9 changed files with 409 additions and 77 deletions.
6 changes: 4 additions & 2 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use Mix.Config

config :logger_json, :backend, metadata: :all
config :logger, :console,
format: "$time [$level] $message $metadata\n",
metadata: [:broker, :topic, :partition]

config :pulsar_ex, producer_module: PulsarEx

config :pulsar, brokers: ["localhost"], port: 6650, admin_port: 8080
config :pulsar, brokers: ["localhost"], admin_port: 8080

config :pulsar, topics: ["persistent://utx/jobs/users.json"]

Expand Down
19 changes: 5 additions & 14 deletions lib/pulsar.ex
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
defmodule Pulsar do
alias Pulsar.{Topic, PartitionProducer, ProducerManager, ProducerRegistry}
alias Pulsar.{PartitionProducer, ProducerManager}

def produce(topic_name, %{partition_key: nil} = message) when is_binary(topic_name) do
with [] <- ProducerRegistry.lookup(topic_name),
:ok <- ProducerManager.create(topic_name) do
produce(topic_name, message)
else
producers when is_list(producers) ->
{_, {pool, _}} = producers |> Enum.random()

:poolboy.transaction(pool, fn pid ->
PartitionProducer.produce(pid, message)
end)

err ->
err
with {:ok, pool} <- ProducerManager.get_producer(topic_name) do
:poolboy.transaction(pool, fn pid ->
PartitionProducer.produce(pid, message)
end)
end
end

Expand Down
18 changes: 18 additions & 0 deletions lib/pulsar/application.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Pulsar.Application do
@moduledoc false

use Application

alias Pulsar.{ConnectionSupervisor, ProducerSupervisor}

@impl true
def start(_type, _args) do
children = [
ConnectionSupervisor,
ProducerSupervisor
]

opts = [strategy: :one_for_one, name: Pulsar.Supervisor]
Supervisor.start_link(children, opts)
end
end
4 changes: 4 additions & 0 deletions lib/pulsar/broker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ defmodule Pulsar.Broker do
_ -> {:error, :malformatted_broker_url}
end
end

def to_name(%Pulsar.Broker{host: host, port: port}) do
"#{host}:#{port}"
end
end
216 changes: 187 additions & 29 deletions lib/pulsar/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,85 @@ defmodule Pulsar.Connection do

require Logger

alias Pulsar.Proto
alias Pulsar.{Proto, Broker, Topic}

@connection_timeout 15_000
@client_version "Pulsar Ex #{Mix.Project.config()[:version]}"
@protocol_version 13
@ping_interval 45_000
@request_timeout Timex.Duration.from_seconds(5)

@route_trip_command MapSet.new([:PRODUCER_SUCCESS])

defmodule State do
@enforce_keys [
:broker,
:socket_opts,
:timeout,
:last_request_id,
:last_producer_id,
:requests,
:metadata
]

defstruct [
:socket,
:host,
:port,
:broker,
# socket_opts: https://erlang.org/doc/man/gen_tcp.html#type-option
:socket_opts,
:timeout,
:last_server_ts
:last_request_id,
:last_producer_id,
:requests,
:last_server_ts,
:metadata
]
end

def start_link(host, port, socket_opts \\ [], timeout \\ @connection_timeout)
def create_producer(connection, %Topic{} = partition) do
with {:ok, command} <- GenServer.call(connection, {:create_producer, partition}) do
{:ok, {connection, command}}
end
end

def start_link(host, port, socket_opts, timeout) do
Connection.start_link(__MODULE__, {host, port, socket_opts, timeout})
def start_link(%Broker{} = broker) do
Connection.start_link(__MODULE__, broker)
end

def close(conn), do: Connection.call(conn, :close)

def init({host, port, socket_opts, timeout}) do
state = %State{host: host, port: port, socket_opts: socket_opts, timeout: timeout}
def init(%Broker{} = broker) do
socket_opts = Application.get_env(:pulsar, :socket_opts, [])
timeout = Application.get_env(:pulsar, :connection_timneout, @connection_timeout)

state = %State{
broker: broker,
socket_opts: socket_opts,
timeout: timeout,
last_request_id: 0,
last_producer_id: 0,
requests: %{},
metadata: [broker: Broker.to_name(broker)]
}

{:connect, :init, state}
end

@doc """
socket_opts: https://erlang.org/doc/man/gen_tcp.html#type-option
"""
def connect(
:init,
%{host: host, port: port, socket_opts: socket_opts, timeout: timeout} = state
%{
broker: %{host: host, port: port},
socket_opts: socket_opts,
timeout: timeout
} = state
) do
Logger.debug("Connecting to broker", state: state)
Logger.debug("Connecting to broker", state.metadata)

with {:ok, socket} <-
:gen_tcp.connect(to_charlist(host), port, optimize_socket_opts(socket_opts), timeout),
:ok <- do_handshake(socket, state) do
:inet.setopts(socket, active: :once)
Process.send_after(self(), :ping, @ping_interval)
Process.send_after(self(), :send_ping, @ping_interval)

{:ok,
%{
Expand All @@ -57,7 +91,7 @@ defmodule Pulsar.Connection do
}}
else
err ->
Logger.error("Error connecting to pulsar #{inspect(err)}", state: state)
Logger.error("Error connecting to pulsar #{inspect(err)}", state.metadata)
{:stop, err, state}
end
end
Expand All @@ -76,7 +110,7 @@ defmodule Pulsar.Connection do
end

defp do_handshake(socket, state) do
Logger.debug("Sending handshake to broker", state: state)
Logger.debug("Sending handshake to broker", state.metadata)

packet =
Proto.CommandConnect.new(
Expand All @@ -91,7 +125,8 @@ defmodule Pulsar.Connection do
:ok
else
err ->
Logger.error("Error establishing handshake with pulsar #{inspect(err)}", state: state)
Logger.error("Error establishing handshake with pulsar #{inspect(err)}", state.metadata)

err
end
end
Expand Down Expand Up @@ -124,20 +159,65 @@ defmodule Pulsar.Connection do

case info do
{:close, from} ->
Logger.info("Disconnecting from broker as requested", state: state)
Logger.info("Disconnecting from broker as requested", state.metadata)
Connection.reply(from, :ok)
{:stop, :normal, state}

{:error, :closed} ->
Logger.warn("Underlying connection closed", state: state)
Logger.warn("Underlying connection closed", state.metadata)
{:stop, :closed, state}

{:error, reason} = err ->
Logger.error("Disconnecting from broker for #{inspect(reason)}", state: state)
Logger.error("Disconnecting from broker for #{inspect(reason)}", state.metadata)

{:stop, err, state}
end
end

def handle_call(
{:create_producer, %Topic{} = partition},
from,
%{
socket: socket,
last_request_id: last_request_id,
last_producer_id: last_producer_id,
requests: requests
} = state
) do
Logger.debug(
"Creating producer to broker",
state.metadata ++ [topic: Topic.to_logical_name(partition), partition: partition.partition]
)

request_id = last_request_id + 1
producer_id = last_producer_id + 1

packet =
Proto.CommandProducer.new(
topic: Topic.to_name(partition),
producer_id: producer_id,
request_id: request_id,
epoch: 0,
user_provided_producer_name: false
)
|> encode_command()

new_state = %{
state
| last_request_id: request_id,
last_producer_id: producer_id
}

case :gen_tcp.send(socket, packet) do
:ok ->
{:noreply,
%{new_state | requests: Map.put(requests, request_id, {from, Timex.now(), partition})}}

err ->
{:reply, err, new_state}
end
end

def handle_call(:close, from, state) do
{:disconnect, {:close, from}, state}
end
Expand All @@ -152,28 +232,33 @@ defmodule Pulsar.Connection do
{command, _} = decode_packet(packet)

Logger.debug(
"Received #{command.type} command from broker #{inspect(get_inner_command(command))}",
state: state
"Received #{command.type} command from broker, #{inspect(get_inner_command(command))}",
state.metadata
)

if command.ping != nil do
Process.send(self(), :pong, [])
Process.send(self(), :send_pong, [])
end

state = handle_command(command, state)

:inet.setopts(socket, active: :once)
{:noreply, %{state | last_server_ts: Timex.now()}}
end

def handle_info(:ping, %{socket: socket, last_server_ts: last_server_ts} = state) do
def handle_info(
:send_ping,
%{socket: socket, last_server_ts: last_server_ts} = state
) do
expiry = @ping_interval |> Timex.Duration.from_milliseconds() |> Timex.Duration.scale(2)

cond do
Timex.after?(Timex.now(), Timex.add(last_server_ts, expiry)) ->
Logger.warn("Connection expired", state: state)
Logger.warn("Connection expired", state.metadata)
{:disconnect, {:error, :closed}, state}

true ->
Logger.debug("Sending PING to broker", state: state)
Logger.debug("Sending PING to broker", state.metadata)

packet = Proto.CommandPing.new() |> encode_command()

Expand All @@ -184,8 +269,8 @@ defmodule Pulsar.Connection do
end
end

def handle_info(:pong, %{socket: socket} = state) do
Logger.debug("Sending PONG to broker", state: state)
def handle_info(:send_pong, %{socket: socket} = state) do
Logger.debug("Sending PONG to broker", state.metadata)

packet = Proto.CommandPong.new() |> encode_command()

Expand All @@ -195,6 +280,79 @@ defmodule Pulsar.Connection do
end
end

defp handle_command(%Proto.BaseCommand{type: nil}, state) do
Logger.error("Command type is missing", state.metadata)
state
end

defp handle_command(
%Proto.BaseCommand{type: type} = command,
%{requests: requests} = state
) do
command = get_inner_command(command)

cond do
command == nil ->
Logger.error("#{type} has nil inner command", state.metadata)

state

MapSet.member?(@route_trip_command, type) && command.request_id == nil ->
Logger.error("#{type} has nil request_id", state.metadata)

state

MapSet.member?(@route_trip_command, type) && Map.get(requests, command.request_id) == nil ->
Logger.error("#{type} arrived unexpectedly", state.metadata)

state

MapSet.member?(@route_trip_command, type) ->
{_, ts, _} = Map.get(requests, command.request_id)

if Timex.after?(Timex.now(), Timex.add(ts, @request_timeout)) do
Logger.warn(
"#{type} took #{Timex.diff(Timex.now(), ts, :milliseconds)} to arrive",
state.metadata
)
end

handle_command(command, state)

true ->
handle_command(command, state)
end
end

defp handle_command(
%Proto.CommandProducerSuccess{} = command,
%{requests: requests} = state
) do
{from, _, partition} = Map.get(requests, command.request_id)

case command do
%{producer_ready: true} ->
GenServer.reply(from, {:ok, command})
%{state | requests: Map.delete(requests, command.request_id)}

_ ->
Logger.warn(
"producer not ready yet for topic #{Topic.to_name(partition)}",
state.metadata
)

state
end
end

defp handle_command(%Proto.CommandPing{}, state), do: state
defp handle_command(%Proto.CommandPong{}, state), do: state

defp handle_command(command, state) do
Logger.warn("unexpected command received #{inspect(command)}", state.metadata)
state
end

defp to_base_command(%Proto.BaseCommand{} = command), do: command

defp to_base_command(%Proto.CommandConnect{} = command),
Expand Down
Loading

0 comments on commit 459381b

Please sign in to comment.