diff --git a/src/common/src/util/mod.rs b/src/common/src/util/mod.rs index 2f2b5cb1f0cd2..8a47ad8b7c121 100644 --- a/src/common/src/util/mod.rs +++ b/src/common/src/util/mod.rs @@ -28,6 +28,7 @@ pub mod memcmp_encoding; pub mod panic; pub mod pretty_bytes; pub mod prost; +pub mod query_log; pub mod resource_util; pub mod row_id; pub mod row_serde; diff --git a/src/common/src/util/query_log.rs b/src/common/src/util/query_log.rs new file mode 100644 index 0000000000000..3d8fcafd0a3b2 --- /dev/null +++ b/src/common/src/util/query_log.rs @@ -0,0 +1,32 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::LazyLock; +use std::time::Duration; + +/// The target name for the root span while pgwire is handling a query. +pub const PGWIRE_ROOT_SPAN_TARGET: &str = "pgwire_root_span"; +/// The target name for the event when pgwire finishes handling a query. +pub const PGWIRE_QUERY_LOG: &str = "pgwire_query_log"; +/// The target name for the event when pgwire does not finish handling a query in time. +pub const PGWIRE_SLOW_QUERY_LOG: &str = "pgwire_slow_query_log"; + +/// The period of logging ongoing slow queries. +pub static SLOW_QUERY_LOG_PERIOD: LazyLock = LazyLock::new(|| { + std::env::var("RW_SLOW_QUERY_LOG_PERIOD_MS") + .ok() + .and_then(|s| s.parse().ok()) + .map(Duration::from_millis) + .unwrap_or(Duration::from_secs(60)) +}); diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 9bf6a17f59725..0b6f60108573b 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -825,27 +825,9 @@ impl SessionImpl { ); } let stmt = stmts.swap_remove(0); - let rsp = { - let mut handle_fut = Box::pin(handle(self, stmt, sql.clone(), formats)); - if cfg!(debug_assertions) { - // Report the SQL in the log periodically if the query is slow. - const SLOW_QUERY_LOG_PERIOD: Duration = Duration::from_secs(60); - const SLOW_QUERY_LOG: &str = "risingwave_frontend_slow_query_log"; - loop { - match tokio::time::timeout(SLOW_QUERY_LOG_PERIOD, &mut handle_fut).await { - Ok(result) => break result, - Err(_) => tracing::warn!( - target: SLOW_QUERY_LOG, - sql = sql.as_ref(), - "slow query has been running for another {SLOW_QUERY_LOG_PERIOD:?}" - ), - } - } - } else { - handle_fut.await - } - } - .inspect_err(|e| tracing::error!(error = %e.as_report(), %sql, "failed to handle sql"))?; + let rsp = handle(self, stmt, sql.clone(), formats).await.inspect_err( + |e| tracing::error!(error = %e.as_report(), %sql, "failed to handle sql"), + )?; Ok(rsp) } @@ -1033,25 +1015,11 @@ impl Session for SessionImpl { let string = stmt.to_string(); let sql_str = string.as_str(); let sql: Arc = Arc::from(sql_str); - let rsp = { - let mut handle_fut = Box::pin(handle(self, stmt, sql.clone(), vec![format])); - if cfg!(debug_assertions) { - // Report the SQL in the log periodically if the query is slow. - const SLOW_QUERY_LOG_PERIOD: Duration = Duration::from_secs(60); - loop { - match tokio::time::timeout(SLOW_QUERY_LOG_PERIOD, &mut handle_fut).await { - Ok(result) => break result, - Err(_) => tracing::warn!( - sql_str, - "slow query has been running for another {SLOW_QUERY_LOG_PERIOD:?}" - ), - } - } - } else { - handle_fut.await - } - } - .inspect_err(|e| tracing::error!(error = %e.as_report(), %sql, "failed to handle sql"))?; + let rsp = handle(self, stmt, sql.clone(), vec![format]) + .await + .inspect_err( + |e| tracing::error!(error = %e.as_report(), %sql, "failed to handle sql"), + )?; Ok(rsp) } @@ -1094,24 +1062,9 @@ impl Session for SessionImpl { self: Arc, portal: Portal, ) -> std::result::Result, BoxedError> { - let rsp = { - let mut handle_fut = Box::pin(handle_execute(self, portal)); - if cfg!(debug_assertions) { - // Report the SQL in the log periodically if the query is slow. - const SLOW_QUERY_LOG_PERIOD: Duration = Duration::from_secs(60); - loop { - match tokio::time::timeout(SLOW_QUERY_LOG_PERIOD, &mut handle_fut).await { - Ok(result) => break result, - Err(_) => tracing::warn!( - "slow query has been running for another {SLOW_QUERY_LOG_PERIOD:?}" - ), - } - } - } else { - handle_fut.await - } - } - .inspect_err(|e| tracing::error!(error=%e.as_report(), "failed to handle execute"))?; + let rsp = handle_execute(self, portal) + .await + .inspect_err(|e| tracing::error!(error=%e.as_report(), "failed to handle execute"))?; Ok(rsp) } diff --git a/src/utils/pgwire/src/pg_protocol.rs b/src/utils/pgwire/src/pg_protocol.rs index 81eb83d7c146f..fcf368aeafae9 100644 --- a/src/utils/pgwire/src/pg_protocol.rs +++ b/src/utils/pgwire/src/pg_protocol.rs @@ -19,22 +19,22 @@ use std::path::PathBuf; use std::pin::Pin; use std::str::Utf8Error; use std::sync::{Arc, LazyLock, Weak}; -use std::time::Instant; +use std::time::{Duration, Instant}; use std::{io, str}; use bytes::{Bytes, BytesMut}; -use futures::future::Either; use futures::stream::StreamExt; use itertools::Itertools; use openssl::ssl::{SslAcceptor, SslContext, SslContextRef, SslMethod}; use risingwave_common::types::DataType; use risingwave_common::util::panic::FutureCatchUnwindExt; +use risingwave_common::util::query_log::*; use risingwave_sqlparser::ast::Statement; use risingwave_sqlparser::parser::Parser; use thiserror_ext::AsReport; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio_openssl::SslStream; -use tracing::{error, warn, Instrument}; +use tracing::Instrument; use crate::error::{PsqlError, PsqlResult}; use crate::net::AddressRef; @@ -103,8 +103,6 @@ where peer_addr: AddressRef, } -const PGWIRE_QUERY_LOG: &str = "pgwire_query_log"; - /// Configures TLS encryption for connections. #[derive(Debug, Clone)] pub struct TlsConfig { @@ -160,6 +158,17 @@ pub fn cstr_to_str(b: &Bytes) -> Result<&str, Utf8Error> { std::str::from_utf8(without_null) } +/// Record `sql` in the current tracing span. +fn record_sql_in_span(sql: &str) { + tracing::Span::current().record( + "sql", + tracing::field::display(truncated_fmt::TruncatedFmt( + &sql, + *RW_QUERY_LOG_TRUNCATE_LEN, + )), + ); +} + impl PgProtocol where S: AsyncWrite + AsyncRead + Unpin, @@ -199,37 +208,122 @@ where self.do_process(msg).await.is_none() || self.is_terminate } + /// The root tracing span for processing a message. The target of the span is + /// [`PGWIRE_ROOT_SPAN_TARGET`]. + /// + /// This is used to provide context for the (slow) query logs and traces. + /// + /// The span is only effective if there's a current session and the message is + /// query-related. Otherwise, `Span::none()` is returned. + fn root_span_for_msg(&self, msg: &FeMessage) -> tracing::Span { + let Some(session_id) = self.session.as_ref().map(|s| s.id().0) else { + return tracing::Span::none(); + }; + + let mode = match msg { + FeMessage::Query(_) => "simple query", + FeMessage::Parse(_) => "extended query parse", + FeMessage::Execute(_) => "extended query execute", + _ => return tracing::Span::none(), + }; + + tracing::info_span!( + target: PGWIRE_ROOT_SPAN_TARGET, + "handle_query", + mode, + session_id, + sql = tracing::field::Empty, // record SQL later in each `process` call + ) + } + /// Return type `Option<()>` is essentially a bool, but allows `?` for early return. /// - `None` means to terminate the current connection /// - `Some(())` means to continue processing the next message async fn do_process(&mut self, msg: FeMessage) -> Option<()> { - let fut = { - // Set the current session as the context when processing the message, if exists. - let weak_session = self - .session - .as_ref() - .map(|s| Arc::downgrade(s) as Weak); - - let fut = self.do_process_inner(msg); - + let span = self.root_span_for_msg(&msg); + let weak_session = self + .session + .as_ref() + .map(|s| Arc::downgrade(s) as Weak); + + // Processing the message itself. + // + // Note: pin the future to avoid stack overflow as we'll wrap it multiple times + // in the following code. + let fut = Box::pin(self.do_process_inner(msg)); + + // Set the current session as the context when processing the message, if exists. + let fut = async move { if let Some(session) = weak_session { - Either::Left(CURRENT_SESSION.scope(session, fut)) + CURRENT_SESSION.scope(session, fut).await } else { - Either::Right(fut) + fut.await } }; - let result = AssertUnwindSafe(fut) - .rw_catch_unwind() - .await - .unwrap_or_else(|payload| { - Err(PsqlError::Panic( - panic_message::panic_message(&payload).to_owned(), - )) - }) - .inspect_err(|error| error!(error = %error.as_report(), "error when process message")); + // Catch unwind. + let fut = async move { + AssertUnwindSafe(fut) + .rw_catch_unwind() + .await + .unwrap_or_else(|payload| { + Err(PsqlError::Panic( + panic_message::panic_message(&payload).to_owned(), + )) + }) + }; + + // Slow query log. + let fut = async move { + let period = *SLOW_QUERY_LOG_PERIOD; + let mut fut = std::pin::pin!(fut); + let mut elapsed = Duration::ZERO; + + // Report the SQL in the log periodically if the query is slow. + loop { + match tokio::time::timeout(period, &mut fut).await { + Ok(result) => break result, + Err(_) => { + elapsed += period; + tracing::info!( + target: PGWIRE_SLOW_QUERY_LOG, + elapsed = %format_args!("{}ms", elapsed.as_millis()), + "slow query" + ); + } + } + } + }; + + // Query log. + let fut = async move { + let start = Instant::now(); + let result = fut.await; + let elapsed = start.elapsed(); + + // Always log if an error occurs. + if let Err(error) = &result { + tracing::error!(error = %error.as_report(), "error when process message"); + } - match result { + // Log to optionally-enabled target `PGWIRE_QUERY_LOG`. + // Only log if we're currently in a tracing span set in `span_for_msg`. + if !tracing::Span::current().is_none() { + tracing::info!( + target: PGWIRE_QUERY_LOG, + status = if result.is_ok() { "ok" } else { "err" }, + time = %format_args!("{}ms", elapsed.as_millis()), + ); + } + + result + }; + + // Tracing span. + let fut = fut.instrument(span); + + // Execute the future and handle the error. + match fut.await { Ok(()) => Some(()), Err(e) => { match e { @@ -460,26 +554,12 @@ where async fn process_query_msg(&mut self, query_string: io::Result<&str>) -> PsqlResult<()> { let sql: Arc = Arc::from(query_string.map_err(|err| PsqlError::SimpleQueryError(Box::new(err)))?); - let start = Instant::now(); + record_sql_in_span(&sql); let session = self.session.clone().unwrap(); - let session_id = session.id().0; - let _exec_context_guard = session.init_exec_context(sql.clone()); - let result = self - .inner_process_query_msg(sql.clone(), session.clone()) - .await; - - let mills = start.elapsed().as_millis(); - tracing::info!( - target: PGWIRE_QUERY_LOG, - mode = %"(simple query)", - session = %session_id, - status = %if result.is_ok() { "ok" } else { "err" }, - time = %format_args!("{}ms", mills), - sql = format_args!("{}", truncated_fmt::TruncatedFmt(&sql, *RW_QUERY_LOG_TRUNCATE_LEN)), - ); - - result + let _exec_context_guard = session.init_exec_context(sql.clone()); + self.inner_process_query_msg(sql.clone(), session.clone()) + .await } async fn inner_process_query_msg( @@ -497,17 +577,7 @@ where // Execute multiple statements in simple query. KISS later. for stmt in stmts { - let span = tracing::info_span!( - "process_query_msg_one_stmt", - session_id = session.id().0, - stmt = format_args!( - "{}", - truncated_fmt::TruncatedFmt(&stmt, *RW_QUERY_LOG_TRUNCATE_LEN) - ), - ); - self.inner_process_query_msg_one_stmt(stmt, session.clone()) - .instrument(span) .await?; } // Put this line inside the for loop above will lead to unfinished/stuck regress test...Not @@ -592,24 +662,11 @@ where fn process_parse_msg(&mut self, msg: FeParseMessage) -> PsqlResult<()> { let sql = cstr_to_str(&msg.sql_bytes).unwrap(); + record_sql_in_span(sql); let session = self.session.clone().unwrap(); - let session_id = session.id().0; let statement_name = cstr_to_str(&msg.statement_name).unwrap().to_string(); - let start = Instant::now(); - - let result = self.inner_process_parse_msg(session, sql, statement_name, msg.type_ids); - let mills = start.elapsed().as_millis(); - tracing::info!( - target: PGWIRE_QUERY_LOG, - mode = %"(extended query parse)", - session = %session_id, - status = %if result.is_ok() { "ok" } else { "err" }, - time = %format_args!("{}ms", mills), - sql = format_args!("{}", truncated_fmt::TruncatedFmt(&sql, *RW_QUERY_LOG_TRUNCATE_LEN)), - ); - - result + self.inner_process_parse_msg(session, sql, statement_name, msg.type_ids) } fn inner_process_parse_msg( @@ -728,7 +785,6 @@ where let portal_name = cstr_to_str(&msg.portal_name).unwrap().to_string(); let row_max = msg.max_rows as usize; let session = self.session.clone().unwrap(); - let session_id = session.id().0; if let Some(mut result_cache) = self.result_cache.remove(&portal_name) { assert!(self.portal_store.contains_key(&portal_name)); @@ -739,24 +795,13 @@ where self.result_cache.insert(portal_name, result_cache); } } else { - let start = Instant::now(); let portal = self.get_portal(&portal_name)?; let sql: Arc = Arc::from(format!("{}", portal)); + record_sql_in_span(&sql); let _exec_context_guard = session.init_exec_context(sql.clone()); let result = session.clone().execute(portal).await; - let mills = start.elapsed().as_millis(); - - tracing::info!( - target: PGWIRE_QUERY_LOG, - mode = %"(extended query execute)", - session = %session_id, - status = %if result.is_ok() { "ok" } else { "err" }, - time = %format_args!("{}ms", mills), - sql = format_args!("{}", truncated_fmt::TruncatedFmt(&sql, *RW_QUERY_LOG_TRUNCATE_LEN)), - ); - let pg_response = result.map_err(PsqlError::ExtendedExecuteError)?; let mut result_cache = ResultCache::new(pg_response); let is_consume_completed = result_cache.consume::(row_max, &mut self.stream).await?; @@ -997,7 +1042,7 @@ where let ssl = openssl::ssl::Ssl::new(ssl_ctx).unwrap(); let mut stream = tokio_openssl::SslStream::new(ssl, stream).unwrap(); if let Err(e) = Pin::new(&mut stream).accept().await { - warn!("Unable to set up an ssl connection, reason: {}", e); + tracing::warn!("Unable to set up an ssl connection, reason: {}", e); let _ = stream.shutdown().await; return Err(e.into()); } diff --git a/src/utils/runtime/src/logger.rs b/src/utils/runtime/src/logger.rs index 62d7d0868c614..18794b257f79f 100644 --- a/src/utils/runtime/src/logger.rs +++ b/src/utils/runtime/src/logger.rs @@ -18,17 +18,17 @@ use std::path::PathBuf; use either::Either; use risingwave_common::metrics::MetricsLayer; use risingwave_common::util::deployment::Deployment; +use risingwave_common::util::query_log::*; use thiserror_ext::AsReport; use tracing::level_filters::LevelFilter as Level; use tracing_subscriber::filter::{FilterFn, Targets}; +use tracing_subscriber::fmt::format::DefaultFields; use tracing_subscriber::fmt::time::OffsetTime; +use tracing_subscriber::fmt::FormatFields; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::prelude::*; use tracing_subscriber::{filter, EnvFilter}; -const PGWIRE_QUERY_LOG: &str = "pgwire_query_log"; -const SLOW_QUERY_LOG: &str = "risingwave_frontend_slow_query_log"; - pub struct LoggerSettings { /// The name of the service. name: String, @@ -115,7 +115,7 @@ impl LoggerSettings { /// If it is set, /// - Dump logs of all SQLs, i.e., tracing target [`PGWIRE_QUERY_LOG`] to /// `RW_QUERY_LOG_PATH/query.log`. -/// - Dump slow queries, i.e., tracing target [`SLOW_QUERY_LOG`] to +/// - Dump slow queries, i.e., tracing target [`PGWIRE_SLOW_QUERY_LOG`] to /// `RW_QUERY_LOG_PATH/slow_query.log`. /// /// Note: @@ -247,53 +247,73 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { e.as_report(), ) }); - let file = std::fs::OpenOptions::new() - .create(true) - .write(true) - .truncate(true) - .open(query_log_path.join("query.log")) - .unwrap_or_else(|e| { - panic!( - "failed to create '{}/query.log': {}", - query_log_path.display(), - e.as_report(), - ) - }); - let layer = tracing_subscriber::fmt::layer() - .with_ansi(false) - .with_level(false) - .with_file(false) - .with_target(false) - .with_timer(default_timer.clone()) - .with_thread_names(true) - .with_thread_ids(true) - .with_writer(std::sync::Mutex::new(file)) - .with_filter(filter::Targets::new().with_target(PGWIRE_QUERY_LOG, Level::TRACE)); - layers.push(layer.boxed()); - let file = std::fs::OpenOptions::new() - .create(true) - .write(true) - .truncate(true) - .open(query_log_path.join("slow_query.log")) - .unwrap_or_else(|e| { - panic!( - "failed to create '{}/slow_query.log': {}", - query_log_path.display(), - e.as_report(), - ) - }); - let layer = tracing_subscriber::fmt::layer() - .with_ansi(false) - .with_level(false) - .with_file(false) - .with_target(false) - .with_timer(default_timer) - .with_thread_names(true) - .with_thread_ids(true) - .with_writer(std::sync::Mutex::new(file)) - .with_filter(filter::Targets::new().with_target(SLOW_QUERY_LOG, Level::TRACE)); - layers.push(layer.boxed()); + /// Newtype wrapper for `DefaultFields`. + /// + /// `fmt::Layer` will share the same `FormattedFields` extension for spans across + /// different layers, as long as the type of `N: FormatFields` is the same. This + /// will cause several problems: + /// + /// - `with_ansi(false)` does not take effect and it will follow the settings of + /// the primary fmt layer installed above. + /// - `Span::record` will update the same `FormattedFields` multiple times, + /// leading to duplicated fields. + /// + /// As a workaround, we use a newtype wrapper here to get a different type id. + /// The const generic parameter `SLOW` is further used to distinguish between the + /// query log and the slow query log. + #[derive(Default)] + struct FmtFields(DefaultFields); + + impl<'writer, const SLOW: bool> FormatFields<'writer> for FmtFields { + fn format_fields( + &self, + writer: tracing_subscriber::fmt::format::Writer<'writer>, + fields: R, + ) -> std::fmt::Result { + self.0.format_fields(writer, fields) + } + } + + for (file_name, target, is_slow) in [ + ("query.log", PGWIRE_QUERY_LOG, false), + ("slow_query.log", PGWIRE_SLOW_QUERY_LOG, true), + ] { + let path = query_log_path.join(file_name); + + let file = std::fs::OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(&path) + .unwrap_or_else(|e| { + panic!("failed to create `{}`: {}", path.display(), e.as_report(),) + }); + + let layer = tracing_subscriber::fmt::layer() + .with_ansi(false) + .with_level(false) + .with_file(false) + .with_target(false) + .with_timer(default_timer.clone()) + .with_thread_names(true) + .with_thread_ids(true) + .with_writer(file); + + let layer = match is_slow { + true => layer.fmt_fields(FmtFields::::default()).boxed(), + false => layer.fmt_fields(FmtFields::::default()).boxed(), + }; + + let layer = layer.with_filter( + filter::Targets::new() + // Root span must be enabled to provide common info like the SQL query. + .with_target(PGWIRE_ROOT_SPAN_TARGET, Level::INFO) + .with_target(target, Level::INFO), + ); + + layers.push(layer.boxed()); + } } if settings.enable_tokio_console {