Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: organize tracing on query path #3310

Merged
merged 6 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_meta::error::{self as meta_error, Result as MetaResult};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::error;
use common_telemetry::tracing_context::TracingContext;
use prost::Message;
use snafu::{location, Location, OptionExt, ResultExt};
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -125,7 +126,12 @@ impl RegionRequester {
let metrics_str = Arc::new(ArcSwapOption::from(None));
let ref_str = metrics_str.clone();

let tracing_context = TracingContext::from_current_span();

let stream = Box::pin(stream!({
let _span = tracing_context.attach(common_telemetry::tracing::info_span!(
"poll_flight_data_stream"
));
while let Some(flight_message) = flight_message_stream.next().await {
let flight_message = flight_message
.map_err(BoxedError::new)
Expand Down
1 change: 1 addition & 0 deletions src/common/telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"] }
parking_lot = { version = "0.12" }
prometheus.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing = "0.1"
tracing-appender = "0.2"
Expand Down
13 changes: 13 additions & 0 deletions src/common/telemetry/src/tracing_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,17 @@ impl TracingContext {
let context = Propagator::new().extract(fields);
Self(context)
}

/// Convert the tracing context to a JSON string in W3C trace context format.
pub fn to_json(&self) -> String {
serde_json::to_string(&self.to_w3c()).unwrap()
}

/// Create a new tracing context from a JSON string in W3C trace context format.
///
/// Illegal json string will produce an empty tracing context and no error will be reported.
pub fn from_json(json: &str) -> Self {
let fields: W3cTrace = serde_json::from_str(json).unwrap_or_default();
Self::from_w3c(&fields)
}
}
3 changes: 2 additions & 1 deletion src/datanode/src/heartbeat/handler/close_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
use common_error::ext::ErrorExt;
use common_meta::instruction::{InstructionReply, SimpleReply};
use common_meta::RegionIdent;
use common_telemetry::warn;
use common_telemetry::{tracing, warn};
use futures_util::future::BoxFuture;
use store_api::region_request::{RegionCloseRequest, RegionRequest};

use crate::error;
use crate::heartbeat::handler::HandlerContext;

impl HandlerContext {
#[tracing::instrument(skip_all)]
pub(crate) fn handle_close_region_instruction(
self,
region_ident: RegionIdent,
Expand Down
8 changes: 2 additions & 6 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl RegionServer {
})
}

#[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
pub async fn handle_request(
&self,
region_id: RegionId,
Expand Down Expand Up @@ -226,6 +227,7 @@ impl RegionServerHandler for RegionServer {
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;
let tracing_context = TracingContext::from_current_span();

let results = if is_parallel {
let join_tasks = requests.into_iter().map(|(region_id, req)| {
let self_to_move = self.clone();
Expand Down Expand Up @@ -488,17 +490,11 @@ impl RegionServerInner {
CurrentEngine::EarlyReturn(rows) => return Ok(rows),
};

let engine_type = engine.name();

// Sets corresponding region status to registering/deregistering before the operation.
self.set_region_status_not_ready(region_id, &engine, &region_change);

match engine
.handle_request(region_id, request)
.trace(info_span!(
"RegionEngine::handle_region_request",
engine_type
))
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })
{
Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing;
use object_store::manager::ObjectStoreManagerRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::LogStore;
Expand Down Expand Up @@ -281,6 +282,7 @@ impl RegionEngine for MitoEngine {
MITO_ENGINE_NAME
}

#[tracing::instrument(skip_all)]
async fn handle_request(
&self,
region_id: RegionId,
Expand All @@ -293,6 +295,7 @@ impl RegionEngine for MitoEngine {
}

/// Handle substrait query and return a stream of record batches
#[tracing::instrument(skip_all)]
async fn handle_query(
&self,
region_id: RegionId,
Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use async_stream::try_stream;
use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::{debug, error};
use common_telemetry::{debug, error, tracing};
use common_time::range::TimestampRange;
use snafu::ResultExt;
use table::predicate::Predicate;
Expand Down Expand Up @@ -286,6 +286,7 @@ impl SeqScan {
}

/// Fetch a batch from the reader and convert it into a record batch.
#[tracing::instrument(skip_all, level = "trace")]
waynexia marked this conversation as resolved.
Show resolved Hide resolved
async fn fetch_record_batch(
reader: &mut dyn BatchReader,
mapper: &ProjectionMapper,
Expand Down
7 changes: 2 additions & 5 deletions src/query/src/dist_plan/merge_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use common_recordbatch::{
DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream,
};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{tracing, warn};
use datafusion::physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time,
};
Expand Down Expand Up @@ -158,7 +157,6 @@ impl MergeScanExec {
})
}

#[tracing::instrument(skip_all)]
pub fn to_stream(&self, context: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
let substrait_plan = self.substrait_plan.to_vec();
let regions = self.regions.clone();
Expand All @@ -167,8 +165,7 @@ impl MergeScanExec {
let schema = Self::arrow_schema_to_schema(self.schema())?;

let dbname = context.task_id().unwrap_or_default();

let tracing_context = TracingContext::from_current_span().to_w3c();
let tracing_context = TracingContext::from_json(context.session_id().as_str());

let stream = Box::pin(stream!({
METRIC_MERGE_SCAN_REGIONS.observe(regions.len() as f64);
Expand All @@ -179,7 +176,7 @@ impl MergeScanExec {
for region_id in regions {
let request = QueryRequest {
header: Some(RegionRequestHeader {
tracing_context: tracing_context.clone(),
tracing_context: tracing_context.to_w3c().clone(),
waynexia marked this conversation as resolved.
Show resolved Hide resolved
dbname: dbname.clone(),
}),
region_id: region_id.into(),
Expand Down
8 changes: 7 additions & 1 deletion src/query/src/query_engine/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::Arc;

use common_telemetry::tracing_context::TracingContext;
use datafusion::execution::context::{SessionState, TaskContext};
use session::context::QueryContextRef;

Expand Down Expand Up @@ -41,9 +42,14 @@ impl QueryEngineContext {
pub fn build_task_ctx(&self) -> Arc<TaskContext> {
let dbname = self.query_ctx.get_db_string();
let state = &self.state;
let tracing_context = TracingContext::from_current_span();

// pass tracing context in session_id
let session_id = tracing_context.to_json();

Arc::new(TaskContext::new(
Some(dbname),
state.session_id().to_string(),
session_id,
state.config().clone(),
state.scalar_functions().clone(),
state.aggregate_functions().clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/servers/src/grpc/flight/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::task::{Context, Poll};
use arrow_flight::FlightData;
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing::info_span;
use common_telemetry::tracing::{info_span, Instrument};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::warn;
use futures::channel::mpsc;
Expand Down Expand Up @@ -66,7 +66,7 @@ impl FlightRecordBatchStream {
return;
}

while let Some(batch_or_err) = recordbatches.next().await {
while let Some(batch_or_err) = recordbatches.next().in_current_span().await {
match batch_or_err {
Ok(recordbatch) => {
if let Err(e) = tx.send(Ok(FlightMessage::Recordbatch(recordbatch))).await {
Expand Down
3 changes: 2 additions & 1 deletion src/servers/src/mysql/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl MysqlInstanceShim {
}
}

#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, name = "mysql::do_query")]
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
if let Some(output) =
crate::mysql::federated::check(query, query_ctx.clone(), self.session.clone())
Expand Down Expand Up @@ -335,6 +335,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
let _ = guard.remove(&stmt_id);
}

#[tracing::instrument(skip_all)]
async fn on_query<'a>(
&'a mut self,
query: &'a str,
Expand Down
10 changes: 9 additions & 1 deletion src/table/src/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use common_query::error::Result as QueryResult;
use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef};
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_telemetry::tracing::Span;
use common_telemetry::tracing_context::TracingContext;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion_physical_expr::PhysicalSortExpr;
Expand Down Expand Up @@ -97,14 +99,18 @@ impl PhysicalPlan for StreamScanAdapter {
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
context: Arc<TaskContext>,
) -> QueryResult<SendableRecordBatchStream> {
let tracing_context = TracingContext::from_json(context.session_id().as_str());
let span = tracing_context.attach(common_telemetry::tracing::info_span!("stream_adapter"));

let mut stream = self.stream.lock().unwrap();
let stream = stream.take().context(query_error::ExecuteRepeatedlySnafu)?;
let mem_usage_metrics = MemoryUsageMetrics::new(&self.metric, partition);
Ok(Box::pin(StreamWithMetricWrapper {
stream,
metric: mem_usage_metrics,
span,
}))
}

Expand All @@ -116,13 +122,15 @@ impl PhysicalPlan for StreamScanAdapter {
pub struct StreamWithMetricWrapper {
stream: SendableRecordBatchStream,
metric: MemoryUsageMetrics,
span: Span,
}

impl Stream for StreamWithMetricWrapper {
type Item = RecordBatchResult<RecordBatch>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let _enter = this.span.enter();
let poll = this.stream.poll_next_unpin(cx);
if let Poll::Ready(Some(Ok(record_batch))) = &poll {
let batch_mem_size = record_batch
Expand Down
Loading