Skip to content

Commit

Permalink
feat: add communication via partisan for client and db handlers inter…
Browse files Browse the repository at this point in the history
…action (#317)

* feat: add communication via partisan for client and db handlers interaction
* extend the postgre cluster strategy to connect partisan peers
* fix rust setup for GA
* fix metrics
  • Loading branch information
abc3 authored Apr 11, 2024
1 parent 1b0b2b2 commit dbb48ff
Show file tree
Hide file tree
Showing 15 changed files with 201 additions and 41 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/stage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ jobs:
echo 'erlang 25.3.2.7' >> ~/.tool-versions
elixir -v
- name: Set up Rust
uses: ATiltedTree/setup-rust@v1
uses: dtolnay/rust-toolchain@v1
with:
rust-version: stable
toolchain: stable
- name: Get git tags
run: git fetch --tags origin
- name: Checkout RELEASE_FROM
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/staging_linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ jobs:
echo 'erlang 25.3.2.7' >> ~/.tool-versions
elixir -v
- name: Set up Rust
uses: ATiltedTree/setup-rust@v1
uses: dtolnay/rust-toolchain@v1
with:
rust-version: stable
toolchain: stable
- name: Cache Mix
uses: actions/cache@v3
with:
Expand Down
18 changes: 17 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ dev:
SECRET_KEY_BASE="dev" \
CLUSTER_POSTGRES="true" \
DB_POOL_SIZE="5" \
ERL_AFLAGS="-kernel shell_history enabled" \
ERL_AFLAGS="-kernel shell_history enabled +zdbbl 2097151" \
iex --name [email protected] --cookie cookie -S mix run --no-halt

dev.node2:
Expand All @@ -26,9 +26,25 @@ dev.node2:
CLUSTER_POSTGRES="true" \
PROXY_PORT_SESSION="5442" \
PROXY_PORT_TRANSACTION="6553" \
PARTISAN_PEER_PORT="10201" \
ERL_AFLAGS="-kernel shell_history enabled" \
iex --name [email protected] --cookie cookie -S mix phx.server

dev.node3:
PORT=4002 \
MIX_ENV=dev \
VAULT_ENC_KEY="aHD8DZRdk2emnkdktFZRh3E9RNg4aOY7" \
API_JWT_SECRET=dev \
METRICS_JWT_SECRET=dev \
REGION=eu \
SECRET_KEY_BASE="dev" \
CLUSTER_POSTGRES="true" \
PROXY_PORT_SESSION="5443" \
PROXY_PORT_TRANSACTION="6554" \
PARTISAN_PEER_PORT="10202" \
ERL_AFLAGS="-kernel shell_history enabled" \
iex --name [email protected] --cookie cookie -S mix phx.server

dev_bin:
MIX_ENV=dev mix release supavisor_bin && ls -l burrito_out

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.48
1.1.49
20 changes: 19 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ topologies =
config: [
url: System.get_env("DATABASE_URL", "ecto://postgres:postgres@localhost:6432/postgres"),
heartbeat_interval: 5_000,
channel_name: "supavisor_#{region}_#{maj}_#{min}"
channel_name: "supavisor_#{region}_#{maj}_#{min}",
channel_name_partisan: "supavisor_partisan_#{region}_#{maj}_#{min}"
]
]

Expand Down Expand Up @@ -171,6 +172,23 @@ if config_env() != :test do
tag: "AES.GCM.V1", key: System.get_env("VAULT_ENC_KEY")
}
]

config :partisan,
# Which overlay to use
peer_service_manager: :partisan_pluggable_peer_service_manager,
listen_addrs: [
{
System.get_env("PARTISAN_PEER_IP", "127.0.0.1"),
String.to_integer(System.get_env("PARTISAN_PEER_PORT", "20100"))
}
],
channels: [
data: %{parallelism: System.get_env("PARTISAN_PARALLELISM", "5") |> String.to_integer()}
],
# Encoding for pid(), reference() and names
pid_encoding: false,
ref_encoding: false,
remote_ref_format: :improper_list
end

if System.get_env("LOGS_ENGINE") == "logflare" do
Expand Down
26 changes: 26 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ config :supavisor, Supavisor.Repo,
pool_size: 10,
port: 6432

config :partisan,
# Which overlay to use
peer_service_manager: :partisan_pluggable_peer_service_manager,
# The listening port for Partisan TCP/IP connections
peer_port: 10200,
channels: [data: %{parallelism: 1}],
# Encoding for pid(), reference() and names
pid_encoding: false,
ref_encoding: false,
remote_ref_format: :improper_list

# We don't run a server during test. If one is required,
# you can enable the server option below.
config :supavisor, SupavisorWeb.Endpoint,
Expand All @@ -43,3 +54,18 @@ config :logger, :console,

# Initialize plugs at runtime for faster test compilation
config :phoenix, :plug_init_mode, :runtime

config :partisan,
peer_service_manager: :partisan_pluggable_peer_service_manager,
listen_addrs: [
{
System.get_env("PARTISAN_PEER_IP", "127.0.0.1"),
String.to_integer(System.get_env("PARTISAN_PEER_PORT", "10200"))
}
],
channels: [
data: %{parallelism: System.get_env("PARTISAN_PARALLELISM", "5") |> String.to_integer()}
],
pid_encoding: false,
ref_encoding: false,
remote_ref_format: :improper_list
96 changes: 85 additions & 11 deletions lib/cluster/strategy/postgres.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ defmodule Cluster.Strategy.Postgres do
"""
use GenServer

@vsn "1.1.49"

alias Cluster.Strategy
alias Cluster.Logger
alias Postgrex, as: P
Expand All @@ -36,6 +38,7 @@ defmodule Cluster.Strategy.Postgres do
state.config
|> Keyword.put_new(:heartbeat_interval, 5_000)
|> Keyword.put_new(:channel_name, "cluster")
|> Keyword.put_new(:channel_name_partisan, "cluster_partisan")
|> Keyword.delete(:url)

meta = %{
Expand All @@ -51,7 +54,8 @@ defmodule Cluster.Strategy.Postgres do
def handle_continue(:connect, state) do
with {:ok, conn} <- P.start_link(state.meta.opts.()),
{:ok, conn_notif} <- P.Notifications.start_link(state.meta.opts.()),
{_, _} <- P.Notifications.listen(conn_notif, state.config[:channel_name]) do
{_, _} <- P.Notifications.listen(conn_notif, state.config[:channel_name]),
{_, _} <- P.Notifications.listen(conn_notif, state.config[:channel_name_partisan]) do
Logger.info(state.topology, "Connected to Postgres database")

meta = %{
Expand All @@ -72,12 +76,60 @@ defmodule Cluster.Strategy.Postgres do
def handle_info(:heartbeat, state) do
Process.cancel_timer(state.meta.heartbeat_ref)
P.query(state.meta.conn, "NOTIFY #{state.config[:channel_name]}, '#{node()}'", [])

P.query(
state.meta.conn,
"NOTIFY #{state.config[:channel_name_partisan]}, '#{partisan_peer_spec_enc()}'",
[]
)

ref = heartbeat(state.config[:heartbeat_interval])
{:noreply, put_in(state.meta.heartbeat_ref, ref)}
end

def handle_info({:notification, _, _, _, node}, state) do
node = String.to_atom(node)
def handle_info({:notification, _, _, channel, msg}, state) do
disterl = state.config[:channel_name]
partisan = state.config[:channel_name_partisan]

case channel do
^disterl -> handle_channels(:disterl, msg, state)
^partisan -> handle_channels(:partisan, msg, state)
other -> Logger.error(state.topology, "Unknown channel: #{other}")
end

{:noreply, state}
end

def handle_info(msg, state) do
Logger.error(state.topology, "Undefined message #{inspect(msg, pretty: true)}")
{:noreply, state}
end

def code_change("1.1.48", state, _) do
Logger.info(state.topology, "Update state from 1.1.48")

partisan_channel =
Application.get_env(:libcluster, :topologies)
|> get_in([:postgres, :config, :channel_name_partisan])

new_config =
state.config
|> Keyword.put(:channel_name_partisan, partisan_channel)

{:ok, %{state | config: new_config}}
end

def code_change(_, state, _), do: {:ok, state}

### Internal functions
@spec heartbeat(non_neg_integer()) :: reference()
defp heartbeat(interval) when interval >= 0 do
Process.send_after(self(), :heartbeat, interval)
end

@spec handle_channels(:disterl | :partisan, String.t(), map()) :: any()
def handle_channels(:disterl, msg, state) do
node = String.to_atom(msg)

if node != node() do
topology = state.topology
Expand All @@ -91,18 +143,40 @@ defmodule Cluster.Strategy.Postgres do
Logger.error(topology, "Failed to connect to node: #{node}")
end
end
end

{:noreply, state}
def handle_channels(:partisan, msg, state) do
spec = partisan_peer_spec_dec(msg)

if spec.name not in [:partisan.node() | :partisan.nodes()] do
spec = partisan_peer_spec_dec(msg)
topology = state.topology

Logger.debug(
topology,
"Trying to connect to partisan node: #{inspect(spec, pretty: true)}"
)

case :partisan_peer_service.join(spec) do
:ok ->
Logger.debug(topology, "Connected to node: #{inspect(spec, pretty: true)}")

other ->
Logger.error(topology, "Failed to connect to partisan node: #{other}")
end
end
end

def handle_info(msg, state) do
Logger.error(state.topology, "Undefined message #{inspect(msg, pretty: true)}")
{:noreply, state}
@spec partisan_peer_spec_enc() :: String.t()
def partisan_peer_spec_enc() do
:partisan.node_spec()
|> :erlang.term_to_binary()
|> Base.encode64()
end

### Internal functions
@spec heartbeat(non_neg_integer()) :: reference()
defp heartbeat(interval) when interval >= 0 do
Process.send_after(self(), :heartbeat, interval)
@spec partisan_peer_spec_dec(String.t()) :: term()
def partisan_peer_spec_dec(spec) do
Base.decode64!(spec)
|> :erlang.binary_to_term()
end
end
10 changes: 5 additions & 5 deletions lib/supavisor/client_handler.ex
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
defmodule Supavisor.ClientHandler do
@moduledoc """
This module is responsible for handling incoming connections to the Supavisor server. It is
implemented as a Ranch protocol behavior and a gen_statem behavior. It handles SSL negotiation,
implemented as a Ranch protocol behavior and a partisan_gen_statem behavior. It handles SSL negotiation,
user authentication, tenant subscription, and dispatching of messages to the appropriate tenant
supervisor. Each client connection is assigned to a specific tenant supervisor.
"""

require Logger

@behaviour :ranch_protocol
@behaviour :gen_statem
@behaviour :partisan_gen_statem

alias Supavisor, as: S
alias Supavisor.DbHandler, as: Db
Expand All @@ -27,12 +27,12 @@ defmodule Supavisor.ClientHandler do
def callback_mode, do: [:handle_event_function]

def client_cast(pid, bin, status) do
:gen_statem.cast(pid, {:client_cast, bin, status})
:partisan_gen_statem.cast(pid, {:client_cast, bin, status})
end

@spec client_call(pid, iodata(), atom()) :: :ok | {:error, term()}
def client_call(pid, bin, status),
do: :gen_statem.call(pid, {:client_call, bin, status}, 30_000)
do: :partisan_gen_statem.call(pid, {:client_call, bin, status}, 30_000)

@impl true
def init(_), do: :ignore
Expand Down Expand Up @@ -70,7 +70,7 @@ defmodule Supavisor.ClientHandler do
log_level: nil
}

:gen_statem.enter_loop(__MODULE__, [hibernate_after: 5_000], :exchange, data)
:partisan_gen_statem.enter_loop(__MODULE__, [hibernate_after: 5_000], :exchange, data)
end

@impl true
Expand Down
10 changes: 5 additions & 5 deletions lib/supavisor/db_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Supavisor.DbHandler do

require Logger

@behaviour :gen_statem
@behaviour :partisan_gen_statem

alias Supavisor, as: S
alias Supavisor.ClientHandler, as: Client
Expand All @@ -22,23 +22,23 @@ defmodule Supavisor.DbHandler do
@async_send_limit 1_000

def start_link(config) do
:gen_statem.start_link(__MODULE__, config, hibernate_after: 5_000)
:partisan_gen_statem.start_link(__MODULE__, config, hibernate_after: 5_000)
end

@spec call(pid(), pid(), binary()) :: :ok | {:error, any()} | {:buffering, non_neg_integer()}
def call(pid, caller, msg), do: :gen_statem.call(pid, {:db_call, caller, msg}, 15_000)
def call(pid, caller, msg), do: :partisan_gen_statem.call(pid, {:db_call, caller, msg}, 15_000)

@spec get_state_and_mode(pid()) :: {:ok, {state, Supavisor.mode()}} | {:error, term()}
def get_state_and_mode(pid) do
try do
{:ok, :gen_statem.call(pid, :get_state_and_mode, 5_000)}
{:ok, :partisan_gen_statem.call(pid, :get_state_and_mode, 5_000)}
catch
error, reason -> {:error, {error, reason}}
end
end

@spec stop(pid()) :: :ok
def stop(pid), do: :gen_statem.stop(pid, :client_termination, 5_000)
def stop(pid), do: :partisan_gen_statem.stop(pid, :client_termination, 5_000)

@impl true
def init(args) do
Expand Down
20 changes: 18 additions & 2 deletions lib/supavisor/hot_upgrade.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ defmodule Supavisor.HotUpgrade do
{:supervisor, :poolboy_sup, _},
Process.info(linked_pid)[:dictionary][:"$initial_call"]
),
state = :sys.get_state(linked_pid),
{status, state} = get_state(linked_pid),
match?(:ok, status),
Record.is_record(state, :state),
state(state, :module) == :poolboy_sup do
:sys.replace_state(linked_pid, fn state ->
Expand Down Expand Up @@ -136,10 +137,15 @@ defmodule Supavisor.HotUpgrade do
|> Enum.each(fn entry(key: key, value: value) ->
case value do
{:cached, {:ok, {:auth_query, auth}}} when is_function(auth) ->
Logger.debug("Reinitializing secret: #{inspect(key)}")
Logger.debug("Reinitializing secret auth_query: #{inspect(key)}")
new = {:cached, {:ok, {:auth_query, enc(auth.())}}}
Cachex.put(Supavisor.Cache, key, new)

{:cached, {:ok, {:auth_query_md5, auth}}} when is_function(auth) ->
Logger.debug("Reinitializing secret auth_query_md5: #{inspect(key)}")
new = {:cached, {:ok, {:auth_query_md5, enc(auth.())}}}
Cachex.put(Supavisor.Cache, key, new)

other ->
Logger.debug("Skipping:#{inspect(key)} #{inspect(other)}")
end
Expand All @@ -151,4 +157,14 @@ defmodule Supavisor.HotUpgrade do

@spec do_enc(term) :: fun
def do_enc(val), do: fn -> val end

def get_state(pid) do
try do
{:ok, :sys.get_state(pid)}
catch
type, exception ->
IO.write("Error getting state: #{inspect(exception)}")
{:error, {type, exception}}
end
end
end
Loading

0 comments on commit dbb48ff

Please sign in to comment.