Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support tracing rule sampler #3405

Merged
merged 2 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,11 @@ intermediate_path = ""
# enable_otlp_tracing = false
# tracing exporter endpoint with format `ip:port`, we use grpc oltp as exporter, default endpoint is `localhost:4317`
# otlp_endpoint = "localhost:4317"
# The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0
# tracing_sample_ratio = 1.0
# Whether to append logs to stdout. Defaults to true.
# append_stdout = true
# The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0
# [logging.tracing_sample_ratio]
# default_ratio = 0.0

# Standalone export the metrics generated by itself
# encoded to Prometheus remote-write format
Expand Down
3 changes: 3 additions & 0 deletions src/common/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(let_chains)]

pub mod logging;
mod macros;
pub mod metric;
mod panic_hook;
pub mod tracing_context;
mod tracing_sampler;

pub use logging::{init_default_ut_logging, init_global_logging};
pub use metric::dump_metrics;
Expand Down
9 changes: 6 additions & 3 deletions src/common/telemetry/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{filter, EnvFilter, Registry};

use crate::tracing_sampler::{create_sampler, TracingSampleOptions};
pub use crate::{debug, error, info, trace, warn};

const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
Expand All @@ -42,7 +43,7 @@ pub struct LoggingOptions {
pub level: Option<String>,
pub enable_otlp_tracing: bool,
pub otlp_endpoint: Option<String>,
pub tracing_sample_ratio: Option<f64>,
pub tracing_sample_ratio: Option<TracingSampleOptions>,
pub append_stdout: bool,
}

Expand Down Expand Up @@ -176,8 +177,10 @@ pub fn init_global_logging(
.expect("error parsing log level string");
let sampler = opts
.tracing_sample_ratio
.map(Sampler::TraceIdRatioBased)
.unwrap_or(Sampler::AlwaysOn);
.as_ref()
.map(create_sampler)
.map(Sampler::ParentBased)
.unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
// Must enable 'tokio_unstable' cfg to use this feature.
// For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start`
#[cfg(feature = "tokio-console")]
Expand Down
176 changes: 176 additions & 0 deletions src/common/telemetry/src/tracing_sampler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use opentelemetry::trace::{
Link, SamplingDecision, SamplingResult, SpanKind, TraceContextExt, TraceId, TraceState,
};
use opentelemetry::KeyValue;
use opentelemetry_sdk::trace::{Sampler, ShouldSample};
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct TracingSampleOptions {
pub default_ratio: f64,
pub rules: Vec<TracingSampleRule>,
}

impl Default for TracingSampleOptions {
fn default() -> Self {
Self {
default_ratio: 1.0,
waynexia marked this conversation as resolved.
Show resolved Hide resolved
rules: vec![],
}
}
}

/// Determine the sampling rate of a span according to the `rules` provided in `RuleSampler`.
/// For spans that do not hit any `rules`, the `default_ratio` is used.
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct TracingSampleRule {
pub protocol: String,
pub request_types: HashSet<String>,
pub ratio: f64,
}

impl TracingSampleRule {
pub fn match_rule(&self, protocol: &str, request_type: Option<&str>) -> Option<f64> {
if protocol == self.protocol {
if self.request_types.is_empty() {
Some(self.ratio)
} else if let Some(t) = request_type
&& self.request_types.contains(t)
{
Some(self.ratio)
} else {
None
}
} else {
None
}
}
}

impl PartialEq for TracingSampleOptions {
fn eq(&self, other: &Self) -> bool {
self.default_ratio == other.default_ratio && self.rules == other.rules
}
}
impl PartialEq for TracingSampleRule {
fn eq(&self, other: &Self) -> bool {
self.protocol == other.protocol
&& self.request_types == other.request_types
&& self.ratio == other.ratio
}
}

impl Eq for TracingSampleOptions {}
impl Eq for TracingSampleRule {}

pub fn create_sampler(opt: &TracingSampleOptions) -> Box<dyn ShouldSample> {
if opt.rules.is_empty() {
Box::new(Sampler::TraceIdRatioBased(opt.default_ratio))
} else {
Box::new(opt.clone())
}
}

impl ShouldSample for TracingSampleOptions {
fn should_sample(
&self,
parent_context: Option<&opentelemetry::Context>,
trace_id: TraceId,
_name: &str,
_span_kind: &SpanKind,
attributes: &[KeyValue],
_links: &[Link],
) -> SamplingResult {
let (mut protocol, mut request_type) = (None, None);
for kv in attributes {
match kv.key.as_str() {
"protocol" => protocol = Some(kv.value.as_str()),
"request_type" => request_type = Some(kv.value.as_str()),
_ => (),
}
}
let ratio = protocol
.and_then(|p| {
self.rules
.iter()
.find_map(|rule| rule.match_rule(p.as_ref(), request_type.as_deref()))
})
.unwrap_or(self.default_ratio);
SamplingResult {
decision: sample_based_on_probability(ratio, trace_id),
// No extra attributes ever set by the SDK samplers.
attributes: Vec::new(),
// all sampler in SDK will not modify trace state.
trace_state: match parent_context {
Some(ctx) => ctx.span().span_context().trace_state().clone(),
None => TraceState::default(),
},
}
}
}

/// The code here mainly refers to the relevant implementation of
/// [opentelemetry](https://github.com/open-telemetry/opentelemetry-rust/blob/ef4701055cc39d3448d5e5392812ded00cdd4476/opentelemetry-sdk/src/trace/sampler.rs#L229),
/// and determines whether the span needs to be collected based on the `TraceId` and sampling rate (i.e. `prob`).
fn sample_based_on_probability(prob: f64, trace_id: TraceId) -> SamplingDecision {
if prob >= 1.0 {
SamplingDecision::RecordAndSample
} else {
let prob_upper_bound = (prob.max(0.0) * (1u64 << 63) as f64) as u64;
let bytes = trace_id.to_bytes();
let (_, low) = bytes.split_at(8);
let trace_id_low = u64::from_be_bytes(low.try_into().unwrap());
let rnd_from_trace_id = trace_id_low >> 1;

if rnd_from_trace_id < prob_upper_bound {
SamplingDecision::RecordAndSample
} else {
SamplingDecision::Drop
}
}
}

#[cfg(test)]
mod test {
use std::collections::HashSet;

use crate::tracing_sampler::TracingSampleRule;

#[test]
fn test_rule() {
let rule = TracingSampleRule {
protocol: "http".to_string(),
request_types: HashSet::new(),
ratio: 1.0,
};
assert_eq!(rule.match_rule("not_http", None), None);
assert_eq!(rule.match_rule("http", None), Some(1.0));
assert_eq!(rule.match_rule("http", Some("abc")), Some(1.0));
let rule1 = TracingSampleRule {
protocol: "http".to_string(),
request_types: HashSet::from(["mysql".to_string()]),
ratio: 1.0,
};
assert_eq!(rule1.match_rule("http", None), None);
assert_eq!(rule1.match_rule("http", Some("abc")), None);
assert_eq!(rule1.match_rule("http", Some("mysql")), Some(1.0));
}
}
5 changes: 4 additions & 1 deletion src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_query::Output;
use common_telemetry::error;
use common_telemetry::logging::info;
use common_telemetry::{error, tracing};
use log_store::raft_engine::RaftEngineBackend;
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOptions;
Expand Down Expand Up @@ -275,6 +275,7 @@ impl Instance {
impl SqlQueryHandler for Instance {
type Error = Error;

#[tracing::instrument(skip_all)]
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
let _timer = metrics::METRIC_HANDLE_SQL_ELAPSED.start_timer();
let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
Expand Down Expand Up @@ -345,6 +346,7 @@ impl SqlQueryHandler for Instance {
.context(ExecLogicalPlanSnafu)
}

#[tracing::instrument(skip_all)]
async fn do_promql_query(
&self,
query: &PromQuery,
Expand Down Expand Up @@ -400,6 +402,7 @@ impl SqlQueryHandler for Instance {

#[async_trait]
impl PrometheusHandler for Instance {
#[tracing::instrument(skip_all)]
async fn do_query(
&self,
query: &PromQuery,
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::tracing;
use query::parser::PromQuery;
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
use servers::query_handler::grpc::GrpcQueryHandler;
Expand Down Expand Up @@ -173,6 +174,7 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte
}

impl Instance {
#[tracing::instrument(skip_all)]
pub async fn handle_inserts(
&self,
requests: InsertRequests,
Expand All @@ -184,6 +186,7 @@ impl Instance {
.context(TableOperationSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn handle_row_inserts(
&self,
requests: RowInsertRequests,
Expand All @@ -195,6 +198,7 @@ impl Instance {
.context(TableOperationSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn handle_metric_row_inserts(
&self,
requests: RowInsertRequests,
Expand All @@ -207,6 +211,7 @@ impl Instance {
.context(TableOperationSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn handle_deletes(
&self,
requests: DeleteRequests,
Expand All @@ -218,6 +223,7 @@ impl Instance {
.context(TableOperationSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn handle_row_deletes(
&self,
requests: RowDeleteRequests,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/instance/opentsdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_error::ext::BoxedError;
use common_telemetry::tracing;
use servers::error as server_error;
use servers::error::AuthSnafu;
use servers::opentsdb::codec::DataPoint;
Expand All @@ -27,6 +28,7 @@ use crate::instance::Instance;

#[async_trait]
impl OpentsdbProtocolHandler for Instance {
#[tracing::instrument(skip_all, fields(protocol = "opentsdb"))]
async fn exec(
&self,
data_points: Vec<DataPoint>,
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_error::ext::BoxedError;
use common_telemetry::tracing;
use opentelemetry_proto::tonic::collector::metrics::v1::{
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
};
Expand All @@ -33,6 +34,7 @@ use crate::metrics::{OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};

#[async_trait]
impl OpenTelemetryProtocolHandler for Instance {
#[tracing::instrument(skip_all)]
async fn metrics(
&self,
request: ExportMetricsServiceRequest,
Expand All @@ -59,6 +61,7 @@ impl OpenTelemetryProtocolHandler for Instance {
Ok(resp)
}

#[tracing::instrument(skip_all)]
async fn traces(
&self,
request: ExportTraceServiceRequest,
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/instance/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use common_error::ext::BoxedError;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging;
use common_telemetry::{logging, tracing};
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use prost::Message;
Expand Down Expand Up @@ -87,6 +87,7 @@ async fn to_query_result(table_name: &str, output: Output) -> ServerResult<Query
}

impl Instance {
#[tracing::instrument(skip_all)]
async fn handle_remote_query(
&self,
ctx: &QueryContextRef,
Expand Down Expand Up @@ -126,6 +127,7 @@ impl Instance {
.context(ExecLogicalPlanSnafu)
}

#[tracing::instrument(skip_all)]
async fn handle_remote_queries(
&self,
ctx: QueryContextRef,
Expand Down
2 changes: 2 additions & 0 deletions src/operator/src/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl StatementExecutor {
self.create_table_inner(create_expr, None, &ctx).await
}

#[tracing::instrument(skip_all)]
pub async fn create_table_inner(
&self,
create_table: &mut CreateTableExpr,
Expand Down Expand Up @@ -367,6 +368,7 @@ impl StatementExecutor {
self.alter_table_inner(expr).await
}

#[tracing::instrument(skip_all)]
pub async fn alter_table_inner(&self, expr: AlterExpr) -> Result<Output> {
let catalog_name = if expr.catalog_name.is_empty() {
DEFAULT_CATALOG_NAME
Expand Down
Loading
Loading