Skip to content

Commit

Permalink
Use an ETS-based queue for assigning files to transport senders
Browse files Browse the repository at this point in the history
Summary:
Instead of instructing senders to send specific files, use an
ETS table as a queue listing the remaining files in an
ongoing transport. Transport senders will be notified of the
new transport and remove files from the queue. This avoids
a failure scenario where a stuck sender causes files to be
dropped from a transport.

Reviewed By: jaher

Differential Revision: D65612901

fbshipit-source-id: 896c210d5fb585fbaf211b85c535b49066537bbe
  • Loading branch information
hsun324 authored and facebook-github-bot committed Nov 8, 2024
1 parent dfc2575 commit de1e10f
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 69 deletions.
101 changes: 65 additions & 36 deletions src/wa_raft_transport.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@
%% Transport API
-export([
cancel/2,
complete/3,
complete/4
complete/3
]).

%% ETS API
Expand All @@ -48,13 +47,18 @@
update_file_info/3
]).

%% Internal API
%% Internal API - Configuration
-export([
default_directory/1,
registered_directory/2,
registered_module/2
]).

%% Internal API - Transport Workers
-export([
pop_file/1
]).

%% gen_server callbacks
-export([
init/1,
Expand Down Expand Up @@ -104,8 +108,8 @@
end_ts => Millis :: integer(),

total_files := non_neg_integer(),
next_file => non_neg_integer(),
completed_files := non_neg_integer(),
queue => ets:table(),

error => term()
}.
Expand Down Expand Up @@ -134,6 +138,7 @@

start_ts => Millis :: integer(),
end_ts => Millis :: integer(),
retries => non_neg_integer(),

total_bytes := non_neg_integer(),
completed_bytes := non_neg_integer(),
Expand All @@ -142,6 +147,8 @@
error => Reason :: term()
}.

%%% ------------------------------------------------------------------------

-record(state, {
counters :: counters:counters_ref()
}).
Expand Down Expand Up @@ -235,13 +242,9 @@ transfer_snapshot(Peer, Table, Partition, LogPos, Root, Timeout) ->
cancel(ID, Reason) ->
gen_server:call(?MODULE, {cancel, ID, Reason}).

-spec complete(ID :: transport_id(), FileID :: file_id(), Status :: term()) -> ok | invalid.
-spec complete(ID :: transport_id(), FileID :: file_id(), Status :: term()) -> ok.
complete(ID, FileID, Status) ->
complete(ID, FileID, Status, undefined).

-spec complete(ID :: transport_id(), FileID :: file_id(), Status :: term(), Pid :: pid() | undefined) -> ok | invalid.
complete(ID, FileID, Status, Pid) ->
gen_server:cast(?MODULE, {complete, ID, FileID, Status, Pid}).
gen_server:cast(?MODULE, {complete, ID, FileID, Status}).

%%% ------------------------------------------------------------------------
%%% ETS table helper functions
Expand Down Expand Up @@ -305,9 +308,11 @@ update_and_get_transport_info(ID, Fun, Counters) ->
-spec delete_transport_info(ID :: transport_id()) -> ok | not_found.
delete_transport_info(ID) ->
case transport_info(ID) of
{ok, #{total_files := TotalFiles}} ->
{ok, #{total_files := TotalFiles} = Info} ->
lists:foreach(fun (FileID) -> delete_file_info(ID, FileID) end, lists:seq(1, TotalFiles)),
ets:delete(?TRANSPORT_TABLE, ID),
Queue = maps:get(queue, Info, undefined),
Queue =/= undefined andalso catch ets:delete(Queue),
ok;
not_found ->
not_found
Expand Down Expand Up @@ -365,7 +370,7 @@ delete_file_info(ID, FileID) ->
ok.

%%-------------------------------------------------------------------
%% Internal API
%% Internal API - Configuration
%%-------------------------------------------------------------------

%% Get the default directory for incoming transports associated with the
Expand All @@ -392,6 +397,33 @@ registered_module(Table, Partition) ->
Options -> Options#raft_options.transport_module
end.

%%-------------------------------------------------------------------
%% Internal API - Transport Workers
%%-------------------------------------------------------------------

-spec pop_file(ID :: transport_id()) -> {ok, FileID :: file_id()} | empty | not_found.
pop_file(ID) ->
case transport_info(ID) of
{ok, #{queue := Queue}} -> try_pop_file(Queue);
_Other -> not_found
end.

-spec try_pop_file(Queue :: ets:table()) -> {ok, FileID :: file_id()} | empty | not_found.
try_pop_file(Queue) ->
try ets:first(Queue) of
'$end_of_table' ->
empty;
FileID ->
try ets:select_delete(Queue, [{{FileID}, [], [true]}]) of
0 -> try_pop_file(Queue);
1 -> {ok, FileID}
catch
error:badarg -> not_found
end
catch
error:badarg -> not_found
end.

%%% ------------------------------------------------------------------------
%%% gen_server callbacks
%%%
Expand Down Expand Up @@ -439,7 +471,7 @@ handle_call({transport, ID, Peer, Module, Meta, Files}, From, #state{counters =
% Force the receiving directory to always exist
catch filelib:ensure_dir([RootDir, $/]),

% Initialize info in ETS about transport and contained files.
% Setup overall transport info
set_transport_info(ID, #{
type => receiver,
status => running,
Expand All @@ -452,6 +484,8 @@ handle_call({transport, ID, Peer, Module, Meta, Files}, From, #state{counters =
total_files => TotalFiles,
completed_files => 0
}, Counters),

% Setup file info for each file
[
begin
FileAtomics = atomics:new(?RAFT_TRANSPORT_FILE_ATOMICS_COUNT, []),
Expand All @@ -466,6 +500,7 @@ handle_call({transport, ID, Peer, Module, Meta, Files}, From, #state{counters =
end || {FileID, RelativePath, Size} <- Files
],

% If the transport is empty, then immediately complete it
TotalFiles =:= 0 andalso
update_and_get_transport_info(
ID,
Expand Down Expand Up @@ -527,8 +562,8 @@ handle_call(Request, _From, #state{} = State) ->
{noreply, State}.

-spec handle_cast(Request, State :: #state{}) -> {noreply, NewState :: #state{}}
when Request :: {complete, ID :: transport_id(), FileID :: file_id(), Status :: term(), Pid :: pid()}.
handle_cast({complete, ID, FileID, Status, Pid}, #state{counters = Counters} = State) ->
when Request :: {complete, ID :: transport_id(), FileID :: file_id(), Status :: term()}.
handle_cast({complete, ID, FileID, Status}, #state{counters = Counters} = State) ->
NowMillis = erlang:system_time(millisecond),
?RAFT_COUNT({'raft.transport.file.send', normalize_status(Status)}),
Result0 = update_file_info(ID, FileID,
Expand All @@ -551,7 +586,7 @@ handle_cast({complete, ID, FileID, Status, Pid}, #state{counters = Counters} = S
update_and_get_transport_info(
ID,
fun
(#{status := running, type := Type, completed_files := CompletedFiles, total_files := TotalFiles} = Info0) ->
(#{status := running, completed_files := CompletedFiles, total_files := TotalFiles} = Info0) ->
Info1 = Info0#{completed_files => CompletedFiles + 1},
Info2 = case CompletedFiles + 1 of
TotalFiles -> Info1#{status => completed, end_ts => NowMillis};
Expand All @@ -565,11 +600,7 @@ handle_cast({complete, ID, FileID, Status, Pid}, #state{counters = Counters} = S
ok -> Info3;
{error, Reason} -> Info3#{status => failed, error => {notify_failed, Reason}}
end,
Info5 = case Type of
sender -> maybe_submit_one(ID, Info4, Pid);
_ -> Info4
end,
maybe_notify(ID, Info5);
maybe_notify(ID, Info4);
(Info) ->
Info
end,
Expand Down Expand Up @@ -598,6 +629,7 @@ handle_info(scan, #state{counters = Counters} = State) ->
ExcessTransportIDs = lists:sublist(lists:sort(InactiveTransports), ExcessTransports),
lists:foreach(fun delete_transport_info/1, ExcessTransportIDs)
end,

schedule_scan(),
{noreply, State};
handle_info(Info, State) ->
Expand Down Expand Up @@ -631,8 +663,9 @@ handle_transport_start(From, Peer, Meta, Root, Counters) ->
Module = transport_module(Meta),
TotalFiles = length(Files),
NowMillis = erlang:system_time(millisecond),
Queue = ets:new(?MODULE, [ordered_set, public]),

% Initialize info in ETS about transport and contained files.
% Setup overall transport info
set_transport_info(ID, #{
type => sender,
status => requested,
Expand All @@ -643,8 +676,11 @@ handle_transport_start(From, Peer, Meta, Root, Counters) ->
root => Root,
start_ts => NowMillis,
total_files => TotalFiles,
completed_files => 0
completed_files => 0,
queue => Queue
}, Counters),

% Setup file info for each file
[
begin
FileAtomics = atomics:new(?RAFT_TRANSPORT_FILE_ATOMICS_COUNT, []),
Expand All @@ -664,6 +700,10 @@ handle_transport_start(From, Peer, Meta, Root, Counters) ->
FileData = [{FileID, Filename, Size} || {FileID, Filename, _, _, Size} <- Files],
case gen_server:call({?MODULE, Peer}, {transport, ID, node(), Module, Meta, FileData}) of
ok ->
% Add all files to the queue
ets:insert(Queue, [{FileID} || {FileID, _, _, _, _} <- Files]),

% Start workers
update_and_get_transport_info(
ID,
fun (Info0) ->
Expand All @@ -676,10 +716,9 @@ handle_transport_start(From, Peer, Meta, Root, Counters) ->
Info2 = Info1#{status => completed, end_ts => NowMillis},
maybe_notify(ID, Info2);
_ ->
Info2 = Info1#{status => running, next_file => 1},
Sup = wa_raft_transport_sup:get_or_start(Peer),
Workers = [Pid || {_Id, Pid, _Type, _Modules} <- supervisor:which_children(Sup), is_pid(Pid)],
lists:foldl(fun (Pid, InfoN) -> maybe_submit_one(ID, InfoN, Pid) end, Info2, Workers)
[gen_server:cast(Pid, {notify, ID}) || {_Id, Pid, _Type, _Modules} <- supervisor:which_children(Sup), is_pid(Pid)],
Info1#{status => running}
end
end,
Counters
Expand Down Expand Up @@ -796,16 +835,6 @@ collect_files_impl(Root, [Filename | Queue], Fun, Acc0) ->
join_names("", Name) -> Name;
join_names(Dir, Name) -> [Dir, $/, Name].

-spec maybe_submit_one(transport_id(), transport_info(), pid()) -> transport_info().
maybe_submit_one(ID, #{status := running, next_file := NextFileID, total_files := LastFileID} = Info, Pid) when is_pid(Pid) ->
gen_server:cast(Pid, {send, ID, NextFileID}),
case NextFileID of
LastFileID -> maps:remove(next_file, Info);
_ -> Info#{next_file => NextFileID + 1}
end;
maybe_submit_one(_ID, Info, _Pid) ->
Info.

-spec maybe_notify_complete(transport_id(), transport_info(), #state{}) -> ok | {error, term()}.
maybe_notify_complete(_ID, #{type := sender}, _State) ->
ok;
Expand Down
2 changes: 1 addition & 1 deletion src/wa_raft_transport_target_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

-spec name(node()) -> atom().
name(Name) ->
list_to_atom("raft_transport_target_sup_" ++ atom_to_list(Name)).
binary_to_atom(<<"raft_transport_target_sup_", (atom_to_binary(Name))/binary>>).

%%% ------------------------------------------------------------------------
%%% OTP supervision callbacks
Expand Down
72 changes: 40 additions & 32 deletions src/wa_raft_transport_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,20 @@
-record(state, {
node :: node(),
number :: non_neg_integer(),
table :: ets:tid(),

states = #{} :: #{module() => state()},
marker :: undefined | 0 | reference()
jobs = queue:new() :: queue:queue(job()),
states = #{} :: #{module() => state()}
}).
-type state() :: #state{}.

-record(transport, {
id :: wa_raft_transport:transport_id()
}).
-record(file, {
id :: wa_raft_transport:transport_id(),
file :: wa_raft_transport:file_id()
}).
-type job() :: #transport{} | #file{}.

%%% ------------------------------------------------------------------------
%%% Internal API
%%%
Expand Down Expand Up @@ -74,9 +81,7 @@ start_link(Node, Number) ->

-spec init(Args :: {node(), non_neg_integer()}) -> {ok, State :: state(), Timeout :: timeout()}.
init({Node, Number}) ->
Table = ets:new(?MODULE, [ordered_set, public]),
% eqwalizer:fixme - better ets:new
{ok, #state{node = Node, number = Number, table = Table}, ?CONTINUE_TIMEOUT}.
{ok, #state{node = Node, number = Number}, ?CONTINUE_TIMEOUT}.

-spec handle_call(Request :: term(), From :: {Pid :: pid(), Tag :: term()}, State :: state()) ->
{noreply, NewState :: state(), Timeout :: timeout()}.
Expand All @@ -86,13 +91,9 @@ handle_call(Request, From, #state{number = Number} = State) ->
{noreply, State, ?CONTINUE_TIMEOUT}.

-spec handle_cast(Request, State :: state()) -> {noreply, NewState :: state(), Timeout :: timeout()}
when Request :: {send, wa_raft_transport:transport_id(), wa_raft_transport:file_id()}.
handle_cast({send, ID, FileID}, #state{table = Table} = State) ->
?RAFT_COUNT('raft.transport.file.send'),
wa_raft_transport:update_file_info(ID, FileID,
fun (Info) -> Info#{status => sending, start_ts => erlang:system_time(millisecond)} end),
true = ets:insert_new(Table, {make_ref(), ID, FileID}),
{noreply, State, ?CONTINUE_TIMEOUT};
when Request :: {notify, wa_raft_transport:transport_id()}.
handle_cast({notify, ID}, #state{jobs = Jobs} = State) ->
{noreply, State#state{jobs = queue:in(#transport{id = ID}, Jobs)}, ?CONTINUE_TIMEOUT};
handle_cast(Request, #state{number = Number} = State) ->
?LOG_WARNING("[~p] received unrecognized cast ~p",
[Number, Request], #{domain => [whatsapp, wa_raft]}),
Expand All @@ -101,17 +102,22 @@ handle_cast(Request, #state{number = Number} = State) ->
-spec handle_info(Info :: term(), State :: state()) ->
{noreply, NewState :: state()}
| {noreply, NewState :: state(), Timeout :: timeout() | hibernate}.
handle_info(timeout, #state{table = Table, marker = undefined} = State) ->
case ets:first(Table) of
'$end_of_table' -> {noreply, State, hibernate}; % table is empty so wait until there is work
_FirstKey -> {noreply, State#state{marker = 0}, ?CONTINUE_TIMEOUT} % 0 compares smaller than any ref
end;
handle_info(timeout, #state{number = Number, table = Table, states = States, marker = Marker} = State) ->
case ets:next(Table, Marker) of
'$end_of_table' ->
{noreply, State#state{marker = undefined}, ?CONTINUE_TIMEOUT};
NextKey ->
[{NextKey, ID, FileID}] = ets:lookup(Table, NextKey),
handle_info(timeout, #state{number = Number, jobs = Jobs, states = States} = State) ->
case queue:out(Jobs) of
{empty, NewJobs} ->
{noreply, State#state{jobs = NewJobs}, ?CONTINUE_TIMEOUT};
{{value, #transport{id = ID}}, NewJobs} ->
case wa_raft_transport:pop_file(ID) of
{ok, FileID} ->
?RAFT_COUNT('raft.transport.file.send'),
wa_raft_transport:update_file_info(ID, FileID,
fun (Info) -> Info#{status => sending, start_ts => erlang:system_time(millisecond)} end),
NewJob = #file{id = ID, file = FileID},
{noreply, State#state{jobs = queue:in(NewJob, NewJobs)}, ?CONTINUE_TIMEOUT};
_Other ->
{noreply, State#state{jobs = NewJobs}, ?CONTINUE_TIMEOUT}
end;
{{value, #file{id = ID, file = FileID} = Job}, NewJobs} ->
{Result, NewState} = case wa_raft_transport:transport_info(ID) of
{ok, #{module := Module}} ->
try get_module_state(Module, State) of
Expand All @@ -129,8 +135,8 @@ handle_info(timeout, #state{number = Number, table = Table, states = States, mar
[Number, Module, ID, FileID, T, E, S], #{domain => [whatsapp, wa_raft]}),
{{T, E}, State}
end;
{stop, Reason} ->
{{stop, Reason}, State}
Other ->
{Other, State}
catch
T:E:S ->
?LOG_WARNING("[~p] module ~p failed to get/init module state due to ~p ~p: ~p",
Expand All @@ -142,11 +148,13 @@ handle_info(timeout, #state{number = Number, table = Table, states = States, mar
[Number, ID], #{domain => [whatsapp, wa_raft]}),
{{stop, invalid_transport}, State}
end,
Result =/= continue andalso begin
ets:delete(Table, NextKey),
wa_raft_transport:complete(ID, FileID, Result, self())
end,
{noreply, NewState#state{marker = NextKey}, ?CONTINUE_TIMEOUT}
case Result =:= continue of
true ->
{noreply, NewState#state{jobs = queue:in(Job, NewJobs)}, ?CONTINUE_TIMEOUT};
false ->
wa_raft_transport:complete(ID, FileID, Result),
{noreply, NewState#state{jobs = queue:in(#transport{id = ID}, NewJobs)}, ?CONTINUE_TIMEOUT}
end
end;
handle_info(Info, #state{number = Number} = State) ->
?LOG_WARNING("[~p] received unrecognized info ~p",
Expand Down

0 comments on commit de1e10f

Please sign in to comment.