Skip to content

Commit

Permalink
fix: events or links to string (#2667)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanbohan authored Oct 31, 2023
1 parent d2f3793 commit 84bcca9
Showing 1 changed file with 95 additions and 43 deletions.
138 changes: 95 additions & 43 deletions src/servers/src/otlp/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use opentelemetry_proto::tonic::common::v1::{
};
use opentelemetry_proto::tonic::trace::v1::span::{Event, Link};
use opentelemetry_proto::tonic::trace::v1::{Span, Status};
use serde_json::json;
use serde::{Deserialize, Serialize};

use super::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use crate::error::Result;
Expand All @@ -36,7 +36,6 @@ const APPROXIMATE_COLUMN_COUNT: usize = 24;
pub const TRACE_TABLE_NAME: &str = "traces_preview_v01";

#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct TraceSpan {
// the following are tags
pub trace_id: String,
Expand Down Expand Up @@ -64,6 +63,42 @@ pub struct TraceSpan {

pub type TraceSpans = Vec<TraceSpan>;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TraceLink {
pub trace_id: String,
pub span_id: String,
pub trace_state: String,
pub attributes: String, // TODO(yuanbohan): Map in the future
}

impl From<&Link> for TraceLink {
fn from(link: &Link) -> Self {
Self {
trace_id: bytes_to_hex_string(&link.trace_id),
span_id: bytes_to_hex_string(&link.span_id),
trace_state: link.trace_state.clone(),
attributes: vec_kv_to_string(&link.attributes),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SpanEvent {
pub name: String,
pub time: String,
pub attributes: String, // TODO(yuanbohan): Map in the future
}

impl From<&Event> for SpanEvent {
fn from(event: &Event) -> Self {
Self {
name: event.name.clone(),
time: Time::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
attributes: vec_kv_to_string(&event.attributes),
}
}
}

/// Convert SpanTraces to GreptimeDB row insert requests.
/// Returns `InsertRequests` and total number of rows to ingest
pub fn to_grpc_insert_requests(
Expand Down Expand Up @@ -261,32 +296,13 @@ pub fn any_value_to_string(val: AnyValue) -> Option<String> {
})
}

pub fn event_to_string(event: &Event) -> String {
json!({
"name": event.name,
"time": Time::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
"attrs": vec_kv_to_string(&event.attributes),
})
.to_string()
}

pub fn events_to_string(events: &[Event]) -> String {
let v: Vec<String> = events.iter().map(event_to_string).collect();
let v: Vec<SpanEvent> = events.iter().map(SpanEvent::from).collect();
serde_json::to_string(&v).unwrap_or_else(|_| "[]".into())
}

pub fn link_to_string(link: &Link) -> String {
json!({
"trace_id": link.trace_id,
"span_id": link.span_id,
"trace_state": link.trace_state,
"attributes": vec_kv_to_string(&link.attributes),
})
.to_string()
}

pub fn links_to_string(links: &[Link]) -> String {
let v: Vec<String> = links.iter().map(link_to_string).collect();
let v: Vec<TraceLink> = links.iter().map(TraceLink::from).collect();
serde_json::to_string(&v).unwrap_or_else(|_| "[]".into())
}

Expand All @@ -303,13 +319,13 @@ mod tests {
use opentelemetry_proto::tonic::common::v1::{
any_value, AnyValue, ArrayValue, KeyValue, KeyValueList,
};
use opentelemetry_proto::tonic::trace::v1::span::Event;
use opentelemetry_proto::tonic::trace::v1::span::{Event, Link};
use opentelemetry_proto::tonic::trace::v1::Status;
use serde_json::json;

use crate::otlp::trace::{
arr_vals_to_string, bytes_to_hex_string, event_to_string, kvlist_to_string,
status_to_string, vec_kv_to_string,
arr_vals_to_string, bytes_to_hex_string, events_to_string, kvlist_to_string,
links_to_string, status_to_string, vec_kv_to_string, SpanEvent, TraceLink,
};

#[test]
Expand Down Expand Up @@ -369,30 +385,66 @@ mod tests {
}

#[test]
fn test_event_to_string() {
let attributes = vec![KeyValue {
fn test_links_to_string() {
let trace_id = vec![
36, 254, 121, 148, 134, 65, 177, 16, 162, 155, 194, 120, 89, 48, 126, 141,
];
let span_id = vec![186, 255, 238, 221, 123, 141, 235, 192];
let trace_state = "OK".to_string();
let link_attributes = vec![KeyValue {
key: "str_key".into(),
value: Some(AnyValue {
value: Some(any_value::Value::StringValue("val1".into())),
}),
}];

let trace_links = vec![TraceLink {
trace_id: bytes_to_hex_string(&trace_id),
span_id: bytes_to_hex_string(&span_id),
trace_state: trace_state.clone(),
attributes: vec_kv_to_string(&link_attributes),
}];
let expect_string = serde_json::to_string(&trace_links).unwrap_or_default();

let links = vec![Link {
trace_id,
span_id,
trace_state,
attributes: link_attributes,
dropped_attributes_count: 0,
}];
let links_string = links_to_string(&links);

assert_eq!(expect_string, links_string);
}

#[test]
fn test_events_to_string() {
let time_unix_nano = 1697620662450128000_u64;
let event_name = "event_name".to_string();
let event_attributes = vec![KeyValue {
key: "str_key".into(),
value: Some(AnyValue {
value: Some(any_value::Value::StringValue("val1".into())),
}),
}];
let event = Event {
time_unix_nano: 1697620662450128000_u64,
name: "event_name".into(),
attributes,

let span_events = vec![SpanEvent {
name: event_name.clone(),
time: Time::new_nanosecond(time_unix_nano as i64).to_iso8601_string(),
attributes: vec_kv_to_string(&event_attributes),
}];
let expect_string = serde_json::to_string(&span_events).unwrap_or_default();

let events = vec![Event {
time_unix_nano,
name: event_name,
attributes: event_attributes,
dropped_attributes_count: 0,
};
let event_string = event_to_string(&event);
let expect = json!({
"name": event.name,
"time": Time::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
"attrs": vec_kv_to_string(&event.attributes),
});
}];
let events_string = events_to_string(&events);

assert_eq!(
expect,
serde_json::from_str::<serde_json::value::Value>(event_string.as_str()).unwrap()
);
assert_eq!(expect_string, events_string);
}

#[test]
Expand Down

0 comments on commit 84bcca9

Please sign in to comment.