Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for tracing::Span::recorded fields in metrics-tracing-context #408

Merged
merged 16 commits into from
Nov 29, 2023
4 changes: 4 additions & 0 deletions metrics-tracing-context/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased] - ReleaseDate

### Added

- Support for dynamism using `tracing::Span::record` to add label values. ([#408](https://github.com/metrics-rs/metrics/pull/408))

## [0.14.0] - 2023-04-16

### Changed
Expand Down
1 change: 1 addition & 0 deletions metrics-tracing-context/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ itoa = { version = "1", default-features = false }
metrics = { version = "^0.21", path = "../metrics" }
metrics-util = { version = "^0.15", path = "../metrics-util" }
lockfree-object-pool = { version = "0.1.3", default-features = false }
indexmap = { version = "2.1", default-features = false, features = ["std"] }
once_cell = { version = "1", default-features = false, features = ["std"] }
tracing = { version = "0.1.29", default-features = false }
tracing-core = { version = "0.1.21", default-features = false }
Expand Down
9 changes: 6 additions & 3 deletions metrics-tracing-context/benches/visit.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use indexmap::IndexMap;
use lockfree_object_pool::LinearObjectPool;
use metrics::Label;
use metrics_tracing_context::Labels;
Expand All @@ -13,9 +14,11 @@ use tracing_core::{
Callsite, Interest,
};

fn get_pool() -> &'static Arc<LinearObjectPool<Vec<Label>>> {
static POOL: OnceCell<Arc<LinearObjectPool<Vec<Label>>>> = OnceCell::new();
POOL.get_or_init(|| Arc::new(LinearObjectPool::new(|| Vec::new(), |vec| vec.clear())))
type Map = IndexMap<&'static str, Label>;

fn get_pool() -> &'static Arc<LinearObjectPool<Map>> {
static POOL: OnceCell<Arc<LinearObjectPool<Map>>> = OnceCell::new();
POOL.get_or_init(|| Arc::new(LinearObjectPool::new(Map::new, Map::clear)))
}

const BATCH_SIZE: usize = 1000;
Expand Down
50 changes: 26 additions & 24 deletions metrics-tracing-context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,25 @@
//!
//! # Implementation
//!
//! The integration layer works by capturing all fields present when a span is created and storing
//! them as an extension to the span. If a metric is emitted while a span is entered, we check that
//! span to see if it has any fields in the extension data, and if it does, we add those fields as
//! labels to the metric key.
//!
//! There are two important behaviors to be aware of:
//! - we only capture the fields present when the span is created
//! - we store all fields that a span has, including the fields of its parent span(s)
//!
//! ## Lack of dynamism
//!
//! This means that if you use [`Span::record`][tracing::Span::record] to add fields to a span after
//! it has been created, those fields will not be captured and added to your metric key.
//! The integration layer works by capturing all fields that are present when a span is created,
//! as well as fields recorded after the fact, and storing them as an extension to the span. If
//! a metric is emitted while a span is entered, any fields captured for that span will be added
//! to the metric as additional labels.
//!
//! Be aware that we recursively capture the fields of a span, including fields from
//! parent spans, and use them when generating metric labels. This means that if a metric is being
//! emitted in span B, which is a child of span A, and span A has field X, and span B has field Y,
//! then the metric labels will include both field X and Y. This applies regardless of how many
//! nested spans are currently entered.
//!
//! ## Duplicate span fields
//!
//! When span fields are captured, they are deduplicated such that only the most recent value is kept.
//! For merging parent span fields into the current span fields, the fields from the current span have
//! the highest priority. Additionally, when using [`Span::record`][tracing::Span::record] to add fields
//! to a span after it has been created, the same behavior applies. This means that recording a field
//! multiple times only keeps the most recently recorded value, including if a field was already present
//! from a parent span and is then recorded dynamically in the current span.
//!
//! ## Span fields and ancestry
//!
Expand Down Expand Up @@ -95,17 +101,15 @@
#![deny(missing_docs)]
#![cfg_attr(docsrs, feature(doc_cfg), deny(rustdoc::broken_intra_doc_links))]

use metrics::{
Counter, Gauge, Histogram, Key, KeyName, Label, Metadata, Recorder, SharedString, Unit,
};
use metrics::{Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SharedString, Unit};
use metrics_util::layers::Layer;

pub mod label_filter;
mod tracing_integration;

pub use label_filter::LabelFilter;
use tracing_integration::WithContext;
pub use tracing_integration::{Labels, MetricsLayer};
use tracing_integration::{Map, WithContext};

/// [`TracingContextLayer`] provides an implementation of a [`Layer`] for [`TracingContext`].
pub struct TracingContextLayer<F> {
Expand Down Expand Up @@ -173,22 +177,20 @@ where
// We're currently within a live tracing span, so see if we have an available
// metrics context to grab any fields/labels out of.
if let Some(ctx) = dispatch.downcast_ref::<WithContext>() {
let mut f = |new_labels: &[Label]| {
if !new_labels.is_empty() {
let mut f = |new_labels: &Map| {
(!new_labels.is_empty()).then(|| {
let (name, mut labels) = key.clone().into_parts();

let filtered_labels = new_labels
.iter()
.values()
.filter(|label| {
self.label_filter.should_include_label(&name, label)
})
.cloned();
labels.extend(filtered_labels);

Some(Key::from_parts(name, labels))
} else {
None
}
Key::from_parts(name, labels)
})
};

// Pull in the span's fields/labels if they exist.
Expand Down
61 changes: 40 additions & 21 deletions metrics-tracing-context/src/tracing_integration.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,37 @@
//! The code that integrates with the `tracing` crate.

use indexmap::IndexMap;
use lockfree_object_pool::{LinearObjectPool, LinearOwnedReusable};
use metrics::{Key, Label};
use once_cell::sync::OnceCell;
use std::sync::Arc;
use std::{any::TypeId, marker::PhantomData};
use std::{any::TypeId, cmp, marker::PhantomData};
use tracing_core::span::{Attributes, Id, Record};
use tracing_core::{field::Visit, Dispatch, Field, Subscriber};
use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer};

fn get_pool() -> &'static Arc<LinearObjectPool<Vec<Label>>> {
static POOL: OnceCell<Arc<LinearObjectPool<Vec<Label>>>> = OnceCell::new();
POOL.get_or_init(|| Arc::new(LinearObjectPool::new(Vec::new, Vec::clear)))
pub(crate) type Map = IndexMap<&'static str, Label>;

fn get_pool() -> &'static Arc<LinearObjectPool<Map>> {
static POOL: OnceCell<Arc<LinearObjectPool<Map>>> = OnceCell::new();
POOL.get_or_init(|| Arc::new(LinearObjectPool::new(Map::new, Map::clear)))
}

/// Span fields mapped as metrics labels.
///
/// Hidden from documentation as there is no need for end users to ever touch this type, but it must
/// be public in order to be pulled in by external benchmark code.
#[doc(hidden)]
pub struct Labels(pub LinearOwnedReusable<Vec<Label>>);
pub struct Labels(pub LinearOwnedReusable<Map>);

impl Labels {
pub(crate) fn extend_from_labels(&mut self, other: &Labels) {
self.0.extend_from_slice(other.as_ref());
let new_len = cmp::max(self.as_ref().len(), other.as_ref().len());
let additional = new_len - self.as_ref().len();
self.0.reserve(additional);
for (k, v) in other.as_ref() {
self.0.insert(k, v.clone());
}
zohnannor marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -35,46 +44,45 @@ impl Default for Labels {
impl Visit for Labels {
fn record_str(&mut self, field: &Field, value: &str) {
let label = Label::new(field.name(), value.to_string());
self.0.push(label);
self.0.insert(field.name(), label);
}

fn record_bool(&mut self, field: &Field, value: bool) {
let label = Label::from_static_parts(field.name(), if value { "true" } else { "false" });
self.0.push(label);
self.0.insert(field.name(), label);
}

fn record_i64(&mut self, field: &Field, value: i64) {
let mut buf = itoa::Buffer::new();
let s = buf.format(value);
let label = Label::new(field.name(), s.to_string());
self.0.push(label);
self.0.insert(field.name(), label);
}

fn record_u64(&mut self, field: &Field, value: u64) {
let mut buf = itoa::Buffer::new();
let s = buf.format(value);
let label = Label::new(field.name(), s.to_string());
self.0.push(label);
self.0.insert(field.name(), label);
}

fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
let value_string = format!("{:?}", value);
let value_string = format!("{value:?}");
let label = Label::new(field.name(), value_string);
self.0.push(label);
self.0.insert(field.name(), label);
}
}

impl Labels {
fn from_attributes(attrs: &Attributes<'_>) -> Labels {
fn from_record(record: &Record) -> Labels {
let mut labels = Labels::default();
let record = Record::new(attrs.values());
record.record(&mut labels);
labels
}
}

impl AsRef<[Label]> for Labels {
fn as_ref(&self) -> &[Label] {
impl AsRef<Map> for Labels {
fn as_ref(&self) -> &Map {
&self.0
}
}
Expand All @@ -88,7 +96,7 @@ impl WithContext {
&self,
dispatch: &Dispatch,
id: &Id,
f: &mut dyn FnMut(&[Label]) -> Option<Key>,
f: &mut dyn FnMut(&Map) -> Option<Key>,
) -> Option<Key> {
let mut ff = |labels: &Labels| f(labels.as_ref());
(self.with_labels)(dispatch, id, &mut ff)
Expand Down Expand Up @@ -123,9 +131,8 @@ where
.expect("subscriber should downcast to expected type; this is a bug!");
let span = subscriber.span(id).expect("registry should have a span for the current ID");

let result =
if let Some(labels) = span.extensions().get::<Labels>() { f(labels) } else { None };
result
let ext = span.extensions();
ext.get::<Labels>().and_then(f)
}
}

Expand All @@ -135,7 +142,7 @@ where
{
fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, cx: Context<'_, S>) {
let span = cx.span(id).expect("span must already exist!");
let mut labels = Labels::from_attributes(attrs);
let mut labels = Labels::from_record(&Record::new(attrs.values()));

if let Some(parent) = span.parent() {
if let Some(parent_labels) = parent.extensions().get::<Labels>() {
Expand All @@ -146,6 +153,18 @@ where
span.extensions_mut().insert(labels);
}

fn on_record(&self, id: &Id, values: &Record<'_>, cx: Context<'_, S>) {
let span = cx.span(id).expect("span must already exist!");
let labels = Labels::from_record(values);

let ext = &mut span.extensions_mut();
if let Some(existing) = ext.get_mut::<Labels>() {
existing.extend_from_labels(&labels);
} else {
ext.insert(labels);
}
}

unsafe fn downcast_raw(&self, id: TypeId) -> Option<*const ()> {
zohnannor marked this conversation as resolved.
Show resolved Hide resolved
match id {
id if id == TypeId::of::<Self>() => Some(self as *const _ as *const ()),
Expand Down
Loading
Loading