diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index b5090537bc8c0..fe9aa22aaedab 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -31,11 +31,12 @@ use risingwave_sqlparser::ast::{ display_comma_separated, Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, }; -use super::{fields_to_descriptors, PgResponseStream, RwPgResponse, RwPgResponseBuilderExt}; +use super::{fields_to_descriptors, RwPgResponse, RwPgResponseBuilderExt}; use crate::binder::{Binder, Relation}; use crate::catalog::{CatalogError, IndexCatalog}; use crate::error::Result; use crate::handler::HandlerArgs; +use crate::session::cursor_manager::SubscriptionCursor; use crate::session::SessionImpl; pub fn get_columns_from_table( @@ -247,6 +248,36 @@ struct ShowCreateObjectRow { create_sql: String, } +#[derive(Fields)] +#[fields(style = "Title Case")] +struct ShowSubscriptionRow { + name: String, + retention_seconds: i64, +} + +#[derive(Fields)] +#[fields(style = "Title Case")] +struct ShowCursorRow { + session_id: String, + user: String, + host: String, + database: String, + cursor_name: String, +} + +#[derive(Fields)] +#[fields(style = "Title Case")] +struct ShowSubscriptionCursorRow { + session_id: String, + user: String, + host: String, + database: String, + cursor_name: String, + subscription_name: String, + state: String, + idle_duration_ms: i64, +} + /// Infer the row description for different show objects. pub fn infer_show_object(objects: &ShowObject) -> Vec { fields_to_descriptors(match objects { @@ -326,12 +357,20 @@ pub async fn handle_show_object( .iter_sink() .map(|t| t.name.clone()) .collect(), - ShowObject::Subscription { schema } => catalog_reader - .read_guard() - .get_schema_by_name(session.database(), &schema_or_default(&schema))? - .iter_subscription() - .map(|t| t.name.clone()) - .collect(), + ShowObject::Subscription { schema } => { + let rows = catalog_reader + .read_guard() + .get_schema_by_name(session.database(), &schema_or_default(&schema))? + .iter_subscription() + .map(|t| ShowSubscriptionRow { + name: t.name.clone(), + retention_seconds: t.retention_seconds as i64, + }) + .collect_vec(); + return Ok(PgResponse::builder(StatementType::SHOW_COMMAND) + .rows(rows) + .into()); + } ShowObject::Secret { schema } => catalog_reader .read_guard() .get_schema_by_name(session.database(), &schema_or_default(&schema))? @@ -482,20 +521,71 @@ pub async fn handle_show_object( .into()); } ShowObject::Cursor => { - let (rows, pg_descs) = session.get_cursor_manager().get_all_query_cursors().await; + let sessions = session + .env() + .sessions_map() + .read() + .values() + .cloned() + .collect_vec(); + let mut rows = vec![]; + for s in sessions { + let session_id = format!("{}", s.id().0); + let user = s.user_name().to_owned(); + let host = format!("{}", s.peer_addr()); + let database = s.database().to_owned(); + + s.get_cursor_manager() + .iter_query_cursors(|cursor_name: &String, _| { + rows.push(ShowCursorRow { + session_id: session_id.clone(), + user: user.clone(), + host: host.clone(), + database: database.clone(), + cursor_name: cursor_name.to_owned(), + }); + }) + .await; + } return Ok(PgResponse::builder(StatementType::SHOW_COMMAND) - .row_cnt_opt(Some(rows.len() as i32)) - .values(PgResponseStream::from(rows), pg_descs) + .rows(rows) .into()); } ShowObject::SubscriptionCursor => { - let (rows, pg_descs) = session - .get_cursor_manager() - .get_all_subscription_cursors() - .await; + let sessions = session + .env() + .sessions_map() + .read() + .values() + .cloned() + .collect_vec(); + let mut rows = vec![]; + for s in sessions { + let ssession_id = format!("{}", s.id().0); + let user = s.user_name().to_owned(); + let host = format!("{}", s.peer_addr()); + let database = s.database().to_owned(); + + s.get_cursor_manager() + .iter_subscription_cursors( + |cursor_name: &String, cursor: &SubscriptionCursor| { + rows.push(ShowSubscriptionCursorRow { + session_id: ssession_id.clone(), + user: user.clone(), + host: host.clone(), + database: database.clone(), + cursor_name: cursor_name.to_owned(), + subscription_name: cursor.subscription_name().to_owned(), + state: cursor.state_info_string(), + idle_duration_ms: cursor.idle_duration().as_millis() as i64, + }); + }, + ) + .await; + } + return Ok(PgResponse::builder(StatementType::SHOW_COMMAND) - .row_cnt_opt(Some(rows.len() as i32)) - .values(PgResponseStream::from(rows), pg_descs) + .rows(rows) .into()); } }; diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index db673892b5e14..cd223a5b9dff3 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -15,6 +15,7 @@ use core::mem; use core::time::Duration; use std::collections::{HashMap, VecDeque}; +use std::fmt::{Display, Formatter}; use std::rc::Rc; use std::sync::Arc; use std::time::Instant; @@ -211,6 +212,34 @@ enum State { Invalid, } +impl Display for State { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + State::InitLogStoreQuery { + seek_timestamp, + expected_timestamp, + } => write!( + f, + "InitLogStoreQuery {{ seek_timestamp: {}, expected_timestamp: {:?} }}", + seek_timestamp, expected_timestamp + ), + State::Fetch { + from_snapshot, + rw_timestamp, + expected_timestamp, + remaining_rows, + init_query_timer, + .. + } => write!( + f, + "Fetch {{ from_snapshot: {}, rw_timestamp: {}, expected_timestamp: {:?}, cached rows: {}, query init at {}ms before }}", + from_snapshot, rw_timestamp, expected_timestamp, remaining_rows.len(), init_query_timer.elapsed().as_millis() + ), + State::Invalid => write!(f, "Invalid"), + } + } +} + pub struct SubscriptionCursor { cursor_name: String, subscription: Arc, @@ -674,6 +703,18 @@ impl SubscriptionCursor { } chunk_stream.init_row_stream(&fields, &formats, session); } + + pub fn idle_duration(&self) -> Duration { + self.last_fetch.elapsed() + } + + pub fn subscription_name(&self) -> &str { + self.subscription.name.as_str() + } + + pub fn state_info_string(&self) -> String { + format!("{}", self.state) + } } pub struct CursorManager { @@ -820,63 +861,27 @@ impl CursorManager { } } - pub async fn get_all_query_cursors(&self) -> (Vec, Vec) { - let cursor_names = self - .cursor_map + pub async fn iter_query_cursors(&self, mut f: impl FnMut(&String, &QueryCursor)) { + self.cursor_map .lock() .await .iter() - .filter_map(|(currsor_name, cursor)| { - if let Cursor::Query(_cursor) = cursor { - let cursor_name = vec![Some(Bytes::from(currsor_name.clone().into_bytes()))]; - Some(Row::new(cursor_name)) - } else { - None + .for_each(|(cursor_name, cursor)| { + if let Cursor::Query(cursor) = cursor { + f(cursor_name, cursor) } - }) - .collect(); - ( - cursor_names, - vec![PgFieldDescriptor::new( - "Name".to_string(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - )], - ) + }); } - pub async fn get_all_subscription_cursors(&self) -> (Vec, Vec) { - let cursors = self - .cursor_map + pub async fn iter_subscription_cursors(&self, mut f: impl FnMut(&String, &SubscriptionCursor)) { + self.cursor_map .lock() .await .iter() - .filter_map(|(cursor_name, cursor)| { + .for_each(|(cursor_name, cursor)| { if let Cursor::Subscription(cursor) = cursor { - let cursors = vec![ - Some(Bytes::from(cursor_name.clone().into_bytes())), - Some(Bytes::from(cursor.subscription.name.clone().into_bytes())), - ]; - Some(Row::new(cursors)) - } else { - None + f(cursor_name, cursor) } - }) - .collect(); - ( - cursors, - vec![ - PgFieldDescriptor::new( - "Name".to_string(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - PgFieldDescriptor::new( - "SubscriptionName".to_string(), - DataType::Varchar.to_oid(), - DataType::Varchar.type_len(), - ), - ], - ) + }); } }