Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adt/immediate sends #20

Open
wants to merge 41 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
1fdf8dc
API to open new channel
Nov 11, 2022
b1dea71
Async reply interface
Nov 16, 2022
6511c77
ubuntu-20-04 is the last version that supports all OTP test versions
Nov 16, 2022
cd19ea9
20.04 still fails. Try older versions
Nov 16, 2022
361f741
Problem may be with OTP versions supported by latest version of rebar…
Nov 16, 2022
1bdf9d4
split out OTP 22
Nov 17, 2022
98414c8
fix names
Nov 17, 2022
2d7f496
allow for flaky test result
Nov 18, 2022
bbe782c
Pass Options through from craeting stream to h2 stream init callback.
Nov 18, 2022
595b77b
Fix otp, os, rebar3 versions to workable combinations
Nov 18, 2022
33c54a8
spec full supervisor return value
michaeldjeffrey Nov 28, 2022
005db1f
Merge branch 'sugiyama/client-connect' into mj/full-update
michaeldjeffrey Nov 28, 2022
ba34f46
Merge remote-tracking branch 'origin/sugiyama/async-client' into mj/f…
michaeldjeffrey Nov 28, 2022
51c13d3
update spec to accept any value gproc accepts for registering
michaeldjeffrey Nov 29, 2022
1898db9
add callback module to client options type
michaeldjeffrey Nov 29, 2022
9d1793d
remove old unary/1 from merge
michaeldjeffrey Dec 1, 2022
8df87ad
Merge branch 'master' into mj/full-update
macpie Mar 14, 2023
2c95b70
reinstate stop stream calls when handling stop directive from handler
andymck Mar 21, 2023
03f763e
Merge pull request #18 from novalabsxyz/andymck/reinstate-stop-stream…
macpie Mar 21, 2023
840dd52
Send immediate when possible
Vagabond Apr 25, 2023
05ba01c
Bump chatterbox
Vagabond Apr 25, 2023
f0a14c9
Bump chatterbox
Vagabond Apr 25, 2023
a30d05b
Bump for better chatterbox
Vagabond Apr 26, 2023
3074abc
Bump chatterbox for lockless hpack
Vagabond Apr 26, 2023
2d5f518
Bump chatterbox
Vagabond Apr 27, 2023
0fe71ca
Fix grpcbox_subchannel connection EXIT handling
Vagabond May 2, 2023
99819f8
Be able to configure rcv timeout in unary call
macpie May 3, 2023
3241b0b
Bump chatterbox for window handling offloading from connection pid
Vagabond May 3, 2023
d18687b
Bump chatterbox for bugfix
Vagabond May 3, 2023
c7ce250
Bump chatterbox for hpack encoder lock fix
Vagabond May 3, 2023
ba5e48a
Bump chatterbox for better stream GC and better stream id tracking
Vagabond May 4, 2023
c663cfd
Update chatterbox to cleanup orphaned streams
Vagabond May 4, 2023
44ee3a7
Upgrade chatterbox to d7a33
macpie May 8, 2023
5deef8c
Bump chatterbox for unhandled message bugfix
Vagabond May 8, 2023
d8d3c46
Upgrade chatterbox
macpie May 8, 2023
4d27295
Bump chatterbox for cpu usage improvements
Vagabond May 10, 2023
1e49731
Bump chatterbox for stream stall
Vagabond May 10, 2023
460a6c9
Upgrade chatterbox
macpie May 11, 2023
775c9fa
bump chatterbox version
michaeldjeffrey Jul 20, 2023
6243151
Upgrade chatterbox
macpie Jul 31, 2023
955ec02
Upgrade chatterbox to fix exit handle
macpie Oct 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
- 'master'

jobs:
build:
build23andnewer:
name: Test on OTP ${{ matrix.otp_version }} and ${{ matrix.os }}
runs-on: ${{ matrix.os }}

Expand Down Expand Up @@ -37,3 +37,33 @@ jobs:
- uses: codecov/codecov-action@v1
with:
file: _build/test/covertool/grpcbox.covertool.xml

build22andolder:
name: Test on OTP ${{ matrix.otp_version }} and ${{ matrix.os }}
runs-on: ${{ matrix.os }}

strategy:
matrix:
otp_version: ['22.3.4.20']
os: [ubuntu-20.04]

steps:
- uses: actions/checkout@v2

- uses: erlef/setup-beam@v1
with:
otp-version: ${{ matrix.otp_version }}
rebar3-version: '3.18.0'

- name: Compile
run: rebar3 compile
- name: Tests
run: rebar3 ct --cover
- name: Dialyzer
run: rebar3 dialyzer
- name: Covertool
run: rebar3 covertool generate

- uses: codecov/codecov-action@v1
with:
file: _build/test/covertool/grpcbox.covertool.xml
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{deps, [
{throttle, "0.2.0", {pkg, lambda_throttle}},
{chatterbox, ".*", {git, "https://github.com/novalabsxyz/chatterbox", {branch, "master"}}},
{chatterbox, ".*", {git, "https://github.com/novalabsxyz/chatterbox", {branch, "mj/remove-write-concurrency"}}},
ctx,
{acceptor_pool, {git, "https://github.com/novalabsxyz/acceptor_pool", {branch, "master"}}},
gproc
Expand Down
8 changes: 4 additions & 4 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@
0},
{<<"chatterbox">>,
{git,"https://github.com/novalabsxyz/chatterbox",
{ref,"cbfe6e46b273f1552b57685c9f6daf710473c609"}},
{ref,"1576025ed66ad65489f62cd5d897433f192eebf5"}},
0},
{<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},0},
{<<"gproc">>,{pkg,<<"gproc">>,<<"0.8.0">>},0},
{<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.2.3">>},1},
{<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},1},
{<<"throttle">>,{pkg,<<"lambda_throttle">>,<<"0.2.0">>},0}]}.
[
{pkg_hash,[
{<<"ctx">>, <<"8FF88B70E6400C4DF90142E7F130625B82086077A45364A78D208ED3ED53C7FE">>},
{<<"gproc">>, <<"CEA02C578589C61E5341FCE149EA36CCEF236CC2ECAC8691FBA408E7EA77EC2F">>},
{<<"hpack">>, <<"17670F83FF984AE6CD74B1C456EDDE906D27FF013740EE4D9EFAA4F1BF999633">>},
{<<"hpack">>, <<"2461899CC4AB6A0EF8E970C1661C5FC6A52D3C25580BC6DD204F84CE94669926">>},
{<<"throttle">>, <<"E881B46D9836AFB70F3E2FA3BE9B0140805BA324ED26AA734FF6C5C1568C6CA7">>}]},
{pkg_hash_ext,[
{<<"ctx">>, <<"A14ED2D1B67723DBEBBE423B28D7615EB0BDCBA6FF28F2D1F1B0A7E1D4AA5FC2">>},
{<<"gproc">>, <<"580ADAFA56463B75263EF5A5DF4C86AF321F68694E7786CB057FD805D1E2A7DE">>},
{<<"hpack">>, <<"06F580167C4B8B8A6429040DF36CC93BBA6D571FAEAEC1B28816523379CBB23A">>},
{<<"hpack">>, <<"D6137D7079169D8C485C6962DFE261AF5B9EF60FBC557344511C1E65E3D95FB0">>},
{<<"throttle">>, <<"3EACFAAC1C2EBD0F17D77D9E96B1029BF07DED4AC233BA38883D70CDF1FFF740">>}]}
].
3 changes: 2 additions & 1 deletion src/grpcbox_channel_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).

%% @doc Start a channel under the grpcbox channel supervisor.
-spec start_child(atom(), [grpcbox_channel:endpoint()], grpcbox_channel:options()) -> {ok, pid()}.
-spec start_child(gproc:value(), [grpcbox_channel:endpoint()], grpcbox_channel:options())
-> supervisor:startchild_ret().
start_child(Name, Endpoints, Options) ->
supervisor:start_child(?SERVER, [Name, Endpoints, Options]).

Expand Down
13 changes: 10 additions & 3 deletions src/grpcbox_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

-module(grpcbox_client).

-export([unary/6,
-export([connect/3,

unary/6,
unary/5,
stream/4,
stream/5,
Expand All @@ -25,7 +27,9 @@
-include("grpcbox.hrl").

-type options() :: #{channel => grpcbox_channel:t(),
encoding => grpcbox:encoding()}.
encoding => grpcbox:encoding(),
callback_module => {module(), any()},
rcv_timeout => non_neg_integer()}.

-type unary_interceptor() :: term().
-type stream_interceptor() :: term().
Expand All @@ -44,6 +48,9 @@
stream_interceptor/0,
interceptor/0]).

connect(ChannelName, Endpoints, Options) ->
grpcbox_channel_sup:start_child(ChannelName, Endpoints, Options).

get_channel(Options, Type) ->
Channel = maps:get(channel, Options, default_channel),
grpcbox_channel:pick(Channel, Type).
Expand Down Expand Up @@ -78,7 +85,7 @@ unary_handler(Ctx, Channel, Path, Input, Def, Options) ->
stream_pid => Pid,
monitor_ref => Ref,
service_def => Def},
case recv_end(S, 5000) of
case recv_end(S, maps:get(rcv_timeout, Options, 5000)) of
eos ->
{ok, Headers} = recv_headers(S, 0),
case recv_trailers(S) of
Expand Down
105 changes: 73 additions & 32 deletions src/grpcbox_client_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@

-include("grpcbox.hrl").

-type client_stream_callback_data() :: term().

-callback(init(ClientPid::pid(), StreamId::non_neg_integer(), term()) -> {ok, client_stream_callback_data()}).
-callback(handle_message(Message::term(), client_stream_callback_data()) -> {ok, client_stream_callback_data()}).
-callback(handle_headers(Metadata::map(), client_stream_callback_data()) -> {ok, client_stream_callback_data()}).
-callback(handle_trailers(Status::binary(), Message::term(), Metadata::map(), client_stream_callback_data()) -> {ok, client_stream_callback_data()}).
-callback(handle_eos(client_stream_callback_data()) -> {ok, client_stream_callback_data()}).

-define(headers(Scheme, Host, Path, Encoding, MessageType, MD), [{<<":method">>, <<"POST">>},
{<<":path">>, Path},
{<<":scheme">>, Scheme},
Expand All @@ -33,6 +41,7 @@ new_stream(Ctx, Channel, Path, Def=#grpcbox_def{service=Service,
encoding := DefaultEncoding,
stats_handler := StatsHandler}} ->
Encoding = maps:get(encoding, Options, DefaultEncoding),
Callback = callback_module(Options),
RequestHeaders = ?headers(Scheme, Authority, Path, encoding_to_binary(Encoding),
MessageType, metadata_headers(Ctx)),
case h2_connection:new_stream(Conn, ?MODULE, [#{service => Service,
Expand All @@ -42,11 +51,11 @@ new_stream(Ctx, Channel, Path, Def=#grpcbox_def{service=Service,
buffer => <<>>,
stats_handler => StatsHandler,
stats => #{},
client_pid => self()}], self()) of
callback_module => Callback,
client_pid => self()}], RequestHeaders, [], self()) of
{error, _Code} = Err ->
Err;
{StreamId, Pid} ->
h2_connection:send_headers(Conn, StreamId, RequestHeaders),
Ref = erlang:monitor(process, Pid),
{ok, #{channel => Conn,
stream_id => StreamId,
Expand All @@ -69,25 +78,26 @@ send_request(Ctx, Channel, Path, Input, #grpcbox_def{service=Service,
encoding := DefaultEncoding,
stats_handler := StatsHandler}} ->
Encoding = maps:get(encoding, Options, DefaultEncoding),
Callback = callback_module(Options),
Body = grpcbox_frame:encode(Encoding, MarshalFun(Input)),
Headers = ?headers(Scheme, Authority, Path, encoding_to_binary(Encoding), MessageType, metadata_headers(Ctx)),

%% headers are sent in the same request as creating a new stream to ensure
%% concurrent calls can't end up interleaving the sending of headers in such
%% a way that a lower stream id's headers are sent after another's, which results
%% in the server closing the connection when it gets them out of order
case h2_connection:new_stream(Conn, grpcbox_client_stream, [#{service => Service,
case h2_connection:new_stream(Conn, ?MODULE, [#{service => Service,
marshal_fun => MarshalFun,
unmarshal_fun => UnMarshalFun,
path => Path,
buffer => <<>>,
stats_handler => StatsHandler,
stats => #{},
client_pid => self()}], Headers, [], self()) of
callback_module => Callback,
client_pid => self()}], Headers, Body, [], self()) of
{error, _Code} = Err ->
Err;
{StreamId, Pid} ->
h2_connection:send_body(Conn, StreamId, Body),
{ok, Conn, StreamId, Pid}
end;
{error, _}=Error ->
Expand Down Expand Up @@ -136,70 +146,101 @@ metadata_headers(Ctx) ->

%% callbacks

init(_ConnectionPid, StreamId, [_, State=#{path := Path}]) ->
init(ConnectionPid, StreamId, [_, State=#{path := Path, callback_module := {CallbackModule, CallbackInitArgs}}]) ->
_ = process_flag(trap_exit, true),
Ctx1 = ctx:with_value(ctx:new(), grpc_client_method, Path),
State1 = stats_handler(Ctx1, rpc_begin, {}, State),
{ok, State1#{stream_id => StreamId}};
{ok, CallbackData} = init_stream_callback(CallbackModule, CallbackInitArgs, ConnectionPid, StreamId),
{ok, State1#{stream_id => StreamId, callback_data => CallbackData}};
init(_, _, State) ->
{ok, State}.

on_receive_headers(H, State=#{resp_headers := _,
ctx := Ctx,
stream_id := StreamId,
client_pid := Pid}) ->
ctx := Ctx} = State) ->
Status = proplists:get_value(<<"grpc-status">>, H, undefined),
Message = proplists:get_value(<<"grpc-message">>, H, undefined),
Metadata = grpcbox_utils:headers_to_metadata(H),
Pid ! {trailers, StreamId, {Status, Message, Metadata}},
State1 = handle_trailers_stream_callback(Status, Message, Metadata, State),
Ctx1 = ctx:with_value(Ctx, grpc_client_status, grpcbox_utils:status_to_string(Status)),
{ok, State#{ctx => Ctx1,
{ok, State1#{ctx := Ctx1,
resp_trailers => H}};
on_receive_headers(H, State=#{stream_id := StreamId,
ctx := Ctx,
client_pid := Pid}) ->
on_receive_headers(H, State=#{ctx := Ctx} = State) ->
Encoding = proplists:get_value(<<"grpc-encoding">>, H, identity),
Metadata = grpcbox_utils:headers_to_metadata(H),
Pid ! {headers, StreamId, Metadata},

State1 = handle_headers_stream_callback(Metadata, State),
%% TODO: better way to know if it is a Trailers-Only response?
%% maybe chatterbox should include information about the end of the stream
case proplists:get_value(<<"grpc-status">>, H, undefined) of
undefined ->
{ok, State#{resp_headers => H,
{ok, State1#{resp_headers => H,
encoding => encoding_to_atom(Encoding)}};
Status ->
Message = proplists:get_value(<<"grpc-message">>, H, undefined),
Pid ! {trailers, StreamId, {Status, Message, Metadata}},
State2 = handle_trailers_stream_callback(Status, Message, Metadata, State),
Ctx1 = ctx:with_value(Ctx, grpc_client_status, grpcbox_utils:status_to_string(Status)),
{ok, State#{resp_headers => H,
{ok, State2#{resp_headers => H,
ctx => Ctx1,
status => Status,
encoding => encoding_to_atom(Encoding)}}
end.

on_receive_data(Data, State=#{stream_id := StreamId,
client_pid := Pid,
buffer := Buffer,
encoding := Encoding,
unmarshal_fun := UnmarshalFun}) ->
on_receive_data(Data, State=#{buffer := Buffer,
encoding := Encoding} = State) ->
{Remaining, Messages} = grpcbox_frame:split(<<Buffer/binary, Data/binary>>, Encoding),
[Pid ! {data, StreamId, UnmarshalFun(Message)} || Message <- Messages],
{ok, State#{buffer => Remaining}};
State1 = handle_message_stream_callback(Messages, State),
{ok, State1#{buffer => Remaining}};
on_receive_data(_Data, State) ->
{ok, State}.

on_end_stream(State=#{stream_id := StreamId,
ctx := Ctx,
client_pid := Pid}) ->
Pid ! {eos, StreamId},
State1 = stats_handler(Ctx, rpc_end, {}, State),
{ok, State1}.
on_end_stream(State=#{ctx := Ctx}) ->
State1 = handle_eos_stream_callback(State),
State2 = stats_handler(Ctx, rpc_end, {}, State1),
{ok, State2}.

handle_info(_, State) ->
State.

%%

callback_module(#{callback_module := CallbackModule}) ->
CallbackModule;
callback_module(_) ->
{grpcbox_client_stream_callback, #{client_pid => self()}}.

init_stream_callback(CallbackModule, CallbackInitArgs, ConnectionPid, StreamId) ->
CallbackModule:init(ConnectionPid, StreamId, CallbackInitArgs).

handle_message_stream_callback(Messages, State) ->
#{unmarshal_fun := UnmarshalFun,
callback_module := {CallbackModule, _},
callback_data := CallbackData} = State,
CallbackData1 = lists:foldl(
fun(M, D) ->
{ok, D1} = CallbackModule:handle_message(UnmarshalFun(M), D),
D1
end, CallbackData, Messages),
State#{callback_data := CallbackData1}.

handle_trailers_stream_callback(Status, Message, Metadata, State) ->
#{callback_module := {CallbackModule, _},
callback_data := CallbackData} = State,
{ok, NewCallbackData} = CallbackModule:handle_trailers(Status, Message, Metadata, CallbackData),
State#{callback_data := NewCallbackData}.

handle_headers_stream_callback(Metadata, State) ->
#{callback_module := {CallbackModule, _},
callback_data := CallbackData} = State,
{ok, NewCallbackData} = CallbackModule:handle_headers(Metadata, CallbackData),
State#{callback_data := NewCallbackData}.

handle_eos_stream_callback(State) ->
#{callback_module := {CallbackModule, _},
callback_data := CallbackData} = State,
{ok, NewCallbackData} = CallbackModule:handle_eos(CallbackData),
State#{callback_data := NewCallbackData}.


stats_handler(Ctx, _, _, State=#{stats_handler := undefined}) ->
State#{ctx => Ctx};
stats_handler(Ctx, Event, Stats, State=#{stats_handler := StatsHandler,
Expand Down
42 changes: 42 additions & 0 deletions src/grpcbox_client_stream_callback.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
-module(grpcbox_client_stream_callback).

-behaviour(grpcbox_client_stream).

-export([
init/3,
handle_message/2,
handle_headers/2,
handle_trailers/4,
handle_eos/1
]).

-type stream_id() :: non_neg_integer().
-type callback_data() :: #{client_pid => pid(), stream_id => stream_id()}.

-spec init(pid(), stream_id(), term()) -> {ok, callback_data()}.
init(_ConnectionPid, StreamId, #{client_pid := ClientPid}) ->
{ok, #{client_pid => ClientPid, stream_id => StreamId}}.

-spec handle_message(term(), callback_data()) -> {ok, callback_data()}.
handle_message(UnmarshalledMessage, CBData) ->
#{client_pid := ClientPid, stream_id := StreamId} = CBData,
ClientPid ! {data, StreamId, UnmarshalledMessage},
{ok, CBData}.

-spec handle_headers(map(), callback_data()) -> {ok, callback_data()}.
handle_headers(Metadata, CBData) ->
#{client_pid := ClientPid, stream_id := StreamId} = CBData,
ClientPid ! {headers, StreamId, Metadata},
{ok, CBData}.

-spec handle_trailers(binary(), term(), map(), callback_data()) -> {ok, callback_data()}.
handle_trailers(Status, Message, Metadata, CBData) ->
#{client_pid := ClientPid, stream_id := StreamId} = CBData,
ClientPid ! {trailers, StreamId, {Status, Message, Metadata}},
{ok, CBData}.

-spec handle_eos(callback_data()) -> {ok, callback_data()}.
handle_eos(CBData) ->
#{client_pid := ClientPid, stream_id := StreamId} = CBData,
ClientPid ! {eos, StreamId},
{ok, CBData}.
Loading
Loading