Skip to content

Commit

Permalink
feat: support tracing rule sampler (#3405)
Browse files Browse the repository at this point in the history
* feat: support tracing rule sampler

* chore: simplify code
  • Loading branch information
Taylor-lagrange authored Mar 5, 2024
1 parent 4915786 commit 53f2a58
Show file tree
Hide file tree
Showing 21 changed files with 302 additions and 25 deletions.
5 changes: 3 additions & 2 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,11 @@ intermediate_path = ""
# enable_otlp_tracing = false
# tracing exporter endpoint with format `ip:port`, we use grpc oltp as exporter, default endpoint is `localhost:4317`
# otlp_endpoint = "localhost:4317"
# The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0
# tracing_sample_ratio = 1.0
# Whether to append logs to stdout. Defaults to true.
# append_stdout = true
# The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0
# [logging.tracing_sample_ratio]
# default_ratio = 0.0

# Standalone export the metrics generated by itself
# encoded to Prometheus remote-write format
Expand Down
3 changes: 3 additions & 0 deletions src/common/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(let_chains)]

pub mod logging;
mod macros;
pub mod metric;
mod panic_hook;
pub mod tracing_context;
mod tracing_sampler;

pub use logging::{init_default_ut_logging, init_global_logging};
pub use metric::dump_metrics;
Expand Down
9 changes: 6 additions & 3 deletions src/common/telemetry/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{filter, EnvFilter, Registry};

use crate::tracing_sampler::{create_sampler, TracingSampleOptions};
pub use crate::{debug, error, info, trace, warn};

const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
Expand All @@ -42,7 +43,7 @@ pub struct LoggingOptions {
pub level: Option<String>,
pub enable_otlp_tracing: bool,
pub otlp_endpoint: Option<String>,
pub tracing_sample_ratio: Option<f64>,
pub tracing_sample_ratio: Option<TracingSampleOptions>,
pub append_stdout: bool,
}

Expand Down Expand Up @@ -176,8 +177,10 @@ pub fn init_global_logging(
.expect("error parsing log level string");
let sampler = opts
.tracing_sample_ratio
.map(Sampler::TraceIdRatioBased)
.unwrap_or(Sampler::AlwaysOn);
.as_ref()
.map(create_sampler)
.map(Sampler::ParentBased)
.unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
// Must enable 'tokio_unstable' cfg to use this feature.
// For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start`
#[cfg(feature = "tokio-console")]
Expand Down
176 changes: 176 additions & 0 deletions src/common/telemetry/src/tracing_sampler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use opentelemetry::trace::{
Link, SamplingDecision, SamplingResult, SpanKind, TraceContextExt, TraceId, TraceState,
};
use opentelemetry::KeyValue;
use opentelemetry_sdk::trace::{Sampler, ShouldSample};
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct TracingSampleOptions {
pub default_ratio: f64,
pub rules: Vec<TracingSampleRule>,
}

impl Default for TracingSampleOptions {
fn default() -> Self {
Self {
default_ratio: 1.0,
rules: vec![],
}
}
}

/// Determine the sampling rate of a span according to the `rules` provided in `RuleSampler`.
/// For spans that do not hit any `rules`, the `default_ratio` is used.
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct TracingSampleRule {
pub protocol: String,
pub request_types: HashSet<String>,
pub ratio: f64,
}

impl TracingSampleRule {
pub fn match_rule(&self, protocol: &str, request_type: Option<&str>) -> Option<f64> {
if protocol == self.protocol {
if self.request_types.is_empty() {
Some(self.ratio)
} else if let Some(t) = request_type
&& self.request_types.contains(t)
{
Some(self.ratio)
} else {
None
}
} else {
None
}
}
}

impl PartialEq for TracingSampleOptions {
fn eq(&self, other: &Self) -> bool {
self.default_ratio == other.default_ratio && self.rules == other.rules
}
}
impl PartialEq for TracingSampleRule {
fn eq(&self, other: &Self) -> bool {
self.protocol == other.protocol
&& self.request_types == other.request_types
&& self.ratio == other.ratio
}
}

impl Eq for TracingSampleOptions {}
impl Eq for TracingSampleRule {}

pub fn create_sampler(opt: &TracingSampleOptions) -> Box<dyn ShouldSample> {
if opt.rules.is_empty() {
Box::new(Sampler::TraceIdRatioBased(opt.default_ratio))
} else {
Box::new(opt.clone())
}
}

impl ShouldSample for TracingSampleOptions {
fn should_sample(
&self,
parent_context: Option<&opentelemetry::Context>,
trace_id: TraceId,
_name: &str,
_span_kind: &SpanKind,
attributes: &[KeyValue],
_links: &[Link],
) -> SamplingResult {
let (mut protocol, mut request_type) = (None, None);
for kv in attributes {
match kv.key.as_str() {
"protocol" => protocol = Some(kv.value.as_str()),
"request_type" => request_type = Some(kv.value.as_str()),
_ => (),
}
}
let ratio = protocol
.and_then(|p| {
self.rules
.iter()
.find_map(|rule| rule.match_rule(p.as_ref(), request_type.as_deref()))
})
.unwrap_or(self.default_ratio);
SamplingResult {
decision: sample_based_on_probability(ratio, trace_id),
// No extra attributes ever set by the SDK samplers.
attributes: Vec::new(),
// all sampler in SDK will not modify trace state.
trace_state: match parent_context {
Some(ctx) => ctx.span().span_context().trace_state().clone(),
None => TraceState::default(),
},
}
}
}

/// The code here mainly refers to the relevant implementation of
/// [opentelemetry](https://github.com/open-telemetry/opentelemetry-rust/blob/ef4701055cc39d3448d5e5392812ded00cdd4476/opentelemetry-sdk/src/trace/sampler.rs#L229),
/// and determines whether the span needs to be collected based on the `TraceId` and sampling rate (i.e. `prob`).
fn sample_based_on_probability(prob: f64, trace_id: TraceId) -> SamplingDecision {
if prob >= 1.0 {
SamplingDecision::RecordAndSample
} else {
let prob_upper_bound = (prob.max(0.0) * (1u64 << 63) as f64) as u64;
let bytes = trace_id.to_bytes();
let (_, low) = bytes.split_at(8);
let trace_id_low = u64::from_be_bytes(low.try_into().unwrap());
let rnd_from_trace_id = trace_id_low >> 1;

if rnd_from_trace_id < prob_upper_bound {
SamplingDecision::RecordAndSample
} else {
SamplingDecision::Drop
}
}
}

#[cfg(test)]
mod test {
use std::collections::HashSet;

use crate::tracing_sampler::TracingSampleRule;

#[test]
fn test_rule() {
let rule = TracingSampleRule {
protocol: "http".to_string(),
request_types: HashSet::new(),
ratio: 1.0,
};
assert_eq!(rule.match_rule("not_http", None), None);
assert_eq!(rule.match_rule("http", None), Some(1.0));
assert_eq!(rule.match_rule("http", Some("abc")), Some(1.0));
let rule1 = TracingSampleRule {
protocol: "http".to_string(),
request_types: HashSet::from(["mysql".to_string()]),
ratio: 1.0,
};
assert_eq!(rule1.match_rule("http", None), None);
assert_eq!(rule1.match_rule("http", Some("abc")), None);
assert_eq!(rule1.match_rule("http", Some("mysql")), Some(1.0));
}
}
5 changes: 4 additions & 1 deletion src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_query::Output;
use common_telemetry::error;
use common_telemetry::logging::info;
use common_telemetry::{error, tracing};
use log_store::raft_engine::RaftEngineBackend;
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOptions;
Expand Down Expand Up @@ -276,6 +276,7 @@ impl Instance {
impl SqlQueryHandler for Instance {
type Error = Error;

#[tracing::instrument(skip_all)]
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
let query_interceptor = query_interceptor_opt.as_ref();
Expand Down Expand Up @@ -344,6 +345,7 @@ impl SqlQueryHandler for Instance {
.context(ExecLogicalPlanSnafu)
}

#[tracing::instrument(skip_all)]
async fn do_promql_query(
&self,
query: &PromQuery,
Expand Down Expand Up @@ -412,6 +414,7 @@ pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {

#[async_trait]
impl PrometheusHandler for Instance {
#[tracing::instrument(skip_all)]
async fn do_query(
&self,
query: &PromQuery,
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::tracing;
use query::parser::PromQuery;
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
use servers::query_handler::grpc::GrpcQueryHandler;
Expand Down Expand Up @@ -178,6 +179,7 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte
}

impl Instance {
#[tracing::instrument(skip_all)]
pub async fn handle_inserts(
&self,
requests: InsertRequests,
Expand All @@ -189,6 +191,7 @@ impl Instance {
.context(TableOperationSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn handle_row_inserts(
&self,
requests: RowInsertRequests,
Expand All @@ -200,6 +203,7 @@ impl Instance {
.context(TableOperationSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn handle_metric_row_inserts(
&self,
requests: RowInsertRequests,
Expand All @@ -212,6 +216,7 @@ impl Instance {
.context(TableOperationSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn handle_deletes(
&self,
requests: DeleteRequests,
Expand All @@ -223,6 +228,7 @@ impl Instance {
.context(TableOperationSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn handle_row_deletes(
&self,
requests: RowDeleteRequests,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/instance/opentsdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_error::ext::BoxedError;
use common_telemetry::tracing;
use servers::error as server_error;
use servers::error::AuthSnafu;
use servers::opentsdb::codec::DataPoint;
Expand All @@ -27,6 +28,7 @@ use crate::instance::Instance;

#[async_trait]
impl OpentsdbProtocolHandler for Instance {
#[tracing::instrument(skip_all, fields(protocol = "opentsdb"))]
async fn exec(
&self,
data_points: Vec<DataPoint>,
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_error::ext::BoxedError;
use common_telemetry::tracing;
use opentelemetry_proto::tonic::collector::metrics::v1::{
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
};
Expand All @@ -33,6 +34,7 @@ use crate::metrics::{OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};

#[async_trait]
impl OpenTelemetryProtocolHandler for Instance {
#[tracing::instrument(skip_all)]
async fn metrics(
&self,
request: ExportMetricsServiceRequest,
Expand All @@ -59,6 +61,7 @@ impl OpenTelemetryProtocolHandler for Instance {
Ok(resp)
}

#[tracing::instrument(skip_all)]
async fn traces(
&self,
request: ExportTraceServiceRequest,
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/instance/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use common_error::ext::BoxedError;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging;
use common_telemetry::{logging, tracing};
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use prost::Message;
Expand Down Expand Up @@ -87,6 +87,7 @@ async fn to_query_result(table_name: &str, output: Output) -> ServerResult<Query
}

impl Instance {
#[tracing::instrument(skip_all)]
async fn handle_remote_query(
&self,
ctx: &QueryContextRef,
Expand Down Expand Up @@ -126,6 +127,7 @@ impl Instance {
.context(ExecLogicalPlanSnafu)
}

#[tracing::instrument(skip_all)]
async fn handle_remote_queries(
&self,
ctx: QueryContextRef,
Expand Down
2 changes: 2 additions & 0 deletions src/operator/src/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl StatementExecutor {
self.create_table_inner(create_expr, None, &ctx).await
}

#[tracing::instrument(skip_all)]
pub async fn create_table_inner(
&self,
create_table: &mut CreateTableExpr,
Expand Down Expand Up @@ -408,6 +409,7 @@ impl StatementExecutor {
self.alter_table_inner(expr).await
}

#[tracing::instrument(skip_all)]
pub async fn alter_table_inner(&self, expr: AlterExpr) -> Result<Output> {
let catalog_name = if expr.catalog_name.is_empty() {
DEFAULT_CATALOG_NAME
Expand Down
Loading

0 comments on commit 53f2a58

Please sign in to comment.