Skip to content

Commit

Permalink
The grpcbox_client get_channel function adds support for hash and dir…
Browse files Browse the repository at this point in the history
…ect strategies
  • Loading branch information
heshaoqiong committed Mar 30, 2023
1 parent 6ea2b84 commit 7c1ca47
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 12 deletions.
30 changes: 22 additions & 8 deletions src/grpcbox_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
-export([start_link/3,
is_ready/1,
pick/2,
pick/3,
add_endpoints/2,
remove_endpoints/3,
stop/1,
Expand Down Expand Up @@ -57,11 +58,19 @@ is_ready(Name) ->
gen_statem:call(?CHANNEL(Name), is_ready).

%% @doc Picks a subchannel from a pool using the configured strategy.
-spec pick(name(), unary | stream) -> {ok, {pid(), grpcbox_client:interceptor() | undefined}} |
{error, undefined_channel | no_endpoints}.
-spec pick(name(), unary | stream) ->
{ok, {pid(), grpcbox_client:interceptor() | undefined}} |
{error, undefined_channel | no_endpoints}.
pick(Name, CallType) ->
pick(Name, CallType, undefined).

%% @doc Picks a subchannel from a pool using the configured strategy.
-spec pick(name(), unary | stream, term() | undefined) ->
{ok, {pid(), grpcbox_client:interceptor() | undefined}} |
{error, undefined_channel | no_endpoints}.
pick(Name, CallType, Key) ->
try
case gproc_pool:pick_worker(Name) of
case pick_worker(Name, Key) of
false -> {error, no_endpoints};
Pid when is_pid(Pid) ->
{ok, {Pid, interceptor(Name, CallType)}}
Expand All @@ -71,6 +80,11 @@ pick(Name, CallType) ->
{error, undefined_channel}
end.

pick_worker(Name, undefined) ->
gproc_pool:pick_worker(Name);
pick_worker(Name, Key) ->
gproc_pool:pick_worker(Name, Key).

add_endpoints(Name, Endpoints) ->
gen_statem:call(?CHANNEL(Name), {add_endpoints, Endpoints}).

Expand Down Expand Up @@ -123,9 +137,9 @@ connected({call, From}, is_ready, _Data) ->
{keep_state_and_data, [{reply, From, true}]};
connected({call, From}, {add_endpoints, Endpoints},
Data=#data{pool=Pool,
stats_handler=StatsHandler,
encoding=Encoding,
endpoints=TotalEndpoints}) ->
stats_handler=StatsHandler,
encoding=Encoding,
endpoints=TotalEndpoints}) ->
NewEndpoints = lists:subtract(Endpoints, TotalEndpoints),
NewTotalEndpoints = lists:umerge(TotalEndpoints, Endpoints),
start_workers(Pool, StatsHandler, Encoding, NewEndpoints),
Expand Down Expand Up @@ -197,8 +211,8 @@ insert_stream_interceptor(Name, _Type, Interceptors) ->
start_workers(Pool, StatsHandler, Encoding, Endpoints) ->
[begin
gproc_pool:add_worker(Pool, Endpoint),
{ok, Pid} = grpcbox_subchannel:start_link(Endpoint,
Pool, Endpoint, Encoding, StatsHandler),
gproc_pool:add_worker({Pool, active}, Endpoint),
{ok, Pid} = grpcbox_subchannel:start_link(Endpoint, Pool, Endpoint, Encoding, StatsHandler),
Pid
end || Endpoint <- Endpoints].

Expand Down
3 changes: 2 additions & 1 deletion src/grpcbox_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@

get_channel(Options, Type) ->
Channel = maps:get(channel, Options, default_channel),
grpcbox_channel:pick(Channel, Type).
Key = maps:get(key, Options, undefined),
grpcbox_channel:pick(Channel, Type, Key).

unary(Ctx, Service, Method, Input, Def, Options) ->
unary(Ctx, filename:join([<<>>, Service, Method]), Input, Def, Options).
Expand Down
25 changes: 22 additions & 3 deletions test/grpcbox_channel_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

all() ->
[
add_and_remove_endpoints
add_and_remove_endpoints,
pick_worker_strategy
].
init_per_suite(_Config) ->
application:set_env(grpcbox, servers, []),
Expand All @@ -35,6 +36,24 @@ end_per_suite(_Config) ->

add_and_remove_endpoints(_Config) ->
grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}]),
?assertMatch(4, length(gproc_pool:active_workers(default_channel))),
?assertEqual(4, length(gproc_pool:active_workers(default_channel))),
grpcbox_channel:remove_endpoints(default_channel, [{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}], normal),
?assertMatch(1, length(gproc_pool:active_workers(default_channel))).
?assertEqual(1, length(gproc_pool:active_workers(default_channel))).

pick_worker_strategy(_Config) ->
?assertEqual(ok, pick_worker(default_channel)),
?assertEqual(ok, pick_worker(random_channel)),
?assertEqual(ok, pick_worker(direct_channel, 1)),
?assertEqual(ok, pick_worker(hash_channel, 1)),
?assertEqual(error, pick_worker(default_channel, 1)),
?assertEqual(error, pick_worker(random_channel, 1)),
?assertEqual(error, pick_worker(direct_channel)),
?assertEqual(error, pick_worker(hash_channel)),
ok.

pick_worker(Name, N) ->
{R, _} = grpcbox_channel:pick(Name, unary, N),
R.
pick_worker(Name) ->
{R, _} = grpcbox_channel:pick(Name, unary, undefined),
R.

0 comments on commit 7c1ca47

Please sign in to comment.