From 36809447d20716e260d9174de7ead108fb789aed Mon Sep 17 00:00:00 2001 From: August Date: Thu, 28 Dec 2023 18:14:31 +0800 Subject: [PATCH] feat: introduce rw_streaming_parallelism system view --- .../src/catalog/system_catalog/mod.rs | 1 + .../catalog/system_catalog/rw_catalog/mod.rs | 2 + .../rw_catalog/rw_streaming_parallelism.rs | 68 +++++++++++++++++++ 3 files changed, 71 insertions(+) create mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_parallelism.rs diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index 607b909392427..8f74fb9380c39 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -439,6 +439,7 @@ prepare_sys_catalog! { { BuiltinCatalog::Table(&RW_RELATION_INFO), read_relation_info await }, { BuiltinCatalog::Table(&RW_SYSTEM_TABLES), read_system_table_info }, { BuiltinCatalog::View(&RW_RELATIONS) }, + { BuiltinCatalog::View(&RW_STREAMING_PARALLELISM) }, { BuiltinCatalog::Table(&RW_COLUMNS), read_rw_columns_info }, { BuiltinCatalog::Table(&RW_TYPES), read_rw_types }, { BuiltinCatalog::Table(&RW_HUMMOCK_PINNED_VERSIONS), read_hummock_pinned_versions await }, diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index 401ce0b3b6c24..a271093eb1ab7 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -40,6 +40,7 @@ mod rw_relations; mod rw_schemas; mod rw_sinks; mod rw_sources; +mod rw_streaming_parallelism; mod rw_system_tables; mod rw_table_fragments; mod rw_table_stats; @@ -78,6 +79,7 @@ pub use rw_relations::*; pub use rw_schemas::*; pub use rw_sinks::*; pub use rw_sources::*; +pub use rw_streaming_parallelism::*; pub use rw_system_tables::*; pub use rw_table_fragments::*; pub use rw_table_stats::*; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_parallelism.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_parallelism.rs new file mode 100644 index 0000000000000..27767e07e74ea --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_streaming_parallelism.rs @@ -0,0 +1,68 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::LazyLock; + +use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME; +use risingwave_common::types::DataType; + +use crate::catalog::system_catalog::{BuiltinView, SystemCatalogColumnsDef}; + +pub static RW_STREAMING_PARALLELISM_COLUMNS: LazyLock>> = + LazyLock::new(|| { + vec![ + (DataType::Int32, "id"), + (DataType::Varchar, "name"), + (DataType::Varchar, "relation_type"), + (DataType::Int32, "fragment_id"), + (DataType::Varchar, "distribution_type"), + (DataType::List(Box::new(DataType::Int32)), "state_table_ids"), + ( + DataType::List(Box::new(DataType::Int32)), + "upstream_fragment_ids", + ), + (DataType::List(Box::new(DataType::Varchar)), "flags"), + (DataType::Int32, "parallelism"), + ] + }); +pub static RW_STREAMING_PARALLELISM: LazyLock = LazyLock::new(|| BuiltinView { + name: "rw_streaming_parallelism", + schema: RW_CATALOG_SCHEMA_NAME, + columns: &RW_STREAMING_PARALLELISM_COLUMNS, + sql: "WITH all_streaming_jobs AS ( \ + SELECT id, name, 'table' as relation_type FROM rw_tables \ + UNION ALL \ + SELECT id, name, 'materialized view' as relation_type FROM rw_materialized_views \ + UNION ALL \ + SELECT id, name, 'sink' as relation_type FROM rw_sinks \ + UNION ALL \ + SELECT id, name, 'index' as relation_type FROM rw_indexes \ + ) \ + SELECT \ + job.id, \ + job.name, \ + job.relation_type, \ + f.fragment_id, \ + f.distribution_type, \ + f.state_table_ids, \ + f.upstream_fragment_ids, \ + f.flags, \ + f.parallelism \ + FROM all_streaming_jobs job \ + INNER JOIN rw_fragments f ON job.id = f.table_id \ + WHERE job.relation_type in ('table', 'materialized view', 'sink', 'index') \ + ORDER BY job.id\ + " + .to_string(), +});