From 2aedc09cc8ef9d4f5f0295ecc7835f571ccb78d4 Mon Sep 17 00:00:00 2001 From: yuanbohan Date: Thu, 2 Nov 2023 17:55:44 +0800 Subject: [PATCH] feat: json attributes via impl Serialeze trait --- src/servers/src/otlp/plugin.rs | 2 +- src/servers/src/otlp/trace.rs | 371 ++--------------------- src/servers/src/otlp/trace/attributes.rs | 225 ++++++++++++++ src/servers/src/otlp/trace/span.rs | 203 +++++++++++++ 4 files changed, 456 insertions(+), 345 deletions(-) create mode 100644 src/servers/src/otlp/trace/attributes.rs create mode 100644 src/servers/src/otlp/trace/span.rs diff --git a/src/servers/src/otlp/plugin.rs b/src/servers/src/otlp/plugin.rs index ddcb4375e6d8..1258fe167ea6 100644 --- a/src/servers/src/otlp/plugin.rs +++ b/src/servers/src/otlp/plugin.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; -use super::trace::TraceSpans; +use super::trace::span::TraceSpans; /// Transformer helps to transform ExportTraceServiceRequest based on logic, like: /// - uplift some fields from Attributes (Map type) to column diff --git a/src/servers/src/otlp/trace.rs b/src/servers/src/otlp/trace.rs index dee5bf04c5fe..16de53f5656d 100644 --- a/src/servers/src/otlp/trace.rs +++ b/src/servers/src/otlp/trace.rs @@ -12,22 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - use api::v1::value::ValueData; use api::v1::{ColumnDataType, RowInsertRequests}; use common_grpc::writer::Precision; -use common_time::timestamp::Timestamp; -use itertools::Itertools; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; -use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue; -use opentelemetry_proto::tonic::common::v1::{ - AnyValue, ArrayValue, InstrumentationScope, KeyValue, KeyValueList, -}; -use opentelemetry_proto::tonic::trace::v1::span::{Event, Link}; -use opentelemetry_proto::tonic::trace::v1::{Span, Status}; -use serde::{Deserialize, Serialize}; +use self::span::{parse_span, TraceSpan, TraceSpans}; use super::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use crate::error::Result; use crate::row_writer::{self, MultiTableData, TableData}; @@ -35,68 +25,29 @@ use crate::row_writer::{self, MultiTableData, TableData}; const APPROXIMATE_COLUMN_COUNT: usize = 24; pub const TRACE_TABLE_NAME: &str = "traces_preview_v01"; -#[derive(Debug, Clone)] -pub struct TraceSpan { - // the following are tags - pub trace_id: String, - pub span_id: String, - pub parent_span_id: String, - - // the following are fields - pub resource_attributes: String, // TODO(yuanbohan): Map in the future - pub scope_name: String, - pub scope_version: String, - pub scope_attributes: String, // TODO(yuanbohan): Map in the future - pub trace_state: String, - pub span_name: String, - pub span_kind: String, - pub span_status_code: String, - pub span_status_message: String, - pub span_attributes: String, // TODO(yuanbohan): Map in the future - pub span_events: String, // TODO(yuanbohan): List in the future - pub span_links: String, // TODO(yuanbohan): List in the future - pub start_in_nanosecond: u64, // this is also the Timestamp Index - pub end_in_nanosecond: u64, - - pub uplifted_fields: Vec<(String, ColumnDataType, ValueData)>, -} - -pub type TraceSpans = Vec; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TraceLink { - pub trace_id: String, - pub span_id: String, - pub trace_state: String, - pub attributes: String, // TODO(yuanbohan): Map in the future -} - -impl From<&Link> for TraceLink { - fn from(link: &Link) -> Self { - Self { - trace_id: bytes_to_hex_string(&link.trace_id), - span_id: bytes_to_hex_string(&link.span_id), - trace_state: link.trace_state.clone(), - attributes: vec_kv_to_string(&link.attributes), - } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SpanEvent { - pub name: String, - pub time: String, - pub attributes: String, // TODO(yuanbohan): Map in the future -} +pub mod attributes; +pub mod span; -impl From<&Event> for SpanEvent { - fn from(event: &Event) -> Self { - Self { - name: event.name.clone(), - time: Timestamp::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(), - attributes: vec_kv_to_string(&event.attributes), +/// Convert OpenTelemetry traces to SpanTraces +/// +/// See +/// +/// for data structure of OTLP traces. +pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans { + let mut spans = vec![]; + for resource_spans in request.resource_spans { + let resource_attrs = resource_spans + .resource + .map(|r| r.attributes) + .unwrap_or_default(); + for scope_spans in resource_spans.scope_spans { + let scope = scope_spans.scope.unwrap_or_default(); + for span in scope_spans.spans { + spans.push(parse_span(resource_attrs.clone(), scope.clone(), span)); + } } } + spans } /// Convert SpanTraces to GreptimeDB row insert requests. @@ -135,18 +86,18 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> { // fields let str_fields_iter = vec![ - ("resource_attributes", span.resource_attributes), + ("resource_attributes", span.resource_attributes.to_string()), ("scope_name", span.scope_name), ("scope_version", span.scope_version), - ("scope_attributes", span.scope_attributes), + ("scope_attributes", span.scope_attributes.to_string()), ("trace_state", span.trace_state), ("span_name", span.span_name), ("span_kind", span.span_kind), ("span_status_code", span.span_status_code), ("span_status_message", span.span_status_message), - ("span_attributes", span.span_attributes), - ("span_events", span.span_events), - ("span_links", span.span_links), + ("span_attributes", span.span_attributes.to_string()), + ("span_events", span.span_events.to_string()), + ("span_links", span.span_links.to_string()), ] .into_iter() .map(|(col, val)| { @@ -172,7 +123,7 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> row_writer::write_fields(writer, str_fields_iter, &mut row)?; row_writer::write_fields(writer, time_fields_iter, &mut row)?; - row_writer::write_fields(writer, span.uplifted_fields.into_iter(), &mut row)?; + row_writer::write_fields(writer, span.uplifted_span_attributes.into_iter(), &mut row)?; } row_writer::write_f64( @@ -193,271 +144,3 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> Ok(()) } - -pub fn parse_span( - resource_attrs: &[KeyValue], - scope: &InstrumentationScope, - span: Span, -) -> TraceSpan { - let (span_status_code, span_status_message) = status_to_string(&span.status); - let span_kind = span.kind().as_str_name().into(); - TraceSpan { - trace_id: bytes_to_hex_string(&span.trace_id), - span_id: bytes_to_hex_string(&span.span_id), - parent_span_id: bytes_to_hex_string(&span.parent_span_id), - - resource_attributes: vec_kv_to_string(resource_attrs), - trace_state: span.trace_state, - - scope_name: scope.name.clone(), - scope_version: scope.version.clone(), - scope_attributes: vec_kv_to_string(&scope.attributes), - - span_name: span.name, - span_kind, - span_status_code, - span_status_message, - span_attributes: vec_kv_to_string(&span.attributes), - span_events: events_to_string(&span.events), - span_links: links_to_string(&span.links), - - start_in_nanosecond: span.start_time_unix_nano, - end_in_nanosecond: span.end_time_unix_nano, - - uplifted_fields: vec![], - } -} - -/// Convert OpenTelemetry traces to SpanTraces -/// -/// See -/// -/// for data structure of OTLP traces. -pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans { - let mut spans = vec![]; - for resource_spans in request.resource_spans { - let resource_attrs = resource_spans - .resource - .map(|r| r.attributes) - .unwrap_or_default(); - for scope_spans in resource_spans.scope_spans { - let scope = scope_spans.scope.unwrap_or_default(); - for span in scope_spans.spans { - spans.push(parse_span(&resource_attrs, &scope, span)); - } - } - } - spans -} - -pub fn bytes_to_hex_string(bs: &[u8]) -> String { - bs.iter().map(|b| format!("{:02x}", b)).join("") -} - -pub fn arr_vals_to_string(arr: &ArrayValue) -> String { - let vs: Vec = arr - .values - .iter() - .filter_map(|val| any_value_to_string(val.clone())) - .collect(); - - serde_json::to_string(&vs).unwrap_or_else(|_| "[]".into()) -} - -pub fn vec_kv_to_string(vec: &[KeyValue]) -> String { - let vs: HashMap = vec - .iter() - .map(|kv| { - let val = kv - .value - .clone() - .and_then(any_value_to_string) - .unwrap_or_default(); - (kv.key.clone(), val) - }) - .collect(); - - serde_json::to_string(&vs).unwrap_or_else(|_| "{}".into()) -} - -pub fn kvlist_to_string(kvlist: &KeyValueList) -> String { - vec_kv_to_string(&kvlist.values) -} - -pub fn any_value_to_string(val: AnyValue) -> Option { - val.value.map(|value| match value { - OtlpValue::StringValue(s) => s, - OtlpValue::BoolValue(b) => b.to_string(), - OtlpValue::IntValue(i) => i.to_string(), - OtlpValue::DoubleValue(d) => d.to_string(), - OtlpValue::ArrayValue(arr) => arr_vals_to_string(&arr), - OtlpValue::KvlistValue(kv) => kvlist_to_string(&kv), - OtlpValue::BytesValue(bs) => bytes_to_hex_string(&bs), - }) -} - -pub fn events_to_string(events: &[Event]) -> String { - let v: Vec = events.iter().map(SpanEvent::from).collect(); - serde_json::to_string(&v).unwrap_or_else(|_| "[]".into()) -} - -pub fn links_to_string(links: &[Link]) -> String { - let v: Vec = links.iter().map(TraceLink::from).collect(); - serde_json::to_string(&v).unwrap_or_else(|_| "[]".into()) -} - -pub fn status_to_string(status: &Option) -> (String, String) { - match status { - Some(status) => (status.code().as_str_name().into(), status.message.clone()), - None => ("".into(), "".into()), - } -} - -#[cfg(test)] -mod tests { - use common_time::timestamp::Timestamp; - use opentelemetry_proto::tonic::common::v1::{ - any_value, AnyValue, ArrayValue, KeyValue, KeyValueList, - }; - use opentelemetry_proto::tonic::trace::v1::span::{Event, Link}; - use opentelemetry_proto::tonic::trace::v1::Status; - use serde_json::json; - - use crate::otlp::trace::{ - arr_vals_to_string, bytes_to_hex_string, events_to_string, kvlist_to_string, - links_to_string, status_to_string, vec_kv_to_string, SpanEvent, TraceLink, - }; - - #[test] - fn test_bytes_to_hex_string() { - assert_eq!( - "24fe79948641b110a29bc27859307e8d", - bytes_to_hex_string(&[ - 36, 254, 121, 148, 134, 65, 177, 16, 162, 155, 194, 120, 89, 48, 126, 141, - ]) - ); - - assert_eq!( - "baffeedd7b8debc0", - bytes_to_hex_string(&[186, 255, 238, 221, 123, 141, 235, 192,]) - ); - } - - #[test] - fn test_arr_vals_to_string() { - assert_eq!("[]", arr_vals_to_string(&ArrayValue { values: vec![] })); - - let arr = ArrayValue { - values: vec![ - AnyValue { - value: Some(any_value::Value::StringValue("string_value".into())), - }, - AnyValue { - value: Some(any_value::Value::BoolValue(true)), - }, - AnyValue { - value: Some(any_value::Value::IntValue(1)), - }, - AnyValue { - value: Some(any_value::Value::DoubleValue(1.2)), - }, - ], - }; - let expect = json!(["string_value", "true", "1", "1.2"]).to_string(); - assert_eq!(expect, arr_vals_to_string(&arr)); - } - - #[test] - fn test_kv_list_to_string() { - let kvlist = KeyValueList { - values: vec![KeyValue { - key: "str_key".into(), - value: Some(AnyValue { - value: Some(any_value::Value::StringValue("val1".into())), - }), - }], - }; - let expect = json!({ - "str_key": "val1", - }) - .to_string(); - assert_eq!(expect, kvlist_to_string(&kvlist)) - } - - #[test] - fn test_links_to_string() { - let trace_id = vec![ - 36, 254, 121, 148, 134, 65, 177, 16, 162, 155, 194, 120, 89, 48, 126, 141, - ]; - let span_id = vec![186, 255, 238, 221, 123, 141, 235, 192]; - let trace_state = "OK".to_string(); - let link_attributes = vec![KeyValue { - key: "str_key".into(), - value: Some(AnyValue { - value: Some(any_value::Value::StringValue("val1".into())), - }), - }]; - - let trace_links = vec![TraceLink { - trace_id: bytes_to_hex_string(&trace_id), - span_id: bytes_to_hex_string(&span_id), - trace_state: trace_state.clone(), - attributes: vec_kv_to_string(&link_attributes), - }]; - let expect_string = serde_json::to_string(&trace_links).unwrap_or_default(); - - let links = vec![Link { - trace_id, - span_id, - trace_state, - attributes: link_attributes, - dropped_attributes_count: 0, - }]; - let links_string = links_to_string(&links); - - assert_eq!(expect_string, links_string); - } - - #[test] - fn test_events_to_string() { - let time_unix_nano = 1697620662450128000_u64; - let event_name = "event_name".to_string(); - let event_attributes = vec![KeyValue { - key: "str_key".into(), - value: Some(AnyValue { - value: Some(any_value::Value::StringValue("val1".into())), - }), - }]; - - let span_events = vec![SpanEvent { - name: event_name.clone(), - time: Timestamp::new_nanosecond(time_unix_nano as i64).to_iso8601_string(), - attributes: vec_kv_to_string(&event_attributes), - }]; - let expect_string = serde_json::to_string(&span_events).unwrap_or_default(); - - let events = vec![Event { - time_unix_nano, - name: event_name, - attributes: event_attributes, - dropped_attributes_count: 0, - }]; - let events_string = events_to_string(&events); - - assert_eq!(expect_string, events_string); - } - - #[test] - fn test_status_to_string() { - let message = String::from("status message"); - let status = Status { - code: 1, - message: message.clone(), - }; - - assert_eq!( - ("STATUS_CODE_OK".into(), message), - status_to_string(&Some(status)), - ); - } -} diff --git a/src/servers/src/otlp/trace/attributes.rs b/src/servers/src/otlp/trace/attributes.rs new file mode 100644 index 000000000000..efd75698b50a --- /dev/null +++ b/src/servers/src/otlp/trace/attributes.rs @@ -0,0 +1,225 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; + +use opentelemetry_proto::tonic::common::v1::any_value::Value::{ + ArrayValue, BoolValue, BytesValue, DoubleValue, IntValue, KvlistValue, StringValue, +}; +use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; +use serde::ser::{SerializeMap, SerializeSeq}; +use serde::Serialize; + +#[derive(Clone, Debug)] +pub struct OtlpAnyValue<'a>(&'a AnyValue); + +impl<'a> From<&'a AnyValue> for OtlpAnyValue<'a> { + fn from(any_val: &'a AnyValue) -> Self { + Self(any_val) + } +} + +impl Display for OtlpAnyValue<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", serde_json::to_string(self).unwrap_or_default()) + } +} + +impl Serialize for OtlpAnyValue<'_> { + fn serialize(&self, zer: S) -> Result + where + S: serde::Serializer, + { + match &self.0.value { + Some(val) => match &val { + StringValue(v) => zer.serialize_str(v), + BoolValue(v) => zer.serialize_bool(*v), + IntValue(v) => zer.serialize_i64(*v), + DoubleValue(v) => zer.serialize_f64(*v), + ArrayValue(v) => { + let mut seq = zer.serialize_seq(Some(v.values.len()))?; + for val in &v.values { + seq.serialize_element(&OtlpAnyValue::from(val))?; + } + seq.end() + } + KvlistValue(v) => { + let mut map = zer.serialize_map(Some(v.values.len()))?; + for kv in &v.values { + if let Some(val) = &kv.value { + map.serialize_entry(&kv.key, &OtlpAnyValue::from(val))?; + } + } + map.end() + } + BytesValue(v) => zer.serialize_bytes(v), + }, + None => zer.serialize_none(), + } + } +} + +#[derive(Debug, Clone)] +pub struct Attributes(Vec); + +impl Display for Attributes { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", serde_json::to_string(self).unwrap_or_default()) + } +} + +impl Serialize for Attributes { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut map = serializer.serialize_map(Some(self.0.len()))?; + for attr in &self.0 { + if let Some(val) = &attr.value { + map.serialize_entry(&attr.key, &OtlpAnyValue::from(val))?; + } + } + map.end() + } +} + +impl From> for Attributes { + fn from(attrs: Vec) -> Self { + Self(attrs) + } +} + +impl Attributes { + pub fn get_ref(&self) -> &Vec { + &self.0 + } + + pub fn get_mut(&mut self) -> &mut Vec { + &mut self.0 + } +} + +#[cfg(test)] +mod tests { + use opentelemetry_proto::tonic::common::v1::any_value::Value; + use opentelemetry_proto::tonic::common::v1::{AnyValue, ArrayValue, KeyValue, KeyValueList}; + + use crate::otlp::trace::attributes::OtlpAnyValue; + + #[test] + fn test_any_value_primitive_type_serialize() { + let values = vec![ + ( + r#""string value""#, + Value::StringValue(String::from("string value")), + ), + ("true", Value::BoolValue(true)), + ("1", Value::IntValue(1)), + ("1.1", Value::DoubleValue(1.1)), + ("[1,2,3]", Value::BytesValue(vec![1, 2, 3])), + ]; + + for (expect, val) in values { + let any_val = AnyValue { value: Some(val) }; + let otlp_value = OtlpAnyValue::from(&any_val); + assert_eq!( + expect, + serde_json::to_string(&otlp_value).unwrap_or_default() + ); + } + } + + #[test] + fn test_any_value_array_type_serialize() { + let values = vec![ + ("[]", vec![]), + ( + r#"["string1","string2","string3"]"#, + vec![ + Value::StringValue(String::from("string1")), + Value::StringValue(String::from("string2")), + Value::StringValue(String::from("string3")), + ], + ), + ( + "[1,2,3]", + vec![Value::IntValue(1), Value::IntValue(2), Value::IntValue(3)], + ), + ( + "[1.1,2.2,3.3]", + vec![ + Value::DoubleValue(1.1), + Value::DoubleValue(2.2), + Value::DoubleValue(3.3), + ], + ), + ( + "[true,false,true]", + vec![ + Value::BoolValue(true), + Value::BoolValue(false), + Value::BoolValue(true), + ], + ), + ]; + + for (expect, vals) in values { + let array_values: Vec = vals + .into_iter() + .map(|val| AnyValue { value: Some(val) }) + .collect(); + + let value = Value::ArrayValue(ArrayValue { + values: array_values, + }); + + let any_val = AnyValue { value: Some(value) }; + let otlp_value = OtlpAnyValue::from(&any_val); + assert_eq!( + expect, + serde_json::to_string(&otlp_value).unwrap_or_default() + ); + } + } + + #[test] + fn test_any_value_map_type_serialize() { + let cases = vec![ + ("{}", vec![]), + ( + r#"{"key1":"val1"}"#, + vec![("key1", Value::StringValue(String::from("val1")))], + ), + ]; + + for (expect, kv) in cases { + let kvlist: Vec = kv + .into_iter() + .map(|(k, v)| KeyValue { + key: k.into(), + value: Some(AnyValue { value: Some(v) }), + }) + .collect(); + + let value = Value::KvlistValue(KeyValueList { values: kvlist }); + + let any_val = AnyValue { value: Some(value) }; + let otlp_value = OtlpAnyValue::from(&any_val); + assert_eq!( + expect, + serde_json::to_string(&otlp_value).unwrap_or_default() + ); + } + } +} diff --git a/src/servers/src/otlp/trace/span.rs b/src/servers/src/otlp/trace/span.rs new file mode 100644 index 000000000000..3cf59b9b32da --- /dev/null +++ b/src/servers/src/otlp/trace/span.rs @@ -0,0 +1,203 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; + +use api::v1::value::ValueData; +use api::v1::ColumnDataType; +use common_time::timestamp::Timestamp; +use itertools::Itertools; +use opentelemetry_proto::tonic::common::v1::{InstrumentationScope, KeyValue}; +use opentelemetry_proto::tonic::trace::v1::span::{Event, Link}; +use opentelemetry_proto::tonic::trace::v1::{Span, Status}; +use serde::Serialize; + +use super::attributes::Attributes; + +#[derive(Debug, Clone)] +pub struct TraceSpan { + // the following are tags + pub trace_id: String, + pub span_id: String, + pub parent_span_id: String, + + // the following are fields + pub resource_attributes: Attributes, // TODO(yuanbohan): Map in the future + pub scope_name: String, + pub scope_version: String, + pub scope_attributes: Attributes, // TODO(yuanbohan): Map in the future + pub trace_state: String, + pub span_name: String, + pub span_kind: String, + pub span_status_code: String, + pub span_status_message: String, + pub span_attributes: Attributes, // TODO(yuanbohan): Map in the future + pub span_events: SpanEvents, // TODO(yuanbohan): List in the future + pub span_links: TraceLinks, // TODO(yuanbohan): List in the future + pub start_in_nanosecond: u64, // this is also the Timestamp Index + pub end_in_nanosecond: u64, + + pub uplifted_span_attributes: Vec<(String, ColumnDataType, ValueData)>, +} + +pub type TraceSpans = Vec; + +#[derive(Debug, Clone, Serialize)] +pub struct TraceLink { + pub trace_id: String, + pub span_id: String, + pub trace_state: String, + pub attributes: Attributes, // TODO(yuanbohan): Map in the future +} + +impl From for TraceLink { + fn from(link: Link) -> Self { + Self { + trace_id: bytes_to_hex_string(&link.trace_id), + span_id: bytes_to_hex_string(&link.span_id), + trace_state: link.trace_state, + attributes: Attributes::from(link.attributes), + } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct TraceLinks(Vec); + +impl From> for TraceLinks { + fn from(value: Vec) -> Self { + let links = value.into_iter().map(TraceLink::from).collect_vec(); + Self(links) + } +} + +impl Display for TraceLinks { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", serde_json::to_string(self).unwrap_or_default()) + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct SpanEvent { + pub name: String, + pub time: String, + pub attributes: Attributes, // TODO(yuanbohan): Map in the future +} + +impl From for SpanEvent { + fn from(event: Event) -> Self { + Self { + name: event.name, + time: Timestamp::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(), + attributes: Attributes::from(event.attributes), + } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct SpanEvents(Vec); + +impl From> for SpanEvents { + fn from(value: Vec) -> Self { + let events = value.into_iter().map(SpanEvent::from).collect_vec(); + Self(events) + } +} + +impl Display for SpanEvents { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", serde_json::to_string(self).unwrap_or_default()) + } +} + +pub fn parse_span( + resource_attrs: Vec, + scope: InstrumentationScope, + span: Span, +) -> TraceSpan { + let (span_status_code, span_status_message) = status_to_string(&span.status); + let span_kind = span.kind().as_str_name().into(); + TraceSpan { + trace_id: bytes_to_hex_string(&span.trace_id), + span_id: bytes_to_hex_string(&span.span_id), + parent_span_id: bytes_to_hex_string(&span.parent_span_id), + + resource_attributes: Attributes::from(resource_attrs), + trace_state: span.trace_state, + + scope_name: scope.name, + scope_version: scope.version, + scope_attributes: Attributes::from(scope.attributes), + + span_name: span.name, + span_kind, + span_status_code, + span_status_message, + span_attributes: Attributes::from(span.attributes), + span_events: SpanEvents::from(span.events), + span_links: TraceLinks::from(span.links), + + start_in_nanosecond: span.start_time_unix_nano, + end_in_nanosecond: span.end_time_unix_nano, + + uplifted_span_attributes: vec![], + } +} + +pub fn bytes_to_hex_string(bs: &[u8]) -> String { + bs.iter().map(|b| format!("{:02x}", b)).join("") +} + +pub fn status_to_string(status: &Option) -> (String, String) { + match status { + Some(status) => (status.code().as_str_name().into(), status.message.clone()), + None => ("".into(), "".into()), + } +} + +#[cfg(test)] +mod tests { + use opentelemetry_proto::tonic::trace::v1::Status; + + use crate::otlp::trace::span::{bytes_to_hex_string, status_to_string}; + + #[test] + fn test_bytes_to_hex_string() { + assert_eq!( + "24fe79948641b110a29bc27859307e8d", + bytes_to_hex_string(&[ + 36, 254, 121, 148, 134, 65, 177, 16, 162, 155, 194, 120, 89, 48, 126, 141, + ]) + ); + + assert_eq!( + "baffeedd7b8debc0", + bytes_to_hex_string(&[186, 255, 238, 221, 123, 141, 235, 192,]) + ); + } + + #[test] + fn test_status_to_string() { + let message = String::from("status message"); + let status = Status { + code: 1, + message: message.clone(), + }; + + assert_eq!( + ("STATUS_CODE_OK".into(), message), + status_to_string(&Some(status)), + ); + } +}