Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for tracing::Span::recorded fields in metrics-tracing-context #408

Merged
merged 16 commits into from
Nov 29, 2023
4 changes: 4 additions & 0 deletions metrics-tracing-context/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased] - ReleaseDate

### Added

- Support for dynamism using `tracing::Span::record` to add label values. ([#408](https://github.com/metrics-rs/metrics/pull/408))

## [0.14.0] - 2023-04-16

### Changed
Expand Down
6 changes: 5 additions & 1 deletion metrics-tracing-context/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@ itoa = { version = "1", default-features = false }
metrics = { version = "^0.21", path = "../metrics" }
metrics-util = { version = "^0.15", path = "../metrics-util" }
lockfree-object-pool = { version = "0.1.3", default-features = false }
indexmap = { version = "2.1", default-features = false, features = ["std"] }
once_cell = { version = "1", default-features = false, features = ["std"] }
tracing = { version = "0.1.29", default-features = false }
tracing-core = { version = "0.1.21", default-features = false }
tracing-subscriber = { version = "0.3.1", default-features = false, features = ["std"] }
tracing-test = "0.2.4"
zohnannor marked this conversation as resolved.
Show resolved Hide resolved

[dev-dependencies]
criterion = { version = "=0.3.3", default-features = false }
parking_lot = { version = "0.12.1", default-features = false }
tracing = { version = "0.1.29", default-features = false, features = ["std"] }
tracing-subscriber = { version = "0.3.1", default-features = false, features = ["registry"] }
tracing-subscriber = { version = "0.3.1", default-features = false, features = ["registry", "fmt"] }
pretty_assertions = "*"
itertools = "*"
11 changes: 7 additions & 4 deletions metrics-tracing-context/benches/visit.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use indexmap::IndexMap;
use lockfree_object_pool::LinearObjectPool;
use metrics::Label;
use metrics::SharedString;
use metrics_tracing_context::Labels;
use once_cell::sync::OnceCell;
use tracing::Metadata;
Expand All @@ -13,9 +14,11 @@ use tracing_core::{
Callsite, Interest,
};

fn get_pool() -> &'static Arc<LinearObjectPool<Vec<Label>>> {
static POOL: OnceCell<Arc<LinearObjectPool<Vec<Label>>>> = OnceCell::new();
POOL.get_or_init(|| Arc::new(LinearObjectPool::new(|| Vec::new(), |vec| vec.clear())))
type Map = IndexMap<SharedString, SharedString>;

fn get_pool() -> &'static Arc<LinearObjectPool<Map>> {
static POOL: OnceCell<Arc<LinearObjectPool<Map>>> = OnceCell::new();
POOL.get_or_init(|| Arc::new(LinearObjectPool::new(Map::new, Map::clear)))
}

const BATCH_SIZE: usize = 1000;
Expand Down
83 changes: 41 additions & 42 deletions metrics-tracing-context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,25 @@
//!
//! # Implementation
//!
//! The integration layer works by capturing all fields present when a span is created and storing
//! them as an extension to the span. If a metric is emitted while a span is entered, we check that
//! span to see if it has any fields in the extension data, and if it does, we add those fields as
//! labels to the metric key.
//!
//! There are two important behaviors to be aware of:
//! - we only capture the fields present when the span is created
//! - we store all fields that a span has, including the fields of its parent span(s)
//!
//! ## Lack of dynamism
//!
//! This means that if you use [`Span::record`][tracing::Span::record] to add fields to a span after
//! it has been created, those fields will not be captured and added to your metric key.
//! The integration layer works by capturing all fields that are present when a span is created,
//! as well as fields recorded after the fact, and storing them as an extension to the span. If
//! a metric is emitted while a span is entered, any fields captured for that span will be added
//! to the metric as additional labels.
//!
//! Be aware that we recursively capture the fields of a span, including fields from
//! parent spans, and use them when generating metric labels. This means that if a metric is being
//! emitted in span B, which is a child of span A, and span A has field X, and span B has field Y,
//! then the metric labels will include both field X and Y. This applies regardless of how many
//! nested spans are currently entered.
//!
//! ## Duplicate span fields
//!
//! When span fields are captured, they are deduplicated such that only the most recent value is kept.
//! For merging parent span fields into the current span fields, the fields from the current span have
//! the highest priority. Additionally, when using [`Span::record`][tracing::Span::record] to add fields
//! to a span after it has been created, the same behavior applies. This means that recording a field
//! multiple times only keeps the most recently recorded value, including if a field was already present
//! from a parent span and is then recorded dynamically in the current span.
//!
//! ## Span fields and ancestry
//!
Expand Down Expand Up @@ -104,7 +110,7 @@ pub mod label_filter;
mod tracing_integration;

pub use label_filter::LabelFilter;
use tracing_integration::WithContext;
use tracing_integration::Map;
pub use tracing_integration::{Labels, MetricsLayer};

/// [`TracingContextLayer`] provides an implementation of a [`Layer`] for [`TracingContext`].
Expand Down Expand Up @@ -169,34 +175,27 @@ where
// doing the iteration would likely exceed the cost of simply constructing the new key.
tracing::dispatcher::get_default(|dispatch| {
let current = dispatch.current_span();
if let Some(id) = current.id() {
// We're currently within a live tracing span, so see if we have an available
// metrics context to grab any fields/labels out of.
if let Some(ctx) = dispatch.downcast_ref::<WithContext>() {
let mut f = |new_labels: &[Label]| {
if !new_labels.is_empty() {
let (name, mut labels) = key.clone().into_parts();

let filtered_labels = new_labels
.iter()
.filter(|label| {
self.label_filter.should_include_label(&name, label)
})
.cloned();
labels.extend(filtered_labels);

Some(Key::from_parts(name, labels))
} else {
None
}
};

// Pull in the span's fields/labels if they exist.
return ctx.with_labels(dispatch, id, &mut f);
}
}

None
let id = current.id()?;
let ctx = dispatch.downcast_ref::<MetricsLayer>()?;

let mut f = |mut span_labels: Map| {
(!span_labels.is_empty()).then(|| {
let (name, labels) = key.clone().into_parts();

span_labels.extend(labels.into_iter().map(Label::into_parts));

let labels = span_labels
.into_iter()
.map(|(key, value)| Label::new(key, value))
.filter(|label| self.label_filter.should_include_label(&name, label))
.collect::<Vec<_>>();
zohnannor marked this conversation as resolved.
Show resolved Hide resolved

Key::from_parts(name, labels)
})
};

// Pull in the span's fields/labels if they exist.
ctx.with_labels(dispatch, id, &mut f)
})
}
}
Expand Down
152 changes: 76 additions & 76 deletions metrics-tracing-context/src/tracing_integration.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,50 @@
//! The code that integrates with the `tracing` crate.

use indexmap::IndexMap;
use lockfree_object_pool::{LinearObjectPool, LinearOwnedReusable};
use metrics::{Key, Label};
use metrics::{Key, SharedString};
use once_cell::sync::OnceCell;
use std::cmp;
use std::sync::Arc;
use std::{any::TypeId, marker::PhantomData};
use tracing::span;
use tracing_core::span::{Attributes, Id, Record};
use tracing_core::{field::Visit, Dispatch, Field, Subscriber};
use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer};

fn get_pool() -> &'static Arc<LinearObjectPool<Vec<Label>>> {
static POOL: OnceCell<Arc<LinearObjectPool<Vec<Label>>>> = OnceCell::new();
POOL.get_or_init(|| Arc::new(LinearObjectPool::new(Vec::new, Vec::clear)))
pub(crate) type Map = IndexMap<SharedString, SharedString>;

fn get_pool() -> &'static Arc<LinearObjectPool<Map>> {
static POOL: OnceCell<Arc<LinearObjectPool<Map>>> = OnceCell::new();
POOL.get_or_init(|| Arc::new(LinearObjectPool::new(Map::new, Map::clear)))
}

/// Span fields mapped as metrics labels.
///
/// Hidden from documentation as there is no need for end users to ever touch this type, but it must
/// be public in order to be pulled in by external benchmark code.
#[doc(hidden)]
pub struct Labels(pub LinearOwnedReusable<Vec<Label>>);
pub struct Labels(pub LinearOwnedReusable<Map>);

impl Labels {
pub(crate) fn extend_from_labels(&mut self, other: &Labels) {
self.0.extend_from_slice(other.as_ref());
fn extend(&mut self, other: &Labels, f: impl Fn(&mut Map, &SharedString, &SharedString)) {
let new_len = cmp::max(self.as_ref().len(), other.as_ref().len());
let additional = new_len - self.as_ref().len();
self.0.reserve(additional);
for (k, v) in other.as_ref() {
f(&mut self.0, k, v);
}
}

fn extend_from_labels(&mut self, other: &Labels) {
self.extend(other, |map, k, v| {
map.entry(k.clone()).or_insert_with(|| v.clone());
});
}

fn extend_from_labels_overwrite(&mut self, other: &Labels) {
self.extend(other, |map, k, v| {
map.insert(k.clone(), v.clone());
});
}
}

Expand All @@ -34,108 +56,91 @@ impl Default for Labels {

impl Visit for Labels {
fn record_str(&mut self, field: &Field, value: &str) {
let label = Label::new(field.name(), value.to_string());
self.0.push(label);
self.0.insert(field.name().into(), value.to_owned().into());
}

fn record_bool(&mut self, field: &Field, value: bool) {
let label = Label::from_static_parts(field.name(), if value { "true" } else { "false" });
self.0.push(label);
self.0.insert(field.name().into(), if value { "true" } else { "false" }.into());
}

fn record_i64(&mut self, field: &Field, value: i64) {
let mut buf = itoa::Buffer::new();
let s = buf.format(value);
let label = Label::new(field.name(), s.to_string());
self.0.push(label);
self.0.insert(field.name().into(), s.to_owned().into());
}

fn record_u64(&mut self, field: &Field, value: u64) {
let mut buf = itoa::Buffer::new();
let s = buf.format(value);
let label = Label::new(field.name(), s.to_string());
self.0.push(label);
self.0.insert(field.name().into(), s.to_owned().into());
}

fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
let value_string = format!("{:?}", value);
let label = Label::new(field.name(), value_string);
self.0.push(label);
self.0.insert(field.name().into(), format!("{value:?}").into());
}
}

impl Labels {
fn from_attributes(attrs: &Attributes<'_>) -> Labels {
fn from_record(record: &Record) -> Labels {
let mut labels = Labels::default();
let record = Record::new(attrs.values());
record.record(&mut labels);
labels
}
}

impl AsRef<[Label]> for Labels {
fn as_ref(&self) -> &[Label] {
impl AsRef<Map> for Labels {
fn as_ref(&self) -> &Map {
&self.0
}
}

pub struct WithContext {
with_labels: fn(&Dispatch, &Id, f: &mut dyn FnMut(&Labels) -> Option<Key>) -> Option<Key>,
}

impl WithContext {
pub fn with_labels(
&self,
dispatch: &Dispatch,
id: &Id,
f: &mut dyn FnMut(&[Label]) -> Option<Key>,
) -> Option<Key> {
let mut ff = |labels: &Labels| f(labels.as_ref());
(self.with_labels)(dispatch, id, &mut ff)
}
}

/// [`MetricsLayer`] is a [`tracing_subscriber::Layer`] that captures the span
/// fields and allows them to be later on used as metrics labels.
pub struct MetricsLayer<S> {
ctx: WithContext,
_subscriber: PhantomData<fn(S)>,
#[derive(Default)]
pub struct MetricsLayer {
with_labels:
Option<fn(&Dispatch, &Id, f: &mut dyn FnMut(&Labels) -> Option<Key>) -> Option<Key>>,
}

impl<S> MetricsLayer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
/// Create a new `MetricsLayer`.
impl MetricsLayer {
/// Create a new [`MetricsLayer`].
pub fn new() -> Self {
let ctx = WithContext { with_labels: Self::with_labels };

Self { ctx, _subscriber: PhantomData }
Self::default()
}

fn with_labels(
pub(crate) fn with_labels(
&self,
dispatch: &Dispatch,
id: &Id,
f: &mut dyn FnMut(&Labels) -> Option<Key>,
f: &mut dyn FnMut(Map) -> Option<Key>,
) -> Option<Key> {
let subscriber = dispatch
.downcast_ref::<S>()
.expect("subscriber should downcast to expected type; this is a bug!");
let span = subscriber.span(id).expect("registry should have a span for the current ID");

let result =
if let Some(labels) = span.extensions().get::<Labels>() { f(labels) } else { None };
result
let mut ff = |labels: &Labels| f(labels.0.clone());
(self.with_labels?)(dispatch, id, &mut ff)
}
}

impl<S> Layer<S> for MetricsLayer<S>
impl<S> Layer<S> for MetricsLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_layer(&mut self, _: &mut S) {
self.with_labels = Some(
|dispatch: &Dispatch, id: &span::Id, f: &mut dyn FnMut(&Labels) -> Option<Key>| {
let subscriber = dispatch
.downcast_ref::<S>()
.expect("subscriber should downcast to expected type; this is a bug!");
let span =
subscriber.span(id).expect("registry should have a span for the current ID");

let ext = span.extensions();
f(ext.get::<Labels>()?)
},
);
}

fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, cx: Context<'_, S>) {
let span = cx.span(id).expect("span must already exist!");
let mut labels = Labels::from_attributes(attrs);
let mut labels = Labels::from_record(&Record::new(attrs.values()));

if let Some(parent) = span.parent() {
if let Some(parent_labels) = parent.extensions().get::<Labels>() {
Expand All @@ -146,20 +151,15 @@ where
span.extensions_mut().insert(labels);
}

unsafe fn downcast_raw(&self, id: TypeId) -> Option<*const ()> {
match id {
id if id == TypeId::of::<Self>() => Some(self as *const _ as *const ()),
id if id == TypeId::of::<WithContext>() => Some(&self.ctx as *const _ as *const ()),
_ => None,
}
}
}
fn on_record(&self, id: &Id, values: &Record<'_>, cx: Context<'_, S>) {
let span = cx.span(id).expect("span must already exist!");
let labels = Labels::from_record(values);

impl<S> Default for MetricsLayer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
fn default() -> Self {
MetricsLayer::new()
let ext = &mut span.extensions_mut();
if let Some(existing) = ext.get_mut::<Labels>() {
existing.extend_from_labels_overwrite(&labels);
} else {
ext.insert(labels);
}
}
}
Loading
Loading