Skip to content

Commit

Permalink
grpcbox_channel can add and remove dynamic endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
heshaoqiong committed Apr 7, 2023
1 parent 21a41ff commit b55785d
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 12 deletions.
57 changes: 45 additions & 12 deletions src/grpcbox_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
-export([start_link/3,
is_ready/1,
pick/2,
add_endpoints/2,
remove_endpoints/3,
stop/1,
stop/2]).
-export([init/1,
Expand Down Expand Up @@ -69,6 +71,12 @@ pick(Name, CallType) ->
{error, undefined_channel}
end.

add_endpoints(Name, Endpoints) ->
gen_statem:call(?CHANNEL(Name), {add_endpoints, Endpoints}).

remove_endpoints(Name, Endpoints, Reason) ->
gen_statem:call(?CHANNEL(Name), {remove_endpoints, Endpoints, Reason}).

-spec interceptor(name(), unary | stream) -> grpcbox_client:interceptor() | undefined.
interceptor(Name, CallType) ->
case ets:lookup(?CHANNELS_TAB, {Name, CallType}) of
Expand Down Expand Up @@ -98,14 +106,13 @@ init([Name, Endpoints, Options]) ->
pool = Name,
encoding = Encoding,
stats_handler = StatsHandler,
endpoints = Endpoints
endpoints = lists:usort(Endpoints)
},

case maps:get(sync_start, Options, false) of
false ->
{ok, idle, Data, [{next_event, internal, connect}]};
true ->
_ = start_workers(Name, StatsHandler, Encoding, Endpoints),
start_workers(Name, StatsHandler, Encoding, Endpoints),
{ok, connected, Data}
end.

Expand All @@ -114,14 +121,33 @@ callback_mode() ->

connected({call, From}, is_ready, _Data) ->
{keep_state_and_data, [{reply, From, true}]};
connected({call, From}, {add_endpoints, Endpoints},
Data=#data{pool=Pool,
stats_handler=StatsHandler,
encoding=Encoding,
endpoints=TotalEndpoints}) ->
NewEndpoints = lists:subtract(Endpoints, TotalEndpoints),
NewTotalEndpoints = lists:umerge(TotalEndpoints, Endpoints),
start_workers(Pool, StatsHandler, Encoding, NewEndpoints),
{keep_state, Data#data{endpoints=NewTotalEndpoints}, [{reply, From, ok}]};
connected({call, From}, {remove_endpoints, Endpoints, Reason},
Data=#data{pool=Pool,
endpoints=TotalEndpoints}) ->

NewEndpoints = sets:to_list(sets:intersection(sets:from_list(Endpoints),
sets:from_list(TotalEndpoints))),
NewTotalEndpoints = lists:subtract(TotalEndpoints, Endpoints),
stop_workers(Pool, NewEndpoints, Reason),
{keep_state, Data#data{endpoints = NewTotalEndpoints}, [{reply, From, ok}]};
connected(EventType, EventContent, Data) ->
handle_event(EventType, EventContent, Data).

idle(internal, connect, Data=#data{pool=Pool,
stats_handler=StatsHandler,
encoding=Encoding,
endpoints=Endpoints}) ->
_ = start_workers(Pool, StatsHandler, Encoding, Endpoints),
stats_handler=StatsHandler,
encoding=Encoding,
endpoints=Endpoints}) ->

start_workers(Pool, StatsHandler, Encoding, Endpoints),
{next_state, connected, Data};
idle({call, From}, is_ready, _Data) ->
{keep_state_and_data, [{reply, From, false}]};
Expand Down Expand Up @@ -170,9 +196,16 @@ insert_stream_interceptor(Name, _Type, Interceptors) ->

start_workers(Pool, StatsHandler, Encoding, Endpoints) ->
[begin
gproc_pool:add_worker(Pool, Endpoint),
{ok, Pid} = grpcbox_subchannel:start_link(Endpoint, Pool, {Transport, Host, Port, SSLOptions},
Encoding, StatsHandler),
Pid
end || Endpoint={Transport, Host, Port, SSLOptions} <- Endpoints].
gproc_pool:add_worker(Pool, Endpoint),
{ok, Pid} = grpcbox_subchannel:start_link(Endpoint,
Pool, Endpoint, Encoding, StatsHandler),
Pid
end || Endpoint <- Endpoints].

stop_workers(Pool, Endpoints, Reason) ->
[begin
case gproc_pool:whereis_worker(Pool, Endpoint) of
undefined -> ok;
Pid -> grpcbox_subchannel:stop(Pid, Reason)
end
end || Endpoint <- Endpoints].
40 changes: 40 additions & 0 deletions test/grpcbox_channel_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
-module(grpcbox_channel_SUITE).

-export([all/0,
init_per_suite/1,
end_per_suite/1,
add_and_remove_endpoints/1,
pick_worker_strategy/1]).

-include_lib("eunit/include/eunit.hrl").

all() ->
[
add_and_remove_endpoints
].
init_per_suite(_Config) ->
application:set_env(grpcbox, servers, []),
application:ensure_all_started(grpcbox),
grpcbox_channel_sup:start_link(),
grpcbox_channel_sup:start_child(default_channel, [{https, "127.0.0.1", 8080, #{}}], #{}),
grpcbox_channel_sup:start_child(random_channel,
[{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}],
#{balancer => random}),
grpcbox_channel_sup:start_child(hash_channel,
[{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}],
#{balancer => hash}),
grpcbox_channel_sup:start_child(direct_channel,
[{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}],
#{ balancer => direct}),

_Config.

end_per_suite(_Config) ->
application:stop(grpcbox),
ok.

add_and_remove_endpoints(_Config) ->
grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}]),
?assertMatch(4, length(gproc_pool:active_workers(default_channel))),
grpcbox_channel:remove_endpoints(default_channel, [{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}], normal),
?assertMatch(1, length(gproc_pool:active_workers(default_channel))).

0 comments on commit b55785d

Please sign in to comment.