diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 0a9c0873ccf6..710e79c0a62e 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -254,10 +254,11 @@ intermediate_path = "" # enable_otlp_tracing = false # tracing exporter endpoint with format `ip:port`, we use grpc oltp as exporter, default endpoint is `localhost:4317` # otlp_endpoint = "localhost:4317" -# The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0 -# tracing_sample_ratio = 1.0 # Whether to append logs to stdout. Defaults to true. # append_stdout = true +# The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0 +# [logging.tracing_sample_ratio] +# default_ratio = 0.0 # Standalone export the metrics generated by itself # encoded to Prometheus remote-write format diff --git a/src/common/telemetry/src/lib.rs b/src/common/telemetry/src/lib.rs index f13cdf72dc68..12f0098a158c 100644 --- a/src/common/telemetry/src/lib.rs +++ b/src/common/telemetry/src/lib.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(let_chains)] + pub mod logging; mod macros; pub mod metric; mod panic_hook; pub mod tracing_context; +mod tracing_sampler; pub use logging::{init_default_ut_logging, init_global_logging}; pub use metric::dump_metrics; diff --git a/src/common/telemetry/src/logging.rs b/src/common/telemetry/src/logging.rs index f3f38489e809..e4e9479a699d 100644 --- a/src/common/telemetry/src/logging.rs +++ b/src/common/telemetry/src/logging.rs @@ -31,6 +31,7 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::prelude::*; use tracing_subscriber::{filter, EnvFilter, Registry}; +use crate::tracing_sampler::{create_sampler, TracingSampleOptions}; pub use crate::{debug, error, info, trace, warn}; const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317"; @@ -42,7 +43,7 @@ pub struct LoggingOptions { pub level: Option, pub enable_otlp_tracing: bool, pub otlp_endpoint: Option, - pub tracing_sample_ratio: Option, + pub tracing_sample_ratio: Option, pub append_stdout: bool, } @@ -176,8 +177,10 @@ pub fn init_global_logging( .expect("error parsing log level string"); let sampler = opts .tracing_sample_ratio - .map(Sampler::TraceIdRatioBased) - .unwrap_or(Sampler::AlwaysOn); + .as_ref() + .map(create_sampler) + .map(Sampler::ParentBased) + .unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))); // Must enable 'tokio_unstable' cfg to use this feature. // For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start` #[cfg(feature = "tokio-console")] diff --git a/src/common/telemetry/src/tracing_sampler.rs b/src/common/telemetry/src/tracing_sampler.rs new file mode 100644 index 000000000000..843603d73080 --- /dev/null +++ b/src/common/telemetry/src/tracing_sampler.rs @@ -0,0 +1,176 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use opentelemetry::trace::{ + Link, SamplingDecision, SamplingResult, SpanKind, TraceContextExt, TraceId, TraceState, +}; +use opentelemetry::KeyValue; +use opentelemetry_sdk::trace::{Sampler, ShouldSample}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct TracingSampleOptions { + pub default_ratio: f64, + pub rules: Vec, +} + +impl Default for TracingSampleOptions { + fn default() -> Self { + Self { + default_ratio: 1.0, + rules: vec![], + } + } +} + +/// Determine the sampling rate of a span according to the `rules` provided in `RuleSampler`. +/// For spans that do not hit any `rules`, the `default_ratio` is used. +#[derive(Clone, Default, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct TracingSampleRule { + pub protocol: String, + pub request_types: HashSet, + pub ratio: f64, +} + +impl TracingSampleRule { + pub fn match_rule(&self, protocol: &str, request_type: Option<&str>) -> Option { + if protocol == self.protocol { + if self.request_types.is_empty() { + Some(self.ratio) + } else if let Some(t) = request_type + && self.request_types.contains(t) + { + Some(self.ratio) + } else { + None + } + } else { + None + } + } +} + +impl PartialEq for TracingSampleOptions { + fn eq(&self, other: &Self) -> bool { + self.default_ratio == other.default_ratio && self.rules == other.rules + } +} +impl PartialEq for TracingSampleRule { + fn eq(&self, other: &Self) -> bool { + self.protocol == other.protocol + && self.request_types == other.request_types + && self.ratio == other.ratio + } +} + +impl Eq for TracingSampleOptions {} +impl Eq for TracingSampleRule {} + +pub fn create_sampler(opt: &TracingSampleOptions) -> Box { + if opt.rules.is_empty() { + Box::new(Sampler::TraceIdRatioBased(opt.default_ratio)) + } else { + Box::new(opt.clone()) + } +} + +impl ShouldSample for TracingSampleOptions { + fn should_sample( + &self, + parent_context: Option<&opentelemetry::Context>, + trace_id: TraceId, + _name: &str, + _span_kind: &SpanKind, + attributes: &[KeyValue], + _links: &[Link], + ) -> SamplingResult { + let (mut protocol, mut request_type) = (None, None); + for kv in attributes { + match kv.key.as_str() { + "protocol" => protocol = Some(kv.value.as_str()), + "request_type" => request_type = Some(kv.value.as_str()), + _ => (), + } + } + let ratio = protocol + .and_then(|p| { + self.rules + .iter() + .find_map(|rule| rule.match_rule(p.as_ref(), request_type.as_deref())) + }) + .unwrap_or(self.default_ratio); + SamplingResult { + decision: sample_based_on_probability(ratio, trace_id), + // No extra attributes ever set by the SDK samplers. + attributes: Vec::new(), + // all sampler in SDK will not modify trace state. + trace_state: match parent_context { + Some(ctx) => ctx.span().span_context().trace_state().clone(), + None => TraceState::default(), + }, + } + } +} + +/// The code here mainly refers to the relevant implementation of +/// [opentelemetry](https://github.com/open-telemetry/opentelemetry-rust/blob/ef4701055cc39d3448d5e5392812ded00cdd4476/opentelemetry-sdk/src/trace/sampler.rs#L229), +/// and determines whether the span needs to be collected based on the `TraceId` and sampling rate (i.e. `prob`). +fn sample_based_on_probability(prob: f64, trace_id: TraceId) -> SamplingDecision { + if prob >= 1.0 { + SamplingDecision::RecordAndSample + } else { + let prob_upper_bound = (prob.max(0.0) * (1u64 << 63) as f64) as u64; + let bytes = trace_id.to_bytes(); + let (_, low) = bytes.split_at(8); + let trace_id_low = u64::from_be_bytes(low.try_into().unwrap()); + let rnd_from_trace_id = trace_id_low >> 1; + + if rnd_from_trace_id < prob_upper_bound { + SamplingDecision::RecordAndSample + } else { + SamplingDecision::Drop + } + } +} + +#[cfg(test)] +mod test { + use std::collections::HashSet; + + use crate::tracing_sampler::TracingSampleRule; + + #[test] + fn test_rule() { + let rule = TracingSampleRule { + protocol: "http".to_string(), + request_types: HashSet::new(), + ratio: 1.0, + }; + assert_eq!(rule.match_rule("not_http", None), None); + assert_eq!(rule.match_rule("http", None), Some(1.0)); + assert_eq!(rule.match_rule("http", Some("abc")), Some(1.0)); + let rule1 = TracingSampleRule { + protocol: "http".to_string(), + request_types: HashSet::from(["mysql".to_string()]), + ratio: 1.0, + }; + assert_eq!(rule1.match_rule("http", None), None); + assert_eq!(rule1.match_rule("http", Some("abc")), None); + assert_eq!(rule1.match_rule("http", Some("mysql")), Some(1.0)); + } +} diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 7cb2e5afaf8b..115613ef01dd 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -39,8 +39,8 @@ use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_query::Output; -use common_telemetry::error; use common_telemetry::logging::info; +use common_telemetry::{error, tracing}; use log_store::raft_engine::RaftEngineBackend; use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_client::MetaClientOptions; @@ -276,6 +276,7 @@ impl Instance { impl SqlQueryHandler for Instance { type Error = Error; + #[tracing::instrument(skip_all)] async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { let query_interceptor_opt = self.plugins.get::>(); let query_interceptor = query_interceptor_opt.as_ref(); @@ -344,6 +345,7 @@ impl SqlQueryHandler for Instance { .context(ExecLogicalPlanSnafu) } + #[tracing::instrument(skip_all)] async fn do_promql_query( &self, query: &PromQuery, @@ -412,6 +414,7 @@ pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output { #[async_trait] impl PrometheusHandler for Instance { + #[tracing::instrument(skip_all)] async fn do_query( &self, query: &PromQuery, diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index eb6cc4f257eb..9f2a2de2a2a8 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_meta::table_name::TableName; use common_query::Output; +use common_telemetry::tracing; use query::parser::PromQuery; use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef}; use servers::query_handler::grpc::GrpcQueryHandler; @@ -178,6 +179,7 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte } impl Instance { + #[tracing::instrument(skip_all)] pub async fn handle_inserts( &self, requests: InsertRequests, @@ -189,6 +191,7 @@ impl Instance { .context(TableOperationSnafu) } + #[tracing::instrument(skip_all)] pub async fn handle_row_inserts( &self, requests: RowInsertRequests, @@ -200,6 +203,7 @@ impl Instance { .context(TableOperationSnafu) } + #[tracing::instrument(skip_all)] pub async fn handle_metric_row_inserts( &self, requests: RowInsertRequests, @@ -212,6 +216,7 @@ impl Instance { .context(TableOperationSnafu) } + #[tracing::instrument(skip_all)] pub async fn handle_deletes( &self, requests: DeleteRequests, @@ -223,6 +228,7 @@ impl Instance { .context(TableOperationSnafu) } + #[tracing::instrument(skip_all)] pub async fn handle_row_deletes( &self, requests: RowDeleteRequests, diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 47bb940a1bb2..4269e9233918 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -15,6 +15,7 @@ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_error::ext::BoxedError; +use common_telemetry::tracing; use servers::error as server_error; use servers::error::AuthSnafu; use servers::opentsdb::codec::DataPoint; @@ -27,6 +28,7 @@ use crate::instance::Instance; #[async_trait] impl OpentsdbProtocolHandler for Instance { + #[tracing::instrument(skip_all, fields(protocol = "opentsdb"))] async fn exec( &self, data_points: Vec, diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 6dd73d407040..22b2d3307da5 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -15,6 +15,7 @@ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_error::ext::BoxedError; +use common_telemetry::tracing; use opentelemetry_proto::tonic::collector::metrics::v1::{ ExportMetricsServiceRequest, ExportMetricsServiceResponse, }; @@ -33,6 +34,7 @@ use crate::metrics::{OTLP_METRICS_ROWS, OTLP_TRACES_ROWS}; #[async_trait] impl OpenTelemetryProtocolHandler for Instance { + #[tracing::instrument(skip_all)] async fn metrics( &self, request: ExportMetricsServiceRequest, @@ -59,6 +61,7 @@ impl OpenTelemetryProtocolHandler for Instance { Ok(resp) } + #[tracing::instrument(skip_all)] async fn traces( &self, request: ExportTraceServiceRequest, diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index 7b4ac281a000..88d350e4c50b 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -23,7 +23,7 @@ use common_error::ext::BoxedError; use common_query::prelude::GREPTIME_PHYSICAL_TABLE; use common_query::Output; use common_recordbatch::RecordBatches; -use common_telemetry::logging; +use common_telemetry::{logging, tracing}; use operator::insert::InserterRef; use operator::statement::StatementExecutor; use prost::Message; @@ -87,6 +87,7 @@ async fn to_query_result(table_name: &str, output: Output) -> ServerResult Result { let catalog_name = if expr.catalog_name.is_empty() { DEFAULT_CATALOG_NAME diff --git a/src/query/src/parser.rs b/src/query/src/parser.rs index 5c66a9b65924..92cab91481b3 100644 --- a/src/query/src/parser.rs +++ b/src/query/src/parser.rs @@ -20,6 +20,7 @@ use std::time::{Duration, SystemTime}; use chrono::DateTime; use common_error::ext::{BoxedError, PlainError}; use common_error::status_code::StatusCode; +use common_telemetry::tracing; use promql_parser::parser::ast::{Extension as NodeExtension, ExtensionExpr}; use promql_parser::parser::Expr::Extension; use promql_parser::parser::{EvalStmt, Expr, ValueType}; @@ -132,6 +133,7 @@ impl QueryLanguageParser { } /// Try to parse PromQL, return the statement when success. + #[tracing::instrument(skip_all)] pub fn parse_promql(query: &PromQuery, _query_ctx: &QueryContextRef) -> Result { let _timer = PARSE_PROMQL_ELAPSED.start_timer(); diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index 63c06d5eed36..0283780b5b5c 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -26,7 +26,8 @@ use arrow_flight::{ use async_trait::async_trait; use common_grpc::flight::{FlightEncoder, FlightMessage}; use common_query::Output; -use common_telemetry::tracing_context::TracingContext; +use common_telemetry::tracing::info_span; +use common_telemetry::tracing_context::{FutureExt, TracingContext}; use futures::Stream; use prost::Message; use snafu::ResultExt; @@ -34,7 +35,7 @@ use tonic::{Request, Response, Status, Streaming}; use crate::error; pub use crate::grpc::flight::stream::FlightRecordBatchStream; -use crate::grpc::greptime_handler::GreptimeRequestHandler; +use crate::grpc::greptime_handler::{get_request_type, GreptimeRequestHandler}; use crate::grpc::TonicResult; pub type TonicStream = Pin> + Send + Sync + 'static>>; @@ -152,11 +153,20 @@ impl FlightCraft for GreptimeRequestHandler { let request = GreptimeRequest::decode(ticket.as_ref()).context(error::InvalidFlightTicketSnafu)?; - let output = self.handle_request(request).await?; - - let stream: Pin> + Send + Sync>> = - to_flight_data_stream(output, TracingContext::new()); - Ok(Response::new(stream)) + // The Grpc protocol pass query by Flight. It needs to be wrapped under a span, in order to record stream + let span = info_span!( + "GreptimeRequestHandler::do_get", + protocol = "grpc", + request_type = get_request_type(&request) + ); + async { + let output = self.handle_request(request).await?; + let stream: Pin> + Send + Sync>> = + to_flight_data_stream(output, TracingContext::from_current_span()); + Ok(Response::new(stream)) + } + .trace(span) + .await } } diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index 3b4bc803ed90..19a4e1d373e0 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -27,7 +27,8 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_query::Output; use common_runtime::Runtime; -use common_telemetry::logging; +use common_telemetry::tracing_context::{FutureExt, TracingContext}; +use common_telemetry::{logging, tracing}; use common_time::timezone::parse_timezone; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; @@ -57,6 +58,7 @@ impl GreptimeRequestHandler { } } + #[tracing::instrument(skip_all, fields(protocol = "grpc", request_type = get_request_type(&request)))] pub(crate) async fn handle_request(&self, request: GreptimeRequest) -> Result { let query = request.request.context(InvalidQuerySnafu { reason: "Expecting non-empty GreptimeRequest.", @@ -79,16 +81,23 @@ impl GreptimeRequestHandler { // - Obtaining a `JoinHandle` to get the panic message (if there's any). // From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped. // 2. avoid the handler blocks the gRPC runtime incidentally. + let tracing_context = TracingContext::from_current_span(); let handle = self.runtime.spawn(async move { - handler.do_query(query, query_ctx).await.map_err(|e| { - if e.status_code().should_log_error() { - logging::error!(e; "Failed to handle request"); - } else { - // Currently, we still print a debug log. - logging::debug!("Failed to handle request, err: {:?}", e); - } - e - }) + handler + .do_query(query, query_ctx) + .trace(tracing_context.attach(tracing::info_span!( + "GreptimeRequestHandler::handle_request_runtime" + ))) + .await + .map_err(|e| { + if e.status_code().should_log_error() { + logging::error!(e; "Failed to handle request"); + } else { + // Currently, we still print a debug log. + logging::debug!("Failed to handle request, err: {:?}", e); + } + e + }) }); handle.await.context(JoinTaskSnafu).map_err(|e| { @@ -98,6 +107,14 @@ impl GreptimeRequestHandler { } } +pub fn get_request_type(request: &GreptimeRequest) -> &'static str { + request + .request + .as_ref() + .map(request_type) + .unwrap_or_default() +} + pub(crate) async fn auth( user_provider: Option, header: Option<&RequestHeader>, diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 89bfaf138353..f8a51d02a617 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -27,6 +27,7 @@ use common_plugins::GREPTIME_EXEC_PREFIX; use common_query::physical_plan::PhysicalPlan; use common_query::Output; use common_recordbatch::util; +use common_telemetry::tracing; use datafusion::physical_plan::metrics::MetricValue; use query::parser::PromQuery; use schemars::JsonSchema; @@ -66,6 +67,7 @@ pub struct SqlQuery { /// Handler to execute sql #[axum_macros::debug_handler] +#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))] pub async fn sql( State(state): State, Query(query_params): Query, @@ -250,6 +252,7 @@ impl From for PromQuery { /// Handler to execute promql #[axum_macros::debug_handler] +#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "promql"))] pub async fn promql( State(state): State, Query(params): Query, diff --git a/src/servers/src/http/influxdb.rs b/src/servers/src/http/influxdb.rs index de13085d8818..9ced1557bf0c 100644 --- a/src/servers/src/http/influxdb.rs +++ b/src/servers/src/http/influxdb.rs @@ -20,6 +20,7 @@ use axum::response::IntoResponse; use axum::Extension; use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_grpc::writer::Precision; +use common_telemetry::tracing; use session::context::QueryContextRef; use crate::error::{Result, TimePrecisionSnafu}; @@ -39,6 +40,7 @@ pub async fn influxdb_health() -> Result { } #[axum_macros::debug_handler] +#[tracing::instrument(skip_all, fields(protocol = "influxdb", request_type = "write_v1"))] pub async fn influxdb_write_v1( State(handler): State, Query(mut params): Query>, @@ -58,6 +60,7 @@ pub async fn influxdb_write_v1( } #[axum_macros::debug_handler] +#[tracing::instrument(skip_all, fields(protocol = "influxdb", request_type = "write_v2"))] pub async fn influxdb_write_v2( State(handler): State, Query(mut params): Query>, diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 9fa552c32dad..b113ae951c52 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -16,6 +16,7 @@ use axum::extract::{RawBody, State}; use axum::http::header; use axum::response::IntoResponse; use axum::Extension; +use common_telemetry::tracing; use hyper::Body; use opentelemetry_proto::tonic::collector::metrics::v1::{ ExportMetricsServiceRequest, ExportMetricsServiceResponse, @@ -31,6 +32,7 @@ use crate::error::{self, Result}; use crate::query_handler::OpenTelemetryProtocolHandlerRef; #[axum_macros::debug_handler] +#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "metrics"))] pub async fn metrics( State(handler): State, Extension(query_ctx): Extension, @@ -69,6 +71,7 @@ impl IntoResponse for OtlpMetricsResponse { } #[axum_macros::debug_handler] +#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))] pub async fn traces( State(handler): State, Extension(query_ctx): Extension, diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index a6b74d324b1b..9188755028b5 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -21,6 +21,7 @@ use axum::response::IntoResponse; use axum::Extension; use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_query::prelude::GREPTIME_PHYSICAL_TABLE; +use common_telemetry::tracing; use hyper::Body; use prost::Message; use schemars::JsonSchema; @@ -75,6 +76,10 @@ pub async fn route_write_without_metric_engine( } #[axum_macros::debug_handler] +#[tracing::instrument( + skip_all, + fields(protocol = "prometheus", request_type = "remote_write") +)] pub async fn remote_write( State(handler): State, Query(params): Query, @@ -111,6 +116,10 @@ impl IntoResponse for PromStoreResponse { } #[axum_macros::debug_handler] +#[tracing::instrument( + skip_all, + fields(protocol = "prometheus", request_type = "remote_read") +)] pub async fn remote_read( State(handler): State, Query(params): Query, diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index 89e86a526201..7cfa6d5715b5 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -25,6 +25,7 @@ use common_error::status_code::StatusCode; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_query::Output; use common_recordbatch::RecordBatches; +use common_telemetry::tracing; use common_time::util::{current_time_rfc3339, yesterday_rfc3339}; use datatypes::prelude::ConcreteDataType; use datatypes::scalars::ScalarVector; @@ -87,6 +88,10 @@ pub struct FormatQuery { } #[axum_macros::debug_handler] +#[tracing::instrument( + skip_all, + fields(protocol = "prometheus", request_type = "format_query") +)] pub async fn format_query( State(_handler): State, Query(params): Query, @@ -115,6 +120,10 @@ pub struct InstantQuery { } #[axum_macros::debug_handler] +#[tracing::instrument( + skip_all, + fields(protocol = "prometheus", request_type = "instant_query") +)] pub async fn instant_query( State(handler): State, Query(params): Query, @@ -154,6 +163,10 @@ pub struct RangeQuery { } #[axum_macros::debug_handler] +#[tracing::instrument( + skip_all, + fields(protocol = "prometheus", request_type = "range_query") +)] pub async fn range_query( State(handler): State, Query(params): Query, @@ -222,6 +235,10 @@ impl<'de> Deserialize<'de> for Matches { } #[axum_macros::debug_handler] +#[tracing::instrument( + skip_all, + fields(protocol = "prometheus", request_type = "labels_query") +)] pub async fn labels_query( State(handler): State, Query(params): Query, @@ -486,6 +503,10 @@ pub struct LabelValueQuery { } #[axum_macros::debug_handler] +#[tracing::instrument( + skip_all, + fields(protocol = "prometheus", request_type = "label_values_query") +)] pub async fn label_values_query( State(handler): State, Path(label_name): Path, @@ -610,6 +631,10 @@ pub struct SeriesQuery { } #[axum_macros::debug_handler] +#[tracing::instrument( + skip_all, + fields(protocol = "prometheus", request_type = "series_query") +)] pub async fn series_query( State(handler): State, Query(params): Query, diff --git a/src/servers/src/mysql/handler.rs b/src/servers/src/mysql/handler.rs index 43a7dcdad93c..0f5fce59f369 100644 --- a/src/servers/src/mysql/handler.rs +++ b/src/servers/src/mysql/handler.rs @@ -335,7 +335,7 @@ impl AsyncMysqlShim for MysqlInstanceShi let _ = guard.remove(&stmt_id); } - #[tracing::instrument(skip_all)] + #[tracing::instrument(skip_all, fields(protocol = "mysql"))] async fn on_query<'a>( &'a mut self, query: &'a str, diff --git a/src/servers/src/postgres/handler.rs b/src/servers/src/postgres/handler.rs index 12a05531eb39..050ba1488c35 100644 --- a/src/servers/src/postgres/handler.rs +++ b/src/servers/src/postgres/handler.rs @@ -19,6 +19,7 @@ use common_error::ext::ErrorExt; use common_query::Output; use common_recordbatch::error::Result as RecordBatchResult; use common_recordbatch::RecordBatch; +use common_telemetry::tracing; use datatypes::schema::SchemaRef; use futures::{future, stream, Stream, StreamExt}; use pgwire::api::portal::{Format, Portal}; @@ -40,6 +41,7 @@ use crate::SqlPlan; #[async_trait] impl SimpleQueryHandler for PostgresServerHandler { + #[tracing::instrument(skip_all, fields(protocol = "postgres"))] async fn do_query<'a, C>( &self, _client: &mut C, diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index d4017cd4e107..f86d30781c82 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -23,6 +23,7 @@ use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest}; use api::v1::RowInsertRequests; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_recordbatch::{RecordBatch, RecordBatches}; +use common_telemetry::tracing; use common_time::timestamp::TimeUnit; use datafusion::prelude::{col, lit, regexp_match, Expr}; use datafusion_common::ScalarValue; @@ -62,6 +63,7 @@ pub fn table_name(q: &Query) -> Result { } /// Create a DataFrame from a remote Query +#[tracing::instrument(skip_all)] pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result { let DataFrame::DataFusion(dataframe) = dataframe;