Skip to content

Commit

Permalink
feat: Introduce scale-in in recovery. (#13270)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <peng@risingwave-labs.com>
shanicky authored Nov 8, 2023
1 parent 61c3e2c commit 306801b
Showing 14 changed files with 697 additions and 263 deletions.
4 changes: 4 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
@@ -208,6 +208,10 @@ pub struct MetaConfig {
#[serde(default)]
pub disable_recovery: bool,

/// Whether to enable scale-in when recovery.
#[serde(default)]
pub enable_scale_in_when_recovery: bool,

#[serde(default = "default::meta::meta_leader_lease_secs")]
pub meta_leader_lease_secs: u64,

1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ hummock_version_checkpoint_interval_sec = 30
min_delta_log_num_for_hummock_version_checkpoint = 10
max_heartbeat_interval_secs = 300
disable_recovery = false
enable_scale_in_when_recovery = false
meta_leader_lease_secs = 30
default_parallelism = "Full"
enable_compaction_deterministic = false
1 change: 1 addition & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -249,6 +249,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
config.meta.meta_leader_lease_secs,
MetaOpts {
enable_recovery: !config.meta.disable_recovery,
enable_scale_in_when_recovery: config.meta.enable_scale_in_when_recovery,
in_flight_barrier_nums,
max_idle_ms,
compaction_deterministic_test: config.meta.enable_compaction_deterministic,
13 changes: 12 additions & 1 deletion src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use risingwave_meta::stream::{ScaleController, ScaleControllerRef};
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::scale_service_server::ScaleService;
use risingwave_pb::meta::{
@@ -35,6 +38,7 @@ pub struct ScaleServiceImpl {
catalog_manager: CatalogManagerRef,
stream_manager: GlobalStreamManagerRef,
barrier_manager: BarrierManagerRef,
scale_controller: ScaleControllerRef,
}

impl ScaleServiceImpl {
@@ -46,13 +50,20 @@ impl ScaleServiceImpl {
stream_manager: GlobalStreamManagerRef,
barrier_manager: BarrierManagerRef,
) -> Self {
let scale_controller = Arc::new(ScaleController::new(
fragment_manager.clone(),
cluster_manager.clone(),
source_manager.clone(),
stream_manager.env.clone(),
));
Self {
fragment_manager,
cluster_manager,
source_manager,
catalog_manager,
stream_manager,
barrier_manager,
scale_controller,
}
}
}
@@ -203,7 +214,7 @@ impl ScaleService for ScaleServiceImpl {
.policy
.ok_or_else(|| Status::invalid_argument("policy is required"))?;

let plan = self.stream_manager.get_reschedule_plan(policy).await?;
let plan = self.scale_controller.get_reschedule_plan(policy).await?;

let next_revision = self.fragment_manager.get_revision().await;

65 changes: 11 additions & 54 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
@@ -39,7 +39,9 @@ use crate::barrier::CommandChanges;
use crate::hummock::HummockManagerRef;
use crate::manager::{CatalogManagerRef, FragmentManagerRef, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments};
use crate::stream::{build_actor_connector_splits, SourceManagerRef, SplitAssignment};
use crate::stream::{
build_actor_connector_splits, ScaleControllerRef, SourceManagerRef, SplitAssignment,
};
use crate::MetaResult;

/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
@@ -238,6 +240,8 @@ pub struct CommandContext {

source_manager: SourceManagerRef,

scale_controller: ScaleControllerRef,

/// The tracing span of this command.
///
/// Differs from [`TracedEpoch`], this span focuses on the lifetime of the corresponding
@@ -260,6 +264,7 @@ impl CommandContext {
command: Command,
kind: BarrierKind,
source_manager: SourceManagerRef,
scale_controller: ScaleControllerRef,
span: tracing::Span,
) -> Self {
Self {
@@ -274,6 +279,7 @@ impl CommandContext {
command,
kind,
source_manager,
scale_controller,
span,
}
}
@@ -760,60 +766,11 @@ impl CommandContext {
}

Command::RescheduleFragment { reschedules } => {
let mut node_dropped_actors = HashMap::new();
for table_fragments in self
.fragment_manager
.get_fragment_read_guard()
.await
.table_fragments()
.values()
{
for fragment_id in table_fragments.fragments.keys() {
if let Some(reschedule) = reschedules.get(fragment_id) {
for actor_id in &reschedule.removed_actors {
let node_id = table_fragments
.actor_status
.get(actor_id)
.unwrap()
.parallel_unit
.as_ref()
.unwrap()
.worker_node_id;
node_dropped_actors
.entry(node_id as WorkerId)
.or_insert(vec![])
.push(*actor_id as ActorId);
}
}
}
}
self.clean_up(node_dropped_actors).await?;

// Update fragment info after rescheduling in meta store.
self.fragment_manager
.post_apply_reschedules(reschedules.clone())
let node_dropped_actors = self
.scale_controller
.post_apply_reschedule(reschedules)
.await?;

let mut stream_source_actor_splits = HashMap::new();
let mut stream_source_dropped_actors = HashSet::new();

for (fragment_id, reschedule) in reschedules {
if !reschedule.actor_splits.is_empty() {
stream_source_actor_splits
.insert(*fragment_id as FragmentId, reschedule.actor_splits.clone());
stream_source_dropped_actors.extend(reschedule.removed_actors.clone());
}
}

if !stream_source_actor_splits.is_empty() {
self.source_manager
.apply_source_change(
None,
Some(stream_source_actor_splits),
Some(stream_source_dropped_actors),
)
.await;
}
self.clean_up(node_dropped_actors).await?;
}

Command::ReplaceTable {
12 changes: 11 additions & 1 deletion src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
@@ -60,7 +60,7 @@ use crate::manager::{
};
use crate::model::{ActorId, BarrierManagerState, TableFragments};
use crate::rpc::metrics::MetaMetrics;
use crate::stream::SourceManagerRef;
use crate::stream::{ScaleController, ScaleControllerRef, SourceManagerRef};
use crate::{MetaError, MetaResult};

mod command;
@@ -176,6 +176,8 @@ pub struct GlobalBarrierManager {

source_manager: SourceManagerRef,

scale_controller: ScaleControllerRef,

sink_manager: SinkCoordinatorManager,

metrics: Arc<MetaMetrics>,
@@ -529,6 +531,12 @@ impl GlobalBarrierManager {
let in_flight_barrier_nums = env.opts.in_flight_barrier_nums;

let tracker = CreateMviewProgressTracker::new();
let scale_controller = Arc::new(ScaleController::new(
fragment_manager.clone(),
cluster_manager.clone(),
source_manager.clone(),
env.clone(),
));
Self {
enable_recovery,
status: Mutex::new(BarrierManagerStatus::Starting),
@@ -539,6 +547,7 @@ impl GlobalBarrierManager {
fragment_manager,
hummock_manager,
source_manager,
scale_controller,
sink_manager,
metrics,
env,
@@ -733,6 +742,7 @@ impl GlobalBarrierManager {
command,
kind,
self.source_manager.clone(),
self.scale_controller.clone(),
span.clone(),
));

105 changes: 98 additions & 7 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ use futures::future::try_join_all;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::meta::get_reschedule_plan_request::{PbWorkerChanges, StableResizePolicy};
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier::{BarrierKind, Mutation};
use risingwave_pb::stream_plan::AddMutation;
@@ -40,7 +41,7 @@ use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::{CheckpointControl, Command, GlobalBarrierManager};
use crate::manager::WorkerId;
use crate::model::{BarrierManagerState, MigrationPlan};
use crate::stream::build_actor_connector_splits;
use crate::stream::{build_actor_connector_splits, RescheduleOptions};
use crate::MetaResult;

impl GlobalBarrierManager {
@@ -254,12 +255,21 @@ impl GlobalBarrierManager {
// following steps will be no-op, while the compute nodes will still be reset.
let mut info = self.resolve_actor_info_for_recovery().await;

// Migrate actors in expired CN to newly joined one.
let migrated = self.migrate_actors(&info).await.inspect_err(|err| {
warn!(err = ?err, "migrate actors failed");
})?;
if migrated {
info = self.resolve_actor_info_for_recovery().await;
if self.env.opts.enable_scale_in_when_recovery {
let scaled = self.scale_actors(&info).await.inspect_err(|err| {
warn!(err = ?err, "scale actors failed");
})?;
if scaled {
info = self.resolve_actor_info_for_recovery().await;
}
} else {
// Migrate actors in expired CN to newly joined one.
let migrated = self.migrate_actors(&info).await.inspect_err(|err| {
warn!(err = ?err, "migrate actors failed");
})?;
if migrated {
info = self.resolve_actor_info_for_recovery().await;
}
}

// Reset all compute nodes, stop and drop existing actors.
@@ -301,6 +311,7 @@ impl GlobalBarrierManager {
command,
BarrierKind::Initial,
self.source_manager.clone(),
self.scale_controller.clone(),
tracing::Span::current(), // recovery span
));

@@ -386,6 +397,86 @@ impl GlobalBarrierManager {
Ok(true)
}

async fn scale_actors(&self, info: &BarrierActorInfo) -> MetaResult<bool> {
debug!("start scaling-in offline actors.");

let expired_workers: HashSet<WorkerId> = info
.actor_map
.iter()
.filter(|(&worker, actors)| !actors.is_empty() && !info.node_map.contains_key(&worker))
.map(|(&worker, _)| worker)
.collect();

if expired_workers.is_empty() {
debug!("no expired workers, skipping.");
return Ok(false);
}

let all_worker_parallel_units = self.fragment_manager.all_worker_parallel_units().await;

let expired_worker_parallel_units: HashMap<_, _> = all_worker_parallel_units
.into_iter()
.filter(|(worker, _)| expired_workers.contains(worker))
.collect();

let fragment_worker_changes = {
let guard = self.fragment_manager.get_fragment_read_guard().await;
let mut policy = HashMap::new();
for table_fragments in guard.table_fragments().values() {
for fragment_id in table_fragments.fragment_ids() {
policy.insert(
fragment_id,
PbWorkerChanges {
exclude_worker_ids: expired_workers.iter().cloned().collect(),
..Default::default()
},
);
}
}
policy
};

let plan = self
.scale_controller
.generate_stable_resize_plan(
StableResizePolicy {
fragment_worker_changes,
},
Some(expired_worker_parallel_units),
)
.await?;

let (reschedule_fragment, applied_reschedules) = self
.scale_controller
.prepare_reschedule_command(
plan,
RescheduleOptions {
resolve_no_shuffle_upstream: true,
},
)
.await?;

if let Err(e) = self
.scale_controller
.post_apply_reschedule(&reschedule_fragment)
.await
{
tracing::error!(
"failed to apply reschedule for offline scaling in recovery: {}",
e.to_string()
);

self.fragment_manager
.cancel_apply_reschedules(applied_reschedules)
.await;

return Err(e);
}

debug!("scaling-in actors succeed.");
Ok(true)
}

/// This function will generate a migration plan, which includes the mapping for all expired and
/// in-used parallel unit to a new one.
async fn generate_migration_plan(
3 changes: 3 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
@@ -81,6 +81,8 @@ pub struct MetaOpts {
/// Whether to enable the recovery of the cluster. If disabled, the meta service will exit on
/// abnormal cases.
pub enable_recovery: bool,
/// Whether to enable the scale-in feature when compute-node is removed.
pub enable_scale_in_when_recovery: bool,
/// The maximum number of barriers in-flight in the compute nodes.
pub in_flight_barrier_nums: usize,
/// After specified seconds of idle (no mview or flush), the process will be exited.
@@ -174,6 +176,7 @@ impl MetaOpts {
pub fn test(enable_recovery: bool) -> Self {
Self {
enable_recovery,
enable_scale_in_when_recovery: false,
in_flight_barrier_nums: 40,
max_idle_ms: 0,
compaction_deterministic_test: false,
Loading

0 comments on commit 306801b

Please sign in to comment.