From df1bff22dc7dd0e35e4dc11181aa46d69844222c Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 23 Jan 2024 19:30:17 +0800 Subject: [PATCH] add func Signed-off-by: Shanicky Chen --- src/meta/node/src/server.rs | 1 + src/meta/service/src/scale_service.rs | 33 +- src/meta/src/barrier/command.rs | 24 +- src/meta/src/barrier/mod.rs | 16 +- src/meta/src/controller/streaming_job.rs | 372 ++++++++++++++++++++++- src/meta/src/manager/metadata.rs | 23 +- src/meta/src/stream/scale.rs | 50 ++- src/meta/src/stream/stream_manager.rs | 3 +- 8 files changed, 452 insertions(+), 70 deletions(-) diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 71486dbece5cb..8c1404d2d2f37 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -124,6 +124,7 @@ pub async fn rpc_serve( Some(backend) => { let mut options = sea_orm::ConnectOptions::new(backend.endpoint); options + .sqlx_logging(false) .max_connections(20) .connect_timeout(Duration::from_secs(10)) .idle_timeout(Duration::from_secs(30)); diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index acfca997d8040..82013790c5fb3 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -68,7 +68,7 @@ impl ScaleServiceImpl { async fn get_revision(&self) -> TableRevision { match &self.metadata_manager { MetadataManager::V1(mgr) => mgr.fragment_manager.get_revision().await, - MetadataManager::V2(_) => unimplemented!("support table revision in v2"), + MetadataManager::V2(_) => Default::default(), } } } @@ -141,10 +141,6 @@ impl ScaleService for ScaleServiceImpl { ) -> Result, Status> { self.barrier_manager.check_status_running().await?; - let MetadataManager::V1(mgr) = &self.metadata_manager else { - unimplemented!("only available in v1"); - }; - let RescheduleRequest { reschedules, revision, @@ -163,19 +159,24 @@ impl ScaleService for ScaleServiceImpl { } let table_parallelisms = { - let guard = mgr.fragment_manager.get_fragment_read_guard().await; - - let mut table_parallelisms = HashMap::new(); - for (table_id, table) in guard.table_fragments() { - if table - .fragment_ids() - .any(|fragment_id| reschedules.contains_key(&fragment_id)) - { - table_parallelisms.insert(*table_id, TableParallelism::Custom); + match &self.metadata_manager { + MetadataManager::V1(mgr) => { + let guard = mgr.fragment_manager.get_fragment_read_guard().await; + + let mut table_parallelisms = HashMap::new(); + for (table_id, table) in guard.table_fragments() { + if table + .fragment_ids() + .any(|fragment_id| reschedules.contains_key(&fragment_id)) + { + table_parallelisms.insert(*table_id, TableParallelism::Custom); + } + } + + table_parallelisms } + MetadataManager::V2(_) => HashMap::new(), } - - table_parallelisms }; self.stream_manager diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index de88c1ae17608..6463a2cb02225 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -23,6 +23,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::hash::ActorMapping; use risingwave_connector::source::SplitImpl; use risingwave_hummock_sdk::HummockEpoch; +use risingwave_pb::meta::table_fragments::PbActorStatus; use risingwave_pb::meta::PausedReason; use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; use risingwave_pb::stream_plan::barrier::BarrierKind; @@ -31,8 +32,8 @@ use risingwave_pb::stream_plan::throttle_mutation::RateLimit; use risingwave_pb::stream_plan::update_mutation::*; use risingwave_pb::stream_plan::{ AddMutation, BarrierMutation, CombinedMutation, Dispatcher, Dispatchers, FragmentTypeFlag, - PauseMutation, ResumeMutation, SourceChangeSplitMutation, StopMutation, ThrottleMutation, - UpdateMutation, + PauseMutation, ResumeMutation, SourceChangeSplitMutation, StopMutation, StreamActor, + ThrottleMutation, UpdateMutation, }; use risingwave_pb::stream_service::{DropActorsRequest, WaitEpochCommitRequest}; use thiserror_ext::AsReport; @@ -76,6 +77,8 @@ pub struct Reschedule { /// Whether this fragment is injectable. The injectable means whether the fragment contains /// any executors that are able to receive barrier. pub injectable: bool, + + pub newly_created_actors: Vec<(StreamActor, PbActorStatus)>, } #[derive(Debug, Clone)] @@ -479,18 +482,20 @@ impl CommandContext { ), Command::RescheduleFragment { reschedules, .. } => { - let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager - else { - unimplemented!("implement scale functions in v2"); - }; + // let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager + // else { + // unimplemented!("implement scale functions in v2"); + // }; + + let metadata_manager = &self.barrier_manager_context.metadata_manager; + let mut dispatcher_update = HashMap::new(); for reschedule in reschedules.values() { for &(upstream_fragment_id, dispatcher_id) in &reschedule.upstream_fragment_dispatcher_ids { // Find the actors of the upstream fragment. - let upstream_actor_ids = mgr - .fragment_manager + let upstream_actor_ids = metadata_manager .get_running_actors_of_fragment(upstream_fragment_id) .await?; @@ -528,8 +533,7 @@ impl CommandContext { for (&fragment_id, reschedule) in reschedules { for &downstream_fragment_id in &reschedule.downstream_fragment_ids { // Find the actors of the downstream fragment. - let downstream_actor_ids = mgr - .fragment_manager + let downstream_actor_ids = metadata_manager .get_running_actors_of_fragment(downstream_fragment_id) .await?; diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 0884560045398..1008b33004835 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -41,6 +41,7 @@ use thiserror_ext::AsReport; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::Mutex; use tokio::task::JoinHandle; +use tokio::time; use tracing::Instrument; use self::command::CommandContext; @@ -394,14 +395,12 @@ impl GlobalBarrierManager { let tracker = CreateMviewProgressTracker::new(); - let scale_controller = match &metadata_manager { - MetadataManager::V1(_) => Some(Arc::new(ScaleController::new( - &metadata_manager, - source_manager.clone(), - env.clone(), - ))), - MetadataManager::V2(_) => None, - }; + let scale_controller = Some(Arc::new(ScaleController::new( + &metadata_manager, + source_manager.clone(), + env.clone(), + ))); + let context = GlobalBarrierManagerContext { status: Arc::new(Mutex::new(BarrierManagerStatus::Starting)), metadata_manager, @@ -742,6 +741,7 @@ impl GlobalBarrierManager { .await; self.context.set_status(BarrierManagerStatus::Running).await; } else { + time::sleep(Duration::from_secs(10)).await; panic!("failed to execute barrier: {}", err.as_report()); } } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 576c5c3c40699..1a7a6973b55f8 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -12,41 +12,47 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use itertools::Itertools; +use risingwave_common::buffer::Bitmap; +use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; +use risingwave_meta_model_v2::actor_dispatcher::DispatcherType; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::{ Actor, ActorDispatcher, Fragment, Index, Object, ObjectDependency, Source, Table, }; use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, index, object_dependency, sink, source, streaming_job, - table, ActorId, ActorUpstreamActors, CreateType, DatabaseId, ExprNodeArray, FragmentId, - I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamNode, TableId, TableVersion, - UserId, + table, ActorId, ActorUpstreamActors, ConnectorSplits, CreateType, DatabaseId, ExprContext, + ExprNodeArray, FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, + StreamNode, TableId, TableVersion, UserId, }; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion}; use risingwave_pb::catalog::{PbCreateType, PbTable}; use risingwave_pb::meta::relation::PbRelationInfo; use risingwave_pb::meta::subscribe_response::{ - Info as NotificationInfo, Operation as NotificationOperation, + Info as NotificationInfo, Info, Operation as NotificationOperation, Operation, +}; +use risingwave_pb::meta::{ + FragmentParallelUnitMapping, PbRelation, PbRelationGroup, PbTableFragments, }; -use risingwave_pb::meta::{PbRelation, PbRelationGroup, PbTableFragments}; use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits}; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate; -use risingwave_pb::stream_plan::{PbDispatcher, PbFragmentTypeFlag}; +use risingwave_pb::stream_plan::{PbDispatcher, PbFragmentTypeFlag, PbStreamActor}; use sea_orm::sea_query::SimpleExpr; use sea_orm::ActiveValue::Set; use sea_orm::{ ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, - ModelTrait, NotSet, QueryFilter, QuerySelect, TransactionTrait, + JoinType, ModelTrait, NotSet, QueryFilter, QuerySelect, RelationTrait, TransactionTrait, }; +use crate::barrier::Reschedule; use crate::controller::catalog::CatalogController; use crate::controller::rename::ReplaceTableExprRewriter; use crate::controller::utils::{ @@ -55,7 +61,7 @@ use crate::controller::utils::{ }; use crate::controller::ObjectModel; use crate::manager::{NotificationVersion, SinkId, StreamingJob}; -use crate::model::StreamContext; +use crate::model::{StreamContext, TableParallelism}; use crate::stream::SplitAssignment; use crate::{MetaError, MetaResult}; @@ -808,4 +814,352 @@ impl CatalogController { Ok(fragment_actors) } + + pub async fn post_apply_reschedules( + &self, + mut reschedules: HashMap, + _table_parallelism_assignment: HashMap< + risingwave_common::catalog::TableId, + TableParallelism, + >, + ) -> MetaResult<()> { + fn update_actors( + actors: &mut Vec, + to_remove: &HashSet, + to_create: &Vec, + ) { + let actor_id_set: HashSet<_> = actors.iter().copied().collect(); + for actor_id in to_create { + debug_assert!(!actor_id_set.contains(actor_id)); + } + for actor_id in to_remove { + debug_assert!(actor_id_set.contains(actor_id)); + } + + actors.retain(|actor_id| !to_remove.contains(actor_id)); + actors.extend_from_slice(to_create); + } + + let new_created_actors: HashSet<_> = reschedules + .values() + .flat_map(|reschedule| { + reschedule + .added_actors + .values() + .flatten() + .map(|actor_id| *actor_id as ActorId) + }) + .collect(); + + let inner = self.inner.write().await; + + let txn = inner.db.begin().await?; + + let mut fragment_mapping_to_notify = vec![]; + + let mut assert_dispatcher_update_checker = HashSet::new(); + + for ( + fragment_id, + Reschedule { + removed_actors, + vnode_bitmap_updates, + actor_splits, + injectable: _, + newly_created_actors, + added_actors: _, + .. + }, + ) in &mut reschedules + { + // todo: batch delete + Actor::delete_many() + .filter( + actor::Column::ActorId + .is_in(removed_actors.iter().map(|id| *id as ActorId).collect_vec()), + ) + .exec(&txn) + .await?; + + // newly created actor + + let mut new_actors = vec![]; + let mut new_actor_dispatchers = vec![]; + + for ( + PbStreamActor { + actor_id, + fragment_id, + nodes, + dispatcher, + upstream_actor_id, + vnode_bitmap, + expr_context, + .. + }, + actor_status, + ) in newly_created_actors + { + let mut actor_upstreams = BTreeMap::>::new(); + + if let Some(nodes) = nodes { + visit_stream_node(nodes, |node| { + if let PbNodeBody::Merge(node) = node { + actor_upstreams + .entry(node.upstream_fragment_id as FragmentId) + .or_default() + .extend(node.upstream_actor_id.iter().map(|id| *id as ActorId)); + } + }); + } + + let actor_upstreams: BTreeMap> = actor_upstreams + .into_iter() + .map(|(k, v)| (k, v.into_iter().collect())) + .collect(); + + assert_eq!( + actor_upstreams + .values() + .flatten() + .cloned() + .sorted() + .collect_vec(), + upstream_actor_id + .iter() + .map(|actor_id| *actor_id as i32) + .sorted() + .collect_vec() + ); + + let actor_upstreams = ActorUpstreamActors(actor_upstreams); + + let status = actor_status.get_state().unwrap().into(); + let parallel_unit_id = actor_status.get_parallel_unit().unwrap().id; + + let splits = actor_splits + .get(actor_id) + .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec()); + + new_actors.push(actor::ActiveModel { + actor_id: Set(*actor_id as _), + fragment_id: Set(*fragment_id as _), + status: Set(status), + splits: Set( + splits.map(|splits| ConnectorSplits::from(PbConnectorSplits { splits })) + ), + parallel_unit_id: Set(parallel_unit_id as _), + upstream_actor_ids: Set(actor_upstreams), + vnode_bitmap: Set(vnode_bitmap.clone().map(|bitmap| bitmap.into())), + expr_context: Set(ExprContext::from(expr_context.as_ref().unwrap().clone())), + }); + + for actor_dispatcher in dispatcher { + new_actor_dispatchers.push(actor_dispatcher::ActiveModel { + id: Set(actor_dispatcher.get_dispatcher_id() as _), + actor_id: Set(*actor_id as _), + dispatcher_type: Set(actor_dispatcher.get_type().unwrap().into()), + dist_key_indices: Set(actor_dispatcher + .get_dist_key_indices() + .clone() + .into()), + output_indices: Set(actor_dispatcher.get_output_indices().clone().into()), + hash_mapping: Set(Some( + actor_dispatcher.get_hash_mapping().unwrap().clone().into(), + )), + dispatcher_id: Set(actor_dispatcher.get_dispatcher_id() as _), + downstream_actor_ids: Set(actor_dispatcher + .get_downstream_actor_id() + .clone() + .into()), + }) + } + } + + if !new_actors.is_empty() { + Actor::insert_many(new_actors).exec(&txn).await?; + } + + if !new_actor_dispatchers.is_empty() { + ActorDispatcher::insert_many(new_actor_dispatchers) + .exec(&txn) + .await?; + } + + // actor update + for (actor_id, bitmap) in vnode_bitmap_updates { + let actor = Actor::find_by_id(*actor_id as ActorId) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("actor", *actor_id))?; + + let mut actor = actor.into_active_model(); + actor.vnode_bitmap = Set(Some(bitmap.clone().to_protobuf().into())); + + actor.update(&txn).await?; + } + + // fragment update + let fragment = Fragment::find_by_id(*fragment_id) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?; + + let actors = fragment.find_related(Actor).all(&inner.db).await?; + + let mut actor_to_parallel_unit = HashMap::with_capacity(actors.len()); + let mut actor_to_vnode_bitmap = HashMap::with_capacity(actors.len()); + for actor in &actors { + actor_to_parallel_unit.insert(actor.actor_id as u32, actor.parallel_unit_id as _); + if let Some(vnode_bitmap) = &actor.vnode_bitmap { + let bitmap = Bitmap::from(vnode_bitmap.inner_ref()); + actor_to_vnode_bitmap.insert(actor.actor_id as u32, bitmap); + } + } + + let vnode_mapping = if actor_to_vnode_bitmap.is_empty() { + // If there's no `vnode_bitmap`, then the fragment must be a singleton fragment. + // We directly use the single parallel unit to construct the mapping. + // TODO: also fill `vnode_bitmap` for the actor of singleton fragment so that we + // don't need this branch. + let parallel_unit = *actor_to_parallel_unit.values().exactly_one().unwrap(); + ParallelUnitMapping::new_single(parallel_unit as ParallelUnitId) + } else { + // Generate the parallel unit mapping from the fragment's actor bitmaps. + assert_eq!(actor_to_vnode_bitmap.len(), actor_to_parallel_unit.len()); + ActorMapping::from_bitmaps(&actor_to_vnode_bitmap) + .to_parallel_unit(&actor_to_parallel_unit) + } + .to_protobuf(); + + let mut fragment = fragment.into_active_model(); + + fragment.vnode_mapping = Set(vnode_mapping.clone().into()); + + fragment.update(&txn).await?; + + let fragment_mapping = FragmentParallelUnitMapping { + fragment_id: *fragment_id as u32, + mapping: Some(vnode_mapping), + }; + + fragment_mapping_to_notify.push(fragment_mapping); + } + + // Second step, update fragment upstream & downstream + for ( + fragment_id, + Reschedule { + upstream_fragment_dispatcher_ids, + upstream_dispatcher_mapping, + downstream_fragment_ids, + added_actors, + removed_actors, + .. + }, + ) in &mut reschedules + { + let removed_actor_ids: HashSet<_> = removed_actors + .iter() + .map(|actor_id| *actor_id as ActorId) + .collect(); + let added_actor_ids = added_actors + .values() + .flatten() + .map(|actor_id| *actor_id as ActorId) + .collect_vec(); + + // first step, upstream fragment + for (upstream_fragment_id, dispatcher_id) in upstream_fragment_dispatcher_ids { + let upstream_fragment = Fragment::find_by_id(*upstream_fragment_id as FragmentId) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?; + + let all_dispatchers = actor_dispatcher::Entity::find() + .join(JoinType::InnerJoin, actor_dispatcher::Relation::Actor.def()) + .filter(actor::Column::FragmentId.eq(upstream_fragment.fragment_id)) + .filter(actor_dispatcher::Column::DispatcherId.eq(*dispatcher_id as i32)) + .all(&txn) + .await?; + + for dispatcher in all_dispatchers { + debug_assert!(assert_dispatcher_update_checker.insert(dispatcher.id)); + + let mut dispatcher = dispatcher.into_active_model(); + + if dispatcher.dispatcher_type.as_ref() == &DispatcherType::Hash { + dispatcher.hash_mapping = + Set(upstream_dispatcher_mapping.as_ref().map(|m| { + risingwave_meta_model_v2::ActorMapping::from(m.to_protobuf()) + })); + } else { + debug_assert!(upstream_dispatcher_mapping.is_none()); + } + + // notice: may panic? + let mut new_downstream_actor_ids = + dispatcher.downstream_actor_ids.as_ref().inner_ref().clone(); + + update_actors( + new_downstream_actor_ids.as_mut(), + &removed_actor_ids, + &added_actor_ids, + ); + + dispatcher.downstream_actor_ids = Set(new_downstream_actor_ids.into()); + dispatcher.update(&txn).await?; + } + } + + // second step, downstream fragment + + for downstream_fragment_id in downstream_fragment_ids { + let actors = Actor::find() + .filter(actor::Column::FragmentId.eq(*downstream_fragment_id as FragmentId)) + .all(&txn) + .await?; + + for actor in actors { + if new_created_actors.contains(&actor.actor_id) { + continue; + } + + let mut actor = actor.into_active_model(); + + let mut new_upstream_actor_ids = + actor.upstream_actor_ids.as_ref().inner_ref().clone(); + + update_actors( + new_upstream_actor_ids.get_mut(fragment_id).unwrap(), + &removed_actor_ids, + &added_actor_ids, + ); + + actor.upstream_actor_ids = Set(new_upstream_actor_ids.into()); + + actor.update(&txn).await?; + } + } + } + + // for (table_id, parallelism) in table_parallelism_assignment { + // if let Some(mut table) = table_fragments.get_mut(table_id) { + // table.assigned_parallelism = parallelism; + // } + // } + // + // assert!(reschedules.is_empty(), "all reschedules must be applied"); + // + for mapping in fragment_mapping_to_notify { + self.env + .notification_manager() + .notify_frontend(Operation::Update, Info::ParallelUnitMapping(mapping)) + .await; + } + + txn.commit().await?; + + Ok(()) + } } diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index ad07dd07815af..f94db9f2045ff 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -20,10 +20,8 @@ use risingwave_pb::catalog::{PbSource, PbTable}; use risingwave_pb::common::worker_node::{PbResource, State}; use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerType}; use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; -use risingwave_pb::meta::table_fragments::{Fragment, PbFragment}; -use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamActor}; -use risingwave_pb::meta::table_fragments::{ActorStatus}; -use risingwave_pb::stream_plan::{StreamActor}; +use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, PbFragment}; +use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamActor, StreamActor}; use crate::barrier::Reschedule; use crate::controller::catalog::CatalogControllerRef; @@ -31,10 +29,8 @@ use crate::controller::cluster::{ClusterControllerRef, WorkerExtraInfo}; use crate::manager::{ CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, StreamingClusterInfo, WorkerId, }; -use crate::stream::SplitAssignment; - use crate::model::{ActorId, FragmentId, MetadataModel, TableFragments, TableParallelism}; - +use crate::stream::SplitAssignment; use crate::MetaResult; #[derive(Clone)] @@ -190,7 +186,9 @@ impl MetadataManager { .pre_apply_reschedules(created_actors) .await } - MetadataManager::V2(_) => todo!(), + + // V2 doesn't need to pre apply reschedules. + MetadataManager::V2(_) => HashMap::new(), } } @@ -205,7 +203,14 @@ impl MetadataManager { .post_apply_reschedules(reschedules, table_parallelism_assignment) .await } - MetadataManager::V2(_) => todo!(), + MetadataManager::V2(mgr) => { + // temp convert u32 to i32 + let reschedules = reschedules.into_iter().map(|(k, v)| (k as _, v)).collect(); + + mgr.catalog_controller + .post_apply_reschedules(reschedules, table_parallelism_assignment) + .await + } } } diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index f228da0b136dc..ce6a3557ca146 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -60,7 +60,7 @@ use crate::storage::{MetaStore, MetaStoreError, MetaStoreRef, Transaction, DEFAU use crate::stream::{GlobalStreamManager, SourceManagerRef}; use crate::{model, MetaError, MetaResult}; -#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] +#[derive(Default, Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] pub struct TableRevision(u64); const TABLE_REVISION_KEY: &[u8] = b"table_revision"; @@ -1284,6 +1284,7 @@ impl ScaleController { downstream_fragment_ids, actor_splits, injectable, + newly_created_actors: vec![], }, ); } @@ -1316,6 +1317,11 @@ impl ScaleController { fragment_created_actors.insert(*fragment_id, created_actors); } + for (fragment_id, to_create) in &fragment_created_actors { + let reschedule = reschedule_fragment.get_mut(fragment_id).unwrap(); + reschedule.newly_created_actors = to_create.values().cloned().collect(); + } + let applied_reschedules = self .metadata_manager .pre_apply_reschedules(fragment_created_actors) @@ -1371,11 +1377,22 @@ impl ScaleController { } for created_parallel_unit_id in added_parallel_units { - let id = self - .env - .id_gen_manager() - .generate::<{ IdCategory::Actor }>() - .await? as ActorId; + // self.env.sql_id_gen_manager_ref().map(|id_gen| id_gen.actors.generate_interval(1)) + // + + let id = match self.env.sql_id_gen_manager_ref() { + None => { + self.env + .id_gen_manager() + .generate::<{ IdCategory::Actor }>() + .await? as ActorId + } + Some(id_gen) => { + let id = id_gen.generate_interval::<{ IdCategory::Actor }>(1); + + id as ActorId + } + }; actors_to_create.insert(id, *created_parallel_unit_id); } @@ -2414,10 +2431,6 @@ impl GlobalStreamManager { options: RescheduleOptions, table_parallelism: Option>, ) -> MetaResult<()> { - let MetadataManager::V1(mgr) = &self.metadata_manager else { - unimplemented!("support reschedule in v2"); - }; - let mut table_parallelism = table_parallelism; let (reschedule_fragment, applied_reschedules) = self @@ -2434,13 +2447,18 @@ impl GlobalStreamManager { table_parallelism: table_parallelism.unwrap_or_default(), }; - let fragment_manager_ref = mgr.fragment_manager.clone(); + match &self.metadata_manager { + MetadataManager::V1(mgr) => { + let fragment_manager_ref = mgr.fragment_manager.clone(); - revert_funcs.push(Box::pin(async move { - fragment_manager_ref - .cancel_apply_reschedules(applied_reschedules) - .await; - })); + revert_funcs.push(Box::pin(async move { + fragment_manager_ref + .cancel_apply_reschedules(applied_reschedules) + .await; + })); + } + MetadataManager::V2(_) => {} + } let _source_pause_guard = self.source_manager.paused.lock().await; diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 1950a27013f3f..7d6caa5524bb5 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -213,12 +213,11 @@ impl GlobalStreamManager { hummock_manager: HummockManagerRef, ) -> MetaResult { let scale_controller = match &metadata_manager { - MetadataManager::V1(_) => { + MetadataManager::V1(_) | MetadataManager::V2(_) => { let scale_controller = ScaleController::new(&metadata_manager, source_manager.clone(), env.clone()); Some(Arc::new(scale_controller)) } - MetadataManager::V2(_) => None, }; Ok(Self { env,