From d4c7509593f38d4188930d5bd1abf20df61047dd Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 18 Feb 2024 16:18:05 +0800 Subject: [PATCH 1/4] feat: organize tracing on query path Signed-off-by: Ruihang Xia --- src/client/src/region.rs | 6 ++++++ src/datanode/src/heartbeat/handler/close_region.rs | 3 ++- src/datanode/src/region_server.rs | 8 ++------ src/mito2/src/engine.rs | 3 +++ src/mito2/src/read/seq_scan.rs | 3 ++- src/query/src/dist_plan/merge_scan.rs | 7 +++---- src/query/src/query_engine/context.rs | 7 ++++++- src/servers/src/grpc/flight/stream.rs | 4 ++-- src/servers/src/mysql/handler.rs | 3 ++- src/servers/src/mysql/writer.rs | 7 ++++++- src/table/Cargo.toml | 1 + src/table/src/table/scan.rs | 11 ++++++++++- 12 files changed, 45 insertions(+), 18 deletions(-) diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 7c05d9337230..7f1181ee59c9 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -28,6 +28,7 @@ use common_meta::error::{self as meta_error, Result as MetaResult}; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use common_telemetry::error; +use common_telemetry::tracing_context::TracingContext; use prost::Message; use snafu::{location, Location, OptionExt, ResultExt}; use tokio_stream::StreamExt; @@ -125,7 +126,12 @@ impl RegionRequester { let metrics_str = Arc::new(ArcSwapOption::from(None)); let ref_str = metrics_str.clone(); + let tracing_context = TracingContext::from_current_span(); + let stream = Box::pin(stream!({ + let _span = tracing_context.attach(common_telemetry::tracing::info_span!( + "poll_flight_data_stream" + )); while let Some(flight_message) = flight_message_stream.next().await { let flight_message = flight_message .map_err(BoxedError::new) diff --git a/src/datanode/src/heartbeat/handler/close_region.rs b/src/datanode/src/heartbeat/handler/close_region.rs index df72240789f3..eb0e9165d99c 100644 --- a/src/datanode/src/heartbeat/handler/close_region.rs +++ b/src/datanode/src/heartbeat/handler/close_region.rs @@ -15,7 +15,7 @@ use common_error::ext::ErrorExt; use common_meta::instruction::{InstructionReply, SimpleReply}; use common_meta::RegionIdent; -use common_telemetry::warn; +use common_telemetry::{tracing, warn}; use futures_util::future::BoxFuture; use store_api::region_request::{RegionCloseRequest, RegionRequest}; @@ -23,6 +23,7 @@ use crate::error; use crate::heartbeat::handler::HandlerContext; impl HandlerContext { + #[tracing::instrument(skip_all)] pub(crate) fn handle_close_region_instruction( self, region_ident: RegionIdent, diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index e798f273e1af..ad5b6e13e430 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -123,6 +123,7 @@ impl RegionServer { }) } + #[tracing::instrument(skip_all, fields(request_type = request.request_type()))] pub async fn handle_request( &self, region_id: RegionId, @@ -226,6 +227,7 @@ impl RegionServerHandler for RegionServer { .map_err(BoxedError::new) .context(ExecuteGrpcRequestSnafu)?; let tracing_context = TracingContext::from_current_span(); + let results = if is_parallel { let join_tasks = requests.into_iter().map(|(region_id, req)| { let self_to_move = self.clone(); @@ -488,17 +490,11 @@ impl RegionServerInner { CurrentEngine::EarlyReturn(rows) => return Ok(rows), }; - let engine_type = engine.name(); - // Sets corresponding region status to registering/deregistering before the operation. self.set_region_status_not_ready(region_id, &engine, ®ion_change); match engine .handle_request(region_id, request) - .trace(info_span!( - "RegionEngine::handle_region_request", - engine_type - )) .await .with_context(|_| HandleRegionRequestSnafu { region_id }) { diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index b84be68798c2..943acdc59348 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -51,6 +51,7 @@ use std::sync::Arc; use async_trait::async_trait; use common_error::ext::BoxedError; use common_recordbatch::SendableRecordBatchStream; +use common_telemetry::tracing; use object_store::manager::ObjectStoreManagerRef; use snafu::{ensure, OptionExt, ResultExt}; use store_api::logstore::LogStore; @@ -281,6 +282,7 @@ impl RegionEngine for MitoEngine { MITO_ENGINE_NAME } + #[tracing::instrument(skip_all)] async fn handle_request( &self, region_id: RegionId, @@ -293,6 +295,7 @@ impl RegionEngine for MitoEngine { } /// Handle substrait query and return a stream of record batches + #[tracing::instrument(skip_all)] async fn handle_query( &self, region_id: RegionId, diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index d9595673f483..209392194644 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -21,7 +21,7 @@ use async_stream::try_stream; use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream}; -use common_telemetry::{debug, error}; +use common_telemetry::{debug, error, tracing}; use common_time::range::TimestampRange; use snafu::ResultExt; use table::predicate::Predicate; @@ -286,6 +286,7 @@ impl SeqScan { } /// Fetch a batch from the reader and convert it into a record batch. + #[tracing::instrument(skip_all, level = "trace")] async fn fetch_record_batch( reader: &mut dyn BatchReader, mapper: &ProjectionMapper, diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index f1d59a5181e8..b2e096aa7f6a 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -29,7 +29,6 @@ use common_recordbatch::{ DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream, }; use common_telemetry::tracing_context::TracingContext; -use common_telemetry::{tracing, warn}; use datafusion::physical_plan::metrics::{ Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time, }; @@ -158,7 +157,6 @@ impl MergeScanExec { }) } - #[tracing::instrument(skip_all)] pub fn to_stream(&self, context: Arc) -> Result { let substrait_plan = self.substrait_plan.to_vec(); let regions = self.regions.clone(); @@ -168,7 +166,8 @@ impl MergeScanExec { let dbname = context.task_id().unwrap_or_default(); - let tracing_context = TracingContext::from_current_span().to_w3c(); + let tracing_context = + TracingContext::from_w3c(&serde_json::from_str(context.session_id().as_str()).unwrap()); let stream = Box::pin(stream!({ METRIC_MERGE_SCAN_REGIONS.observe(regions.len() as f64); @@ -179,7 +178,7 @@ impl MergeScanExec { for region_id in regions { let request = QueryRequest { header: Some(RegionRequestHeader { - tracing_context: tracing_context.clone(), + tracing_context: tracing_context.to_w3c().clone(), dbname: dbname.clone(), }), region_id: region_id.into(), diff --git a/src/query/src/query_engine/context.rs b/src/query/src/query_engine/context.rs index d5361b4e5f2c..06187e50bf86 100644 --- a/src/query/src/query_engine/context.rs +++ b/src/query/src/query_engine/context.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use common_telemetry::tracing_context::TracingContext; use datafusion::execution::context::{SessionState, TaskContext}; use session::context::QueryContextRef; @@ -41,9 +42,13 @@ impl QueryEngineContext { pub fn build_task_ctx(&self) -> Arc { let dbname = self.query_ctx.get_db_string(); let state = &self.state; + let tracing_context = TracingContext::from_current_span(); + + let session_id = serde_json::to_string(&tracing_context.to_w3c()).unwrap(); + Arc::new(TaskContext::new( Some(dbname), - state.session_id().to_string(), + session_id, state.config().clone(), state.scalar_functions().clone(), state.aggregate_functions().clone(), diff --git a/src/servers/src/grpc/flight/stream.rs b/src/servers/src/grpc/flight/stream.rs index 472b287c13ee..c2d066a73b75 100644 --- a/src/servers/src/grpc/flight/stream.rs +++ b/src/servers/src/grpc/flight/stream.rs @@ -18,7 +18,7 @@ use std::task::{Context, Poll}; use arrow_flight::FlightData; use common_grpc::flight::{FlightEncoder, FlightMessage}; use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::tracing::info_span; +use common_telemetry::tracing::{info_span, Instrument}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::warn; use futures::channel::mpsc; @@ -66,7 +66,7 @@ impl FlightRecordBatchStream { return; } - while let Some(batch_or_err) = recordbatches.next().await { + while let Some(batch_or_err) = recordbatches.next().in_current_span().await { match batch_or_err { Ok(recordbatch) => { if let Err(e) = tx.send(Ok(FlightMessage::Recordbatch(recordbatch))).await { diff --git a/src/servers/src/mysql/handler.rs b/src/servers/src/mysql/handler.rs index 9c1540dcfa62..43a7dcdad93c 100644 --- a/src/servers/src/mysql/handler.rs +++ b/src/servers/src/mysql/handler.rs @@ -92,7 +92,7 @@ impl MysqlInstanceShim { } } - #[tracing::instrument(skip_all)] + #[tracing::instrument(skip_all, name = "mysql::do_query")] async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { if let Some(output) = crate::mysql::federated::check(query, query_ctx.clone(), self.session.clone()) @@ -335,6 +335,7 @@ impl AsyncMysqlShim for MysqlInstanceShi let _ = guard.remove(&stmt_id); } + #[tracing::instrument(skip_all)] async fn on_query<'a>( &'a mut self, query: &'a str, diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index fd5304ec3e76..14f0b5e40774 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -17,6 +17,8 @@ use std::ops::Deref; use common_error::ext::ErrorExt; use common_query::Output; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; +use common_telemetry::tracing::Span; +use common_telemetry::tracing_context::FutureExt; use common_telemetry::{debug, error}; use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::schema::SchemaRef; @@ -134,10 +136,13 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { ) -> Result<()> { match create_mysql_column_def(&query_result.schema) { Ok(column_def) => { + let tracing_span = Span::current(); // The RowWriter's lifetime is bound to `column_def` thus we can't use finish_one() // to return a new QueryResultWriter. let mut row_writer = writer.start(&column_def).await?; - while let Some(record_batch) = query_result.stream.next().await { + while let Some(record_batch) = + query_result.stream.next().trace(tracing_span.clone()).await + { match record_batch { Ok(record_batch) => { Self::write_recordbatch( diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 467bad1b1354..01a792b1437f 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -32,6 +32,7 @@ humantime = "2.1" humantime-serde.workspace = true paste = "1.0" serde.workspace = true +serde_json.workspace = true snafu.workspace = true store-api.workspace = true tokio.workspace = true diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 3bc6ec77ee93..7f563392cd60 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -23,6 +23,8 @@ use common_query::error::Result as QueryResult; use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef}; use common_recordbatch::error::Result as RecordBatchResult; use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use common_telemetry::tracing::Span; +use common_telemetry::tracing_context::TracingContext; use datafusion::execution::context::TaskContext; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion_physical_expr::PhysicalSortExpr; @@ -97,14 +99,19 @@ impl PhysicalPlan for StreamScanAdapter { fn execute( &self, partition: usize, - _context: Arc, + context: Arc, ) -> QueryResult { + let tracing_context = + TracingContext::from_w3c(&serde_json::from_str(context.session_id().as_str()).unwrap()); + let span = tracing_context.attach(common_telemetry::tracing::info_span!("stream_adapter")); + let mut stream = self.stream.lock().unwrap(); let stream = stream.take().context(query_error::ExecuteRepeatedlySnafu)?; let mem_usage_metrics = MemoryUsageMetrics::new(&self.metric, partition); Ok(Box::pin(StreamWithMetricWrapper { stream, metric: mem_usage_metrics, + span, })) } @@ -116,6 +123,7 @@ impl PhysicalPlan for StreamScanAdapter { pub struct StreamWithMetricWrapper { stream: SendableRecordBatchStream, metric: MemoryUsageMetrics, + span: Span, } impl Stream for StreamWithMetricWrapper { @@ -123,6 +131,7 @@ impl Stream for StreamWithMetricWrapper { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); + let _enter = this.span.enter(); let poll = this.stream.poll_next_unpin(cx); if let Poll::Ready(Some(Ok(record_batch))) = &poll { let batch_mem_size = record_batch From a5b62d093a989c5b6d01bc90bafc12a83f12f5aa Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 18 Feb 2024 17:06:26 +0800 Subject: [PATCH 2/4] warp json conversion to TracingContext's methods Signed-off-by: Ruihang Xia --- Cargo.lock | 1 + src/common/telemetry/Cargo.toml | 1 + src/common/telemetry/src/tracing_context.rs | 13 +++++++++++++ src/query/src/dist_plan/merge_scan.rs | 4 +--- src/query/src/query_engine/context.rs | 3 ++- src/table/Cargo.toml | 1 - src/table/src/table/scan.rs | 3 +-- 7 files changed, 19 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0665afb7f17c..7c1c926d151f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2060,6 +2060,7 @@ dependencies = [ "parking_lot 0.12.1", "prometheus", "serde", + "serde_json", "tokio", "tracing", "tracing-appender", diff --git a/src/common/telemetry/Cargo.toml b/src/common/telemetry/Cargo.toml index 2d03aa45d1fb..c6b76a61e587 100644 --- a/src/common/telemetry/Cargo.toml +++ b/src/common/telemetry/Cargo.toml @@ -24,6 +24,7 @@ opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"] } parking_lot = { version = "0.12" } prometheus.workspace = true serde.workspace = true +serde_json.workspace = true tokio.workspace = true tracing = "0.1" tracing-appender = "0.2" diff --git a/src/common/telemetry/src/tracing_context.rs b/src/common/telemetry/src/tracing_context.rs index b6aed81c858e..bf9c3dd91602 100644 --- a/src/common/telemetry/src/tracing_context.rs +++ b/src/common/telemetry/src/tracing_context.rs @@ -89,4 +89,17 @@ impl TracingContext { let context = Propagator::new().extract(fields); Self(context) } + + /// Convert the tracing context to a JSON string in W3C trace context format. + pub fn to_json(&self) -> String { + serde_json::to_string(&self.to_w3c()).unwrap() + } + + /// Create a new tracing context from a JSON string in W3C trace context format. + /// + /// Illegal json string will produce an empty tracing context and no error will be reported. + pub fn from_json(json: &str) -> Self { + let fields: W3cTrace = serde_json::from_str(json).unwrap_or_default(); + Self::from_w3c(&fields) + } } diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index b2e096aa7f6a..38f6c2d21f5c 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -165,9 +165,7 @@ impl MergeScanExec { let schema = Self::arrow_schema_to_schema(self.schema())?; let dbname = context.task_id().unwrap_or_default(); - - let tracing_context = - TracingContext::from_w3c(&serde_json::from_str(context.session_id().as_str()).unwrap()); + let tracing_context = TracingContext::from_json(context.session_id().as_str()); let stream = Box::pin(stream!({ METRIC_MERGE_SCAN_REGIONS.observe(regions.len() as f64); diff --git a/src/query/src/query_engine/context.rs b/src/query/src/query_engine/context.rs index 06187e50bf86..b90b7d41f613 100644 --- a/src/query/src/query_engine/context.rs +++ b/src/query/src/query_engine/context.rs @@ -44,7 +44,8 @@ impl QueryEngineContext { let state = &self.state; let tracing_context = TracingContext::from_current_span(); - let session_id = serde_json::to_string(&tracing_context.to_w3c()).unwrap(); + // pass tracing context in session_id + let session_id = tracing_context.to_json(); Arc::new(TaskContext::new( Some(dbname), diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 01a792b1437f..467bad1b1354 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -32,7 +32,6 @@ humantime = "2.1" humantime-serde.workspace = true paste = "1.0" serde.workspace = true -serde_json.workspace = true snafu.workspace = true store-api.workspace = true tokio.workspace = true diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 7f563392cd60..45e27823153a 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -101,8 +101,7 @@ impl PhysicalPlan for StreamScanAdapter { partition: usize, context: Arc, ) -> QueryResult { - let tracing_context = - TracingContext::from_w3c(&serde_json::from_str(context.session_id().as_str()).unwrap()); + let tracing_context = TracingContext::from_json(context.session_id().as_str()); let span = tracing_context.attach(common_telemetry::tracing::info_span!("stream_adapter")); let mut stream = self.stream.lock().unwrap(); From 1a2fcbd850841c0693fc89f629a82bdfb7b2ecb9 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 18 Feb 2024 17:19:03 +0800 Subject: [PATCH 3/4] remove unnecessary .trace() Signed-off-by: Ruihang Xia --- src/servers/src/mysql/writer.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index 14f0b5e40774..fd5304ec3e76 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -17,8 +17,6 @@ use std::ops::Deref; use common_error::ext::ErrorExt; use common_query::Output; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; -use common_telemetry::tracing::Span; -use common_telemetry::tracing_context::FutureExt; use common_telemetry::{debug, error}; use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::schema::SchemaRef; @@ -136,13 +134,10 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { ) -> Result<()> { match create_mysql_column_def(&query_result.schema) { Ok(column_def) => { - let tracing_span = Span::current(); // The RowWriter's lifetime is bound to `column_def` thus we can't use finish_one() // to return a new QueryResultWriter. let mut row_writer = writer.start(&column_def).await?; - while let Some(record_batch) = - query_result.stream.next().trace(tracing_span.clone()).await - { + while let Some(record_batch) = query_result.stream.next().await { match record_batch { Ok(record_batch) => { Self::write_recordbatch( From 184c90a86ad240ceb72065c54b5bd91524c35219 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 18 Feb 2024 17:28:43 +0800 Subject: [PATCH 4/4] Update src/query/src/dist_plan/merge_scan.rs Co-authored-by: Zhenchi --- src/query/src/dist_plan/merge_scan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 38f6c2d21f5c..d4e4516d1b53 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -176,7 +176,7 @@ impl MergeScanExec { for region_id in regions { let request = QueryRequest { header: Some(RegionRequestHeader { - tracing_context: tracing_context.to_w3c().clone(), + tracing_context: tracing_context.to_w3c(), dbname: dbname.clone(), }), region_id: region_id.into(),