From b11c2c25af36ca57e43ab68589c1dc2c769fe1e0 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 3 Jan 2024 16:11:54 +0800 Subject: [PATCH 1/3] feat: introduce rw_streaming_parallelism system view (#14261) (#14326) Co-authored-by: August Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- .../src/catalog/system_catalog/mod.rs | 1 + .../catalog/system_catalog/rw_catalog/mod.rs | 2 + .../rw_catalog/rw_streaming_parallelism.rs | 82 +++++++++++++++++++ 3 files changed, 85 insertions(+) create mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_parallelism.rs diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index 7406b0d9f03e1..c5bed6c98796c 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -439,6 +439,7 @@ prepare_sys_catalog! { { BuiltinCatalog::Table(&RW_RELATION_INFO), read_relation_info await }, { BuiltinCatalog::Table(&RW_SYSTEM_TABLES), read_system_table_info }, { BuiltinCatalog::View(&RW_RELATIONS) }, + { BuiltinCatalog::View(&RW_STREAMING_PARALLELISM) }, { BuiltinCatalog::Table(&RW_COLUMNS), read_rw_columns_info }, { BuiltinCatalog::Table(&RW_TYPES), read_rw_types }, { BuiltinCatalog::Table(&RW_HUMMOCK_PINNED_VERSIONS), read_hummock_pinned_versions await }, diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index fd6120477fab2..977840444a6e6 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -40,6 +40,7 @@ mod rw_relations; mod rw_schemas; mod rw_sinks; mod rw_sources; +mod rw_streaming_parallelism; mod rw_system_tables; mod rw_table_fragments; mod rw_table_stats; @@ -78,6 +79,7 @@ pub use rw_relations::*; pub use rw_schemas::*; pub use rw_sinks::*; pub use rw_sources::*; +pub use rw_streaming_parallelism::*; pub use rw_system_tables::*; pub use rw_table_fragments::*; pub use rw_table_stats::*; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_parallelism.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_parallelism.rs new file mode 100644 index 0000000000000..01cdb37bd15f4 --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_parallelism.rs @@ -0,0 +1,82 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::LazyLock; + +use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME; +use risingwave_common::types::DataType; + +use crate::catalog::system_catalog::{BuiltinView, SystemCatalogColumnsDef}; + +pub static RW_STREAMING_PARALLELISM_COLUMNS: LazyLock>> = + LazyLock::new(|| { + vec![ + (DataType::Int32, "id"), + (DataType::Varchar, "name"), + (DataType::Varchar, "relation_type"), + (DataType::Int32, "fragment_id"), + (DataType::Varchar, "distribution_type"), + (DataType::List(Box::new(DataType::Int32)), "state_table_ids"), + ( + DataType::List(Box::new(DataType::Int32)), + "upstream_fragment_ids", + ), + (DataType::List(Box::new(DataType::Varchar)), "flags"), + (DataType::Int32, "parallelism"), + ] + }); +pub static RW_STREAMING_PARALLELISM: LazyLock = LazyLock::new(|| BuiltinView { + name: "rw_streaming_parallelism", + schema: RW_CATALOG_SCHEMA_NAME, + columns: &RW_STREAMING_PARALLELISM_COLUMNS, + sql: "WITH all_streaming_jobs AS ( \ + SELECT id, name, 'table' as relation_type FROM rw_tables \ + UNION ALL \ + SELECT id, name, 'materialized view' as relation_type FROM rw_materialized_views \ + UNION ALL \ + SELECT id, name, 'sink' as relation_type FROM rw_sinks \ + UNION ALL \ + SELECT id, name, 'index' as relation_type FROM rw_indexes \ + ) \ + SELECT \ + job.id, \ + job.name, \ + job.relation_type, \ + f.fragment_id, \ + f.distribution_type, \ + f.state_table_ids, \ + f.upstream_fragment_ids, \ + f.flags, \ + f.parallelism \ + FROM all_streaming_jobs job \ + INNER JOIN rw_fragments f ON job.id = f.table_id \ + WHERE job.relation_type in ('table', 'materialized view', 'sink', 'index') \ + ORDER BY job.id\ + " + .to_string(), +}); From 3ee19828530a37c18b252d565648354a811dc9ec Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 3 Jan 2024 14:08:27 +0000 Subject: [PATCH 2/3] fix: revert to random source split assignment when there are too many actors (#14346) (#14349) Co-authored-by: xxchan --- src/meta/src/stream/source_manager.rs | 193 +++++++++----------------- 1 file changed, 65 insertions(+), 128 deletions(-) diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index b5a1286804408..f7637a9d0c046 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -394,7 +394,7 @@ impl Eq for ActorSplitsAssignment {} impl PartialEq for ActorSplitsAssignment { fn eq(&self, other: &Self) -> bool { - self.splits.len() == other.splits.len() && self.actor_id == other.actor_id + self.splits.len() == other.splits.len() } } @@ -407,12 +407,7 @@ impl PartialOrd for ActorSplitsAssignment { impl Ord for ActorSplitsAssignment { fn cmp(&self, other: &Self) -> Ordering { // Note: this is reversed order, to make BinaryHeap a min heap. - other - .splits - .len() - .cmp(&self.splits.len()) - // To make the BinaryHeap have a deterministic order - .then(other.actor_id.cmp(&self.actor_id)) + other.splits.len().cmp(&self.splits.len()) } } @@ -508,6 +503,12 @@ where for split_id in new_discovered_splits { // ActorSplitsAssignment's Ord is reversed, so this is min heap, i.e., // we get the assignment with the least splits here. + + // Note: If multiple actors have the same number of splits, it will be randomly picked. + // When the number of source actors is larger than the number of splits, + // It's possible that the assignment is uneven. + // e.g., https://github.com/risingwavelabs/risingwave/issues/14324#issuecomment-1875033158 + // TODO: We should make the assignment rack-aware to make sure it's even. let mut peek_ref = heap.peek_mut().unwrap(); peek_ref .splits @@ -1073,46 +1074,29 @@ mod tests { #[test] fn test_reassign_splits() { - fn check( - actor_splits: HashMap>, - discovered_splits: BTreeMap, - expected: expect_test::Expect, - ) { - let diff = reassign_splits( - FragmentId::default(), - actor_splits, - &discovered_splits, - Default::default(), - ) - .map(BTreeMap::from_iter); // ensure deterministic debug string - expected.assert_debug_eq(&diff); - } - let actor_splits = HashMap::new(); let discovered_splits: BTreeMap = BTreeMap::new(); - check( + assert!(reassign_splits( + FragmentId::default(), actor_splits, - discovered_splits, - expect_test::expect![[r#" - None - "#]], - ); + &discovered_splits, + Default::default() + ) + .is_none()); let actor_splits = (0..3).map(|i| (i, vec![])).collect(); let discovered_splits: BTreeMap = BTreeMap::new(); - check( + let diff = reassign_splits( + FragmentId::default(), actor_splits, - discovered_splits, - expect_test::expect![[r#" - Some( - { - 0: [], - 1: [], - 2: [], - }, - ) - "#]], - ); + &discovered_splits, + Default::default(), + ) + .unwrap(); + assert_eq!(diff.len(), 3); + for splits in diff.values() { + assert!(splits.is_empty()) + } let actor_splits = (0..3).map(|i| (i, vec![])).collect(); let discovered_splits: BTreeMap = (0..3) @@ -1121,31 +1105,20 @@ mod tests { (split.id(), split) }) .collect(); - check( + + let diff = reassign_splits( + FragmentId::default(), actor_splits, - discovered_splits, - expect_test::expect![[r#" - Some( - { - 0: [ - TestSplit { - id: 0, - }, - ], - 1: [ - TestSplit { - id: 1, - }, - ], - 2: [ - TestSplit { - id: 2, - }, - ], - }, - ) - "#]], - ); + &discovered_splits, + Default::default(), + ) + .unwrap(); + assert_eq!(diff.len(), 3); + for splits in diff.values() { + assert_eq!(splits.len(), 1); + } + + check_all_splits(&discovered_splits, &diff); let actor_splits = (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect(); let discovered_splits: BTreeMap = (0..5) @@ -1154,82 +1127,46 @@ mod tests { (split.id(), split) }) .collect(); - check( + + let diff = reassign_splits( + FragmentId::default(), actor_splits, - discovered_splits, - expect_test::expect![[r#" - Some( - { - 0: [ - TestSplit { - id: 0, - }, - TestSplit { - id: 3, - }, - ], - 1: [ - TestSplit { - id: 1, - }, - TestSplit { - id: 4, - }, - ], - 2: [ - TestSplit { - id: 2, - }, - ], - }, - ) - "#]], - ); + &discovered_splits, + Default::default(), + ) + .unwrap(); + assert_eq!(diff.len(), 3); + for splits in diff.values() { + let len = splits.len(); + assert!(len == 1 || len == 2); + } + + check_all_splits(&discovered_splits, &diff); let mut actor_splits: HashMap> = (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect(); actor_splits.insert(3, vec![]); actor_splits.insert(4, vec![]); + let discovered_splits: BTreeMap = (0..5) .map(|i| { let split = TestSplit { id: i }; (split.id(), split) }) .collect(); - check( + + let diff = reassign_splits( + FragmentId::default(), actor_splits, - discovered_splits, - expect_test::expect![[r#" - Some( - { - 0: [ - TestSplit { - id: 0, - }, - ], - 1: [ - TestSplit { - id: 1, - }, - ], - 2: [ - TestSplit { - id: 2, - }, - ], - 3: [ - TestSplit { - id: 3, - }, - ], - 4: [ - TestSplit { - id: 4, - }, - ], - }, - ) - "#]], - ); + &discovered_splits, + Default::default(), + ) + .unwrap(); + assert_eq!(diff.len(), 5); + for splits in diff.values() { + assert_eq!(splits.len(), 1); + } + + check_all_splits(&discovered_splits, &diff); } } From 024d9a8bc108e0c5fc1ba9ba9829a8dabeb3caa5 Mon Sep 17 00:00:00 2001 From: stonepage <40830455+st1page@users.noreply.github.com> Date: Thu, 4 Jan 2024 17:32:14 +0800 Subject: [PATCH 3/3] fix(stream): rowIdGen executor handle invisible delete record (#14237) (#14329) --- src/stream/src/executor/row_id_gen.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/stream/src/executor/row_id_gen.rs b/src/stream/src/executor/row_id_gen.rs index 317a8d256106d..3bd6c0aca2167 100644 --- a/src/stream/src/executor/row_id_gen.rs +++ b/src/stream/src/executor/row_id_gen.rs @@ -65,15 +65,26 @@ impl RowIdGenExecutor { } /// Generate a row ID column according to ops. - fn gen_row_id_column_by_op(&mut self, column: &ArrayRef, ops: Ops<'_>) -> ArrayRef { + fn gen_row_id_column_by_op( + &mut self, + column: &ArrayRef, + ops: Ops<'_>, + vis: &Bitmap, + ) -> ArrayRef { let len = column.len(); let mut builder = SerialArrayBuilder::new(len); - for (datum, op) in column.iter().zip_eq_fast(ops) { + for ((datum, op), vis) in column.iter().zip_eq_fast(ops).zip_eq_fast(vis.iter()) { // Only refill row_id for insert operation. match op { Op::Insert => builder.append(Some(self.row_id_generator.next().into())), - _ => builder.append(Some(Serial::try_from(datum.unwrap()).unwrap())), + _ => { + if vis { + builder.append(Some(Serial::try_from(datum.unwrap()).unwrap())) + } else { + builder.append(None) + } + } } } @@ -97,7 +108,7 @@ impl RowIdGenExecutor { // For chunk message, we fill the row id column and then yield it. let (ops, mut columns, bitmap) = chunk.into_inner(); columns[self.row_id_index] = - self.gen_row_id_column_by_op(&columns[self.row_id_index], &ops); + self.gen_row_id_column_by_op(&columns[self.row_id_index], &ops, &bitmap); yield Message::Chunk(StreamChunk::with_visibility(ops, columns, bitmap)); } Message::Barrier(barrier) => {