Skip to content

Commit

Permalink
Make adding new stages easy.
Browse files Browse the repository at this point in the history
Adding a new stage is now equivalent to signalling the start of the
stage,
master_activity_events:note_rebalance_stage_started(Stage,
                                                    NodesInvolved)
If no nodes are involved in the stage i.e., NodesInvolved are [], we
ignore the stage as a part of rebalance visibility.

This can result in new stages showing up in the UI which weren't
part of the rebalance visibility output at the start of rebalance.

To mark stage as completed,
master_activity_events:note_rebalance_stage_completed(Stage)

Part of EPIC,
MB-30894: Rebalance visibility and reporting

Change-Id: I95f7542f4fa6b1e0771e2ab83879efb98ac48e03
Reviewed-on: http://review.couchbase.org/105567
Tested-by: Abhijeeth Nuthan <[email protected]>
Well-Formed: Build Bot <[email protected]>
Reviewed-by: Aliaksey Artamonau <[email protected]>
  • Loading branch information
anuthan authored and Aliaksey Artamonau committed Mar 21, 2019
1 parent 004fba7 commit 2784673
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 54 deletions.
11 changes: 6 additions & 5 deletions src/master_activity_events.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
note_autofailover_node_state_change/4,
note_autofailover_server_group_state_change/4,
note_autofailover_done/2,
note_rebalance_stage_started/1,
note_rebalance_stage_started/2,
note_rebalance_stage_completed/1,
note_rebalance_stage_event/2
]).
Expand All @@ -87,8 +87,8 @@ get_stage_list(Stage) when is_atom(Stage) ->
get_stage_list(Stage) when is_list(Stage) ->
Stage.

note_rebalance_stage_started(Stage) ->
submit_cast({rebalance_stage_started, get_stage_list(Stage)}).
note_rebalance_stage_started(Stage, Nodes) ->
submit_cast({rebalance_stage_started, get_stage_list(Stage), Nodes}).

note_rebalance_stage_completed(Stage) ->
submit_cast({rebalance_stage_completed, get_stage_list(Stage)}).
Expand Down Expand Up @@ -408,10 +408,11 @@ maybe_get_pids_node(Pid) when is_pid(Pid) ->
maybe_get_pids_node(_PerhapsBinary) ->
skip_this_pair_please.

event_to_jsons({TS, rebalance_stage_started, Stage}) ->
event_to_jsons({TS, rebalance_stage_started, Stage, Nodes}) ->
[format_simple_plist_as_json([{type, rebalanceStageStarted},
{ts, misc:time_to_epoch_float(TS)},
{stage, {list, Stage}}])];
{stage, {list, Stage}},
{nodes, {list, Nodes}}])];

event_to_jsons({TS, rebalance_stage_completed, Stage}) ->
[format_simple_plist_as_json([{type, rebalanceStageCompleted},
Expand Down
20 changes: 5 additions & 15 deletions src/ns_rebalance_observer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ is_interesting_master_event({vbucket_move_start, _Pid, _BucketName, _Node, _VBuc
fun handle_vbucket_move_start/2;
is_interesting_master_event({vbucket_move_done, _BucketName, _VBucketId}) ->
fun handle_vbucket_move_done/2;
is_interesting_master_event({rebalance_stage_started, _Stage}) ->
is_interesting_master_event({rebalance_stage_started, _Stage, _Nodes}) ->
fun handle_rebalance_stage_started/2;
is_interesting_master_event({rebalance_stage_completed, _Stage}) ->
fun handle_rebalance_stage_completed/2;
Expand All @@ -142,16 +142,6 @@ is_interesting_master_event({seqno_waiting_ended, _BucketName, _VBucketId, _, _}
is_interesting_master_event(_) ->
undefined.

possible_substages(kv, NodesInfo) ->
case proplists:get_value(delta_nodes, NodesInfo, []) of
[] ->
[];
DeltaNodes ->
[{kv_delta_recovery, DeltaNodes, []}]
end;
possible_substages(_,_) ->
[].

get_stage_nodes(Services, NodesInfo) ->
ActiveNodes = proplists:get_value(active_nodes, NodesInfo, []),
lists:filtermap(
Expand All @@ -160,8 +150,7 @@ get_stage_nodes(Services, NodesInfo) ->
[] ->
false;
Nodes ->
SubStages = possible_substages(Service, NodesInfo),
{true, {Service, Nodes, SubStages}}
{true, {Service, Nodes}}
end
end, lists:usort(Services)).

Expand Down Expand Up @@ -339,9 +328,10 @@ initiate_bucket_rebalance(BucketName, FFMap, OldState) ->
TmpState = update_all_vb_info(OldState, BucketName, dict:from_list(Moves)),
TmpState#state{bucket = BucketName}.

handle_rebalance_stage_started({TS, rebalance_stage_started, Stage},
handle_rebalance_stage_started({TS, rebalance_stage_started, Stage, Nodes},
#state{stage_info = Old} = State) ->
New = rebalance_stage_info:update_stage_info(Stage, {started, TS}, Old),
New = rebalance_stage_info:update_stage_info(Stage, {started, {TS, Nodes}},
Old),
{noreply, State#state{stage_info = New}}.

handle_rebalance_stage_completed({TS, rebalance_stage_completed, Stage},
Expand Down
16 changes: 10 additions & 6 deletions src/ns_rebalancer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -540,9 +540,9 @@ rebalance_simple_services(Config, Services, KeepNodes) ->
true ->
lists:filtermap(
fun (Service) ->
master_activity_events:note_rebalance_stage_started(
Service),
ServiceNodes = ns_cluster_membership:service_nodes(KeepNodes, Service),
master_activity_events:note_rebalance_stage_started(
Service, ServiceNodes),
Updated = update_service_map_with_config(Config, Service, ServiceNodes),

master_activity_events:note_rebalance_stage_completed(
Expand Down Expand Up @@ -601,7 +601,7 @@ rebalance_topology_aware_services(Config, Services, KeepNodesAll, EjectNodesAll)
false;
_ ->
master_activity_events:note_rebalance_stage_started(
Service),
Service, AllNodes),
update_service_map_with_config(Config, Service, AllNodes),
ok = rebalance_topology_aware_service(Service, KeepNodes,
EjectNodes, DeltaNodes),
Expand Down Expand Up @@ -716,7 +716,9 @@ rebalance_body(KeepNodes,

ok = drop_old_2i_indexes(KeepNodes),

master_activity_events:note_rebalance_stage_started(kv),
LiveKVNodes = ns_cluster_membership:service_nodes(KeepNodes ++ EjectNodesAll,
kv),
master_activity_events:note_rebalance_stage_started(kv, LiveKVNodes),
%% wait till all bucket shutdowns are done on nodes we're
%% adding (or maybe adding).
do_wait_buckets_shutdown(KeepNodes),
Expand All @@ -731,7 +733,7 @@ rebalance_body(KeepNodes,
end, BucketConfigs),

master_activity_events:note_rebalance_stage_started(
[kv, kv_delta_recovery]),
[kv, kv_delta_recovery], KVDeltaNodes),
ok = apply_delta_recovery_buckets(DeltaRecoveryBuckets,
KVDeltaNodes, BucketConfigs),
ok = maybe_clear_recovery_type(KeepNodes),
Expand Down Expand Up @@ -1371,7 +1373,9 @@ do_run_graceful_failover_moves(Nodes, BucketName, BucketConfig, I, N) ->
Map = proplists:get_value(map, BucketConfig, []),
Map1 = mb_map:promote_replicas_for_graceful_failover(Map, Nodes),

master_activity_events:note_rebalance_stage_started(kv),
ActiveNodes = ns_cluster_membership:active_nodes(),
InvolvedNodes = ns_cluster_membership:service_nodes(ActiveNodes, kv),
master_activity_events:note_rebalance_stage_started(kv, InvolvedNodes),
ProgressFun = make_progress_fun(I, N),
RV = run_mover(BucketName, BucketConfig,
proplists:get_value(servers, BucketConfig),
Expand Down
84 changes: 56 additions & 28 deletions src/rebalance_stage_info.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,27 @@

-type stage_info() :: #stage_info{}.

init(Stages) ->
PerStageProgress = dict:from_list(init_per_stage_progress(Stages)),
%% Need StageNodes as when rebalance starts we need to show a minimum stages of
%% rebalance that are expected to occur, usually the services involved.
init(StageNodes) ->
PerStageProgress = dict:from_list(init_per_stage_progress(StageNodes)),
Aggregated = aggregate(PerStageProgress),
StageInfo = init_per_stage_info(Stages),
StageInfo = init_per_stage_info(StageNodes),
#stage_info{per_stage_progress = PerStageProgress,
aggregated = Aggregated,
per_stage_info = StageInfo}.

init_per_stage_progress(Stages) ->
lists:flatten([init_stage_progress(S, N, SS) || {S, N, SS} <- Stages]).
init_per_stage_progress(StageNodes) ->
[{Stage, dict:from_list([{N, 0} || N <- Nodes])} ||
{Stage, Nodes} <- StageNodes, Nodes =/= []].

init_stage_progress(_Stage, [], _SubStage) ->
[];
init_stage_progress(Stage, Nodes, SubStages) ->
SubStageNodes = init_per_stage_progress(SubStages),
[{Stage, dict:from_list([{N, 0} || N <- Nodes])} | SubStageNodes].
init_per_stage_info(StageNodes) ->
[{Stage, #stage_details{}} || {Stage, Nodes} <- StageNodes, Nodes =/= []].

%% For backward compatibility.
get_progress(#stage_info{aggregated = Aggregated}) ->
Aggregated.

init_per_stage_info(Stages) ->
[{Stage, #stage_details{
start_time = false,
complete_time = false,
sub_stages = init_per_stage_info(SubStages),
notable_events = []
}} || {Stage, Nodes, SubStages} <- Stages, Nodes =/= []].

update_progress(
Stage, StageProgress,
#stage_info{per_stage_progress = OldPerStageProgress} = StageInfo) ->
Expand All @@ -85,7 +77,7 @@ do_update_progress(Stage, StageProgress, PerStage) ->
dict:merge(fun (_, _, New) ->
New
end, OldStageProgress, StageProgress)
end, PerStage).
end, StageProgress, PerStage).

aggregate(PerStage) ->
TmpAggr = dict:fold(
Expand Down Expand Up @@ -212,32 +204,39 @@ get_per_stage_progress(PerStageProgress) ->
dict:to_list(StageProgress)
end, PerStageProgress).

update_stage_info({started, Time}, StageInfo) ->
update_stage({started, {Time, _}}, StageInfo) ->
StageInfo#stage_details{start_time = Time,
complete_time = false};
update_stage_info({completed, Time}, StageInfo) ->
update_stage({completed, Time}, StageInfo) ->
StageInfo#stage_details{complete_time = Time};
update_stage_info({notable_event, TS, Text},
#stage_details{notable_events = NotableEvents} = StageInfo) ->
update_stage({notable_event, TS, Text},
#stage_details{notable_events = NotableEvents} = StageInfo) ->
Time = binarify_timestamp(TS),
Msg = list_to_binary(Text),
StageInfo#stage_details{notable_events = [{Time, Msg} | NotableEvents]}.

update_stage_info(Stage, StageInfoUpdate,
#stage_info{per_stage_info = PerStageInfo} = StageInfo) ->
update_stage_info(Stage, StageInfoUpdate, StageInfo) ->
NewStageInfo = maybe_create(Stage, StageInfoUpdate, StageInfo,
fun maybe_create_new_stage_progress/3),
update_stage_info_inner(Stage, StageInfoUpdate, NewStageInfo).

update_stage_info_inner(Stage, StageInfoUpdate,
#stage_info{per_stage_info = PerStageInfo} = StageInfo) ->
NewPerStageInfo = update_stage_info_rec(Stage, StageInfoUpdate,
PerStageInfo),
StageInfo#stage_info{per_stage_info = NewPerStageInfo}.

update_stage_info_rec([Stage | SubStages], StageInfoUpdate, AllStageInfo) ->
update_stage_info_rec([Stage | SubStages] = AllStages, StageInfoUpdate,
AllStageInfo) ->
case lists:keysearch(Stage, 1, AllStageInfo) of
false ->
AllStageInfo;
maybe_create(AllStages, StageInfoUpdate, AllStageInfo,
fun create_stage/3);
{value, {Stage, OldStageInfo}} ->
NewStageInfo =
case SubStages of
[] ->
update_stage_info(StageInfoUpdate, OldStageInfo);
update_stage(StageInfoUpdate, OldStageInfo);
_ ->
NewSubStages = update_stage_info_rec(
SubStages,
Expand All @@ -248,3 +247,32 @@ update_stage_info_rec([Stage | SubStages], StageInfoUpdate, AllStageInfo) ->
end,
lists:keyreplace(Stage, 1, AllStageInfo, {Stage, NewStageInfo})
end.

create_new_field({started, {_, []}}) ->
false;
create_new_field({started, {_, _}}) ->
true;
create_new_field(_) ->
false.

maybe_create(Stage, Info, Old, Fun) ->
case create_new_field(Info) of
true -> Fun(Stage, Info, Old);
false -> Old
end.

create_stage([Stage | _] = AllStages, {started, {_,_}} = Info, AllStageInfo) ->
update_stage_info_rec(AllStages, Info,
[{Stage, #stage_details{}} | AllStageInfo]).

maybe_create_new_stage_progress(
Stage, {started, {_, Nodes}},
#stage_info{per_stage_progress = PerStageProgress} = StageInfo) ->
ProgressStage = lists:last(Stage),
case dict:find(ProgressStage, PerStageProgress) of
{ok, _} ->
StageInfo;
_ ->
[{ProgressStage, Dict}] = init_per_stage_progress([{ProgressStage, Nodes}]),
update_progress(ProgressStage, Dict, StageInfo)
end.

0 comments on commit 2784673

Please sign in to comment.