Skip to content

Commit

Permalink
feat: add rw_streaming_graph and remove graph info in rw_relation_info
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed May 16, 2024
1 parent 224ac39 commit fc01c92
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 121 deletions.
1 change: 1 addition & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ mod rw_relations;
mod rw_schemas;
mod rw_sinks;
mod rw_sources;
mod rw_streaming_graph;
mod rw_streaming_parallelism;
mod rw_subscriptions;
mod rw_system_tables;
Expand Down
181 changes: 60 additions & 121 deletions src/frontend/src/catalog/system_catalog/rw_catalog/rw_relation_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,9 @@

use risingwave_common::types::{Fields, Timestamptz};
use risingwave_frontend_macro::system_catalog;
use serde_json::json;

use crate::catalog::system_catalog::SysCatalogReaderImpl;
use crate::error::Result;

// TODO: `rw_relation_info` contains some extra streaming meta info that's only meaningful for
// streaming jobs, we'd better query relation infos from `rw_relations` and move these streaming
// infos into anther system table.
#[derive(Fields)]
#[primary_key(schemaname, relationname)]
struct RwRelationInfo {
Expand All @@ -31,8 +26,6 @@ struct RwRelationInfo {
definition: String,
relationtype: String,
relationid: i32,
relationtimezone: String, // The timezone used to interpret ambiguous dates/timestamps as tstz
fragments: Option<String>, // fragments is json encoded fragment infos.
initialized_at: Option<Timestamptz>,
created_at: Option<Timestamptz>,
initialized_at_cluster_version: Option<String>,
Expand All @@ -41,120 +34,72 @@ struct RwRelationInfo {

#[system_catalog(table, "rw_catalog.rw_relation_info")]
async fn read_relation_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwRelationInfo>> {
let mut table_ids = Vec::new();
{
let catalog_reader = reader.catalog_reader.read_guard();
let schemas = catalog_reader.get_all_schema_names(&reader.auth_context.database)?;
for schema in &schemas {
let schema_catalog =
catalog_reader.get_schema_by_name(&reader.auth_context.database, schema)?;

schema_catalog.iter_mv().for_each(|t| {
table_ids.push(t.id.table_id);
});

schema_catalog.iter_table().for_each(|t| {
table_ids.push(t.id.table_id);
});

schema_catalog.iter_sink().for_each(|t| {
table_ids.push(t.id.sink_id);
});

schema_catalog.iter_index().for_each(|t| {
table_ids.push(t.index_table.id.table_id);
});

schema_catalog.iter_subscription().for_each(|t| {
table_ids.push(t.id.subscription_id);
});
}
}

let table_fragments = reader.meta_client.list_table_fragments(&table_ids).await?;
let mut rows = Vec::new();
let catalog_reader = reader.catalog_reader.read_guard();
let schemas = catalog_reader.get_all_schema_names(&reader.auth_context.database)?;
for schema in &schemas {
let schema_catalog =
catalog_reader.get_schema_by_name(&reader.auth_context.database, schema)?;
schema_catalog.iter_mv().for_each(|t| {
if let Some(fragments) = table_fragments.get(&t.id.table_id) {
rows.push(RwRelationInfo {
schemaname: schema.clone(),
relationname: t.name.clone(),
relationowner: t.owner as i32,
definition: t.definition.clone(),
relationtype: "MATERIALIZED VIEW".into(),
relationid: t.id.table_id as i32,
relationtimezone: fragments.get_ctx().unwrap().get_timezone().clone(),
fragments: Some(json!(fragments.get_fragments()).to_string()),
initialized_at: t.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: t.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: t.initialized_at_cluster_version.clone(),
created_at_cluster_version: t.created_at_cluster_version.clone(),
});
}
rows.push(RwRelationInfo {
schemaname: schema.clone(),
relationname: t.name.clone(),
relationowner: t.owner as i32,
definition: t.definition.clone(),
relationtype: "MATERIALIZED VIEW".into(),
relationid: t.id.table_id as i32,
initialized_at: t.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: t.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: t.initialized_at_cluster_version.clone(),
created_at_cluster_version: t.created_at_cluster_version.clone(),
});
});

schema_catalog.iter_table().for_each(|t| {
if let Some(fragments) = table_fragments.get(&t.id.table_id) {
rows.push(RwRelationInfo {
schemaname: schema.clone(),
relationname: t.name.clone(),
relationowner: t.owner as i32,
definition: t.definition.clone(),
relationtype: "TABLE".into(),
relationid: t.id.table_id as i32,
relationtimezone: fragments.get_ctx().unwrap().get_timezone().clone(),
fragments: Some(json!(fragments.get_fragments()).to_string()),
initialized_at: t.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: t.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: t.initialized_at_cluster_version.clone(),
created_at_cluster_version: t.created_at_cluster_version.clone(),
});
}
rows.push(RwRelationInfo {
schemaname: schema.clone(),
relationname: t.name.clone(),
relationowner: t.owner as i32,
definition: t.definition.clone(),
relationtype: "TABLE".into(),
relationid: t.id.table_id as i32,
initialized_at: t.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: t.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: t.initialized_at_cluster_version.clone(),
created_at_cluster_version: t.created_at_cluster_version.clone(),
});
});

schema_catalog.iter_sink().for_each(|t| {
if let Some(fragments) = table_fragments.get(&t.id.sink_id) {
rows.push(RwRelationInfo {
schemaname: schema.clone(),
relationname: t.name.clone(),
relationowner: t.owner.user_id as i32,
definition: t.definition.clone(),
relationtype: "SINK".into(),
relationid: t.id.sink_id as i32,
relationtimezone: fragments.get_ctx().unwrap().get_timezone().clone(),
fragments: Some(json!(fragments.get_fragments()).to_string()),
initialized_at: t.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: t.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: t.initialized_at_cluster_version.clone(),
created_at_cluster_version: t.created_at_cluster_version.clone(),
});
}
rows.push(RwRelationInfo {
schemaname: schema.clone(),
relationname: t.name.clone(),
relationowner: t.owner.user_id as i32,
definition: t.definition.clone(),
relationtype: "SINK".into(),
relationid: t.id.sink_id as i32,
initialized_at: t.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: t.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: t.initialized_at_cluster_version.clone(),
created_at_cluster_version: t.created_at_cluster_version.clone(),
});
});

schema_catalog.iter_index().for_each(|t| {
if let Some(fragments) = table_fragments.get(&t.index_table.id.table_id) {
rows.push(RwRelationInfo {
schemaname: schema.clone(),
relationname: t.name.clone(),
relationowner: t.index_table.owner as i32,
definition: t.index_table.definition.clone(),
relationtype: "INDEX".into(),
relationid: t.index_table.id.table_id as i32,
relationtimezone: fragments.get_ctx().unwrap().get_timezone().clone(),
fragments: Some(json!(fragments.get_fragments()).to_string()),
initialized_at: t.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: t.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: t.initialized_at_cluster_version.clone(),
created_at_cluster_version: t.created_at_cluster_version.clone(),
});
}
rows.push(RwRelationInfo {
schemaname: schema.clone(),
relationname: t.name.clone(),
relationowner: t.index_table.owner as i32,
definition: t.index_table.definition.clone(),
relationtype: "INDEX".into(),
relationid: t.index_table.id.table_id as i32,
initialized_at: t.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: t.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: t.initialized_at_cluster_version.clone(),
created_at_cluster_version: t.created_at_cluster_version.clone(),
});
});

// Sources have no fragments.
schema_catalog.iter_source().for_each(|t| {
rows.push(RwRelationInfo {
schemaname: schema.clone(),
Expand All @@ -163,8 +108,6 @@ async fn read_relation_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwRelat
definition: t.definition.clone(),
relationtype: "SOURCE".into(),
relationid: t.id as i32,
relationtimezone: "".into(),
fragments: None,
initialized_at: t.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: t.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: t.initialized_at_cluster_version.clone(),
Expand All @@ -173,22 +116,18 @@ async fn read_relation_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwRelat
});

schema_catalog.iter_subscription().for_each(|t| {
if let Some(fragments) = table_fragments.get(&t.id.subscription_id) {
rows.push(RwRelationInfo {
schemaname: schema.clone(),
relationname: t.name.clone(),
relationowner: t.owner.user_id as i32,
definition: t.definition.clone(),
relationtype: "SUBSCRIPTION".into(),
relationid: t.id.subscription_id as i32,
relationtimezone: fragments.get_ctx().unwrap().get_timezone().clone(),
fragments: Some(json!(fragments.get_fragments()).to_string()),
initialized_at: t.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: t.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: t.initialized_at_cluster_version.clone(),
created_at_cluster_version: t.created_at_cluster_version.clone(),
});
}
rows.push(RwRelationInfo {
schemaname: schema.clone(),
relationname: t.name.clone(),
relationowner: t.owner.user_id as i32,
definition: t.definition.clone(),
relationtype: "SUBSCRIPTION".into(),
relationid: t.id.subscription_id as i32,
initialized_at: t.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: t.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: t.initialized_at_cluster_version.clone(),
created_at_cluster_version: t.created_at_cluster_version.clone(),
});
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::{Fields, JsonbVal};
use risingwave_frontend_macro::system_catalog;
use serde_json::json;

use crate::catalog::system_catalog::SysCatalogReaderImpl;
use crate::error::Result;

#[derive(Fields)]
struct RwStreamingGraph {
#[primary_key]
id: i32,
name: String,
relation_type: String,
definition: String,
timezone: String, // The timezone used to interpret ambiguous dates/timestamps as tstz
graph: JsonbVal,
}

#[system_catalog(table, "rw_catalog.rw_streaming_graph")]
async fn read_streaming_graph(reader: &SysCatalogReaderImpl) -> Result<Vec<RwStreamingGraph>> {
let mut job_ids = vec![];
{
let catalog_reader = reader.catalog_reader.read_guard();
let schemas = catalog_reader.get_all_schema_names(&reader.auth_context.database)?;
for schema in &schemas {
let schema_catalog =
catalog_reader.get_schema_by_name(&reader.auth_context.database, schema)?;
job_ids.extend(schema_catalog.iter_mv().map(|mv| mv.id.table_id));
job_ids.extend(schema_catalog.iter_table().map(|t| t.id.table_id));
job_ids.extend(schema_catalog.iter_sink().map(|s| s.id.sink_id));
job_ids.extend(
schema_catalog
.iter_source()
.filter_map(|s| s.info.is_shared().then(|| s.id)),
);
job_ids.extend(
schema_catalog
.iter_index()
.map(|idx| idx.index_table.id.table_id),
);
}
}

let mut table_fragments = reader.meta_client.list_table_fragments(&job_ids).await?;
let mut rows = Vec::new();
let catalog_reader = reader.catalog_reader.read_guard();
let schemas = catalog_reader.get_all_schema_names(&reader.auth_context.database)?;
for schema in &schemas {
let schema_catalog =
catalog_reader.get_schema_by_name(&reader.auth_context.database, schema)?;
schema_catalog.iter_mv().for_each(|t| {
if let Some(fragments) = table_fragments.remove(&t.id.table_id) {
rows.push(RwStreamingGraph {
id: t.id.table_id as i32,
name: t.name.clone(),
relation_type: "MATERIALIZED VIEW".into(),
definition: t.definition.clone(),
timezone: fragments.ctx.unwrap().timezone,
graph: json!(fragments.fragments).into(),
});
}
});

schema_catalog.iter_table().for_each(|t| {
if let Some(fragments) = table_fragments.remove(&t.id.table_id) {
rows.push(RwStreamingGraph {
id: t.id.table_id as i32,
name: t.name.clone(),
relation_type: "TABLE".into(),
definition: t.definition.clone(),
timezone: fragments.ctx.unwrap().timezone,
graph: json!(fragments.fragments).into(),
});
}
});

schema_catalog.iter_sink().for_each(|t| {
if let Some(fragments) = table_fragments.remove(&t.id.sink_id) {
rows.push(RwStreamingGraph {
id: t.id.sink_id as i32,
name: t.name.clone(),
relation_type: "SINK".into(),
definition: t.definition.clone(),
timezone: fragments.ctx.unwrap().timezone,
graph: json!(fragments.fragments).into(),
});
}
});

schema_catalog.iter_index().for_each(|t| {
if let Some(fragments) = table_fragments.remove(&t.index_table.id.table_id) {
rows.push(RwStreamingGraph {
id: t.index_table.id.table_id as i32,
name: t.name.clone(),
relation_type: "INDEX".into(),
definition: t.index_table.definition.clone(),
timezone: fragments.ctx.unwrap().timezone,
graph: json!(fragments.fragments).into(),
});
}
});

schema_catalog
.iter_source()
.filter(|s| s.info.is_shared())
.for_each(|t| {
if let Some(fragments) = table_fragments.remove(&t.id) {
rows.push(RwStreamingGraph {
id: t.id as i32,
name: t.name.clone(),
relation_type: "SOURCE".into(),
definition: t.definition.clone(),
timezone: fragments.ctx.unwrap().timezone,
graph: json!(fragments.fragments).into(),
});
}
});
}

Ok(rows)
}

0 comments on commit fc01c92

Please sign in to comment.