Skip to content

Commit

Permalink
cherry-pick: feat: improve observability for subscription/cursor SHOW…
Browse files Browse the repository at this point in the history
… commands (#18896)
  • Loading branch information
xxhZs authored and hzxa21 committed Nov 8, 2024
1 parent 291589b commit fdeeb2e
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 67 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

122 changes: 106 additions & 16 deletions src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ use risingwave_sqlparser::ast::{
display_comma_separated, Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter,
};

use super::{fields_to_descriptors, PgResponseStream, RwPgResponse, RwPgResponseBuilderExt};
use super::{fields_to_descriptors, RwPgResponse, RwPgResponseBuilderExt};
use crate::binder::{Binder, Relation};
use crate::catalog::{CatalogError, IndexCatalog};
use crate::error::Result;
use crate::handler::HandlerArgs;
use crate::session::cursor_manager::SubscriptionCursor;
use crate::session::SessionImpl;

pub fn get_columns_from_table(
Expand Down Expand Up @@ -247,6 +248,36 @@ struct ShowCreateObjectRow {
create_sql: String,
}

#[derive(Fields)]
#[fields(style = "Title Case")]
struct ShowSubscriptionRow {
name: String,
retention_seconds: i64,
}

#[derive(Fields)]
#[fields(style = "Title Case")]
struct ShowCursorRow {
session_id: String,
user: String,
host: String,
database: String,
cursor_name: String,
}

#[derive(Fields)]
#[fields(style = "Title Case")]
struct ShowSubscriptionCursorRow {
session_id: String,
user: String,
host: String,
database: String,
cursor_name: String,
subscription_name: String,
state: String,
idle_duration_ms: i64,
}

/// Infer the row description for different show objects.
pub fn infer_show_object(objects: &ShowObject) -> Vec<PgFieldDescriptor> {
fields_to_descriptors(match objects {
Expand Down Expand Up @@ -326,12 +357,20 @@ pub async fn handle_show_object(
.iter_sink()
.map(|t| t.name.clone())
.collect(),
ShowObject::Subscription { schema } => catalog_reader
.read_guard()
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_subscription()
.map(|t| t.name.clone())
.collect(),
ShowObject::Subscription { schema } => {
let rows = catalog_reader
.read_guard()
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_subscription()
.map(|t| ShowSubscriptionRow {
name: t.name.clone(),
retention_seconds: t.retention_seconds as i64,
})
.collect_vec();
return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
.rows(rows)
.into());
}
ShowObject::Secret { schema } => catalog_reader
.read_guard()
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
Expand Down Expand Up @@ -482,20 +521,71 @@ pub async fn handle_show_object(
.into());
}
ShowObject::Cursor => {
let (rows, pg_descs) = session.get_cursor_manager().get_all_query_cursors().await;
let sessions = session
.env()
.sessions_map()
.read()
.values()
.cloned()
.collect_vec();
let mut rows = vec![];
for s in sessions {
let session_id = format!("{}", s.id().0);
let user = s.user_name().to_owned();
let host = format!("{}", s.peer_addr());
let database = s.database().to_owned();

s.get_cursor_manager()
.iter_query_cursors(|cursor_name: &String, _| {
rows.push(ShowCursorRow {
session_id: session_id.clone(),
user: user.clone(),
host: host.clone(),
database: database.clone(),
cursor_name: cursor_name.to_owned(),
});
})
.await;
}
return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
.row_cnt_opt(Some(rows.len() as i32))
.values(PgResponseStream::from(rows), pg_descs)
.rows(rows)
.into());
}
ShowObject::SubscriptionCursor => {
let (rows, pg_descs) = session
.get_cursor_manager()
.get_all_subscription_cursors()
.await;
let sessions = session
.env()
.sessions_map()
.read()
.values()
.cloned()
.collect_vec();
let mut rows = vec![];
for s in sessions {
let ssession_id = format!("{}", s.id().0);
let user = s.user_name().to_owned();
let host = format!("{}", s.peer_addr());
let database = s.database().to_owned();

s.get_cursor_manager()
.iter_subscription_cursors(
|cursor_name: &String, cursor: &SubscriptionCursor| {
rows.push(ShowSubscriptionCursorRow {
session_id: ssession_id.clone(),
user: user.clone(),
host: host.clone(),
database: database.clone(),
cursor_name: cursor_name.to_owned(),
subscription_name: cursor.subscription_name().to_owned(),
state: cursor.state_info_string(),
idle_duration_ms: cursor.idle_duration().as_millis() as i64,
});
},
)
.await;
}

return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
.row_cnt_opt(Some(rows.len() as i32))
.values(PgResponseStream::from(rows), pg_descs)
.rows(rows)
.into());
}
};
Expand Down
99 changes: 52 additions & 47 deletions src/frontend/src/session/cursor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use core::mem;
use core::time::Duration;
use std::collections::{HashMap, VecDeque};
use std::fmt::{Display, Formatter};
use std::rc::Rc;
use std::sync::Arc;
use std::time::Instant;
Expand Down Expand Up @@ -211,6 +212,34 @@ enum State {
Invalid,
}

impl Display for State {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
State::InitLogStoreQuery {
seek_timestamp,
expected_timestamp,
} => write!(
f,
"InitLogStoreQuery {{ seek_timestamp: {}, expected_timestamp: {:?} }}",
seek_timestamp, expected_timestamp
),
State::Fetch {
from_snapshot,
rw_timestamp,
expected_timestamp,
remaining_rows,
init_query_timer,
..
} => write!(
f,
"Fetch {{ from_snapshot: {}, rw_timestamp: {}, expected_timestamp: {:?}, cached rows: {}, query init at {}ms before }}",
from_snapshot, rw_timestamp, expected_timestamp, remaining_rows.len(), init_query_timer.elapsed().as_millis()
),
State::Invalid => write!(f, "Invalid"),
}
}
}

pub struct SubscriptionCursor {
cursor_name: String,
subscription: Arc<SubscriptionCatalog>,
Expand Down Expand Up @@ -674,6 +703,18 @@ impl SubscriptionCursor {
}
chunk_stream.init_row_stream(&fields, &formats, session);
}

pub fn idle_duration(&self) -> Duration {
self.last_fetch.elapsed()
}

pub fn subscription_name(&self) -> &str {
self.subscription.name.as_str()
}

pub fn state_info_string(&self) -> String {
format!("{}", self.state)
}
}

pub struct CursorManager {
Expand Down Expand Up @@ -820,63 +861,27 @@ impl CursorManager {
}
}

pub async fn get_all_query_cursors(&self) -> (Vec<Row>, Vec<PgFieldDescriptor>) {
let cursor_names = self
.cursor_map
pub async fn iter_query_cursors(&self, mut f: impl FnMut(&String, &QueryCursor)) {
self.cursor_map
.lock()
.await
.iter()
.filter_map(|(currsor_name, cursor)| {
if let Cursor::Query(_cursor) = cursor {
let cursor_name = vec![Some(Bytes::from(currsor_name.clone().into_bytes()))];
Some(Row::new(cursor_name))
} else {
None
.for_each(|(cursor_name, cursor)| {
if let Cursor::Query(cursor) = cursor {
f(cursor_name, cursor)
}
})
.collect();
(
cursor_names,
vec![PgFieldDescriptor::new(
"Name".to_string(),
DataType::Varchar.to_oid(),
DataType::Varchar.type_len(),
)],
)
});
}

pub async fn get_all_subscription_cursors(&self) -> (Vec<Row>, Vec<PgFieldDescriptor>) {
let cursors = self
.cursor_map
pub async fn iter_subscription_cursors(&self, mut f: impl FnMut(&String, &SubscriptionCursor)) {
self.cursor_map
.lock()
.await
.iter()
.filter_map(|(cursor_name, cursor)| {
.for_each(|(cursor_name, cursor)| {
if let Cursor::Subscription(cursor) = cursor {
let cursors = vec![
Some(Bytes::from(cursor_name.clone().into_bytes())),
Some(Bytes::from(cursor.subscription.name.clone().into_bytes())),
];
Some(Row::new(cursors))
} else {
None
f(cursor_name, cursor)
}
})
.collect();
(
cursors,
vec![
PgFieldDescriptor::new(
"Name".to_string(),
DataType::Varchar.to_oid(),
DataType::Varchar.type_len(),
),
PgFieldDescriptor::new(
"SubscriptionName".to_string(),
DataType::Varchar.to_oid(),
DataType::Varchar.type_len(),
),
],
)
});
}
}

0 comments on commit fdeeb2e

Please sign in to comment.