Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: events or links to string #2667

Merged
merged 1 commit into from
Oct 31, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 95 additions & 43 deletions src/servers/src/otlp/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use opentelemetry_proto::tonic::common::v1::{
};
use opentelemetry_proto::tonic::trace::v1::span::{Event, Link};
use opentelemetry_proto::tonic::trace::v1::{Span, Status};
use serde_json::json;
use serde::{Deserialize, Serialize};

use super::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use crate::error::Result;
Expand All @@ -36,7 +36,6 @@ const APPROXIMATE_COLUMN_COUNT: usize = 24;
pub const TRACE_TABLE_NAME: &str = "traces_preview_v01";

#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct TraceSpan {
// the following are tags
pub trace_id: String,
Expand Down Expand Up @@ -64,6 +63,42 @@ pub struct TraceSpan {

pub type TraceSpans = Vec<TraceSpan>;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TraceLink {
pub trace_id: String,
pub span_id: String,
pub trace_state: String,
pub attributes: String, // TODO(yuanbohan): Map in the future
}

impl From<&Link> for TraceLink {
fn from(link: &Link) -> Self {
Self {
trace_id: bytes_to_hex_string(&link.trace_id),
span_id: bytes_to_hex_string(&link.span_id),
trace_state: link.trace_state.clone(),
attributes: vec_kv_to_string(&link.attributes),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SpanEvent {
pub name: String,
pub time: String,
pub attributes: String, // TODO(yuanbohan): Map in the future
}

impl From<&Event> for SpanEvent {
fn from(event: &Event) -> Self {
Self {
name: event.name.clone(),
time: Time::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
attributes: vec_kv_to_string(&event.attributes),
}
}
}

/// Convert SpanTraces to GreptimeDB row insert requests.
/// Returns `InsertRequests` and total number of rows to ingest
pub fn to_grpc_insert_requests(
Expand Down Expand Up @@ -261,32 +296,13 @@ pub fn any_value_to_string(val: AnyValue) -> Option<String> {
})
}

pub fn event_to_string(event: &Event) -> String {
json!({
"name": event.name,
"time": Time::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
"attrs": vec_kv_to_string(&event.attributes),
})
.to_string()
}

pub fn events_to_string(events: &[Event]) -> String {
let v: Vec<String> = events.iter().map(event_to_string).collect();
let v: Vec<SpanEvent> = events.iter().map(SpanEvent::from).collect();
serde_json::to_string(&v).unwrap_or_else(|_| "[]".into())
}

pub fn link_to_string(link: &Link) -> String {
json!({
"trace_id": link.trace_id,
"span_id": link.span_id,
"trace_state": link.trace_state,
"attributes": vec_kv_to_string(&link.attributes),
})
.to_string()
}

pub fn links_to_string(links: &[Link]) -> String {
let v: Vec<String> = links.iter().map(link_to_string).collect();
let v: Vec<TraceLink> = links.iter().map(TraceLink::from).collect();
serde_json::to_string(&v).unwrap_or_else(|_| "[]".into())
}

Expand All @@ -303,13 +319,13 @@ mod tests {
use opentelemetry_proto::tonic::common::v1::{
any_value, AnyValue, ArrayValue, KeyValue, KeyValueList,
};
use opentelemetry_proto::tonic::trace::v1::span::Event;
use opentelemetry_proto::tonic::trace::v1::span::{Event, Link};
use opentelemetry_proto::tonic::trace::v1::Status;
use serde_json::json;

use crate::otlp::trace::{
arr_vals_to_string, bytes_to_hex_string, event_to_string, kvlist_to_string,
status_to_string, vec_kv_to_string,
arr_vals_to_string, bytes_to_hex_string, events_to_string, kvlist_to_string,
links_to_string, status_to_string, vec_kv_to_string, SpanEvent, TraceLink,
};

#[test]
Expand Down Expand Up @@ -369,30 +385,66 @@ mod tests {
}

#[test]
fn test_event_to_string() {
let attributes = vec![KeyValue {
fn test_links_to_string() {
let trace_id = vec![
36, 254, 121, 148, 134, 65, 177, 16, 162, 155, 194, 120, 89, 48, 126, 141,
];
let span_id = vec![186, 255, 238, 221, 123, 141, 235, 192];
let trace_state = "OK".to_string();
let link_attributes = vec![KeyValue {
key: "str_key".into(),
value: Some(AnyValue {
value: Some(any_value::Value::StringValue("val1".into())),
}),
}];

let trace_links = vec![TraceLink {
trace_id: bytes_to_hex_string(&trace_id),
span_id: bytes_to_hex_string(&span_id),
trace_state: trace_state.clone(),
attributes: vec_kv_to_string(&link_attributes),
}];
let expect_string = serde_json::to_string(&trace_links).unwrap_or_default();

let links = vec![Link {
trace_id,
span_id,
trace_state,
attributes: link_attributes,
dropped_attributes_count: 0,
}];
let links_string = links_to_string(&links);

assert_eq!(expect_string, links_string);
}

#[test]
fn test_events_to_string() {
let time_unix_nano = 1697620662450128000_u64;
let event_name = "event_name".to_string();
let event_attributes = vec![KeyValue {
key: "str_key".into(),
value: Some(AnyValue {
value: Some(any_value::Value::StringValue("val1".into())),
}),
}];
let event = Event {
time_unix_nano: 1697620662450128000_u64,
name: "event_name".into(),
attributes,

let span_events = vec![SpanEvent {
name: event_name.clone(),
time: Time::new_nanosecond(time_unix_nano as i64).to_iso8601_string(),
attributes: vec_kv_to_string(&event_attributes),
}];
let expect_string = serde_json::to_string(&span_events).unwrap_or_default();

let events = vec![Event {
time_unix_nano,
name: event_name,
attributes: event_attributes,
dropped_attributes_count: 0,
};
let event_string = event_to_string(&event);
let expect = json!({
"name": event.name,
"time": Time::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
"attrs": vec_kv_to_string(&event.attributes),
});
}];
let events_string = events_to_string(&events);

assert_eq!(
expect,
serde_json::from_str::<serde_json::value::Value>(event_string.as_str()).unwrap()
);
assert_eq!(expect_string, events_string);
}

#[test]
Expand Down