From 84bcca9117ba606da2192092206304885b1dc457 Mon Sep 17 00:00:00 2001 From: yuanbohan Date: Tue, 31 Oct 2023 10:41:47 +0800 Subject: [PATCH] fix: events or links to string (#2667) --- src/servers/src/otlp/trace.rs | 138 +++++++++++++++++++++++----------- 1 file changed, 95 insertions(+), 43 deletions(-) diff --git a/src/servers/src/otlp/trace.rs b/src/servers/src/otlp/trace.rs index 20ba773f3db1..48d3975c7a2f 100644 --- a/src/servers/src/otlp/trace.rs +++ b/src/servers/src/otlp/trace.rs @@ -26,7 +26,7 @@ use opentelemetry_proto::tonic::common::v1::{ }; use opentelemetry_proto::tonic::trace::v1::span::{Event, Link}; use opentelemetry_proto::tonic::trace::v1::{Span, Status}; -use serde_json::json; +use serde::{Deserialize, Serialize}; use super::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use crate::error::Result; @@ -36,7 +36,6 @@ const APPROXIMATE_COLUMN_COUNT: usize = 24; pub const TRACE_TABLE_NAME: &str = "traces_preview_v01"; #[derive(Debug, Clone)] -#[allow(dead_code)] pub struct TraceSpan { // the following are tags pub trace_id: String, @@ -64,6 +63,42 @@ pub struct TraceSpan { pub type TraceSpans = Vec; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TraceLink { + pub trace_id: String, + pub span_id: String, + pub trace_state: String, + pub attributes: String, // TODO(yuanbohan): Map in the future +} + +impl From<&Link> for TraceLink { + fn from(link: &Link) -> Self { + Self { + trace_id: bytes_to_hex_string(&link.trace_id), + span_id: bytes_to_hex_string(&link.span_id), + trace_state: link.trace_state.clone(), + attributes: vec_kv_to_string(&link.attributes), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SpanEvent { + pub name: String, + pub time: String, + pub attributes: String, // TODO(yuanbohan): Map in the future +} + +impl From<&Event> for SpanEvent { + fn from(event: &Event) -> Self { + Self { + name: event.name.clone(), + time: Time::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(), + attributes: vec_kv_to_string(&event.attributes), + } + } +} + /// Convert SpanTraces to GreptimeDB row insert requests. /// Returns `InsertRequests` and total number of rows to ingest pub fn to_grpc_insert_requests( @@ -261,32 +296,13 @@ pub fn any_value_to_string(val: AnyValue) -> Option { }) } -pub fn event_to_string(event: &Event) -> String { - json!({ - "name": event.name, - "time": Time::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(), - "attrs": vec_kv_to_string(&event.attributes), - }) - .to_string() -} - pub fn events_to_string(events: &[Event]) -> String { - let v: Vec = events.iter().map(event_to_string).collect(); + let v: Vec = events.iter().map(SpanEvent::from).collect(); serde_json::to_string(&v).unwrap_or_else(|_| "[]".into()) } -pub fn link_to_string(link: &Link) -> String { - json!({ - "trace_id": link.trace_id, - "span_id": link.span_id, - "trace_state": link.trace_state, - "attributes": vec_kv_to_string(&link.attributes), - }) - .to_string() -} - pub fn links_to_string(links: &[Link]) -> String { - let v: Vec = links.iter().map(link_to_string).collect(); + let v: Vec = links.iter().map(TraceLink::from).collect(); serde_json::to_string(&v).unwrap_or_else(|_| "[]".into()) } @@ -303,13 +319,13 @@ mod tests { use opentelemetry_proto::tonic::common::v1::{ any_value, AnyValue, ArrayValue, KeyValue, KeyValueList, }; - use opentelemetry_proto::tonic::trace::v1::span::Event; + use opentelemetry_proto::tonic::trace::v1::span::{Event, Link}; use opentelemetry_proto::tonic::trace::v1::Status; use serde_json::json; use crate::otlp::trace::{ - arr_vals_to_string, bytes_to_hex_string, event_to_string, kvlist_to_string, - status_to_string, vec_kv_to_string, + arr_vals_to_string, bytes_to_hex_string, events_to_string, kvlist_to_string, + links_to_string, status_to_string, vec_kv_to_string, SpanEvent, TraceLink, }; #[test] @@ -369,30 +385,66 @@ mod tests { } #[test] - fn test_event_to_string() { - let attributes = vec![KeyValue { + fn test_links_to_string() { + let trace_id = vec![ + 36, 254, 121, 148, 134, 65, 177, 16, 162, 155, 194, 120, 89, 48, 126, 141, + ]; + let span_id = vec![186, 255, 238, 221, 123, 141, 235, 192]; + let trace_state = "OK".to_string(); + let link_attributes = vec![KeyValue { + key: "str_key".into(), + value: Some(AnyValue { + value: Some(any_value::Value::StringValue("val1".into())), + }), + }]; + + let trace_links = vec![TraceLink { + trace_id: bytes_to_hex_string(&trace_id), + span_id: bytes_to_hex_string(&span_id), + trace_state: trace_state.clone(), + attributes: vec_kv_to_string(&link_attributes), + }]; + let expect_string = serde_json::to_string(&trace_links).unwrap_or_default(); + + let links = vec![Link { + trace_id, + span_id, + trace_state, + attributes: link_attributes, + dropped_attributes_count: 0, + }]; + let links_string = links_to_string(&links); + + assert_eq!(expect_string, links_string); + } + + #[test] + fn test_events_to_string() { + let time_unix_nano = 1697620662450128000_u64; + let event_name = "event_name".to_string(); + let event_attributes = vec![KeyValue { key: "str_key".into(), value: Some(AnyValue { value: Some(any_value::Value::StringValue("val1".into())), }), }]; - let event = Event { - time_unix_nano: 1697620662450128000_u64, - name: "event_name".into(), - attributes, + + let span_events = vec![SpanEvent { + name: event_name.clone(), + time: Time::new_nanosecond(time_unix_nano as i64).to_iso8601_string(), + attributes: vec_kv_to_string(&event_attributes), + }]; + let expect_string = serde_json::to_string(&span_events).unwrap_or_default(); + + let events = vec![Event { + time_unix_nano, + name: event_name, + attributes: event_attributes, dropped_attributes_count: 0, - }; - let event_string = event_to_string(&event); - let expect = json!({ - "name": event.name, - "time": Time::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(), - "attrs": vec_kv_to_string(&event.attributes), - }); + }]; + let events_string = events_to_string(&events); - assert_eq!( - expect, - serde_json::from_str::(event_string.as_str()).unwrap() - ); + assert_eq!(expect_string, events_string); } #[test]