From b39aef33c44a2223e8efee8b6322929ea8f5fda8 Mon Sep 17 00:00:00 2001 From: August Date: Tue, 17 Oct 2023 12:49:59 +0800 Subject: [PATCH] feat: expose hidden and distribution keys of columns in sql command and rw_columns (#12839) --- e2e_test/ddl/show.slt | 35 +++-- e2e_test/extended_mode/basic.slt | 23 +-- .../information_schema/columns.rs | 3 +- .../system_catalog/pg_catalog/pg_attribute.rs | 3 +- .../system_catalog/rw_catalog/rw_columns.rs | 53 +++++-- src/frontend/src/handler/describe.rs | 144 ++++++++++-------- src/frontend/src/handler/show.rs | 19 ++- src/frontend/src/handler/util.rs | 13 +- src/frontend/src/session.rs | 10 ++ 9 files changed, 193 insertions(+), 110 deletions(-) diff --git a/e2e_test/ddl/show.slt b/e2e_test/ddl/show.slt index 787aacddb0330..5ae7575668645 100644 --- a/e2e_test/ddl/show.slt +++ b/e2e_test/ddl/show.slt @@ -7,20 +7,23 @@ create materialized view mv3 as select sum(v1) as sum_v1 from t3; statement ok create view v3 as select sum(v2) as sum_v2 from t3; -query TT +query TTT describe t3; ---- -v1 integer -v2 integer -v3 integer -primary key _row_id +v1 integer false +v2 integer false +v3 integer false +_row_id serial true +primary key _row_id NULL +distribution key _row_id NULL -query TT +query TTT show columns from t3; ---- -v1 integer -v2 integer -v3 integer +v1 integer false +v2 integer false +v3 integer false +_row_id serial true statement ok create index idx1 on t3 (v1,v2); @@ -30,14 +33,16 @@ show indexes from t3; ---- idx1 t3 v1 ASC, v2 ASC v3 v1 -query TT +query TTT describe t3; ---- -v1 integer -v2 integer -v3 integer -primary key _row_id -idx1 index(v1 ASC, v2 ASC) include(v3) distributed by(v1) +v1 integer false +v2 integer false +v3 integer false +_row_id serial true +primary key _row_id NULL +distribution key _row_id NULL +idx1 index(v1 ASC, v2 ASC) include(v3) distributed by(v1) NULL query TT show create index idx1; diff --git a/e2e_test/extended_mode/basic.slt b/e2e_test/extended_mode/basic.slt index 51513a444ec79..7869494979e47 100644 --- a/e2e_test/extended_mode/basic.slt +++ b/e2e_test/extended_mode/basic.slt @@ -39,20 +39,23 @@ values(round(42.4382)); statement ok create table t3 (v1 int, v2 int, v3 int); -query TT +query TTT describe t3; ---- -v1 integer -v2 integer -v3 integer -primary key _row_id - -query TT +v1 integer false +v2 integer false +v3 integer false +_row_id serial true +primary key _row_id NULL +distribution key _row_id NULL + +query TTT show columns from t3; ---- -v1 integer -v2 integer -v3 integer +v1 integer false +v2 integer false +v3 integer false +_row_id serial true statement ok drop table t3; diff --git a/src/frontend/src/catalog/system_catalog/information_schema/columns.rs b/src/frontend/src/catalog/system_catalog/information_schema/columns.rs index 3f8c6726f529c..136451237b52d 100644 --- a/src/frontend/src/catalog/system_catalog/information_schema/columns.rs +++ b/src/frontend/src/catalog/system_catalog/information_schema/columns.rs @@ -63,7 +63,8 @@ pub static INFORMATION_SCHEMA_COLUMNS: LazyLock = LazyLock::new(|| c.udt_type AS udt_name \ FROM rw_catalog.rw_columns c \ LEFT JOIN rw_catalog.rw_relations r ON c.relation_id = r.id \ - JOIN rw_catalog.rw_schemas s ON s.id = r.schema_id\ + JOIN rw_catalog.rw_schemas s ON s.id = r.schema_id \ + WHERE c.is_hidden = false\ " .to_string(), }); diff --git a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_attribute.rs b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_attribute.rs index 4ac5ccbe4de03..ff75778182d0e 100644 --- a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_attribute.rs +++ b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_attribute.rs @@ -53,7 +53,8 @@ pub static PG_ATTRIBUTE: LazyLock = LazyLock::new(|| BuiltinView { ''::varchar AS attidentity, \ ''::varchar AS attgenerated, \ -1 AS atttypmod \ - FROM rw_catalog.rw_columns c\ + FROM rw_catalog.rw_columns c \ + WHERE c.is_hidden = false\ " .to_string(), }); diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_columns.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_columns.rs index a1b867f15eded..dd8cc72ed9bf5 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_columns.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_columns.rs @@ -29,6 +29,9 @@ pub static RW_COLUMNS: LazyLock = LazyLock::new(|| BuiltinTable { (DataType::Int32, "relation_id"), // belonged relation id (DataType::Varchar, "name"), // column name (DataType::Int32, "position"), // 1-indexed position + (DataType::Boolean, "is_hidden"), + (DataType::Boolean, "is_primary_key"), + (DataType::Boolean, "is_distribution_key"), (DataType::Varchar, "data_type"), (DataType::Int32, "type_oid"), (DataType::Int16, "type_len"), @@ -50,6 +53,9 @@ impl SysCatalogReaderImpl { Some(ScalarImpl::Int32(view.id as i32)), Some(ScalarImpl::Utf8(column.name.clone().into())), Some(ScalarImpl::Int32(index as i32 + 1)), + Some(ScalarImpl::Bool(false)), + Some(ScalarImpl::Bool(false)), + Some(ScalarImpl::Bool(false)), Some(ScalarImpl::Utf8(column.data_type().to_string().into())), Some(ScalarImpl::Int32(column.data_type().to_oid())), Some(ScalarImpl::Int16(column.data_type().type_len())), @@ -58,24 +64,49 @@ impl SysCatalogReaderImpl { }) }); + let rows = schema + .iter_system_tables() + .flat_map(|table| { + table + .columns + .iter() + .enumerate() + .map(move |(index, column)| { + OwnedRow::new(vec![ + Some(ScalarImpl::Int32(table.id.table_id as i32)), + Some(ScalarImpl::Utf8(column.name().into())), + Some(ScalarImpl::Int32(index as i32 + 1)), + Some(ScalarImpl::Bool(column.is_hidden)), + Some(ScalarImpl::Bool(table.pk.contains(&index))), + Some(ScalarImpl::Bool(false)), + Some(ScalarImpl::Utf8(column.data_type().to_string().into())), + Some(ScalarImpl::Int32(column.data_type().to_oid())), + Some(ScalarImpl::Int16(column.data_type().type_len())), + Some(ScalarImpl::Utf8(column.data_type().pg_name().into())), + ]) + }) + }) + .chain(view_rows); + schema .iter_valid_table() - .map(|table| (table.id.table_id(), table.columns())) - .chain( - schema - .iter_system_tables() - .map(|table| (table.id.table_id(), table.columns())), - ) - .flat_map(|(id, columns)| { - columns + .flat_map(|table| { + table + .columns .iter() .enumerate() - .filter(|(_, column)| !column.is_hidden()) .map(move |(index, column)| { OwnedRow::new(vec![ - Some(ScalarImpl::Int32(id as i32)), + Some(ScalarImpl::Int32(table.id.table_id as i32)), Some(ScalarImpl::Utf8(column.name().into())), Some(ScalarImpl::Int32(index as i32 + 1)), + Some(ScalarImpl::Bool(column.is_hidden)), + Some(ScalarImpl::Bool( + table.pk().iter().any(|idx| idx.column_index == index), + )), + Some(ScalarImpl::Bool( + table.distribution_key().contains(&index), + )), Some(ScalarImpl::Utf8(column.data_type().to_string().into())), Some(ScalarImpl::Int32(column.data_type().to_oid())), Some(ScalarImpl::Int16(column.data_type().type_len())), @@ -83,7 +114,7 @@ impl SysCatalogReaderImpl { ]) }) }) - .chain(view_rows) + .chain(rows) }) .collect_vec()) } diff --git a/src/frontend/src/handler/describe.rs b/src/frontend/src/handler/describe.rs index f2fb89d02a7dc..4100b9a20be02 100644 --- a/src/frontend/src/handler/describe.rs +++ b/src/frontend/src/handler/describe.rs @@ -18,7 +18,7 @@ use itertools::Itertools; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::types::Row; -use risingwave_common::catalog::ColumnDesc; +use risingwave_common::catalog::{ColumnCatalog, ColumnDesc}; use risingwave_common::error::Result; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{display_comma_separated, ObjectName}; @@ -34,65 +34,66 @@ pub fn handle_describe(handler_args: HandlerArgs, table_name: ObjectName) -> Res let mut binder = Binder::new_for_system(&session); let relation = binder.bind_relation_by_name(table_name.clone(), None, false)?; // For Source, it doesn't have table catalog so use get source to get column descs. - let (columns, pk_columns, indices): (Vec, Vec, Vec>) = { - let (column_catalogs, pk_column_catalogs, indices) = match relation { - Relation::Source(s) => { - let pk_column_catalogs = s - .catalog - .pk_col_ids - .iter() - .map(|&column_id| { - s.catalog - .columns - .iter() - .filter(|x| x.column_id() == column_id) - .exactly_one() - .unwrap() - .clone() - }) - .collect_vec(); - (s.catalog.columns, pk_column_catalogs, vec![]) - } - Relation::BaseTable(t) => { - let pk_column_catalogs = t - .table_catalog - .pk() - .iter() - .map(|x| t.table_catalog.columns[x.column_index].clone()) - .collect_vec(); - (t.table_catalog.columns, pk_column_catalogs, t.table_indexes) - } - Relation::SystemTable(t) => { - let pk_column_catalogs = t - .sys_table_catalog - .pk - .iter() - .map(|idx| t.sys_table_catalog.columns[*idx].clone()) - .collect_vec(); - ( - t.sys_table_catalog.columns.clone(), - pk_column_catalogs, - vec![], - ) - } - _ => { - return Err( - CatalogError::NotFound("table or source", table_name.to_string()).into(), - ); - } - }; - ( - column_catalogs - .into_iter() - .filter(|c| !c.is_hidden) - .map(|c| c.column_desc) - .collect(), - pk_column_catalogs - .into_iter() - .map(|c| c.column_desc) - .collect(), - indices, - ) + let (columns, pk_columns, dist_columns, indices): ( + Vec, + Vec, + Vec, + Vec>, + ) = match relation { + Relation::Source(s) => { + let pk_column_catalogs = s + .catalog + .pk_col_ids + .iter() + .map(|&column_id| { + s.catalog + .columns + .iter() + .filter(|x| x.column_id() == column_id) + .map(|x| x.column_desc.clone()) + .exactly_one() + .unwrap() + }) + .collect_vec(); + (s.catalog.columns, pk_column_catalogs, vec![], vec![]) + } + Relation::BaseTable(t) => { + let pk_column_catalogs = t + .table_catalog + .pk() + .iter() + .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone()) + .collect_vec(); + let dist_columns = t + .table_catalog + .distribution_key() + .iter() + .map(|idx| t.table_catalog.columns[*idx].column_desc.clone()) + .collect_vec(); + ( + t.table_catalog.columns, + pk_column_catalogs, + dist_columns, + t.table_indexes, + ) + } + Relation::SystemTable(t) => { + let pk_column_catalogs = t + .sys_table_catalog + .pk + .iter() + .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone()) + .collect_vec(); + ( + t.sys_table_catalog.columns.clone(), + pk_column_catalogs, + vec![], + vec![], + ) + } + _ => { + return Err(CatalogError::NotFound("table or source", table_name.to_string()).into()); + } }; // Convert all column descs to rows @@ -109,6 +110,22 @@ pub fn handle_describe(handler_args: HandlerArgs, table_name: ObjectName) -> Res ) .into(), ), + None, + ])); + } + + // Convert distribution keys to rows + if !dist_columns.is_empty() { + rows.push(Row::new(vec![ + Some("distribution key".into()), + Some( + display_comma_separated( + &dist_columns.into_iter().map(|col| col.name).collect_vec(), + ) + .to_string() + .into(), + ), + None, ])); } @@ -138,6 +155,7 @@ pub fn handle_describe(handler_args: HandlerArgs, table_name: ObjectName) -> Res .into(), ) }, + None, ]) })); @@ -156,6 +174,11 @@ pub fn handle_describe(handler_args: HandlerArgs, table_name: ObjectName) -> Res DataType::Varchar.to_oid(), DataType::Varchar.type_len(), ), + PgFieldDescriptor::new( + "Is Hidden".to_owned(), + DataType::Varchar.to_oid(), + DataType::Varchar.type_len(), + ), ], ) .into()) @@ -208,6 +231,7 @@ mod tests { "v3".into() => "integer".into(), "v4".into() => "integer".into(), "primary key".into() => "v3".into(), + "distribution key".into() => "v3".into(), "idx1".into() => "index(v1 DESC, v2 ASC, v3 ASC) include(v4) distributed by(v1)".into(), }; diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index 3e06fbbc62a81..15b423e259299 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -18,7 +18,7 @@ use itertools::Itertools; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::types::Row; -use risingwave_common::catalog::{ColumnDesc, DEFAULT_SCHEMA_NAME}; +use risingwave_common::catalog::{ColumnCatalog, DEFAULT_SCHEMA_NAME}; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::DataType; use risingwave_common::util::addr::HostAddr; @@ -40,10 +40,10 @@ use crate::session::SessionImpl; pub fn get_columns_from_table( session: &SessionImpl, table_name: ObjectName, -) -> Result> { +) -> Result> { let mut binder = Binder::new_for_system(session); let relation = binder.bind_relation_by_name(table_name.clone(), None, false)?; - let catalogs = match relation { + let column_catalogs = match relation { Relation::Source(s) => s.catalog.columns, Relation::BaseTable(t) => t.table_catalog.columns, Relation::SystemTable(t) => t.sys_table_catalog.columns.clone(), @@ -52,11 +52,7 @@ pub fn get_columns_from_table( } }; - Ok(catalogs - .into_iter() - .filter(|c| !c.is_hidden) - .map(|c| c.column_desc) - .collect()) + Ok(column_catalogs) } pub fn get_indexes_from_table( @@ -159,6 +155,11 @@ pub async fn handle_show_object( DataType::Varchar.to_oid(), DataType::Varchar.type_len(), ), + PgFieldDescriptor::new( + "Is Hidden".to_owned(), + DataType::Varchar.to_oid(), + DataType::Varchar.type_len(), + ), ], ) .into()); @@ -583,6 +584,8 @@ mod tests { "country.city.zipcode".into() => "character varying".into(), "rate".into() => "real".into(), "country".into() => "test.Country".into(), + "_rw_kafka_timestamp".into() => "timestamp with time zone".into(), + "_row_id".into() => "serial".into(), }; assert_eq!(columns, expected_columns); diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index a5645b38b1c48..66494be928d42 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -26,7 +26,7 @@ use pgwire::pg_server::BoxedError; use pgwire::types::{Format, FormatIterator, Row}; use pin_project_lite::pin_project; use risingwave_common::array::DataChunk; -use risingwave_common::catalog::{ColumnDesc, Field}; +use risingwave_common::catalog::{ColumnCatalog, Field}; use risingwave_common::error::{ErrorCode, Result as RwResult}; use risingwave_common::row::Row as _; use risingwave_common::types::{DataType, ScalarRefImpl, Timestamptz}; @@ -170,11 +170,12 @@ fn to_pg_rows( } /// Convert column descs to rows which conclude name and type -pub fn col_descs_to_rows(columns: Vec) -> Vec { +pub fn col_descs_to_rows(columns: Vec) -> Vec { columns .iter() .flat_map(|col| { - col.flatten() + col.column_desc + .flatten() .into_iter() .map(|c| { let type_name = if let DataType::Struct { .. } = c.data_type { @@ -182,7 +183,11 @@ pub fn col_descs_to_rows(columns: Vec) -> Vec { } else { c.data_type.to_string() }; - Row::new(vec![Some(c.name.into()), Some(type_name.into())]) + Row::new(vec![ + Some(c.name.into()), + Some(type_name.into()), + Some(col.is_hidden.to_string().into()), + ]) }) .collect_vec() }) diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 9f9390cb629c2..f257a06521f33 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -1102,6 +1102,11 @@ fn infer(bound: Option, stmt: Statement) -> Result Ok(vec![PgFieldDescriptor::new( "Name".to_owned(), @@ -1160,6 +1165,11 @@ fn infer(bound: Option, stmt: Statement) -> Result Ok(vec![PgFieldDescriptor::new( "QUERY PLAN".to_owned(),