Skip to content

Commit

Permalink
Direct cast to ns_rebalance_observer for rebalance events.
Browse files Browse the repository at this point in the history
Doing a cast to ns_rebalance_observer for interested events instead of
the bunny hop from master_activity_events_ingress to
master_activity_events to cast in ns_rebalance_observer as was done
previously, to prevent delays as much as possible. This is extremely
ugly but effective and easy to maintain.

Change-Id: Icb3ae2df34c3f864b00d8f23223c7e20a8dae546
Reviewed-on: http://review.couchbase.org/103698
Tested-by: Abhijeeth Nuthan <[email protected]>
Well-Formed: Build Bot <[email protected]>
Reviewed-by: Aliaksey Artamonau <[email protected]>
  • Loading branch information
anuthan authored and Aliaksey Artamonau committed Mar 21, 2019
1 parent 5f38660 commit 004fba7
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 44 deletions.
16 changes: 14 additions & 2 deletions src/master_activity_events.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,19 @@

-export([stream_events/2]).

submit_cast(Arg) ->
(catch gen_event:notify(master_activity_events_ingress, {submit_master_event, Arg})).
submit_cast(Event) ->
try
%% Doing a cast to ns_rebalance_observer for interested events instead
%% of the bunny hop from master_activity_events_ingress to
%% master_activity_events to cast in ns_rebalance_observer as was done
%% previously, to prevent delays as much as possible. This is extremely
%% ugly but effective and easy to maintain, I think.
ns_rebalance_observer:submit_master_event(Event),
gen_event:notify(master_activity_events_ingress,
{submit_master_event, Event})
catch T:E ->
?log_debug("Failed to send master activity event: ~p", [{T,E}])
end.

get_stage_list(Stage) when is_atom(Stage) ->
[Stage];
Expand Down Expand Up @@ -114,6 +125,7 @@ note_became_master() ->
note_set_ff_map(BucketName, undefined, _OldMap) ->
submit_cast({set_ff_map, BucketName, undefined});
note_set_ff_map(BucketName, NewMap, OldMap) ->
ns_rebalance_observer:submit_master_event({set_ff_map, BucketName, NewMap}),
Work = fun () ->
{set_ff_map, BucketName,
misc:compute_map_diff(NewMap, OldMap)}
Expand Down
78 changes: 36 additions & 42 deletions src/ns_rebalance_observer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
get_aggregated_progress/1,
get_stage_info/0,
update_stage_info/2,
update_progress/2]).
update_progress/2,
submit_master_event/1]).

%% gen_server callbacks
-export([code_change/3, init/1, handle_call/3, handle_cast/2, handle_info/2,
Expand Down Expand Up @@ -102,35 +103,41 @@ update_progress(Stage, StageProgress) ->
update_stage_info(Stage, StageInfo) ->
gen_server:cast(?SERVER, {update_stage_info, Stage, StageInfo}).

is_interesting_master_event({_, bucket_rebalance_started, _Bucket, _Pid}) ->
get_registered_local_name() ->
?MODULE.

submit_master_event(Event) ->
gen_server:cast(get_registered_local_name(), {note, Event}).

is_interesting_master_event({bucket_rebalance_started, _Bucket, _Pid}) ->
fun handle_bucket_rebalance_started/2;
is_interesting_master_event({_, set_ff_map, _BucketName, _Diff}) ->
is_interesting_master_event({set_ff_map, _BucketName, _FFMap}) ->
fun handle_set_ff_map/2;
is_interesting_master_event({_, vbucket_move_start, _Pid, _BucketName, _Node, _VBucketId, _, _}) ->
is_interesting_master_event({vbucket_move_start, _Pid, _BucketName, _Node, _VBucketId, _, _}) ->
fun handle_vbucket_move_start/2;
is_interesting_master_event({_, vbucket_move_done, _BucketName, _VBucketId}) ->
is_interesting_master_event({vbucket_move_done, _BucketName, _VBucketId}) ->
fun handle_vbucket_move_done/2;
is_interesting_master_event({_, rebalance_stage_started, _Stage}) ->
is_interesting_master_event({rebalance_stage_started, _Stage}) ->
fun handle_rebalance_stage_started/2;
is_interesting_master_event({_, rebalance_stage_completed, _Stage}) ->
is_interesting_master_event({rebalance_stage_completed, _Stage}) ->
fun handle_rebalance_stage_completed/2;
is_interesting_master_event({_, rebalance_stage_event, _Stage, _Event}) ->
is_interesting_master_event({rebalance_stage_event, _Stage, _Event}) ->
fun handle_rebalance_stage_event/2;
is_interesting_master_event({_, compaction_uninhibit_started, _BucketName, _}) ->
is_interesting_master_event({compaction_uninhibit_started, _BucketName, _}) ->
fun handle_compaction_uninhibit/2;
is_interesting_master_event({_, compaction_uninhibit_done, _BucketName, _}) ->
is_interesting_master_event({compaction_uninhibit_done, _BucketName, _}) ->
fun handle_compaction_uninhibit/2;
is_interesting_master_event({_, takeover_started, _BucketName, _VBucketId, _, _}) ->
is_interesting_master_event({takeover_started, _BucketName, _VBucketId, _, _}) ->
fun handle_takeover/2;
is_interesting_master_event({_, takeover_ended, _BucketName, _VBucketId, _, _}) ->
is_interesting_master_event({takeover_ended, _BucketName, _VBucketId, _, _}) ->
fun handle_takeover/2;
is_interesting_master_event({_, backfill_phase_started, _BucketName, _VBucketId}) ->
is_interesting_master_event({backfill_phase_started, _BucketName, _VBucketId}) ->
fun handle_generic_vb_stat_event/2;
is_interesting_master_event({_, backfill_phase_ended, _BucketName, _VBucketId}) ->
is_interesting_master_event({backfill_phase_ended, _BucketName, _VBucketId}) ->
fun handle_generic_vb_stat_event/2;
is_interesting_master_event({_, seqno_waiting_started, _BucketName, _VBucketId, _, _}) ->
is_interesting_master_event({seqno_waiting_started, _BucketName, _VBucketId, _, _}) ->
fun handle_persistence/2;
is_interesting_master_event({_, seqno_waiting_ended, _BucketName, _VBucketId, _, _}) ->
is_interesting_master_event({seqno_waiting_ended, _BucketName, _VBucketId, _, _}) ->
fun handle_persistence/2;
is_interesting_master_event(_) ->
undefined.
Expand Down Expand Up @@ -160,23 +167,14 @@ get_stage_nodes(Services, NodesInfo) ->

init({Services, NodesInfo, Type}) ->
Self = self(),
ns_pubsub:subscribe_link(master_activity_events,
fun (Event, _Ignored) ->
case is_interesting_master_event(Event) of
undefined ->
[];
Fun ->
gen_server:cast(Self, {note, Fun, Event})
end
end, []),

StageInfo = rebalance_stage_info:init(get_stage_nodes(Services, NodesInfo)),
Buckets = ns_bucket:get_bucket_names(),
BucketsCount = length(Buckets),
BucketLevelInfo = dict:from_list([{BN,
#bucket_level_info{bucket_name = BN}} ||
BN <- Buckets]),
proc_lib:spawn_link(erlang, apply, [fun docs_left_updater_init/1, [Self]]),
erlang:register(get_registered_local_name(), self()),

{ok, #state{bucket = undefined,
buckets_count = BucketsCount,
Expand All @@ -202,9 +200,14 @@ handle_call(Req, From, State) ->
?log_error("Got unknown request: ~p from ~p", [Req, From]),
{reply, unknown_request, State}.

handle_cast({note, Fun, Ev}, State) ->
{noreply, NewState} = Fun(Ev, State),
{noreply, NewState};
handle_cast({note, Event}, State) ->
case is_interesting_master_event(Event) of
undefined ->
{noreply, State};
Fun ->
StampedEvent = list_to_tuple([os:timestamp() | tuple_to_list(Event)]),
Fun(StampedEvent, State)
end;

handle_cast({update_stats, BucketName, VBucket, NodeToDocsLeft}, State) ->
?log_debug("Got update_stats: ~p, ~p", [VBucket, NodeToDocsLeft]),
Expand Down Expand Up @@ -265,20 +268,11 @@ handle_cast(Req, _State) ->
?log_error("Got unknown cast: ~p", [Req]),
erlang:error({unknown_cast, Req}).

initiate_bucket_rebalance(BucketName, OldState) when OldState#state.bucket =:= BucketName ->
initiate_bucket_rebalance(BucketName, _FFMap, OldState) when OldState#state.bucket =:= BucketName ->
OldState;
initiate_bucket_rebalance(BucketName, OldState) ->
initiate_bucket_rebalance(BucketName, FFMap, OldState) ->
{ok, BucketConfig} = ns_bucket:get_bucket(BucketName),
Map = proplists:get_value(map, BucketConfig),
FFMap = case proplists:get_value(fastForwardMap, BucketConfig) of
undefined ->
%% yes this is possible if rebalance completes
%% faster than we can start observing it's
%% progress
Map;
FFMap0 ->
FFMap0
end,
VBCount = length(Map),
Diff = [Triple
|| {_, [MasterNode|_] = ChainBefore, ChainAfter} = Triple <- lists:zip3(lists:seq(0, VBCount-1),
Expand Down Expand Up @@ -367,8 +361,8 @@ handle_bucket_rebalance_started({_, bucket_rebalance_started, _BucketName, _Pid}
NewState = State#state{bucket_number=Number + 1},
{noreply, NewState}.

handle_set_ff_map({_, set_ff_map, BucketName, _Diff}, State) ->
{noreply, initiate_bucket_rebalance(BucketName, State)}.
handle_set_ff_map({_, set_ff_map, BucketName, Map}, State) ->
{noreply, initiate_bucket_rebalance(BucketName, Map, State)}.

handle_vbucket_move_start({TS, vbucket_move_start, _Pid, BucketName,
_Node, VBucketId, _, _},
Expand Down

0 comments on commit 004fba7

Please sign in to comment.