diff --git a/e2e_test/sink/sink_into_table/parallelism.slt b/e2e_test/sink/sink_into_table/parallelism.slt new file mode 100644 index 0000000000000..5aede570384b5 --- /dev/null +++ b/e2e_test/sink/sink_into_table/parallelism.slt @@ -0,0 +1,87 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +SET STREAMING_PARALLELISM TO 2; + +statement ok +create table t_simple (v1 int, v2 int); + +statement ok +create table m_simple (v1 int primary key, v2 int); + +statement ok +SET STREAMING_PARALLELISM TO 3; + +statement ok +create sink s_simple_1 into m_simple as select v1, v2 from t_simple; + +query I +select distinct parallelism from rw_fragment_parallelism where name in ('t_simple', 'm_simple', 's_simple_1'); +---- +2 + +statement ok +insert into t_simple select * from generate_series(1, 100); + +statement ok +flush; + +query I +select count(*) from m_simple; +---- +100 + +statement ok +drop sink s_simple_1; + +statement ok +drop table t_simple; + +statement ok +drop table m_simple; + +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +SET STREAMING_PARALLELISM TO 2; + +statement ok +create table t_simple (v1 int, v2 int); + +statement ok +create table m_simple (v1 int primary key, v2 int); + +statement ok +SET STREAMING_PARALLELISM TO 3; + +# multi fragment sink +statement ok +create sink s_multi into m_simple as select v1, count(*)::int from t_simple group by v1; + +query I +select parallelism from rw_fragment_parallelism where array_position(flags, 'SINK') is not null and name = 's_multi'; +---- +3 + +statement ok +insert into t_simple select * from generate_series(1, 100); + +statement ok +flush; + +query I +select count(*) from m_simple; +---- +100 + +statement ok +drop sink s_multi; + +statement ok +drop table t_simple; + +statement ok +drop table m_simple; + diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index d2fbab5acaf1c..65f94e550da00 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1800,7 +1800,6 @@ impl DdlController { dummy_table_id: TableId, ) -> MetaResult<(ReplaceTableContext, TableFragments)> { let id = stream_job.id(); - let default_parallelism = fragment_graph.default_parallelism(); let expr_context = stream_ctx.to_expr_context(); let old_table_fragments = self @@ -1848,7 +1847,9 @@ impl DdlController { // 2. Build the actor graph. let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?; - let parallelism = self.resolve_stream_parallelism(default_parallelism, &cluster_info)?; + let parallelism = NonZeroUsize::new(original_table_fragment.get_actors().len()) + .expect("The number of actors in the original table fragment should be greater than 0"); + let actor_graph_builder = ActorGraphBuilder::new(id, complete_graph, cluster_info, parallelism)?; @@ -1863,11 +1864,6 @@ impl DdlController { .await?; assert!(dispatchers.is_empty()); - let table_parallelism = match default_parallelism { - None => TableParallelism::Adaptive, - Some(parallelism) => TableParallelism::Fixed(parallelism.get()), - }; - // 3. Build the table fragments structure that will be persisted in the stream manager, and // the context that contains all information needed for building the actors on the compute // nodes. @@ -1876,8 +1872,7 @@ impl DdlController { graph, &building_locations.actor_locations, stream_ctx, - // todo: shall we use the old table fragments' parallelism - table_parallelism, + old_table_fragments.assigned_parallelism, ); let ctx = ReplaceTableContext {