From 91d97acbfaae47f95b9ae40984a74ab14b948d49 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Fri, 23 Feb 2024 11:30:38 +0800 Subject: [PATCH] refactor(frontend): use `#[derive(Fields)]` in statement handlers (#15130) Signed-off-by: Runji Wang --- src/common/fields-derive/src/lib.rs | 58 +++- src/frontend/src/handler/cancel_job.rs | 24 +- src/frontend/src/handler/describe.rs | 118 +++---- src/frontend/src/handler/explain.rs | 30 +- src/frontend/src/handler/mod.rs | 44 ++- src/frontend/src/handler/show.rs | 314 +++++++++++------- src/frontend/src/handler/transaction.rs | 21 +- src/frontend/src/handler/util.rs | 65 +--- src/frontend/src/handler/variable.rs | 104 +++--- src/frontend/src/session.rs | 40 +-- src/frontend/src/utils/infer_stmt_row_desc.rs | 253 -------------- src/frontend/src/utils/mod.rs | 1 - 12 files changed, 449 insertions(+), 623 deletions(-) delete mode 100644 src/frontend/src/utils/infer_stmt_row_desc.rs diff --git a/src/common/fields-derive/src/lib.rs b/src/common/fields-derive/src/lib.rs index 86fa229a5adc..b38f57975168 100644 --- a/src/common/fields-derive/src/lib.rs +++ b/src/common/fields-derive/src/lib.rs @@ -16,7 +16,7 @@ use proc_macro2::TokenStream; use quote::quote; use syn::{Data, DeriveInput, Result}; -#[proc_macro_derive(Fields, attributes(primary_key))] +#[proc_macro_derive(Fields, attributes(primary_key, fields))] pub fn fields(tokens: proc_macro::TokenStream) -> proc_macro::TokenStream { inner(tokens.into()).into() } @@ -46,6 +46,16 @@ fn gen(tokens: TokenStream) -> Result { )); }; + let style = get_style(&input); + if let Some(style) = &style { + if !["Title Case", "TITLE CASE", "snake_case"].contains(&style.value().as_str()) { + return Err(syn::Error::new_spanned( + style, + "only `Title Case`, `TITLE CASE`, and `snake_case` are supported", + )); + } + } + let fields_rw: Vec = struct_ .fields .iter() @@ -55,6 +65,12 @@ fn gen(tokens: TokenStream) -> Result { if name.starts_with("r#") { name = name[2..].to_string(); } + // cast style + match style.as_ref().map_or(String::new(), |f| f.value()).as_str() { + "Title Case" => name = to_title_case(&name), + "TITLE CASE" => name = to_title_case(&name).to_uppercase(), + _ => {} + } let ty = &field.ty; quote! { (#name, <#ty as ::risingwave_common::types::WithDataType>::default_data_type()) @@ -132,6 +148,46 @@ fn get_primary_key(input: &syn::DeriveInput) -> Option> { None } +/// Get name style from `#[fields(style = "xxx")]` attribute. +fn get_style(input: &syn::DeriveInput) -> Option { + let style = input.attrs.iter().find_map(|attr| match &attr.meta { + syn::Meta::List(list) if list.path.is_ident("fields") => { + let name_value: syn::MetaNameValue = syn::parse2(list.tokens.clone()).ok()?; + if name_value.path.is_ident("style") { + Some(name_value.value) + } else { + None + } + } + _ => None, + })?; + match style { + syn::Expr::Lit(lit) => match lit.lit { + syn::Lit::Str(s) => Some(s), + _ => None, + }, + _ => None, + } +} + +/// Convert `snake_case` to `Title Case`. +fn to_title_case(s: &str) -> String { + let mut title = String::new(); + let mut next_upper = true; + for c in s.chars() { + if c == '_' { + title.push(' '); + next_upper = true; + } else if next_upper { + title.push(c.to_uppercase().next().unwrap()); + next_upper = false; + } else { + title.push(c); + } + } + title +} + #[cfg(test)] mod tests { use indoc::indoc; diff --git a/src/frontend/src/handler/cancel_job.rs b/src/frontend/src/handler/cancel_job.rs index f124a2a030bd..278e01e3e1bc 100644 --- a/src/frontend/src/handler/cancel_job.rs +++ b/src/frontend/src/handler/cancel_job.rs @@ -12,14 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use itertools::Itertools; -use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; -use pgwire::types::Row; -use risingwave_common::types::DataType; +use risingwave_common::types::Fields; use risingwave_pb::meta::cancel_creating_jobs_request::{CreatingJobIds, PbJobs}; use risingwave_sqlparser::ast::JobIdents; +use super::RwPgResponseBuilderExt; use crate::error::Result; use crate::handler::{HandlerArgs, RwPgResponse}; @@ -36,16 +34,14 @@ pub(super) async fn handle_cancel( .await?; let rows = canceled_jobs .into_iter() - .map(|id| Row::new(vec![Some(id.to_string().into())])) - .collect_vec(); + .map(|id| CancelRow { id: id.to_string() }); Ok(PgResponse::builder(StatementType::CANCEL_COMMAND) - .values( - rows.into(), - vec![PgFieldDescriptor::new( - "Id".to_string(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - )], - ) + .rows(rows) .into()) } + +#[derive(Fields)] +#[fields(style = "Title Case")] +struct CancelRow { + id: String, +} diff --git a/src/frontend/src/handler/describe.rs b/src/frontend/src/handler/describe.rs index ef1a601cca59..36cff2e20e2b 100644 --- a/src/frontend/src/handler/describe.rs +++ b/src/frontend/src/handler/describe.rs @@ -17,17 +17,16 @@ use std::fmt::Display; use itertools::Itertools; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; -use pgwire::types::Row; use risingwave_common::catalog::{ColumnCatalog, ColumnDesc}; -use risingwave_common::types::DataType; +use risingwave_common::types::Fields; use risingwave_sqlparser::ast::{display_comma_separated, ObjectName}; -use super::RwPgResponse; +use super::show::ShowColumnRow; +use super::{fields_to_descriptors, RwPgResponse}; use crate::binder::{Binder, Relation}; use crate::catalog::CatalogError; use crate::error::Result; -use crate::handler::util::col_descs_to_rows; -use crate::handler::HandlerArgs; +use crate::handler::{HandlerArgs, RwPgResponseBuilderExt}; pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Result { let session = handler_args.session; @@ -156,7 +155,10 @@ pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Re }; // Convert all column descs to rows - let mut rows = col_descs_to_rows(columns); + let mut rows = columns + .into_iter() + .flat_map(ShowColumnRow::from_catalog) + .collect_vec(); fn concat(display_elems: impl IntoIterator) -> String where @@ -170,96 +172,68 @@ pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Re // Convert primary key to rows if !pk_columns.is_empty() { - rows.push(Row::new(vec![ - Some("primary key".into()), - Some(concat(pk_columns.iter().map(|x| &x.name)).into()), - None, // Is Hidden - None, // Description - ])); + rows.push(ShowColumnRow { + name: "primary key".into(), + r#type: concat(pk_columns.iter().map(|x| &x.name)), + is_hidden: None, + description: None, + }); } // Convert distribution keys to rows if !dist_columns.is_empty() { - rows.push(Row::new(vec![ - Some("distribution key".into()), - Some(concat(dist_columns.iter().map(|x| &x.name)).into()), - None, // Is Hidden - None, // Description - ])); + rows.push(ShowColumnRow { + name: "distribution key".into(), + r#type: concat(dist_columns.iter().map(|x| &x.name)), + is_hidden: None, + description: None, + }); } // Convert all indexes to rows rows.extend(indices.iter().map(|index| { let index_display = index.display(); - Row::new(vec![ - Some(index.name.clone().into()), - if index_display.include_columns.is_empty() { - Some( - format!( - "index({}) distributed by({})", - display_comma_separated(&index_display.index_columns_with_ordering), - display_comma_separated(&index_display.distributed_by_columns), - ) - .into(), + ShowColumnRow { + name: index.name.clone(), + r#type: if index_display.include_columns.is_empty() { + format!( + "index({}) distributed by({})", + display_comma_separated(&index_display.index_columns_with_ordering), + display_comma_separated(&index_display.distributed_by_columns), ) } else { - Some( - format!( - "index({}) include({}) distributed by({})", - display_comma_separated(&index_display.index_columns_with_ordering), - display_comma_separated(&index_display.include_columns), - display_comma_separated(&index_display.distributed_by_columns), - ) - .into(), + format!( + "index({}) include({}) distributed by({})", + display_comma_separated(&index_display.index_columns_with_ordering), + display_comma_separated(&index_display.include_columns), + display_comma_separated(&index_display.distributed_by_columns), ) }, - // Is Hidden - None, - // Description + is_hidden: None, // TODO: index description - None, - ]) + description: None, + } })); - rows.push(Row::new(vec![ - Some("table description".into()), - Some(relname.into()), - None, // Is Hidden - description.map(Into::into), // Description - ])); + rows.push(ShowColumnRow { + name: "table description".into(), + r#type: relname, + is_hidden: None, + description: description.map(Into::into), + }); // TODO: table name and description as title of response // TODO: recover the original user statement Ok(PgResponse::builder(StatementType::DESCRIBE) - .values( - rows.into(), - vec![ - PgFieldDescriptor::new( - "Name".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Type".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Is Hidden".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Description".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - ], - ) + .rows(rows) .into()) } +pub fn infer_describe() -> Vec { + fields_to_descriptors(ShowColumnRow::fields()) +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index c25bf7678bd0..b966cca8f50c 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -12,12 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use itertools::Itertools; -use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; -use pgwire::types::Row; use risingwave_common::bail_not_implemented; -use risingwave_common::types::DataType; +use risingwave_common::types::Fields; use risingwave_sqlparser::ast::{ExplainOptions, ExplainType, Statement}; use thiserror_ext::AsReport; @@ -27,7 +24,7 @@ use super::create_sink::{gen_sink_plan, get_partition_compute_info}; use super::create_table::ColumnIdGenerator; use super::query::gen_batch_plan_by_statement; use super::util::SourceSchemaCompatExt; -use super::RwPgResponse; +use super::{RwPgResponse, RwPgResponseBuilderExt}; use crate::error::{ErrorCode, Result}; use crate::handler::create_table::handle_create_table_plan; use crate::handler::HandlerArgs; @@ -254,20 +251,17 @@ pub async fn handle_explain( } } - let rows = blocks - .iter() - .flat_map(|b| b.lines().map(|l| l.to_owned())) - .map(|l| Row::new(vec![Some(l.into())])) - .collect_vec(); + let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow { + query_plan: l.into(), + }); Ok(PgResponse::builder(StatementType::EXPLAIN) - .values( - rows.into(), - vec![PgFieldDescriptor::new( - "QUERY PLAN".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - )], - ) + .rows(rows) .into()) } + +#[derive(Fields)] +#[fields(style = "TITLE CASE")] +struct ExplainRow { + query_plan: String, +} diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 3cdc4b191da9..827f28f87319 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -18,11 +18,15 @@ use std::task::{Context, Poll}; use futures::stream::{self, BoxStream}; use futures::{Stream, StreamExt}; +use itertools::Itertools; +use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, START_TRANSACTION}; use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult}; use pgwire::pg_server::BoxedError; use pgwire::types::{Format, Row}; use risingwave_common::bail_not_implemented; +use risingwave_common::types::Fields; +use risingwave_common::util::iter_util::ZipEqFast; use risingwave_sqlparser::ast::*; use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt}; @@ -59,7 +63,7 @@ pub mod create_table; pub mod create_table_as; pub mod create_user; pub mod create_view; -mod describe; +pub mod describe; mod drop_connection; mod drop_database; pub mod drop_function; @@ -78,7 +82,7 @@ pub mod handle_privilege; mod kill_process; pub mod privilege; pub mod query; -mod show; +pub mod show; mod transaction; pub mod util; pub mod variable; @@ -90,6 +94,42 @@ pub type RwPgResponseBuilder = PgResponseBuilder; /// The [`PgResponse`] used by RisingWave. pub type RwPgResponse = PgResponse; +#[easy_ext::ext(RwPgResponseBuilderExt)] +impl RwPgResponseBuilder { + /// Append rows to the response. + pub fn rows(self, rows: impl IntoIterator) -> Self { + let fields = T::fields(); + self.values( + rows.into_iter() + .map(|row| { + Row::new( + row.into_owned_row() + .into_iter() + .zip_eq_fast(&fields) + .map(|(datum, (_, ty))| { + datum.map(|scalar| { + scalar.as_scalar_ref_impl().text_format(ty).into() + }) + }) + .collect(), + ) + }) + .collect_vec() + .into(), + fields_to_descriptors(fields), + ) + } +} + +pub fn fields_to_descriptors( + fields: Vec<(&str, risingwave_common::types::DataType)>, +) -> Vec { + fields + .iter() + .map(|(name, ty)| PgFieldDescriptor::new(name.to_string(), ty.to_oid(), ty.type_len())) + .collect() +} + pub enum PgResponseStream { LocalQuery(DataChunkToRowSetAdapter), DistributedQuery(DataChunkToRowSetAdapter), diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index 4a98b6c7cd33..226a219a1188 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -19,27 +19,24 @@ use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_protocol::truncated_fmt; use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::pg_server::Session; -use pgwire::types::Row; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, DEFAULT_SCHEMA_NAME}; -use risingwave_common::types::DataType; +use risingwave_common::types::{DataType, Fields}; use risingwave_common::util::addr::HostAddr; use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION; use risingwave_expr::scalar::like::{i_like_default, like_default}; use risingwave_pb::catalog::connection; use risingwave_sqlparser::ast::{ - Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, + display_comma_separated, Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, }; use serde_json; -use super::RwPgResponse; +use super::{fields_to_descriptors, RwPgResponse, RwPgResponseBuilderExt}; use crate::binder::{Binder, Relation}; use crate::catalog::{CatalogError, IndexCatalog}; use crate::error::Result; -use crate::handler::util::{col_descs_to_rows, indexes_to_rows}; use crate::handler::HandlerArgs; use crate::session::SessionImpl; -use crate::utils::infer_stmt_row_desc::infer_show_object; pub fn get_columns_from_table( session: &SessionImpl, @@ -109,6 +106,136 @@ fn schema_or_default(schema: &Option) -> String { .map_or_else(|| DEFAULT_SCHEMA_NAME.to_string(), |s| s.real_value()) } +#[derive(Fields)] +#[fields(style = "Title Case")] +struct ShowObjectRow { + name: String, +} + +#[derive(Fields)] +#[fields(style = "Title Case")] +pub struct ShowColumnRow { + pub name: String, + pub r#type: String, + pub is_hidden: Option, + pub description: Option, +} + +impl ShowColumnRow { + pub fn from_catalog(col: ColumnCatalog) -> Vec { + col.column_desc + .flatten() + .into_iter() + .map(|c| { + let type_name = if let DataType::Struct { .. } = c.data_type { + c.type_name.clone() + } else { + c.data_type.to_string() + }; + ShowColumnRow { + name: c.name, + r#type: type_name, + is_hidden: Some(col.is_hidden.to_string()), + description: c.description, + } + }) + .collect() + } +} + +#[derive(Fields)] +#[fields(style = "Title Case")] +struct ShowConnectionRow { + name: String, + r#type: String, + properties: String, +} + +#[derive(Fields)] +#[fields(style = "Title Case")] +struct ShowFunctionRow { + name: String, + arguments: String, + return_type: String, + language: String, + link: Option, +} + +#[derive(Fields)] +#[fields(style = "Title Case")] +struct ShowIndexRow { + name: String, + on: String, + key: String, + include: String, + distributed_by: String, +} + +impl From> for ShowIndexRow { + fn from(index: Arc) -> Self { + let index_display = index.display(); + ShowIndexRow { + name: index.name.clone(), + on: index.primary_table.name.clone(), + key: display_comma_separated(&index_display.index_columns_with_ordering).to_string(), + include: display_comma_separated(&index_display.include_columns).to_string(), + distributed_by: display_comma_separated(&index_display.distributed_by_columns) + .to_string(), + } + } +} + +#[derive(Fields)] +#[fields(style = "Title Case")] +struct ShowClusterRow { + addr: String, + state: String, + parallel_units: String, + is_streaming: String, + is_serving: String, + is_unschedulable: String, +} + +#[derive(Fields)] +#[fields(style = "Title Case")] +struct ShowJobRow { + id: i64, + statement: String, + progress: String, +} + +#[derive(Fields)] +#[fields(style = "Title Case")] +struct ShowProcessListRow { + id: String, + user: String, + host: String, + database: String, + time: Option, + info: Option, +} + +#[derive(Fields)] +#[fields(style = "Title Case")] +struct ShowCreateObjectRow { + name: String, + create_sql: String, +} + +/// Infer the row description for different show objects. +pub fn infer_show_object(objects: &ShowObject) -> Vec { + fields_to_descriptors(match objects { + ShowObject::Columns { .. } => ShowColumnRow::fields(), + ShowObject::Connection { .. } => ShowConnectionRow::fields(), + ShowObject::Function { .. } => ShowFunctionRow::fields(), + ShowObject::Indexes { .. } => ShowIndexRow::fields(), + ShowObject::Cluster => ShowClusterRow::fields(), + ShowObject::Jobs => ShowJobRow::fields(), + ShowObject::ProcessList => ShowProcessListRow::fields(), + _ => ShowObjectRow::fields(), + }) +} + pub async fn handle_show_object( handler_args: HandlerArgs, command: ShowObject, @@ -119,7 +246,6 @@ pub async fn handle_show_object( if let Some(ShowStatementFilter::Where(..)) = filter { bail_not_implemented!("WHERE clause in SHOW statement"); } - let row_desc = infer_show_object(&command); let catalog_reader = session.env().catalog_reader(); @@ -178,18 +304,15 @@ pub async fn handle_show_object( .into()); }; - let rows = col_descs_to_rows(columns); - return Ok(PgResponse::builder(StatementType::SHOW_COMMAND) - .values(rows.into(), row_desc) + .rows(columns.into_iter().flat_map(ShowColumnRow::from_catalog)) .into()); } ShowObject::Indexes { table } => { let indexes = get_indexes_from_table(&session, table)?; - let rows = indexes_to_rows(indexes); return Ok(PgResponse::builder(StatementType::SHOW_COMMAND) - .values(rows.into(), row_desc) + .rows(indexes.into_iter().map(ShowIndexRow::from)) .into()); } ShowObject::Connection { schema } => { @@ -200,7 +323,7 @@ pub async fn handle_show_object( .iter_connections() .map(|c| { let name = c.name.clone(); - let conn_type = match &c.info { + let r#type = match &c.info { connection::Info::PrivateLinkService(_) => { PRIVATELINK_CONNECTION.to_string() }, @@ -230,105 +353,81 @@ pub async fn handle_show_object( ) } }; - Row::new(vec![ - Some(name.into()), - Some(conn_type.into()), - Some(properties.into()), - ]) - }) - .collect_vec(); + ShowConnectionRow { + name, + r#type, + properties, + } + }); return Ok(PgResponse::builder(StatementType::SHOW_COMMAND) - .values(rows.into(), row_desc) + .rows(rows) .into()); } ShowObject::Function { schema } => { - let rows = catalog_reader - .read_guard() + let reader = catalog_reader.read_guard(); + let rows = reader .get_schema_by_name(session.database(), &schema_or_default(&schema))? .iter_function() - .map(|t| { - Row::new(vec![ - Some(t.name.clone().into()), - Some(t.arg_types.iter().map(|t| t.to_string()).join(", ").into()), - Some(t.return_type.to_string().into()), - Some(t.language.clone().into()), - t.link.clone().map(Into::into), - ]) - }) - .collect_vec(); + .map(|t| ShowFunctionRow { + name: t.name.clone(), + arguments: t.arg_types.iter().map(|t| t.to_string()).join(", "), + return_type: t.return_type.to_string(), + language: t.language.clone(), + link: t.link.clone(), + }); return Ok(PgResponse::builder(StatementType::SHOW_COMMAND) - .values(rows.into(), row_desc) + .rows(rows) .into()); } ShowObject::Cluster => { let workers = session.env().worker_node_manager().list_worker_nodes(); - let rows = workers - .into_iter() - .map(|worker| { - let addr: HostAddr = worker.host.as_ref().unwrap().into(); - let property = worker.property.as_ref().unwrap(); - Row::new(vec![ - Some(addr.to_string().into()), - Some(worker.get_state().unwrap().as_str_name().into()), - Some( - worker - .parallel_units - .into_iter() - .map(|pu| pu.id) - .join(", ") - .into(), - ), - Some(property.is_streaming.to_string().into()), - Some(property.is_serving.to_string().into()), - Some(property.is_unschedulable.to_string().into()), - ]) - }) - .collect_vec(); + let rows = workers.into_iter().map(|worker| { + let addr: HostAddr = worker.host.as_ref().unwrap().into(); + let property = worker.property.as_ref().unwrap(); + ShowClusterRow { + addr: addr.to_string(), + state: worker.get_state().unwrap().as_str_name().to_string(), + parallel_units: worker.parallel_units.into_iter().map(|pu| pu.id).join(", "), + is_streaming: property.is_streaming.to_string(), + is_serving: property.is_serving.to_string(), + is_unschedulable: property.is_unschedulable.to_string(), + } + }); return Ok(PgResponse::builder(StatementType::SHOW_COMMAND) - .values(rows.into(), row_desc) + .rows(rows) .into()); } ShowObject::Jobs => { let resp = session.env().meta_client().list_ddl_progress().await?; - let rows = resp - .into_iter() - .map(|job| { - Row::new(vec![ - Some(job.id.to_string().into()), - Some(job.statement.into()), - Some(job.progress.into()), - ]) - }) - .collect_vec(); + let rows = resp.into_iter().map(|job| ShowJobRow { + id: job.id as i64, + statement: job.statement, + progress: job.progress, + }); return Ok(PgResponse::builder(StatementType::SHOW_COMMAND) - .values(rows.into(), row_desc) + .rows(rows) .into()); } ShowObject::ProcessList => { - let rows = { - let sessions_map = session.env().sessions_map(); - sessions_map - .read() - .values() - .map(|s| { - Row::new(vec![ - // Since process id and the secret id in the session id are the same in RisingWave, just display the process id. - Some(format!("{}", s.id().0).into()), - Some(s.user_name().to_owned().into()), - Some(format!("{}", s.peer_addr()).into()), - Some(s.database().to_owned().into()), - s.elapse_since_running_sql() - .map(|mills| format!("{}ms", mills).into()), - s.running_sql().map(|sql| { - format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024)).into() - }), - ]) - }) - .collect_vec() - }; + let sessions_map = session.env().sessions_map().read(); + let rows = sessions_map.values().map(|s| { + ShowProcessListRow { + // Since process id and the secret id in the session id are the same in RisingWave, just display the process id. + id: format!("{}", s.id().0), + user: s.user_name().to_owned(), + host: format!("{}", s.peer_addr()), + database: s.database().to_owned(), + time: s + .elapse_since_running_sql() + .map(|mills| format!("{}ms", mills)), + info: s + .running_sql() + .map(|sql| format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024))), + } + }); return Ok(PgResponse::builder(StatementType::SHOW_COMMAND) - .values(rows.into(), row_desc) + .rows(rows) .into()); } }; @@ -341,21 +440,17 @@ pub async fn handle_show_object( Some(ShowStatementFilter::Where(..)) => unreachable!(), None => true, }) - .map(|n| Row::new(vec![Some(n.into())])) - .collect_vec(); + .map(|name| ShowObjectRow { name }); Ok(PgResponse::builder(StatementType::SHOW_COMMAND) - .values( - rows.into(), - vec![PgFieldDescriptor::new( - "Name".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - )], - ) + .rows(rows) .into()) } +pub fn infer_show_create_object() -> Vec { + fields_to_descriptors(ShowCreateObjectRow::fields()) +} + pub fn handle_show_create_object( handle_args: HandlerArgs, show_create_type: ShowCreateType, @@ -415,21 +510,10 @@ pub fn handle_show_create_object( let name = format!("{}.{}", schema_name, object_name); Ok(PgResponse::builder(StatementType::SHOW_COMMAND) - .values( - vec![Row::new(vec![Some(name.into()), Some(sql.into())])].into(), - vec![ - PgFieldDescriptor::new( - "Name".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Create Sql".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - ], - ) + .rows([ShowCreateObjectRow { + name, + create_sql: sql, + }]) .into()) } diff --git a/src/frontend/src/handler/transaction.rs b/src/frontend/src/handler/transaction.rs index 452cfe0ed929..8ab7af36c29c 100644 --- a/src/frontend/src/handler/transaction.rs +++ b/src/frontend/src/handler/transaction.rs @@ -13,14 +13,13 @@ // limitations under the License. use pgwire::pg_response::StatementType; -use pgwire::types::Row; use risingwave_common::bail_not_implemented; +use risingwave_common::types::Fields; use risingwave_sqlparser::ast::{TransactionAccessMode, TransactionMode, Value}; -use super::{HandlerArgs, RwPgResponse}; +use super::{HandlerArgs, RwPgResponse, RwPgResponseBuilderExt}; use crate::error::Result; use crate::session::transaction::AccessMode; -use crate::utils::infer_stmt_row_desc::infer_show_variable; macro_rules! not_impl { ($body:expr) => { @@ -118,16 +117,20 @@ pub async fn handle_set( .into()) } +#[derive(Fields)] +#[fields(style = "Title Case")] +struct ShowVariableRow { + name: String, +} + pub fn handle_show_isolation_level(handler_args: HandlerArgs) -> Result { let config_reader = handler_args.session.config(); - let parameter_name = "transaction_isolation"; - let row_desc = infer_show_variable(parameter_name); - let rows = vec![Row::new(vec![Some( - config_reader.get(parameter_name)?.into(), - )])]; + let rows = [ShowVariableRow { + name: config_reader.get("transaction_isolation")?, + }]; Ok(RwPgResponse::builder(StatementType::SHOW_VARIABLE) - .values(rows.into(), row_desc) + .rows(rows) .into()) } diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 6e91cf53f0b3..1e49ee8baf54 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -27,14 +27,13 @@ use pgwire::pg_server::BoxedError; use pgwire::types::{Format, FormatIterator, Row}; use pin_project_lite::pin_project; use risingwave_common::array::DataChunk; -use risingwave_common::catalog::{ColumnCatalog, Field}; +use risingwave_common::catalog::Field; use risingwave_common::row::Row as _; use risingwave_common::types::{DataType, ScalarRefImpl, Timestamptz}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector::source::KAFKA_CONNECTOR; -use risingwave_sqlparser::ast::{display_comma_separated, CompatibleSourceSchema, ConnectorSchema}; +use risingwave_sqlparser::ast::{CompatibleSourceSchema, ConnectorSchema}; -use crate::catalog::IndexCatalog; use crate::error::{ErrorCode, Result as RwResult}; use crate::handler::create_source::UPSTREAM_SOURCE_KEY; use crate::session::{current, SessionImpl}; @@ -172,66 +171,6 @@ fn to_pg_rows( .try_collect() } -/// Convert column descs to rows which conclude name and type -pub fn col_descs_to_rows(columns: Vec) -> Vec { - columns - .iter() - .flat_map(|col| { - col.column_desc - .flatten() - .into_iter() - .map(|c| { - let type_name = if let DataType::Struct { .. } = c.data_type { - c.type_name.clone() - } else { - c.data_type.to_string() - }; - Row::new(vec![ - Some(c.name.into()), - Some(type_name.into()), - Some(col.is_hidden.to_string().into()), - c.description.map(Into::into), - ]) - }) - .collect_vec() - }) - .collect_vec() -} - -pub fn indexes_to_rows(indexes: Vec>) -> Vec { - indexes - .iter() - .map(|index| { - let index_display = index.display(); - Row::new(vec![ - Some(index.name.clone().into()), - Some(index.primary_table.name.clone().into()), - Some( - format!( - "{}", - display_comma_separated(&index_display.index_columns_with_ordering) - ) - .into(), - ), - Some( - format!( - "{}", - display_comma_separated(&index_display.include_columns) - ) - .into(), - ), - Some( - format!( - "{}", - display_comma_separated(&index_display.distributed_by_columns) - ) - .into(), - ), - ]) - }) - .collect_vec() -} - /// Convert from [`Field`] to [`PgFieldDescriptor`]. pub fn to_pg_field(f: &Field) -> PgFieldDescriptor { PgFieldDescriptor::new( diff --git a/src/frontend/src/handler/variable.rs b/src/frontend/src/handler/variable.rs index 9b4828b23283..96fd232215cc 100644 --- a/src/frontend/src/handler/variable.rs +++ b/src/frontend/src/handler/variable.rs @@ -14,19 +14,18 @@ use anyhow::Context; use itertools::Itertools; +use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_protocol::ParameterStatus; use pgwire::pg_response::{PgResponse, StatementType}; -use pgwire::types::Row; use risingwave_common::session_config::{ConfigReporter, SESSION_CONFIG_LIST_SEP}; use risingwave_common::system_param::reader::SystemParamsRead; -use risingwave_common::types::{DataType, ScalarRefImpl}; +use risingwave_common::types::Fields; use risingwave_sqlparser::ast::{Ident, SetTimeZoneValue, SetVariableValue, Value}; use risingwave_sqlparser::keywords::Keyword; -use super::RwPgResponse; +use super::{fields_to_descriptors, RwPgResponse, RwPgResponseBuilderExt}; use crate::error::Result; use crate::handler::HandlerArgs; -use crate::utils::infer_stmt_row_desc::infer_show_variable; /// convert `SetVariableValue` to string while remove the quotes on literals. pub(crate) fn set_var_to_param_str(value: &SetVariableValue) -> Option { @@ -117,40 +116,36 @@ pub(super) async fn handle_show( ) -> Result { // TODO: Verify that the name used in `show` command is indeed always case-insensitive. let name = variable.iter().map(|e| e.real_value()).join(" "); - let row_desc = infer_show_variable(&name); - let rows = if name.eq_ignore_ascii_case("PARAMETERS") { - handle_show_system_params(handler_args).await? + if name.eq_ignore_ascii_case("PARAMETERS") { + handle_show_system_params(handler_args).await } else if name.eq_ignore_ascii_case("ALL") { - handle_show_all(handler_args.clone())? + handle_show_all(handler_args.clone()) } else { let config_reader = handler_args.session.config(); - vec![Row::new(vec![Some(config_reader.get(&name)?.into())])] - }; - - Ok(PgResponse::builder(StatementType::SHOW_VARIABLE) - .values(rows.into(), row_desc) - .into()) + Ok(PgResponse::builder(StatementType::SHOW_VARIABLE) + .rows([ShowVariableRow { + name: config_reader.get(&name)?, + }]) + .into()) + } } -fn handle_show_all(handler_args: HandlerArgs) -> Result> { +fn handle_show_all(handler_args: HandlerArgs) -> Result { let config_reader = handler_args.session.config(); let all_variables = config_reader.show_all(); - let rows = all_variables - .iter() - .map(|info| { - Row::new(vec![ - Some(info.name.clone().into()), - Some(info.setting.clone().into()), - Some(info.description.clone().into()), - ]) - }) - .collect_vec(); - Ok(rows) + let rows = all_variables.iter().map(|info| ShowVariableAllRow { + name: info.name.clone(), + setting: info.setting.clone(), + description: info.description.clone(), + }); + Ok(PgResponse::builder(StatementType::SHOW_VARIABLE) + .rows(rows) + .into()) } -async fn handle_show_system_params(handler_args: HandlerArgs) -> Result> { +async fn handle_show_system_params(handler_args: HandlerArgs) -> Result { let params = handler_args .session .env() @@ -160,17 +155,46 @@ async fn handle_show_system_params(handler_args: HandlerArgs) -> Result let rows = params .get_all() .into_iter() - .map(|info| { - let is_mutable_bytes = ScalarRefImpl::Bool(info.mutable) - .text_format(&DataType::Boolean) - .into(); - Row::new(vec![ - Some(info.name.into()), - Some(info.value.into()), - Some(info.description.into()), - Some(is_mutable_bytes), - ]) - }) - .collect_vec(); - Ok(rows) + .map(|info| ShowVariableParamsRow { + name: info.name.into(), + value: info.value, + description: info.description.into(), + mutable: info.mutable, + }); + Ok(PgResponse::builder(StatementType::SHOW_VARIABLE) + .rows(rows) + .into()) +} + +pub fn infer_show_variable(name: &str) -> Vec { + fields_to_descriptors(if name.eq_ignore_ascii_case("ALL") { + ShowVariableAllRow::fields() + } else if name.eq_ignore_ascii_case("PARAMETERS") { + ShowVariableParamsRow::fields() + } else { + ShowVariableRow::fields() + }) +} + +#[derive(Fields)] +#[fields(style = "Title Case")] +struct ShowVariableRow { + name: String, +} + +#[derive(Fields)] +#[fields(style = "Title Case")] +struct ShowVariableAllRow { + name: String, + setting: String, + description: String, +} + +#[derive(Fields)] +#[fields(style = "Title Case")] +struct ShowVariableParamsRow { + name: String, + value: String, + description: String, + mutable: bool, } diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 460c97853583..67a5da01e121 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -81,11 +81,14 @@ use crate::catalog::{ check_schema_writable, CatalogError, DatabaseId, OwnedByUserCatalog, SchemaId, }; use crate::error::{ErrorCode, Result, RwError}; +use crate::handler::describe::infer_describe; use crate::handler::extended_handle::{ handle_bind, handle_execute, handle_parse, Portal, PrepareStatement, }; use crate::handler::privilege::ObjectCheckItem; +use crate::handler::show::{infer_show_create_object, infer_show_object}; use crate::handler::util::to_pg_field; +use crate::handler::variable::infer_show_variable; use crate::handler::{handle, RwPgResponse}; use crate::health_service::HealthServiceImpl; use crate::meta_client::{FrontendMetaClient, FrontendMetaClientImpl}; @@ -102,7 +105,6 @@ use crate::user::user_authentication::md5_hash_with_salt; use crate::user::user_manager::UserInfoManager; use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl}; use crate::user::UserId; -use crate::utils::infer_stmt_row_desc::{infer_show_object, infer_show_variable}; use crate::{FrontendOpts, PgResponseStream}; pub(crate) mod current; @@ -1242,18 +1244,7 @@ fn infer(bound: Option, stmt: Statement) -> Result Ok(infer_show_object(&show_object)), - Statement::ShowCreateObject { .. } => Ok(vec![ - PgFieldDescriptor::new( - "Name".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Create Sql".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - ]), + Statement::ShowCreateObject { .. } => Ok(infer_show_create_object()), Statement::ShowTransactionIsolationLevel => { let name = "transaction_isolation"; Ok(infer_show_variable(name)) @@ -1262,28 +1253,7 @@ fn infer(bound: Option, stmt: Statement) -> Result Ok(vec![ - PgFieldDescriptor::new( - "Name".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Type".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Is Hidden".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Description".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - ]), + Statement::Describe { name: _ } => Ok(infer_describe()), Statement::Explain { .. } => Ok(vec![PgFieldDescriptor::new( "QUERY PLAN".to_owned(), DataType::Varchar.to_oid(), diff --git a/src/frontend/src/utils/infer_stmt_row_desc.rs b/src/frontend/src/utils/infer_stmt_row_desc.rs deleted file mode 100644 index 690b2bf81872..000000000000 --- a/src/frontend/src/utils/infer_stmt_row_desc.rs +++ /dev/null @@ -1,253 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use pgwire::pg_field_descriptor::PgFieldDescriptor; -use risingwave_common::types::DataType; -use risingwave_sqlparser::ast::ShowObject; - -/// `infer_stmt_row_desc` is used to infer the row description for different show objects. -pub fn infer_show_object(objects: &ShowObject) -> Vec { - match objects { - ShowObject::Columns { .. } => vec![ - PgFieldDescriptor::new( - "Name".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Type".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Is Hidden".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Description".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - ], - ShowObject::Connection { .. } => vec![ - PgFieldDescriptor::new( - "Name".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Type".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Properties".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - ], - ShowObject::Function { .. } => vec![ - PgFieldDescriptor::new( - "Name".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Arguments".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Return Type".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Language".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Link".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - ], - ShowObject::Indexes { .. } => vec![ - PgFieldDescriptor::new( - "Name".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "On".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Key".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Include".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Distributed By".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - ], - ShowObject::Cluster => vec![ - PgFieldDescriptor::new( - "Addr".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "State".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Parallel Units".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Is Streaming".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Is Serving".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Is Unschedulable".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - ], - ShowObject::Jobs => vec![ - PgFieldDescriptor::new( - "Id".to_owned(), - DataType::Int64.to_oid(), - DataType::Int64.type_len(), - ), - PgFieldDescriptor::new( - "Statement".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Progress".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - ], - ShowObject::ProcessList => vec![ - PgFieldDescriptor::new( - "Id".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "User".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Host".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Database".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Time".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Info".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - ], - _ => vec![PgFieldDescriptor::new( - "Name".to_owned(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - )], - } -} - -pub fn infer_show_variable(name: &str) -> Vec { - if name.eq_ignore_ascii_case("ALL") { - vec![ - PgFieldDescriptor::new( - "Name".to_string(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Setting".to_string(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Description".to_string(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - ] - } else if name.eq_ignore_ascii_case("PARAMETERS") { - vec![ - PgFieldDescriptor::new( - "Name".to_string(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Value".to_string(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Description".to_string(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "Mutable".to_string(), - DataType::Boolean.to_oid(), - DataType::Boolean.type_len(), - ), - ] - } else { - vec![PgFieldDescriptor::new( - name.to_ascii_lowercase(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - )] - } -} diff --git a/src/frontend/src/utils/mod.rs b/src/frontend/src/utils/mod.rs index bfe7cb093aad..697b626fb339 100644 --- a/src/frontend/src/utils/mod.rs +++ b/src/frontend/src/utils/mod.rs @@ -30,7 +30,6 @@ pub use rewrite_index::*; mod index_set; pub use index_set::*; pub(crate) mod group_by; -pub mod infer_stmt_row_desc; pub mod overwrite_options; pub use group_by::*; pub use overwrite_options::*;