From de1e10fdda6d70d4ca92b62352f0a44290864681 Mon Sep 17 00:00:00 2001 From: Henry Sun Date: Fri, 8 Nov 2024 09:36:34 -0800 Subject: [PATCH] Use an ETS-based queue for assigning files to transport senders Summary: Instead of instructing senders to send specific files, use an ETS table as a queue listing the remaining files in an ongoing transport. Transport senders will be notified of the new transport and remove files from the queue. This avoids a failure scenario where a stuck sender causes files to be dropped from a transport. Reviewed By: jaher Differential Revision: D65612901 fbshipit-source-id: 896c210d5fb585fbaf211b85c535b49066537bbe --- src/wa_raft_transport.erl | 101 +++++++++++++++++---------- src/wa_raft_transport_target_sup.erl | 2 +- src/wa_raft_transport_worker.erl | 72 ++++++++++--------- 3 files changed, 106 insertions(+), 69 deletions(-) diff --git a/src/wa_raft_transport.erl b/src/wa_raft_transport.erl index 149623c..fec07c3 100644 --- a/src/wa_raft_transport.erl +++ b/src/wa_raft_transport.erl @@ -34,8 +34,7 @@ %% Transport API -export([ cancel/2, - complete/3, - complete/4 + complete/3 ]). %% ETS API @@ -48,13 +47,18 @@ update_file_info/3 ]). -%% Internal API +%% Internal API - Configuration -export([ default_directory/1, registered_directory/2, registered_module/2 ]). +%% Internal API - Transport Workers +-export([ + pop_file/1 +]). + %% gen_server callbacks -export([ init/1, @@ -104,8 +108,8 @@ end_ts => Millis :: integer(), total_files := non_neg_integer(), - next_file => non_neg_integer(), completed_files := non_neg_integer(), + queue => ets:table(), error => term() }. @@ -134,6 +138,7 @@ start_ts => Millis :: integer(), end_ts => Millis :: integer(), + retries => non_neg_integer(), total_bytes := non_neg_integer(), completed_bytes := non_neg_integer(), @@ -142,6 +147,8 @@ error => Reason :: term() }. +%%% ------------------------------------------------------------------------ + -record(state, { counters :: counters:counters_ref() }). @@ -235,13 +242,9 @@ transfer_snapshot(Peer, Table, Partition, LogPos, Root, Timeout) -> cancel(ID, Reason) -> gen_server:call(?MODULE, {cancel, ID, Reason}). --spec complete(ID :: transport_id(), FileID :: file_id(), Status :: term()) -> ok | invalid. +-spec complete(ID :: transport_id(), FileID :: file_id(), Status :: term()) -> ok. complete(ID, FileID, Status) -> - complete(ID, FileID, Status, undefined). - --spec complete(ID :: transport_id(), FileID :: file_id(), Status :: term(), Pid :: pid() | undefined) -> ok | invalid. -complete(ID, FileID, Status, Pid) -> - gen_server:cast(?MODULE, {complete, ID, FileID, Status, Pid}). + gen_server:cast(?MODULE, {complete, ID, FileID, Status}). %%% ------------------------------------------------------------------------ %%% ETS table helper functions @@ -305,9 +308,11 @@ update_and_get_transport_info(ID, Fun, Counters) -> -spec delete_transport_info(ID :: transport_id()) -> ok | not_found. delete_transport_info(ID) -> case transport_info(ID) of - {ok, #{total_files := TotalFiles}} -> + {ok, #{total_files := TotalFiles} = Info} -> lists:foreach(fun (FileID) -> delete_file_info(ID, FileID) end, lists:seq(1, TotalFiles)), ets:delete(?TRANSPORT_TABLE, ID), + Queue = maps:get(queue, Info, undefined), + Queue =/= undefined andalso catch ets:delete(Queue), ok; not_found -> not_found @@ -365,7 +370,7 @@ delete_file_info(ID, FileID) -> ok. %%------------------------------------------------------------------- -%% Internal API +%% Internal API - Configuration %%------------------------------------------------------------------- %% Get the default directory for incoming transports associated with the @@ -392,6 +397,33 @@ registered_module(Table, Partition) -> Options -> Options#raft_options.transport_module end. +%%------------------------------------------------------------------- +%% Internal API - Transport Workers +%%------------------------------------------------------------------- + +-spec pop_file(ID :: transport_id()) -> {ok, FileID :: file_id()} | empty | not_found. +pop_file(ID) -> + case transport_info(ID) of + {ok, #{queue := Queue}} -> try_pop_file(Queue); + _Other -> not_found + end. + +-spec try_pop_file(Queue :: ets:table()) -> {ok, FileID :: file_id()} | empty | not_found. +try_pop_file(Queue) -> + try ets:first(Queue) of + '$end_of_table' -> + empty; + FileID -> + try ets:select_delete(Queue, [{{FileID}, [], [true]}]) of + 0 -> try_pop_file(Queue); + 1 -> {ok, FileID} + catch + error:badarg -> not_found + end + catch + error:badarg -> not_found + end. + %%% ------------------------------------------------------------------------ %%% gen_server callbacks %%% @@ -439,7 +471,7 @@ handle_call({transport, ID, Peer, Module, Meta, Files}, From, #state{counters = % Force the receiving directory to always exist catch filelib:ensure_dir([RootDir, $/]), - % Initialize info in ETS about transport and contained files. + % Setup overall transport info set_transport_info(ID, #{ type => receiver, status => running, @@ -452,6 +484,8 @@ handle_call({transport, ID, Peer, Module, Meta, Files}, From, #state{counters = total_files => TotalFiles, completed_files => 0 }, Counters), + + % Setup file info for each file [ begin FileAtomics = atomics:new(?RAFT_TRANSPORT_FILE_ATOMICS_COUNT, []), @@ -466,6 +500,7 @@ handle_call({transport, ID, Peer, Module, Meta, Files}, From, #state{counters = end || {FileID, RelativePath, Size} <- Files ], + % If the transport is empty, then immediately complete it TotalFiles =:= 0 andalso update_and_get_transport_info( ID, @@ -527,8 +562,8 @@ handle_call(Request, _From, #state{} = State) -> {noreply, State}. -spec handle_cast(Request, State :: #state{}) -> {noreply, NewState :: #state{}} - when Request :: {complete, ID :: transport_id(), FileID :: file_id(), Status :: term(), Pid :: pid()}. -handle_cast({complete, ID, FileID, Status, Pid}, #state{counters = Counters} = State) -> + when Request :: {complete, ID :: transport_id(), FileID :: file_id(), Status :: term()}. +handle_cast({complete, ID, FileID, Status}, #state{counters = Counters} = State) -> NowMillis = erlang:system_time(millisecond), ?RAFT_COUNT({'raft.transport.file.send', normalize_status(Status)}), Result0 = update_file_info(ID, FileID, @@ -551,7 +586,7 @@ handle_cast({complete, ID, FileID, Status, Pid}, #state{counters = Counters} = S update_and_get_transport_info( ID, fun - (#{status := running, type := Type, completed_files := CompletedFiles, total_files := TotalFiles} = Info0) -> + (#{status := running, completed_files := CompletedFiles, total_files := TotalFiles} = Info0) -> Info1 = Info0#{completed_files => CompletedFiles + 1}, Info2 = case CompletedFiles + 1 of TotalFiles -> Info1#{status => completed, end_ts => NowMillis}; @@ -565,11 +600,7 @@ handle_cast({complete, ID, FileID, Status, Pid}, #state{counters = Counters} = S ok -> Info3; {error, Reason} -> Info3#{status => failed, error => {notify_failed, Reason}} end, - Info5 = case Type of - sender -> maybe_submit_one(ID, Info4, Pid); - _ -> Info4 - end, - maybe_notify(ID, Info5); + maybe_notify(ID, Info4); (Info) -> Info end, @@ -598,6 +629,7 @@ handle_info(scan, #state{counters = Counters} = State) -> ExcessTransportIDs = lists:sublist(lists:sort(InactiveTransports), ExcessTransports), lists:foreach(fun delete_transport_info/1, ExcessTransportIDs) end, + schedule_scan(), {noreply, State}; handle_info(Info, State) -> @@ -631,8 +663,9 @@ handle_transport_start(From, Peer, Meta, Root, Counters) -> Module = transport_module(Meta), TotalFiles = length(Files), NowMillis = erlang:system_time(millisecond), + Queue = ets:new(?MODULE, [ordered_set, public]), - % Initialize info in ETS about transport and contained files. + % Setup overall transport info set_transport_info(ID, #{ type => sender, status => requested, @@ -643,8 +676,11 @@ handle_transport_start(From, Peer, Meta, Root, Counters) -> root => Root, start_ts => NowMillis, total_files => TotalFiles, - completed_files => 0 + completed_files => 0, + queue => Queue }, Counters), + + % Setup file info for each file [ begin FileAtomics = atomics:new(?RAFT_TRANSPORT_FILE_ATOMICS_COUNT, []), @@ -664,6 +700,10 @@ handle_transport_start(From, Peer, Meta, Root, Counters) -> FileData = [{FileID, Filename, Size} || {FileID, Filename, _, _, Size} <- Files], case gen_server:call({?MODULE, Peer}, {transport, ID, node(), Module, Meta, FileData}) of ok -> + % Add all files to the queue + ets:insert(Queue, [{FileID} || {FileID, _, _, _, _} <- Files]), + + % Start workers update_and_get_transport_info( ID, fun (Info0) -> @@ -676,10 +716,9 @@ handle_transport_start(From, Peer, Meta, Root, Counters) -> Info2 = Info1#{status => completed, end_ts => NowMillis}, maybe_notify(ID, Info2); _ -> - Info2 = Info1#{status => running, next_file => 1}, Sup = wa_raft_transport_sup:get_or_start(Peer), - Workers = [Pid || {_Id, Pid, _Type, _Modules} <- supervisor:which_children(Sup), is_pid(Pid)], - lists:foldl(fun (Pid, InfoN) -> maybe_submit_one(ID, InfoN, Pid) end, Info2, Workers) + [gen_server:cast(Pid, {notify, ID}) || {_Id, Pid, _Type, _Modules} <- supervisor:which_children(Sup), is_pid(Pid)], + Info1#{status => running} end end, Counters @@ -796,16 +835,6 @@ collect_files_impl(Root, [Filename | Queue], Fun, Acc0) -> join_names("", Name) -> Name; join_names(Dir, Name) -> [Dir, $/, Name]. --spec maybe_submit_one(transport_id(), transport_info(), pid()) -> transport_info(). -maybe_submit_one(ID, #{status := running, next_file := NextFileID, total_files := LastFileID} = Info, Pid) when is_pid(Pid) -> - gen_server:cast(Pid, {send, ID, NextFileID}), - case NextFileID of - LastFileID -> maps:remove(next_file, Info); - _ -> Info#{next_file => NextFileID + 1} - end; -maybe_submit_one(_ID, Info, _Pid) -> - Info. - -spec maybe_notify_complete(transport_id(), transport_info(), #state{}) -> ok | {error, term()}. maybe_notify_complete(_ID, #{type := sender}, _State) -> ok; diff --git a/src/wa_raft_transport_target_sup.erl b/src/wa_raft_transport_target_sup.erl index bc1a0e8..246feb4 100644 --- a/src/wa_raft_transport_target_sup.erl +++ b/src/wa_raft_transport_target_sup.erl @@ -34,7 +34,7 @@ -spec name(node()) -> atom(). name(Name) -> - list_to_atom("raft_transport_target_sup_" ++ atom_to_list(Name)). + binary_to_atom(<<"raft_transport_target_sup_", (atom_to_binary(Name))/binary>>). %%% ------------------------------------------------------------------------ %%% OTP supervision callbacks diff --git a/src/wa_raft_transport_worker.erl b/src/wa_raft_transport_worker.erl index e798443..caa74f7 100644 --- a/src/wa_raft_transport_worker.erl +++ b/src/wa_raft_transport_worker.erl @@ -35,13 +35,20 @@ -record(state, { node :: node(), number :: non_neg_integer(), - table :: ets:tid(), - - states = #{} :: #{module() => state()}, - marker :: undefined | 0 | reference() + jobs = queue:new() :: queue:queue(job()), + states = #{} :: #{module() => state()} }). -type state() :: #state{}. +-record(transport, { + id :: wa_raft_transport:transport_id() +}). +-record(file, { + id :: wa_raft_transport:transport_id(), + file :: wa_raft_transport:file_id() +}). +-type job() :: #transport{} | #file{}. + %%% ------------------------------------------------------------------------ %%% Internal API %%% @@ -74,9 +81,7 @@ start_link(Node, Number) -> -spec init(Args :: {node(), non_neg_integer()}) -> {ok, State :: state(), Timeout :: timeout()}. init({Node, Number}) -> - Table = ets:new(?MODULE, [ordered_set, public]), - % eqwalizer:fixme - better ets:new - {ok, #state{node = Node, number = Number, table = Table}, ?CONTINUE_TIMEOUT}. + {ok, #state{node = Node, number = Number}, ?CONTINUE_TIMEOUT}. -spec handle_call(Request :: term(), From :: {Pid :: pid(), Tag :: term()}, State :: state()) -> {noreply, NewState :: state(), Timeout :: timeout()}. @@ -86,13 +91,9 @@ handle_call(Request, From, #state{number = Number} = State) -> {noreply, State, ?CONTINUE_TIMEOUT}. -spec handle_cast(Request, State :: state()) -> {noreply, NewState :: state(), Timeout :: timeout()} - when Request :: {send, wa_raft_transport:transport_id(), wa_raft_transport:file_id()}. -handle_cast({send, ID, FileID}, #state{table = Table} = State) -> - ?RAFT_COUNT('raft.transport.file.send'), - wa_raft_transport:update_file_info(ID, FileID, - fun (Info) -> Info#{status => sending, start_ts => erlang:system_time(millisecond)} end), - true = ets:insert_new(Table, {make_ref(), ID, FileID}), - {noreply, State, ?CONTINUE_TIMEOUT}; + when Request :: {notify, wa_raft_transport:transport_id()}. +handle_cast({notify, ID}, #state{jobs = Jobs} = State) -> + {noreply, State#state{jobs = queue:in(#transport{id = ID}, Jobs)}, ?CONTINUE_TIMEOUT}; handle_cast(Request, #state{number = Number} = State) -> ?LOG_WARNING("[~p] received unrecognized cast ~p", [Number, Request], #{domain => [whatsapp, wa_raft]}), @@ -101,17 +102,22 @@ handle_cast(Request, #state{number = Number} = State) -> -spec handle_info(Info :: term(), State :: state()) -> {noreply, NewState :: state()} | {noreply, NewState :: state(), Timeout :: timeout() | hibernate}. -handle_info(timeout, #state{table = Table, marker = undefined} = State) -> - case ets:first(Table) of - '$end_of_table' -> {noreply, State, hibernate}; % table is empty so wait until there is work - _FirstKey -> {noreply, State#state{marker = 0}, ?CONTINUE_TIMEOUT} % 0 compares smaller than any ref - end; -handle_info(timeout, #state{number = Number, table = Table, states = States, marker = Marker} = State) -> - case ets:next(Table, Marker) of - '$end_of_table' -> - {noreply, State#state{marker = undefined}, ?CONTINUE_TIMEOUT}; - NextKey -> - [{NextKey, ID, FileID}] = ets:lookup(Table, NextKey), +handle_info(timeout, #state{number = Number, jobs = Jobs, states = States} = State) -> + case queue:out(Jobs) of + {empty, NewJobs} -> + {noreply, State#state{jobs = NewJobs}, ?CONTINUE_TIMEOUT}; + {{value, #transport{id = ID}}, NewJobs} -> + case wa_raft_transport:pop_file(ID) of + {ok, FileID} -> + ?RAFT_COUNT('raft.transport.file.send'), + wa_raft_transport:update_file_info(ID, FileID, + fun (Info) -> Info#{status => sending, start_ts => erlang:system_time(millisecond)} end), + NewJob = #file{id = ID, file = FileID}, + {noreply, State#state{jobs = queue:in(NewJob, NewJobs)}, ?CONTINUE_TIMEOUT}; + _Other -> + {noreply, State#state{jobs = NewJobs}, ?CONTINUE_TIMEOUT} + end; + {{value, #file{id = ID, file = FileID} = Job}, NewJobs} -> {Result, NewState} = case wa_raft_transport:transport_info(ID) of {ok, #{module := Module}} -> try get_module_state(Module, State) of @@ -129,8 +135,8 @@ handle_info(timeout, #state{number = Number, table = Table, states = States, mar [Number, Module, ID, FileID, T, E, S], #{domain => [whatsapp, wa_raft]}), {{T, E}, State} end; - {stop, Reason} -> - {{stop, Reason}, State} + Other -> + {Other, State} catch T:E:S -> ?LOG_WARNING("[~p] module ~p failed to get/init module state due to ~p ~p: ~p", @@ -142,11 +148,13 @@ handle_info(timeout, #state{number = Number, table = Table, states = States, mar [Number, ID], #{domain => [whatsapp, wa_raft]}), {{stop, invalid_transport}, State} end, - Result =/= continue andalso begin - ets:delete(Table, NextKey), - wa_raft_transport:complete(ID, FileID, Result, self()) - end, - {noreply, NewState#state{marker = NextKey}, ?CONTINUE_TIMEOUT} + case Result =:= continue of + true -> + {noreply, NewState#state{jobs = queue:in(Job, NewJobs)}, ?CONTINUE_TIMEOUT}; + false -> + wa_raft_transport:complete(ID, FileID, Result), + {noreply, NewState#state{jobs = queue:in(#transport{id = ID}, NewJobs)}, ?CONTINUE_TIMEOUT} + end end; handle_info(Info, #state{number = Number} = State) -> ?LOG_WARNING("[~p] received unrecognized info ~p",