Skip to content

Commit

Permalink
feat: support scaling in sql backend (#14757)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky authored and shanicky committed Feb 1, 2024
1 parent 6acb999 commit 37c9fe7
Show file tree
Hide file tree
Showing 14 changed files with 792 additions and 195 deletions.
2 changes: 2 additions & 0 deletions src/meta/model_v2/migration/src/m20230908_072257_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(StreamingJob::JobStatus).string().not_null())
.col(ColumnDef::new(StreamingJob::CreateType).string().not_null())
.col(ColumnDef::new(StreamingJob::Timezone).string())
.col(ColumnDef::new(StreamingJob::Parallelism).json().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_streaming_job_object_id")
Expand Down Expand Up @@ -991,6 +992,7 @@ enum StreamingJob {
JobStatus,
Timezone,
CreateType,
Parallelism,
}

#[derive(DeriveIden)]
Expand Down
8 changes: 8 additions & 0 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,11 @@ derive_from_json_struct!(
FragmentVnodeMapping,
risingwave_pb::common::ParallelUnitMapping
);

#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
pub enum StreamingParallelism {
Auto,
Fixed(usize),
}

impl Eq for StreamingParallelism {}
3 changes: 2 additions & 1 deletion src/meta/model_v2/src/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use sea_orm::entity::prelude::*;

use crate::{CreateType, JobStatus};
use crate::{CreateType, JobStatus, StreamingParallelism};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "streaming_job")]
Expand All @@ -24,6 +24,7 @@ pub struct Model {
pub job_status: JobStatus,
pub create_type: CreateType,
pub timezone: Option<String>,
pub parallelism: StreamingParallelism,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
71 changes: 41 additions & 30 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
use std::collections::HashMap;
use std::sync::Arc;

use risingwave_common::catalog;
use risingwave_meta::manager::MetadataManager;
use risingwave_meta::model::TableParallelism;
use risingwave_meta::stream::{ScaleController, ScaleControllerRef, TableRevision};
use risingwave_meta_model_v2::FragmentId;
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::scale_service_server::ScaleService;
use risingwave_pb::meta::{
Expand All @@ -38,7 +40,7 @@ pub struct ScaleServiceImpl {
source_manager: SourceManagerRef,
stream_manager: GlobalStreamManagerRef,
barrier_manager: BarrierManagerRef,
scale_controller: Option<ScaleControllerRef>,
scale_controller: ScaleControllerRef,
}

impl ScaleServiceImpl {
Expand All @@ -48,14 +50,12 @@ impl ScaleServiceImpl {
stream_manager: GlobalStreamManagerRef,
barrier_manager: BarrierManagerRef,
) -> Self {
let scale_controller = match &metadata_manager {
MetadataManager::V1(_) => Some(Arc::new(ScaleController::new(
&metadata_manager,
source_manager.clone(),
stream_manager.env.clone(),
))),
MetadataManager::V2(_) => None,
};
let scale_controller = Arc::new(ScaleController::new(
&metadata_manager,
source_manager.clone(),
stream_manager.env.clone(),
));

Self {
metadata_manager,
source_manager,
Expand All @@ -68,7 +68,8 @@ impl ScaleServiceImpl {
async fn get_revision(&self) -> TableRevision {
match &self.metadata_manager {
MetadataManager::V1(mgr) => mgr.fragment_manager.get_revision().await,
MetadataManager::V2(_) => unimplemented!("support table revision in v2"),
// todo, support table revision in meta model v2
MetadataManager::V2(_) => Default::default(),
}
}
}
Expand Down Expand Up @@ -141,10 +142,6 @@ impl ScaleService for ScaleServiceImpl {
) -> Result<Response<RescheduleResponse>, Status> {
self.barrier_manager.check_status_running().await?;

let MetadataManager::V1(mgr) = &self.metadata_manager else {
unimplemented!("only available in v1");
};

let RescheduleRequest {
reschedules,
revision,
Expand All @@ -163,19 +160,36 @@ impl ScaleService for ScaleServiceImpl {
}

let table_parallelisms = {
let guard = mgr.fragment_manager.get_fragment_read_guard().await;

let mut table_parallelisms = HashMap::new();
for (table_id, table) in guard.table_fragments() {
if table
.fragment_ids()
.any(|fragment_id| reschedules.contains_key(&fragment_id))
{
table_parallelisms.insert(*table_id, TableParallelism::Custom);
match &self.metadata_manager {
MetadataManager::V1(mgr) => {
let guard = mgr.fragment_manager.get_fragment_read_guard().await;

let mut table_parallelisms = HashMap::new();
for (table_id, table) in guard.table_fragments() {
if table
.fragment_ids()
.any(|fragment_id| reschedules.contains_key(&fragment_id))
{
table_parallelisms.insert(*table_id, TableParallelism::Custom);
}
}

table_parallelisms
}
}
MetadataManager::V2(mgr) => {
let streaming_job_ids = mgr
.catalog_controller
.get_fragment_job_id(
reschedules.keys().map(|id| *id as FragmentId).collect(),
)
.await?;

table_parallelisms
streaming_job_ids
.into_iter()
.map(|id| (catalog::TableId::new(id as _), TableParallelism::Custom))
.collect()
}
}
};

self.stream_manager
Expand Down Expand Up @@ -240,11 +254,8 @@ impl ScaleService for ScaleServiceImpl {
.policy
.ok_or_else(|| Status::invalid_argument("policy is required"))?;

let Some(scale_controller) = &self.scale_controller else {
return Err(Status::unimplemented(
"reschedule plan is not supported in v2",
));
};
let scale_controller = &self.scale_controller;

let plan = scale_controller.get_reschedule_plan(policy).await?;

let next_revision = self.get_revision().await;
Expand Down
21 changes: 9 additions & 12 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorMapping;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::meta::table_fragments::PbActorStatus;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
use risingwave_pb::stream_plan::barrier::BarrierKind;
Expand All @@ -31,8 +32,8 @@ use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
use risingwave_pb::stream_plan::update_mutation::*;
use risingwave_pb::stream_plan::{
AddMutation, BarrierMutation, CombinedMutation, Dispatcher, Dispatchers, FragmentTypeFlag,
PauseMutation, ResumeMutation, SourceChangeSplitMutation, StopMutation, ThrottleMutation,
UpdateMutation,
PauseMutation, ResumeMutation, SourceChangeSplitMutation, StopMutation, StreamActor,
ThrottleMutation, UpdateMutation,
};
use risingwave_pb::stream_service::{DropActorsRequest, WaitEpochCommitRequest};
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -76,6 +77,8 @@ pub struct Reschedule {
/// Whether this fragment is injectable. The injectable means whether the fragment contains
/// any executors that are able to receive barrier.
pub injectable: bool,

pub newly_created_actors: Vec<(StreamActor, PbActorStatus)>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -478,18 +481,15 @@ impl CommandContext {
),

Command::RescheduleFragment { reschedules, .. } => {
let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager
else {
unimplemented!("implement scale functions in v2");
};
let metadata_manager = &self.barrier_manager_context.metadata_manager;

let mut dispatcher_update = HashMap::new();
for reschedule in reschedules.values() {
for &(upstream_fragment_id, dispatcher_id) in
&reschedule.upstream_fragment_dispatcher_ids
{
// Find the actors of the upstream fragment.
let upstream_actor_ids = mgr
.fragment_manager
let upstream_actor_ids = metadata_manager
.get_running_actors_of_fragment(upstream_fragment_id)
.await?;

Expand Down Expand Up @@ -527,8 +527,7 @@ impl CommandContext {
for (&fragment_id, reschedule) in reschedules {
for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
// Find the actors of the downstream fragment.
let downstream_actor_ids = mgr
.fragment_manager
let downstream_actor_ids = metadata_manager
.get_running_actors_of_fragment(downstream_fragment_id)
.await?;

Expand Down Expand Up @@ -968,8 +967,6 @@ impl CommandContext {
self.clean_up(removed_actors).await?;
self.barrier_manager_context
.scale_controller
.as_ref()
.unwrap()
.post_apply_reschedule(reschedules, table_parallelism)
.await?;
}
Expand Down
16 changes: 7 additions & 9 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ pub struct GlobalBarrierManagerContext {

source_manager: SourceManagerRef,

scale_controller: Option<ScaleControllerRef>,
scale_controller: ScaleControllerRef,

sink_manager: SinkCoordinatorManager,

Expand Down Expand Up @@ -394,14 +394,12 @@ impl GlobalBarrierManager {

let tracker = CreateMviewProgressTracker::new();

let scale_controller = match &metadata_manager {
MetadataManager::V1(_) => Some(Arc::new(ScaleController::new(
&metadata_manager,
source_manager.clone(),
env.clone(),
))),
MetadataManager::V2(_) => None,
};
let scale_controller = Arc::new(ScaleController::new(
&metadata_manager,
source_manager.clone(),
env.clone(),
));

let context = GlobalBarrierManagerContext {
status: Arc::new(Mutex::new(BarrierManagerStatus::Starting)),
metadata_manager,
Expand Down
Loading

0 comments on commit 37c9fe7

Please sign in to comment.