Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce scale-in in recovery. #13270

Merged
merged 10 commits into from
Nov 8, 2023
4 changes: 4 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ pub struct MetaConfig {
#[serde(default)]
pub disable_recovery: bool,

/// Whether to enable scale-in when recovery.
#[serde(default)]
pub enable_scale_in_when_recovery: bool,

#[serde(default = "default::meta::meta_leader_lease_secs")]
pub meta_leader_lease_secs: u64,

Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ hummock_version_checkpoint_interval_sec = 30
min_delta_log_num_for_hummock_version_checkpoint = 10
max_heartbeat_interval_secs = 300
disable_recovery = false
enable_scale_in_when_recovery = false
meta_leader_lease_secs = 30
default_parallelism = "Full"
enable_compaction_deterministic = false
Expand Down
1 change: 1 addition & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
config.meta.meta_leader_lease_secs,
MetaOpts {
enable_recovery: !config.meta.disable_recovery,
enable_scale_in_when_recovery: config.meta.enable_scale_in_when_recovery,
in_flight_barrier_nums,
max_idle_ms,
compaction_deterministic_test: config.meta.enable_compaction_deterministic,
Expand Down
13 changes: 12 additions & 1 deletion src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use risingwave_meta::stream::{ScaleController, ScaleControllerRef};
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::scale_service_server::ScaleService;
use risingwave_pb::meta::{
Expand All @@ -35,6 +38,7 @@ pub struct ScaleServiceImpl {
catalog_manager: CatalogManagerRef,
stream_manager: GlobalStreamManagerRef,
barrier_manager: BarrierManagerRef,
scale_controller: ScaleControllerRef,
}

impl ScaleServiceImpl {
Expand All @@ -46,13 +50,20 @@ impl ScaleServiceImpl {
stream_manager: GlobalStreamManagerRef,
barrier_manager: BarrierManagerRef,
) -> Self {
let scale_controller = Arc::new(ScaleController::new(
fragment_manager.clone(),
cluster_manager.clone(),
source_manager.clone(),
stream_manager.env.clone(),
));
Self {
fragment_manager,
cluster_manager,
source_manager,
catalog_manager,
stream_manager,
barrier_manager,
scale_controller,
}
}
}
Expand Down Expand Up @@ -203,7 +214,7 @@ impl ScaleService for ScaleServiceImpl {
.policy
.ok_or_else(|| Status::invalid_argument("policy is required"))?;

let plan = self.stream_manager.get_reschedule_plan(policy).await?;
let plan = self.scale_controller.get_reschedule_plan(policy).await?;

let next_revision = self.fragment_manager.get_revision().await;

Expand Down
65 changes: 11 additions & 54 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ use crate::barrier::CommandChanges;
use crate::hummock::HummockManagerRef;
use crate::manager::{CatalogManagerRef, FragmentManagerRef, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments};
use crate::stream::{build_actor_connector_splits, SourceManagerRef, SplitAssignment};
use crate::stream::{
build_actor_connector_splits, ScaleControllerRef, SourceManagerRef, SplitAssignment,
};
use crate::MetaResult;

/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
Expand Down Expand Up @@ -238,6 +240,8 @@ pub struct CommandContext {

source_manager: SourceManagerRef,

scale_controller: ScaleControllerRef,

/// The tracing span of this command.
///
/// Differs from [`TracedEpoch`], this span focuses on the lifetime of the corresponding
Expand All @@ -260,6 +264,7 @@ impl CommandContext {
command: Command,
kind: BarrierKind,
source_manager: SourceManagerRef,
scale_controller: ScaleControllerRef,
span: tracing::Span,
) -> Self {
Self {
Expand All @@ -274,6 +279,7 @@ impl CommandContext {
command,
kind,
source_manager,
scale_controller,
span,
}
}
Expand Down Expand Up @@ -760,60 +766,11 @@ impl CommandContext {
}

Command::RescheduleFragment { reschedules } => {
let mut node_dropped_actors = HashMap::new();
for table_fragments in self
.fragment_manager
.get_fragment_read_guard()
.await
.table_fragments()
.values()
{
for fragment_id in table_fragments.fragments.keys() {
if let Some(reschedule) = reschedules.get(fragment_id) {
for actor_id in &reschedule.removed_actors {
let node_id = table_fragments
.actor_status
.get(actor_id)
.unwrap()
.parallel_unit
.as_ref()
.unwrap()
.worker_node_id;
node_dropped_actors
.entry(node_id as WorkerId)
.or_insert(vec![])
.push(*actor_id as ActorId);
}
}
}
}
self.clean_up(node_dropped_actors).await?;

// Update fragment info after rescheduling in meta store.
self.fragment_manager
.post_apply_reschedules(reschedules.clone())
let node_dropped_actors = self
.scale_controller
.post_apply_reschedule(reschedules)
.await?;

let mut stream_source_actor_splits = HashMap::new();
let mut stream_source_dropped_actors = HashSet::new();

for (fragment_id, reschedule) in reschedules {
if !reschedule.actor_splits.is_empty() {
stream_source_actor_splits
.insert(*fragment_id as FragmentId, reschedule.actor_splits.clone());
stream_source_dropped_actors.extend(reschedule.removed_actors.clone());
}
}

if !stream_source_actor_splits.is_empty() {
self.source_manager
.apply_source_change(
None,
Some(stream_source_actor_splits),
Some(stream_source_dropped_actors),
)
.await;
}
self.clean_up(node_dropped_actors).await?;
}

Command::ReplaceTable {
Expand Down
12 changes: 11 additions & 1 deletion src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use crate::manager::{
};
use crate::model::{ActorId, BarrierManagerState, TableFragments};
use crate::rpc::metrics::MetaMetrics;
use crate::stream::SourceManagerRef;
use crate::stream::{ScaleController, ScaleControllerRef, SourceManagerRef};
use crate::{MetaError, MetaResult};

mod command;
Expand Down Expand Up @@ -176,6 +176,8 @@ pub struct GlobalBarrierManager {

source_manager: SourceManagerRef,

scale_controller: ScaleControllerRef,

sink_manager: SinkCoordinatorManager,

metrics: Arc<MetaMetrics>,
Expand Down Expand Up @@ -529,6 +531,12 @@ impl GlobalBarrierManager {
let in_flight_barrier_nums = env.opts.in_flight_barrier_nums;

let tracker = CreateMviewProgressTracker::new();
let scale_controller = Arc::new(ScaleController::new(
fragment_manager.clone(),
cluster_manager.clone(),
source_manager.clone(),
env.clone(),
));
Self {
enable_recovery,
status: Mutex::new(BarrierManagerStatus::Starting),
Expand All @@ -539,6 +547,7 @@ impl GlobalBarrierManager {
fragment_manager,
hummock_manager,
source_manager,
scale_controller,
sink_manager,
metrics,
env,
Expand Down Expand Up @@ -733,6 +742,7 @@ impl GlobalBarrierManager {
command,
kind,
self.source_manager.clone(),
self.scale_controller.clone(),
span.clone(),
));

Expand Down
105 changes: 98 additions & 7 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use futures::future::try_join_all;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::meta::get_reschedule_plan_request::{PbWorkerChanges, StableResizePolicy};
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier::{BarrierKind, Mutation};
use risingwave_pb::stream_plan::AddMutation;
Expand All @@ -40,7 +41,7 @@ use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::{CheckpointControl, Command, GlobalBarrierManager};
use crate::manager::WorkerId;
use crate::model::{BarrierManagerState, MigrationPlan};
use crate::stream::build_actor_connector_splits;
use crate::stream::{build_actor_connector_splits, RescheduleOptions};
use crate::MetaResult;

impl GlobalBarrierManager {
Expand Down Expand Up @@ -254,12 +255,21 @@ impl GlobalBarrierManager {
// following steps will be no-op, while the compute nodes will still be reset.
let mut info = self.resolve_actor_info_for_recovery().await;

// Migrate actors in expired CN to newly joined one.
let migrated = self.migrate_actors(&info).await.inspect_err(|err| {
warn!(err = ?err, "migrate actors failed");
})?;
if migrated {
info = self.resolve_actor_info_for_recovery().await;
if self.env.opts.enable_scale_in_when_recovery {
let scaled = self.scale_actors(&info).await.inspect_err(|err| {
warn!(err = ?err, "scale actors failed");
})?;
if scaled {
info = self.resolve_actor_info_for_recovery().await;
}
} else {
// Migrate actors in expired CN to newly joined one.
let migrated = self.migrate_actors(&info).await.inspect_err(|err| {
warn!(err = ?err, "migrate actors failed");
})?;
if migrated {
info = self.resolve_actor_info_for_recovery().await;
}
}

// Reset all compute nodes, stop and drop existing actors.
Expand Down Expand Up @@ -301,6 +311,7 @@ impl GlobalBarrierManager {
command,
BarrierKind::Initial,
self.source_manager.clone(),
self.scale_controller.clone(),
tracing::Span::current(), // recovery span
));

Expand Down Expand Up @@ -386,6 +397,86 @@ impl GlobalBarrierManager {
Ok(true)
}

async fn scale_actors(&self, info: &BarrierActorInfo) -> MetaResult<bool> {
debug!("start scaling-in offline actors.");

let expired_workers: HashSet<WorkerId> = info
.actor_map
.iter()
.filter(|(&worker, actors)| !actors.is_empty() && !info.node_map.contains_key(&worker))
.map(|(&worker, _)| worker)
.collect();

if expired_workers.is_empty() {
debug!("no expired workers, skipping.");
return Ok(false);
}

let all_worker_parallel_units = self.fragment_manager.all_worker_parallel_units().await;

let expired_worker_parallel_units: HashMap<_, _> = all_worker_parallel_units
.into_iter()
.filter(|(worker, _)| expired_workers.contains(worker))
.collect();

let fragment_worker_changes = {
let guard = self.fragment_manager.get_fragment_read_guard().await;
let mut policy = HashMap::new();
for table_fragments in guard.table_fragments().values() {
for fragment_id in table_fragments.fragment_ids() {
policy.insert(
fragment_id,
PbWorkerChanges {
exclude_worker_ids: expired_workers.iter().cloned().collect(),
..Default::default()
},
);
}
}
policy
};

let plan = self
.scale_controller
.generate_stable_resize_plan(
StableResizePolicy {
fragment_worker_changes,
},
Some(expired_worker_parallel_units),
)
.await?;

let (reschedule_fragment, applied_reschedules) = self
.scale_controller
.prepare_reschedule_command(
plan,
RescheduleOptions {
resolve_no_shuffle_upstream: true,
},
)
.await?;

if let Err(e) = self
.scale_controller
.post_apply_reschedule(&reschedule_fragment)
.await
{
tracing::error!(
"failed to apply reschedule for offline scaling in recovery: {}",
e.to_string()
);

self.fragment_manager
.cancel_apply_reschedules(applied_reschedules)
.await;

return Err(e);
}

debug!("scaling-in actors succeed.");
Ok(true)
}

/// This function will generate a migration plan, which includes the mapping for all expired and
/// in-used parallel unit to a new one.
async fn generate_migration_plan(
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub struct MetaOpts {
/// Whether to enable the recovery of the cluster. If disabled, the meta service will exit on
/// abnormal cases.
pub enable_recovery: bool,
/// Whether to enable the scale-in feature when compute-node is removed.
pub enable_scale_in_when_recovery: bool,
/// The maximum number of barriers in-flight in the compute nodes.
pub in_flight_barrier_nums: usize,
/// After specified seconds of idle (no mview or flush), the process will be exited.
Expand Down Expand Up @@ -174,6 +176,7 @@ impl MetaOpts {
pub fn test(enable_recovery: bool) -> Self {
Self {
enable_recovery,
enable_scale_in_when_recovery: false,
in_flight_barrier_nums: 40,
max_idle_ms: 0,
compaction_deterministic_test: false,
Expand Down
Loading
Loading