Skip to content

Commit

Permalink
feat(meta): add Custom to StreamingParallelism (#16033)
Browse files Browse the repository at this point in the history
Co-authored-by: August <[email protected]>
  • Loading branch information
shanicky and yezizp2012 authored Apr 1, 2024
1 parent 077dbb8 commit 246c940
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 17 deletions.
16 changes: 15 additions & 1 deletion risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,19 @@ profile:
- use: compute-node
- use: frontend

meta-1cn-1fe-sqlite-with-recovery:
config-path: src/config/ci-recovery.toml
steps:
- use: minio
- use: sqlite
- use: meta-node
port: 5690
dashboard-port: 5691
exporter-port: 1250
- use: compactor
- use: compute-node
- use: frontend

java-binding-demo:
steps:
- use: minio
Expand Down Expand Up @@ -1370,4 +1383,5 @@ template:
port: 6379

# address of redis
address: "127.0.0.1"
address: "127.0.0.1"

1 change: 1 addition & 0 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ derive_from_json_struct!(
pub enum StreamingParallelism {
Adaptive,
Fixed(usize),
Custom,
}

impl Eq for StreamingParallelism {}
1 change: 1 addition & 0 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ impl StreamManagerService for StreamServiceImpl {
.map(|(table_id, state, parallelism)| {
let parallelism = match parallelism {
StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
StreamingParallelism::Custom => model::TableParallelism::Custom,
StreamingParallelism::Fixed(n) => {
model::TableParallelism::Fixed(n as _)
}
Expand Down
52 changes: 40 additions & 12 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::controller::catalog::ReleaseContext;
use crate::manager::{ActiveStreamingWorkerNodes, MetadataManager, WorkerId};
use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy};
use crate::MetaResult;
use crate::{model, MetaResult};

impl GlobalBarrierManager {
// Retry base interval in milliseconds.
Expand Down Expand Up @@ -705,24 +705,52 @@ impl GlobalBarrierManagerContext {
let mgr = self.metadata_manager.as_v2_ref();
debug!("start resetting actors distribution");

let available_parallelism = active_nodes
.current()
.values()
.flat_map(|worker_node| worker_node.parallel_units.iter())
.count();

let table_parallelisms: HashMap<_, _> = {
let streaming_parallelisms = mgr
.catalog_controller
.get_all_created_streaming_parallelisms()
.await?;

streaming_parallelisms
.into_iter()
.map(|(table_id, parallelism)| {
// no custom for sql backend
let table_parallelism = match parallelism {
StreamingParallelism::Adaptive => TableParallelism::Adaptive,
StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
};
let mut result = HashMap::new();

(table_id as u32, table_parallelism)
})
.collect()
for (object_id, streaming_parallelism) in streaming_parallelisms {
let actual_fragment_parallelism = mgr
.catalog_controller
.get_actual_job_fragment_parallelism(object_id)
.await?;

let table_parallelism = match streaming_parallelism {
StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
StreamingParallelism::Custom => model::TableParallelism::Custom,
StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
};

let target_parallelism = Self::derive_target_parallelism(
available_parallelism,
table_parallelism,
actual_fragment_parallelism,
self.env.opts.default_parallelism,
);

if target_parallelism != table_parallelism {
tracing::info!(
"resetting table {} parallelism from {:?} to {:?}",
object_id,
table_parallelism,
target_parallelism
);
}

result.insert(object_id as u32, target_parallelism);
}

result
};

let schedulable_worker_ids = active_nodes
Expand Down
34 changes: 34 additions & 0 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ impl CatalogController {
ctx: Some(ctx.unwrap_or_default()),
parallelism: Some(
match parallelism {
StreamingParallelism::Custom => TableParallelism::Custom,
StreamingParallelism::Adaptive => TableParallelism::Adaptive,
StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
}
Expand Down Expand Up @@ -1284,6 +1285,39 @@ impl CatalogController {

Ok(Self::compose_fragment(fragment, actors, actor_dispatchers)?.0)
}

/// Get the actor count of `Materialize` or `Sink` fragment of the specified table.
pub async fn get_actual_job_fragment_parallelism(
&self,
job_id: ObjectId,
) -> MetaResult<Option<usize>> {
let inner = self.inner.read().await;
let mut fragments: Vec<(FragmentId, i32, i64)> = Fragment::find()
.join(JoinType::InnerJoin, fragment::Relation::Actor.def())
.select_only()
.columns([
fragment::Column::FragmentId,
fragment::Column::FragmentTypeMask,
])
.column_as(actor::Column::ActorId.count(), "count")
.filter(fragment::Column::JobId.eq(job_id))
.group_by(fragment::Column::FragmentId)
.into_tuple()
.all(&inner.db)
.await?;

fragments.retain(|(_, mask, _)| {
*mask & PbFragmentTypeFlag::Mview as i32 != 0
|| *mask & PbFragmentTypeFlag::Sink as i32 != 0
});

Ok(fragments
.into_iter()
.at_most_one()
.ok()
.flatten()
.map(|(_, _, count)| count as usize))
}
}

#[cfg(test)]
Expand Down
4 changes: 1 addition & 3 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1284,9 +1284,7 @@ impl CatalogController {
streaming_job.parallelism = Set(match parallelism {
TableParallelism::Adaptive => StreamingParallelism::Adaptive,
TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n as _),
TableParallelism::Custom => {
unreachable!("sql backend doesn't support custom parallelism")
}
TableParallelism::Custom => StreamingParallelism::Custom,
});

streaming_job.update(&txn).await?;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2772,10 +2772,10 @@ impl GlobalStreamManager {
streaming_parallelisms
.into_iter()
.map(|(table_id, parallelism)| {
// no custom for sql backend
let table_parallelism = match parallelism {
StreamingParallelism::Adaptive => TableParallelism::Adaptive,
StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
StreamingParallelism::Custom => TableParallelism::Custom,
};

(table_id as u32, table_parallelism)
Expand Down

0 comments on commit 246c940

Please sign in to comment.