Skip to content

Commit

Permalink
feat(frontend): report slow query in the log
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Nov 28, 2022
1 parent e6c9116 commit b1ddf6d
Showing 1 changed file with 24 additions and 16 deletions.
40 changes: 24 additions & 16 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,10 +769,8 @@ impl Session<PgResponseStream> for SessionImpl {
format: bool,
) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
// Parse sql.
let mut stmts = Parser::parse_sql(sql).map_err(|e| {
tracing::error!("failed to parse sql:\n{}:\n{}", sql, e);
e
})?;
let mut stmts = Parser::parse_sql(sql)
.inspect_err(|e| tracing::error!("failed to parse sql:\n{}:\n{}", sql, e))?;
if stmts.is_empty() {
return Ok(PgResponse::empty_result(
pgwire::pg_response::StatementType::EMPTY,
Expand All @@ -785,10 +783,24 @@ impl Session<PgResponseStream> for SessionImpl {
));
}
let stmt = stmts.swap_remove(0);
let rsp = handle(self, stmt, sql, format).await.map_err(|e| {
tracing::error!("failed to handle sql:\n{}:\n{}", sql, e);
e
})?;
let rsp = {
// Report the SQL in the log periodically if the query is slow.
const SLOW_QUERY_LOG_PERIOD: Duration = Duration::from_secs(60);
let mut ticker = tokio::time::interval(SLOW_QUERY_LOG_PERIOD);
ticker.reset();
let mut handle_fut = Box::pin(handle(self, stmt, sql, format));
loop {
tokio::select! {
_ = ticker.tick() => {
tracing::warn!(sql, "slow query has been running for another {SLOW_QUERY_LOG_PERIOD:?}");
}
result = &mut handle_fut => {
break result;
}
}
}
}
.inspect_err(|e| tracing::error!("failed to handle sql:\n{}:\n{}", sql, e))?;
Ok(rsp)
}

Expand All @@ -797,10 +809,8 @@ impl Session<PgResponseStream> for SessionImpl {
sql: &str,
) -> std::result::Result<Vec<PgFieldDescriptor>, BoxedError> {
// Parse sql.
let mut stmts = Parser::parse_sql(sql).map_err(|e| {
tracing::error!("failed to parse sql:\n{}:\n{}", sql, e);
e
})?;
let mut stmts = Parser::parse_sql(sql)
.inspect_err(|e| tracing::error!("failed to parse sql:\n{}:\n{}", sql, e))?;
if stmts.is_empty() {
return Ok(vec![]);
}
Expand All @@ -814,10 +824,8 @@ impl Session<PgResponseStream> for SessionImpl {
// This part refers from src/frontend/handler/ so the Vec<PgFieldDescriptor> is same as
// result of run_statement().
let rsp = match stmt {
Statement::Query(_) => infer(self, stmt, sql).map_err(|e| {
tracing::error!("failed to handle sql:\n{}:\n{}", sql, e);
e
})?,
Statement::Query(_) => infer(self, stmt, sql)
.inspect_err(|e| tracing::error!("failed to handle sql:\n{}:\n{}", sql, e))?,
Statement::ShowObjects(show_object) => match show_object {
ShowObject::Columns { table: _ } => {
vec![
Expand Down

0 comments on commit b1ddf6d

Please sign in to comment.