From b55785d4b3f52c4289812f49b6324e5281c1bf86 Mon Sep 17 00:00:00 2001 From: sevenhe Date: Thu, 23 Mar 2023 10:12:14 +0800 Subject: [PATCH] grpcbox_channel can add and remove dynamic endpoints --- src/grpcbox_channel.erl | 57 +++++++++++++++++++++++++++------- test/grpcbox_channel_SUITE.erl | 40 ++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 12 deletions(-) create mode 100644 test/grpcbox_channel_SUITE.erl diff --git a/src/grpcbox_channel.erl b/src/grpcbox_channel.erl index 4c9a8a7..17bfe57 100644 --- a/src/grpcbox_channel.erl +++ b/src/grpcbox_channel.erl @@ -5,6 +5,8 @@ -export([start_link/3, is_ready/1, pick/2, + add_endpoints/2, + remove_endpoints/3, stop/1, stop/2]). -export([init/1, @@ -69,6 +71,12 @@ pick(Name, CallType) -> {error, undefined_channel} end. +add_endpoints(Name, Endpoints) -> + gen_statem:call(?CHANNEL(Name), {add_endpoints, Endpoints}). + +remove_endpoints(Name, Endpoints, Reason) -> + gen_statem:call(?CHANNEL(Name), {remove_endpoints, Endpoints, Reason}). + -spec interceptor(name(), unary | stream) -> grpcbox_client:interceptor() | undefined. interceptor(Name, CallType) -> case ets:lookup(?CHANNELS_TAB, {Name, CallType}) of @@ -98,14 +106,13 @@ init([Name, Endpoints, Options]) -> pool = Name, encoding = Encoding, stats_handler = StatsHandler, - endpoints = Endpoints + endpoints = lists:usort(Endpoints) }, - case maps:get(sync_start, Options, false) of false -> {ok, idle, Data, [{next_event, internal, connect}]}; true -> - _ = start_workers(Name, StatsHandler, Encoding, Endpoints), + start_workers(Name, StatsHandler, Encoding, Endpoints), {ok, connected, Data} end. @@ -114,14 +121,33 @@ callback_mode() -> connected({call, From}, is_ready, _Data) -> {keep_state_and_data, [{reply, From, true}]}; +connected({call, From}, {add_endpoints, Endpoints}, + Data=#data{pool=Pool, + stats_handler=StatsHandler, + encoding=Encoding, + endpoints=TotalEndpoints}) -> + NewEndpoints = lists:subtract(Endpoints, TotalEndpoints), + NewTotalEndpoints = lists:umerge(TotalEndpoints, Endpoints), + start_workers(Pool, StatsHandler, Encoding, NewEndpoints), + {keep_state, Data#data{endpoints=NewTotalEndpoints}, [{reply, From, ok}]}; +connected({call, From}, {remove_endpoints, Endpoints, Reason}, + Data=#data{pool=Pool, + endpoints=TotalEndpoints}) -> + + NewEndpoints = sets:to_list(sets:intersection(sets:from_list(Endpoints), + sets:from_list(TotalEndpoints))), + NewTotalEndpoints = lists:subtract(TotalEndpoints, Endpoints), + stop_workers(Pool, NewEndpoints, Reason), + {keep_state, Data#data{endpoints = NewTotalEndpoints}, [{reply, From, ok}]}; connected(EventType, EventContent, Data) -> handle_event(EventType, EventContent, Data). idle(internal, connect, Data=#data{pool=Pool, - stats_handler=StatsHandler, - encoding=Encoding, - endpoints=Endpoints}) -> - _ = start_workers(Pool, StatsHandler, Encoding, Endpoints), + stats_handler=StatsHandler, + encoding=Encoding, + endpoints=Endpoints}) -> + + start_workers(Pool, StatsHandler, Encoding, Endpoints), {next_state, connected, Data}; idle({call, From}, is_ready, _Data) -> {keep_state_and_data, [{reply, From, false}]}; @@ -170,9 +196,16 @@ insert_stream_interceptor(Name, _Type, Interceptors) -> start_workers(Pool, StatsHandler, Encoding, Endpoints) -> [begin - gproc_pool:add_worker(Pool, Endpoint), - {ok, Pid} = grpcbox_subchannel:start_link(Endpoint, Pool, {Transport, Host, Port, SSLOptions}, - Encoding, StatsHandler), - Pid - end || Endpoint={Transport, Host, Port, SSLOptions} <- Endpoints]. + gproc_pool:add_worker(Pool, Endpoint), + {ok, Pid} = grpcbox_subchannel:start_link(Endpoint, + Pool, Endpoint, Encoding, StatsHandler), + Pid + end || Endpoint <- Endpoints]. +stop_workers(Pool, Endpoints, Reason) -> + [begin + case gproc_pool:whereis_worker(Pool, Endpoint) of + undefined -> ok; + Pid -> grpcbox_subchannel:stop(Pid, Reason) + end + end || Endpoint <- Endpoints]. diff --git a/test/grpcbox_channel_SUITE.erl b/test/grpcbox_channel_SUITE.erl new file mode 100644 index 0000000..33b40ca --- /dev/null +++ b/test/grpcbox_channel_SUITE.erl @@ -0,0 +1,40 @@ +-module(grpcbox_channel_SUITE). + +-export([all/0, + init_per_suite/1, + end_per_suite/1, + add_and_remove_endpoints/1, + pick_worker_strategy/1]). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [ + add_and_remove_endpoints + ]. +init_per_suite(_Config) -> + application:set_env(grpcbox, servers, []), + application:ensure_all_started(grpcbox), + grpcbox_channel_sup:start_link(), + grpcbox_channel_sup:start_child(default_channel, [{https, "127.0.0.1", 8080, #{}}], #{}), + grpcbox_channel_sup:start_child(random_channel, + [{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}], + #{balancer => random}), + grpcbox_channel_sup:start_child(hash_channel, + [{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}], + #{balancer => hash}), + grpcbox_channel_sup:start_child(direct_channel, + [{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}], + #{ balancer => direct}), + + _Config. + +end_per_suite(_Config) -> + application:stop(grpcbox), + ok. + +add_and_remove_endpoints(_Config) -> + grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}]), + ?assertMatch(4, length(gproc_pool:active_workers(default_channel))), + grpcbox_channel:remove_endpoints(default_channel, [{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}], normal), + ?assertMatch(1, length(gproc_pool:active_workers(default_channel))).