Skip to content

Commit

Permalink
Merge branch 'release-1.6' into auto-release-1.6-0523b41bcd07906b5511…
Browse files Browse the repository at this point in the history
…7483bdd30c6e64fb534c
  • Loading branch information
yezizp2012 authored Jan 5, 2024
2 parents 2b1d16b + 024d9a8 commit ed4d033
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 132 deletions.
1 change: 1 addition & 0 deletions src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ prepare_sys_catalog! {
{ BuiltinCatalog::Table(&RW_RELATION_INFO), read_relation_info await },
{ BuiltinCatalog::Table(&RW_SYSTEM_TABLES), read_system_table_info },
{ BuiltinCatalog::View(&RW_RELATIONS) },
{ BuiltinCatalog::View(&RW_STREAMING_PARALLELISM) },
{ BuiltinCatalog::Table(&RW_COLUMNS), read_rw_columns_info },
{ BuiltinCatalog::Table(&RW_TYPES), read_rw_types },
{ BuiltinCatalog::Table(&RW_HUMMOCK_PINNED_VERSIONS), read_hummock_pinned_versions await },
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod rw_relations;
mod rw_schemas;
mod rw_sinks;
mod rw_sources;
mod rw_streaming_parallelism;
mod rw_system_tables;
mod rw_table_fragments;
mod rw_table_stats;
Expand Down Expand Up @@ -78,6 +79,7 @@ pub use rw_relations::*;
pub use rw_schemas::*;
pub use rw_sinks::*;
pub use rw_sources::*;
pub use rw_streaming_parallelism::*;
pub use rw_system_tables::*;
pub use rw_table_fragments::*;
pub use rw_table_stats::*;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::LazyLock;

use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME;
use risingwave_common::types::DataType;

use crate::catalog::system_catalog::{BuiltinView, SystemCatalogColumnsDef};

pub static RW_STREAMING_PARALLELISM_COLUMNS: LazyLock<Vec<SystemCatalogColumnsDef<'_>>> =
LazyLock::new(|| {
vec![
(DataType::Int32, "id"),
(DataType::Varchar, "name"),
(DataType::Varchar, "relation_type"),
(DataType::Int32, "fragment_id"),
(DataType::Varchar, "distribution_type"),
(DataType::List(Box::new(DataType::Int32)), "state_table_ids"),
(
DataType::List(Box::new(DataType::Int32)),
"upstream_fragment_ids",
),
(DataType::List(Box::new(DataType::Varchar)), "flags"),
(DataType::Int32, "parallelism"),
]
});
pub static RW_STREAMING_PARALLELISM: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
name: "rw_streaming_parallelism",
schema: RW_CATALOG_SCHEMA_NAME,
columns: &RW_STREAMING_PARALLELISM_COLUMNS,
sql: "WITH all_streaming_jobs AS ( \
SELECT id, name, 'table' as relation_type FROM rw_tables \
UNION ALL \
SELECT id, name, 'materialized view' as relation_type FROM rw_materialized_views \
UNION ALL \
SELECT id, name, 'sink' as relation_type FROM rw_sinks \
UNION ALL \
SELECT id, name, 'index' as relation_type FROM rw_indexes \
) \
SELECT \
job.id, \
job.name, \
job.relation_type, \
f.fragment_id, \
f.distribution_type, \
f.state_table_ids, \
f.upstream_fragment_ids, \
f.flags, \
f.parallelism \
FROM all_streaming_jobs job \
INNER JOIN rw_fragments f ON job.id = f.table_id \
WHERE job.relation_type in ('table', 'materialized view', 'sink', 'index') \
ORDER BY job.id\
"
.to_string(),
});
193 changes: 65 additions & 128 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ impl<T: SplitMetaData + Clone> Eq for ActorSplitsAssignment<T> {}

impl<T: SplitMetaData + Clone> PartialEq<Self> for ActorSplitsAssignment<T> {
fn eq(&self, other: &Self) -> bool {
self.splits.len() == other.splits.len() && self.actor_id == other.actor_id
self.splits.len() == other.splits.len()
}
}

Expand All @@ -407,12 +407,7 @@ impl<T: SplitMetaData + Clone> PartialOrd<Self> for ActorSplitsAssignment<T> {
impl<T: SplitMetaData + Clone> Ord for ActorSplitsAssignment<T> {
fn cmp(&self, other: &Self) -> Ordering {
// Note: this is reversed order, to make BinaryHeap a min heap.
other
.splits
.len()
.cmp(&self.splits.len())
// To make the BinaryHeap have a deterministic order
.then(other.actor_id.cmp(&self.actor_id))
other.splits.len().cmp(&self.splits.len())
}
}

Expand Down Expand Up @@ -508,6 +503,12 @@ where
for split_id in new_discovered_splits {
// ActorSplitsAssignment's Ord is reversed, so this is min heap, i.e.,
// we get the assignment with the least splits here.

// Note: If multiple actors have the same number of splits, it will be randomly picked.
// When the number of source actors is larger than the number of splits,
// It's possible that the assignment is uneven.
// e.g., https://github.com/risingwavelabs/risingwave/issues/14324#issuecomment-1875033158
// TODO: We should make the assignment rack-aware to make sure it's even.
let mut peek_ref = heap.peek_mut().unwrap();
peek_ref
.splits
Expand Down Expand Up @@ -1073,46 +1074,29 @@ mod tests {

#[test]
fn test_reassign_splits() {
fn check(
actor_splits: HashMap<ActorId, Vec<TestSplit>>,
discovered_splits: BTreeMap<SplitId, TestSplit>,
expected: expect_test::Expect,
) {
let diff = reassign_splits(
FragmentId::default(),
actor_splits,
&discovered_splits,
Default::default(),
)
.map(BTreeMap::from_iter); // ensure deterministic debug string
expected.assert_debug_eq(&diff);
}

let actor_splits = HashMap::new();
let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
check(
assert!(reassign_splits(
FragmentId::default(),
actor_splits,
discovered_splits,
expect_test::expect![[r#"
None
"#]],
);
&discovered_splits,
Default::default()
)
.is_none());

let actor_splits = (0..3).map(|i| (i, vec![])).collect();
let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
check(
let diff = reassign_splits(
FragmentId::default(),
actor_splits,
discovered_splits,
expect_test::expect![[r#"
Some(
{
0: [],
1: [],
2: [],
},
)
"#]],
);
&discovered_splits,
Default::default(),
)
.unwrap();
assert_eq!(diff.len(), 3);
for splits in diff.values() {
assert!(splits.is_empty())
}

let actor_splits = (0..3).map(|i| (i, vec![])).collect();
let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..3)
Expand All @@ -1121,31 +1105,20 @@ mod tests {
(split.id(), split)
})
.collect();
check(

let diff = reassign_splits(
FragmentId::default(),
actor_splits,
discovered_splits,
expect_test::expect![[r#"
Some(
{
0: [
TestSplit {
id: 0,
},
],
1: [
TestSplit {
id: 1,
},
],
2: [
TestSplit {
id: 2,
},
],
},
)
"#]],
);
&discovered_splits,
Default::default(),
)
.unwrap();
assert_eq!(diff.len(), 3);
for splits in diff.values() {
assert_eq!(splits.len(), 1);
}

check_all_splits(&discovered_splits, &diff);

let actor_splits = (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect();
let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..5)
Expand All @@ -1154,82 +1127,46 @@ mod tests {
(split.id(), split)
})
.collect();
check(

let diff = reassign_splits(
FragmentId::default(),
actor_splits,
discovered_splits,
expect_test::expect![[r#"
Some(
{
0: [
TestSplit {
id: 0,
},
TestSplit {
id: 3,
},
],
1: [
TestSplit {
id: 1,
},
TestSplit {
id: 4,
},
],
2: [
TestSplit {
id: 2,
},
],
},
)
"#]],
);
&discovered_splits,
Default::default(),
)
.unwrap();
assert_eq!(diff.len(), 3);
for splits in diff.values() {
let len = splits.len();
assert!(len == 1 || len == 2);
}

check_all_splits(&discovered_splits, &diff);

let mut actor_splits: HashMap<ActorId, Vec<TestSplit>> =
(0..3).map(|i| (i, vec![TestSplit { id: i }])).collect();
actor_splits.insert(3, vec![]);
actor_splits.insert(4, vec![]);

let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..5)
.map(|i| {
let split = TestSplit { id: i };
(split.id(), split)
})
.collect();
check(

let diff = reassign_splits(
FragmentId::default(),
actor_splits,
discovered_splits,
expect_test::expect![[r#"
Some(
{
0: [
TestSplit {
id: 0,
},
],
1: [
TestSplit {
id: 1,
},
],
2: [
TestSplit {
id: 2,
},
],
3: [
TestSplit {
id: 3,
},
],
4: [
TestSplit {
id: 4,
},
],
},
)
"#]],
);
&discovered_splits,
Default::default(),
)
.unwrap();
assert_eq!(diff.len(), 5);
for splits in diff.values() {
assert_eq!(splits.len(), 1);
}

check_all_splits(&discovered_splits, &diff);
}
}
Loading

0 comments on commit ed4d033

Please sign in to comment.