Skip to content

Commit

Permalink
Merge pull request #3 from fengys1996/otlp_tracing_plugin
Browse files Browse the repository at this point in the history
feat(otlp): trace parser plugin
  • Loading branch information
yuanbohan authored Oct 20, 2023
2 parents 4482abb + 7f87dd9 commit acb8aea
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 39 deletions.
11 changes: 8 additions & 3 deletions src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,16 @@ impl OpenTelemetryProtocolHandler for Instance {
.check_permission(ctx.current_user(), PermissionReq::Otlp)
.context(AuthSnafu)?;

let (requests, rows) = match self.plugins.get::<TraceParserRef>() {
Some(parser) => parser.parse(request)?,
None => otlp::trace::to_grpc_insert_requests(request)?,
let (table_name, spans) = match self.plugins.get::<TraceParserRef>() {
Some(parser) => (parser.table_name(), parser.parse(request)),
None => (
otlp::trace::TRACE_TABLE_NAME.to_string(),
otlp::trace::parse(request),
),
};

let (requests, rows) = otlp::trace::to_grpc_insert_requests(table_name, spans)?;

let _ = self
.handle_row_inserts(requests, ctx)
.await
Expand Down
6 changes: 3 additions & 3 deletions src/servers/src/otlp/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@

use std::sync::Arc;

use api::v1::RowInsertRequests;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;

use crate::error::Result;
use super::trace::TraceSpans;

/// Transformer helps to transform ExportTraceServiceRequest based on logic, like:
/// - uplift some fields from Attributes (Map type) to column
pub trait TraceParser: Send + Sync {
fn parse(&self, request: ExportTraceServiceRequest) -> Result<(RowInsertRequests, usize)>;
fn parse(&self, request: ExportTraceServiceRequest) -> TraceSpans;
fn table_name(&self) -> String;
}

pub type TraceParserRef = Arc<dyn TraceParser>;
82 changes: 49 additions & 33 deletions src/servers/src/otlp/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,37 @@ use crate::error::Result;
use crate::row_writer::{self, MultiTableData, TableData};

const APPROXIMATE_COLUMN_COUNT: usize = 16;
const TRACE_TABLE_NAME: &str = "traces_preview_v01";
pub const TRACE_TABLE_NAME: &str = "traces_preview_v01";

#[derive(Debug, Clone, Eq, PartialEq)]
struct TraceSpan {
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct TraceSpan {
// the following are tags
trace_id: String,
span_id: String,
parent_span_id: String,
pub trace_id: String,
pub span_id: String,
pub parent_span_id: String,

// the following are fields
resource_attributes: String, // TODO(yuanbohan): Map in the future
scope_name: String,
scope_version: String,
scope_attributes: String, // TODO(yuanbohan): Map in the future
trace_state: String,
span_name: String,
span_kind: String,
span_status_code: String,
span_status_message: String,
span_attributes: String, // TODO(yuanbohan): Map in the future
span_events: String, // TODO(yuanbohan): List in the future
span_links: String, // TODO(yuanbohan): List in the future
start_in_nanosecond: u64, // this is also the Timestamp Index
end_in_nanosecond: u64,
pub resource_attributes: String, // TODO(yuanbohan): Map in the future
pub scope_name: String,
pub scope_version: String,
pub scope_attributes: String, // TODO(yuanbohan): Map in the future
pub trace_state: String,
pub span_name: String,
pub span_kind: String,
pub span_status_code: String,
pub span_status_message: String,
pub span_attributes: String, // TODO(yuanbohan): Map in the future
pub span_events: String, // TODO(yuanbohan): List in the future
pub span_links: String, // TODO(yuanbohan): List in the future
pub start_in_nanosecond: u64, // this is also the Timestamp Index
pub end_in_nanosecond: u64,

pub uplift_fields: Vec<(String, ColumnDataType, ValueData)>,
}

pub type TraceSpans = Vec<TraceSpan>;

/// Convert OpenTelemetry traces to GreptimeDB row insert requests
///
/// See
Expand All @@ -67,16 +72,16 @@ struct TraceSpan {
///
/// Returns `InsertRequests` and total number of rows to ingest
pub fn to_grpc_insert_requests(
request: ExportTraceServiceRequest,
table_name: String,
spans: TraceSpans,
) -> Result<(RowInsertRequests, usize)> {
let mut multi_table_writer = MultiTableData::default();
let one_table_writer = multi_table_writer.get_or_default_table_data(
TRACE_TABLE_NAME,
table_name,
APPROXIMATE_COLUMN_COUNT,
APPROXIMATE_COLUMN_COUNT,
);

let spans = parse_request_to_spans(request);
for span in spans {
let row = one_table_writer.alloc_one_row();
write_span_to_row(one_table_writer, row, span)?;
Expand All @@ -85,7 +90,11 @@ pub fn to_grpc_insert_requests(
Ok(multi_table_writer.into_row_insert_requests())
}

fn write_span_to_row(writer: &mut TableData, mut row: Vec<Value>, span: TraceSpan) -> Result<()> {
pub fn write_span_to_row(
writer: &mut TableData,
mut row: Vec<Value>,
span: TraceSpan,
) -> Result<()> {
{
// tags
let iter = vec![
Expand Down Expand Up @@ -137,6 +146,7 @@ fn write_span_to_row(writer: &mut TableData, mut row: Vec<Value>, span: TraceSpa

row_writer::write_fields(writer, str_fields_iter, &mut row)?;
row_writer::write_fields(writer, time_fields_iter, &mut row)?;
row_writer::write_fields(writer, span.uplift_fields.into_iter(), &mut row)?;
}

row_writer::write_f64(
Expand All @@ -158,7 +168,11 @@ fn write_span_to_row(writer: &mut TableData, mut row: Vec<Value>, span: TraceSpa
Ok(())
}

fn parse_span(resource_attrs: &[KeyValue], scope: &InstrumentationScope, span: Span) -> TraceSpan {
pub fn parse_span(
resource_attrs: &[KeyValue],
scope: &InstrumentationScope,
span: Span,
) -> TraceSpan {
let (code, message) = status_to_string(&span.status);
TraceSpan {
trace_id: bytes_to_hex_string(&span.trace_id),
Expand All @@ -180,10 +194,12 @@ fn parse_span(resource_attrs: &[KeyValue], scope: &InstrumentationScope, span: S

start_in_nanosecond: span.start_time_unix_nano,
end_in_nanosecond: span.end_time_unix_nano,

uplift_fields: vec![],
}
}

fn parse_request_to_spans(request: ExportTraceServiceRequest) -> Vec<TraceSpan> {
pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
let mut spans = vec![];
for resource_spans in request.resource_spans {
let resource_attrs = resource_spans
Expand Down Expand Up @@ -248,9 +264,9 @@ pub fn any_value_to_string(val: AnyValue) -> Option<String> {

pub fn event_to_string(event: &Event) -> String {
json!({
"name": event.name,
"time": Time::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
"attrs": vec_kv_to_string(&event.attributes),
"name": event.name,
"time": Time::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
"attrs": vec_kv_to_string(&event.attributes),
})
.to_string()
}
Expand All @@ -262,10 +278,10 @@ pub fn events_to_string(events: &[Event]) -> String {

pub fn link_to_string(link: &Link) -> String {
json!({
"trace_id": link.trace_id,
"span_id": link.span_id,
"trace_state": link.trace_state,
"attributes": vec_kv_to_string(&link.attributes),
"trace_id": link.trace_id,
"span_id": link.span_id,
"trace_state": link.trace_state,
"attributes": vec_kv_to_string(&link.attributes),
})
.to_string()
}
Expand Down

0 comments on commit acb8aea

Please sign in to comment.