diff --git a/risedev.yml b/risedev.yml index ad6eb7531e351..ec485a7fdc35e 100644 --- a/risedev.yml +++ b/risedev.yml @@ -355,6 +355,19 @@ profile: - use: compute-node - use: frontend + meta-1cn-1fe-sqlite-with-recovery: + config-path: src/config/ci-recovery.toml + steps: + - use: minio + - use: sqlite + - use: meta-node + port: 5690 + dashboard-port: 5691 + exporter-port: 1250 + - use: compactor + - use: compute-node + - use: frontend + java-binding-demo: steps: - use: minio @@ -1370,4 +1383,5 @@ template: port: 6379 # address of redis - address: "127.0.0.1" \ No newline at end of file + address: "127.0.0.1" + diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index c07d76827c7b6..974cdf4fd2671 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -232,6 +232,7 @@ derive_from_json_struct!( pub enum StreamingParallelism { Adaptive, Fixed(usize), + Custom, } impl Eq for StreamingParallelism {} diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index ddedd1ed4fc96..6c2d40e2c581d 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -284,6 +284,7 @@ impl StreamManagerService for StreamServiceImpl { .map(|(table_id, state, parallelism)| { let parallelism = match parallelism { StreamingParallelism::Adaptive => model::TableParallelism::Adaptive, + StreamingParallelism::Custom => model::TableParallelism::Custom, StreamingParallelism::Fixed(n) => { model::TableParallelism::Fixed(n as _) } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 32b3700755674..e862f828e2041 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -49,7 +49,7 @@ use crate::controller::catalog::ReleaseContext; use crate::manager::{ActiveStreamingWorkerNodes, MetadataManager, WorkerId}; use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy}; -use crate::MetaResult; +use crate::{model, MetaResult}; impl GlobalBarrierManager { // Retry base interval in milliseconds. @@ -705,24 +705,52 @@ impl GlobalBarrierManagerContext { let mgr = self.metadata_manager.as_v2_ref(); debug!("start resetting actors distribution"); + let available_parallelism = active_nodes + .current() + .values() + .flat_map(|worker_node| worker_node.parallel_units.iter()) + .count(); + let table_parallelisms: HashMap<_, _> = { let streaming_parallelisms = mgr .catalog_controller .get_all_created_streaming_parallelisms() .await?; - streaming_parallelisms - .into_iter() - .map(|(table_id, parallelism)| { - // no custom for sql backend - let table_parallelism = match parallelism { - StreamingParallelism::Adaptive => TableParallelism::Adaptive, - StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n), - }; + let mut result = HashMap::new(); - (table_id as u32, table_parallelism) - }) - .collect() + for (object_id, streaming_parallelism) in streaming_parallelisms { + let actual_fragment_parallelism = mgr + .catalog_controller + .get_actual_job_fragment_parallelism(object_id) + .await?; + + let table_parallelism = match streaming_parallelism { + StreamingParallelism::Adaptive => model::TableParallelism::Adaptive, + StreamingParallelism::Custom => model::TableParallelism::Custom, + StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _), + }; + + let target_parallelism = Self::derive_target_parallelism( + available_parallelism, + table_parallelism, + actual_fragment_parallelism, + self.env.opts.default_parallelism, + ); + + if target_parallelism != table_parallelism { + tracing::info!( + "resetting table {} parallelism from {:?} to {:?}", + object_id, + table_parallelism, + target_parallelism + ); + } + + result.insert(object_id as u32, target_parallelism); + } + + result }; let schedulable_worker_ids = active_nodes diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index c9beada284d5d..2e9f6a480299f 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -337,6 +337,7 @@ impl CatalogController { ctx: Some(ctx.unwrap_or_default()), parallelism: Some( match parallelism { + StreamingParallelism::Custom => TableParallelism::Custom, StreamingParallelism::Adaptive => TableParallelism::Adaptive, StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _), } @@ -1284,6 +1285,39 @@ impl CatalogController { Ok(Self::compose_fragment(fragment, actors, actor_dispatchers)?.0) } + + /// Get the actor count of `Materialize` or `Sink` fragment of the specified table. + pub async fn get_actual_job_fragment_parallelism( + &self, + job_id: ObjectId, + ) -> MetaResult> { + let inner = self.inner.read().await; + let mut fragments: Vec<(FragmentId, i32, i64)> = Fragment::find() + .join(JoinType::InnerJoin, fragment::Relation::Actor.def()) + .select_only() + .columns([ + fragment::Column::FragmentId, + fragment::Column::FragmentTypeMask, + ]) + .column_as(actor::Column::ActorId.count(), "count") + .filter(fragment::Column::JobId.eq(job_id)) + .group_by(fragment::Column::FragmentId) + .into_tuple() + .all(&inner.db) + .await?; + + fragments.retain(|(_, mask, _)| { + *mask & PbFragmentTypeFlag::Mview as i32 != 0 + || *mask & PbFragmentTypeFlag::Sink as i32 != 0 + }); + + Ok(fragments + .into_iter() + .at_most_one() + .ok() + .flatten() + .map(|(_, _, count)| count as usize)) + } } #[cfg(test)] diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 2b05f5f54ef2e..a4aeb1182d6c2 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -1284,9 +1284,7 @@ impl CatalogController { streaming_job.parallelism = Set(match parallelism { TableParallelism::Adaptive => StreamingParallelism::Adaptive, TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n as _), - TableParallelism::Custom => { - unreachable!("sql backend doesn't support custom parallelism") - } + TableParallelism::Custom => StreamingParallelism::Custom, }); streaming_job.update(&txn).await?; diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 8aae5c95c5552..19a0d8942b5b6 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -2772,10 +2772,10 @@ impl GlobalStreamManager { streaming_parallelisms .into_iter() .map(|(table_id, parallelism)| { - // no custom for sql backend let table_parallelism = match parallelism { StreamingParallelism::Adaptive => TableParallelism::Adaptive, StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n), + StreamingParallelism::Custom => TableParallelism::Custom, }; (table_id as u32, table_parallelism)