Skip to content

Commit

Permalink
feat(frontend): support show processlist (#13287)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Nov 8, 2023
1 parent 54390d3 commit 7b3f8fc
Show file tree
Hide file tree
Showing 16 changed files with 284 additions and 77 deletions.
19 changes: 12 additions & 7 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,12 @@ impl TestCase {
.chain(std::iter::once(self.sql()))
{
result = self
.run_sql(sql, session.clone(), do_check_result, result)
.run_sql(
Arc::from(sql.to_owned()),
session.clone(),
do_check_result,
result,
)
.await?;
}

Expand Down Expand Up @@ -326,7 +331,7 @@ impl TestCase {
);
let temp_file = create_proto_file(content.as_str());
self.run_sql(
&(sql + temp_file.path().to_str().unwrap() + "')"),
Arc::from(sql + temp_file.path().to_str().unwrap() + "')"),
session.clone(),
false,
None,
Expand Down Expand Up @@ -357,7 +362,7 @@ impl TestCase {
);
let temp_file = create_proto_file(content.as_str());
self.run_sql(
&(sql + temp_file.path().to_str().unwrap() + "')"),
Arc::from(sql + temp_file.path().to_str().unwrap() + "')"),
session.clone(),
false,
None,
Expand All @@ -376,15 +381,15 @@ impl TestCase {

async fn run_sql(
&self,
sql: &str,
sql: Arc<str>,
session: Arc<SessionImpl>,
do_check_result: bool,
mut result: Option<TestCaseResult>,
) -> Result<Option<TestCaseResult>> {
let statements = Parser::parse_sql(sql).unwrap();
let statements = Parser::parse_sql(&sql).unwrap();
for stmt in statements {
// TODO: `sql` may contain multiple statements here.
let handler_args = HandlerArgs::new(session.clone(), &stmt, sql)?;
let handler_args = HandlerArgs::new(session.clone(), &stmt, sql.clone())?;
let _guard = session.txn_begin_implicit();
match stmt.clone() {
Statement::Query(_)
Expand All @@ -399,7 +404,7 @@ impl TestCase {
..Default::default()
};
let context = OptimizerContext::new(
HandlerArgs::new(session.clone(), &stmt, sql)?,
HandlerArgs::new(session.clone(), &stmt, sql.clone())?,
explain_options,
);
let ret = self.apply_query(&stmt, context.into())?;
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use anyhow::Context;
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
Expand Down Expand Up @@ -172,7 +174,7 @@ pub async fn handle_alter_table_column(
}

// Create handler args as if we're creating a new table with the altered definition.
let handler_args = HandlerArgs::new(session.clone(), &definition, "")?;
let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?;
let col_id_gen = ColumnIdGenerator::new_alter(&original_catalog);
let Statement::CreateTable {
columns,
Expand Down
28 changes: 17 additions & 11 deletions src/frontend/src/handler/extended_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,7 @@ impl std::fmt::Display for Portal {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match &self {
Portal::Empty => write!(f, "Empty"),
Portal::Portal(portal) => write!(
f,
"{}, params = {:?}",
portal.statement, portal.bound_result.parsed_params
),
Portal::Portal(portal) => portal.fmt(f),
Portal::PureStatement(stmt) => write!(f, "{}", stmt),
}
}
Expand All @@ -74,14 +70,24 @@ pub struct PortalResult {
pub result_formats: Vec<Format>,
}

impl std::fmt::Display for PortalResult {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"{}, params = {:?}",
self.statement, self.bound_result.parsed_params
)
}
}

pub fn handle_parse(
session: Arc<SessionImpl>,
statement: Statement,
specific_param_types: Vec<Option<DataType>>,
) -> Result<PrepareStatement> {
session.clear_cancel_query_flag();
let str_sql = statement.to_string();
let handler_args = HandlerArgs::new(session, &statement, &str_sql)?;
let sql: Arc<str> = Arc::from(statement.to_string());
let handler_args = HandlerArgs::new(session, &statement, sql)?;
match &statement {
Statement::Query(_)
| Statement::Insert { .. }
Expand Down Expand Up @@ -181,8 +187,8 @@ pub async fn handle_execute(session: Arc<SessionImpl>, portal: Portal) -> Result
Portal::Portal(portal) => {
session.clear_cancel_query_flag();
let _guard = session.txn_begin_implicit(); // TODO(bugen): is this behavior correct?
let str_sql = portal.statement.to_string();
let handler_args = HandlerArgs::new(session, &portal.statement, &str_sql)?;
let sql: Arc<str> = Arc::from(portal.statement.to_string());
let handler_args = HandlerArgs::new(session, &portal.statement, sql)?;
match &portal.statement {
Statement::Query(_)
| Statement::Insert { .. }
Expand All @@ -192,8 +198,8 @@ pub async fn handle_execute(session: Arc<SessionImpl>, portal: Portal) -> Result
}
}
Portal::PureStatement(stmt) => {
let sql = stmt.to_string();
handle(session, stmt, &sql, vec![]).await
let sql: Arc<str> = Arc::from(stmt.to_string());
handle(session, stmt, sql, vec![]).await
}
}
}
Expand Down
9 changes: 4 additions & 5 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,16 @@ impl From<Vec<Row>> for PgResponseStream {
#[derive(Clone)]
pub struct HandlerArgs {
pub session: Arc<SessionImpl>,
pub sql: String,
pub sql: Arc<str>,
pub normalized_sql: String,
pub with_options: WithOptions,
}

impl HandlerArgs {
pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: &str) -> Result<Self> {
pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: Arc<str>) -> Result<Self> {
Ok(Self {
session,
sql: sql.into(),
sql,
with_options: WithOptions::try_from(stmt)?,
normalized_sql: Self::normalize_sql(stmt),
})
Expand Down Expand Up @@ -172,12 +172,11 @@ impl HandlerArgs {
pub async fn handle(
session: Arc<SessionImpl>,
stmt: Statement,
sql: &str,
sql: Arc<str>,
formats: Vec<Format>,
) -> Result<RwPgResponse> {
session.clear_cancel_query_flag();
let _guard = session.txn_begin_implicit();

let handler_args = HandlerArgs::new(session, &stmt, sql)?;

match stmt {
Expand Down
28 changes: 28 additions & 0 deletions src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use std::sync::Arc;

use itertools::Itertools;
use pgwire::pg_field_descriptor::PgFieldDescriptor;
use pgwire::pg_protocol::truncated_fmt;
use pgwire::pg_response::{PgResponse, StatementType};
use pgwire::pg_server::Session;
use pgwire::types::Row;
use risingwave_common::catalog::{ColumnCatalog, DEFAULT_SCHEMA_NAME};
use risingwave_common::error::{ErrorCode, Result};
Expand Down Expand Up @@ -267,6 +269,32 @@ pub async fn handle_show_object(
.values(rows.into(), row_desc)
.into());
}
ShowObject::ProcessList => {
let rows = {
let sessions_map = session.env().sessions_map();
sessions_map
.read()
.values()
.map(|s| {
Row::new(vec![
Some(format!("{}-{}", s.id().0, s.id().1).into()),
Some(s.user_name().to_owned().into()),
Some(format!("{}", s.peer_addr()).into()),
Some(s.database().to_owned().into()),
s.elapse_since_running_sql()
.map(|mills| format!("{}ms", mills).into()),
s.running_sql().map(|sql| {
format!("{}", truncated_fmt::TruncatedFmt(&sql, 1024)).into()
}),
])
})
.collect_vec()
};

return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
.values(rows.into(), row_desc)
.into());
}
};

let rows = names
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/optimizer_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct OptimizerContext {
/// Store plan node id
next_plan_node_id: RefCell<i32>,
/// The original SQL string, used for debugging.
sql: String,
sql: Arc<str>,
/// Normalized SQL string. See [`HandlerArgs::normalize_sql`].
normalized_sql: String,
/// Explain options
Expand Down Expand Up @@ -97,7 +97,7 @@ impl OptimizerContext {
Self {
session_ctx: Arc::new(SessionImpl::mock()),
next_plan_node_id: RefCell::new(0),
sql: "".to_owned(),
sql: Arc::from(""),
normalized_sql: "".to_owned(),
explain_options: ExplainOptions::default(),
optimizer_trace: RefCell::new(vec![]),
Expand Down
Loading

0 comments on commit 7b3f8fc

Please sign in to comment.