Skip to content

Commit

Permalink
Merge pull request #221 from lmnr-ai/dev
Browse files Browse the repository at this point in the history
Fix realtime and manual spans
  • Loading branch information
dinmukhamedm authored Nov 18, 2024
2 parents b30e993 + 13c7b5b commit 2bd68d8
Show file tree
Hide file tree
Showing 17 changed files with 3,487 additions and 436 deletions.
865 changes: 571 additions & 294 deletions app-server/Cargo.lock

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions app-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ tokio-stream = { version = "0.1", features = ["net"] }
futures = "0.3"
rayon = "1"
enum_dispatch = "0.3.12"
reqwest = { version = "0.12.7", default-features = false, features = ["rustls-tls", "json", "stream", "multipart"] }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json", "stream", "multipart"] }
serde = "1.0"
serde_json = "1.0.105"
log = "0.4.20"
Expand All @@ -35,11 +35,10 @@ actix-multipart = "0.6.1"
actix-web-httpauth = "0.8.1"
rand = "0.8.5"
itertools = "0.11.0"
unicode-segmentation = "1.10.1"
chrono = { version = "0.4.31", features = ["serde"] }
moka = { version = "0.12.1", features = ["sync", "future"] }
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "uuid", "json", "chrono", "bigdecimal"] }
thiserror = "1.0.56"
thiserror = "2"
json_value_merge = "2.0.0"
serde-jsonlines = "0.5.0"
regex = "1.10.3"
Expand All @@ -55,7 +54,6 @@ handlebars_misc_helpers = { version = "0.16.3", features = ["json"] }
aws-sdk-bedrockruntime = "1.37.0"
aws-config = "1.5.5"
aws-credential-types = "1.2.0"
backoff = { version = "0.4.0", features = ["tokio"] }
lmnr-baml = { git = "https://github.com/lmnr-ai/lmnr-baml", branch = "rust" }
lapin = "2.5.0"
bytes = "1.7.1"
Expand Down
11 changes: 2 additions & 9 deletions app-server/src/db/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ impl FromStr for SpanType {
#[derive(Deserialize, Serialize, Clone, Debug, Default, FromRow)]
#[serde(rename_all = "camelCase")]
pub struct Span {
pub version: String,
pub span_id: Uuid,
pub trace_id: Uuid,
pub parent_span_id: Option<Uuid>,
Expand Down Expand Up @@ -78,8 +77,7 @@ pub async fn record_span(pool: &PgPool, span: &Span, project_id: &Uuid) -> Resul
};
sqlx::query(
"INSERT INTO spans
(version,
span_id,
(span_id,
trace_id,
parent_span_id,
start_time,
Expand All @@ -106,10 +104,8 @@ pub async fn record_span(pool: &PgPool, span: &Span, project_id: &Uuid) -> Resul
$10,
$11,
$12,
$13,
$14)
$13)
ON CONFLICT (span_id, project_id) DO UPDATE SET
version = EXCLUDED.version,
trace_id = EXCLUDED.trace_id,
parent_span_id = EXCLUDED.parent_span_id,
start_time = EXCLUDED.start_time,
Expand All @@ -123,7 +119,6 @@ pub async fn record_span(pool: &PgPool, span: &Span, project_id: &Uuid) -> Resul
output_preview = EXCLUDED.output_preview
",
)
.bind(&span.version)
.bind(&span.span_id)
.bind(&span.trace_id)
.bind(&span.parent_span_id as &Option<Uuid>)
Expand Down Expand Up @@ -196,7 +191,6 @@ pub async fn get_trace_spans(
spans.span_id,
spans.start_time,
spans.end_time,
spans.version,
spans.trace_id,
spans.input,
spans.output,
Expand Down Expand Up @@ -245,7 +239,6 @@ pub async fn get_span(pool: &PgPool, id: Uuid, project_id: Uuid) -> Result<Span>
span_id,
start_time,
end_time,
version,
trace_id,
parent_span_id,
name,
Expand Down
72 changes: 15 additions & 57 deletions app-server/src/db/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ use super::{
utils::add_date_range_to_query,
};

pub const DEFAULT_VERSION: &str = "0.1.0";

/// Helper struct to pass current trace info, if exists, if pipeline is called from remote trace context
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -45,12 +43,6 @@ pub struct Trace {
start_time: Option<DateTime<Utc>>,
#[serde(default)]
end_time: Option<DateTime<Utc>>,
// Laminar trace format's version
version: String,
// Laminar customers' release version
release: Option<String>,
// User id of Laminar customers' user
user_id: Option<String>,
session_id: Option<String>,
metadata: Option<Value>,
input_token_count: i64,
Expand All @@ -59,7 +51,6 @@ pub struct Trace {
input_cost: f64,
output_cost: f64,
cost: f64,
success: bool,
project_id: Uuid,
}

Expand All @@ -69,12 +60,6 @@ pub struct TraceWithTopSpan {
id: Uuid,
start_time: DateTime<Utc>,
end_time: Option<DateTime<Utc>>,
// Laminar trace format's version
version: String,
// Laminar customers' release version
release: Option<String>,
// User id of Laminar customers' user
user_id: Option<String>,
session_id: Option<String>,
metadata: Option<Value>,
input_token_count: i64,
Expand All @@ -83,7 +68,6 @@ pub struct TraceWithTopSpan {
input_cost: f64,
output_cost: f64,
cost: f64,
success: bool,
project_id: Uuid,

top_span_input_preview: Option<String>,
Expand Down Expand Up @@ -114,13 +98,11 @@ pub async fn update_trace_attributes(
input_cost,
output_cost,
cost,
success,
start_time,
end_time,
version,
session_id,
user_id,
trace_type
trace_type,
metadata
)
VALUES (
$1,
Expand All @@ -131,13 +113,11 @@ pub async fn update_trace_attributes(
COALESCE($6, 0::float8),
COALESCE($7, 0::float8),
COALESCE($8, 0::float8),
COALESCE($9, true),
$9,
$10,
$11,
$12,
$13,
$14,
COALESCE($15, 'DEFAULT'::trace_type)
COALESCE($12, 'DEFAULT'::trace_type),
$13
)
ON CONFLICT(id) DO
UPDATE
Expand All @@ -148,12 +128,11 @@ pub async fn update_trace_attributes(
input_cost = traces.input_cost + COALESCE($6, 0),
output_cost = traces.output_cost + COALESCE($7, 0),
cost = traces.cost + COALESCE($8, 0),
success = CASE WHEN $9 IS NULL THEN traces.success ELSE $9 END,
start_time = CASE WHEN traces.start_time IS NULL OR traces.start_time > $10 THEN $10 ELSE traces.start_time END,
end_time = CASE WHEN traces.end_time IS NULL OR traces.end_time < $11 THEN $11 ELSE traces.end_time END,
session_id = CASE WHEN traces.session_id IS NULL THEN $13 ELSE traces.session_id END,
user_id = CASE WHEN traces.user_id IS NULL THEN $14 ELSE traces.user_id END,
trace_type = CASE WHEN $15 IS NULL THEN traces.trace_type ELSE COALESCE($15, 'DEFAULT'::trace_type) END
start_time = CASE WHEN traces.start_time IS NULL OR traces.start_time > $9 THEN $9 ELSE traces.start_time END,
end_time = CASE WHEN traces.end_time IS NULL OR traces.end_time < $10 THEN $10 ELSE traces.end_time END,
session_id = CASE WHEN traces.session_id IS NULL THEN $11 ELSE traces.session_id END,
trace_type = CASE WHEN $12 IS NULL THEN traces.trace_type ELSE COALESCE($12, 'DEFAULT'::trace_type) END,
metadata = COALESCE($13, traces.metadata)
"
)
.bind(attributes.id)
Expand All @@ -164,13 +143,11 @@ pub async fn update_trace_attributes(
.bind(attributes.input_cost)
.bind(attributes.output_cost)
.bind(attributes.cost)
.bind(attributes.success)
.bind(attributes.start_time)
.bind(attributes.end_time)
.bind(DEFAULT_VERSION)
.bind(&attributes.session_id)
.bind(&attributes.user_id)
.bind(&attributes.trace_type)
.bind(&serde_json::to_value(&attributes.metadata).unwrap())
.execute(pool)
.await?;
Ok(())
Expand All @@ -188,9 +165,6 @@ fn add_traces_info_expression(
id,
start_time,
end_time,
version,
release,
user_id,
session_id,
metadata,
project_id,
Expand All @@ -200,15 +174,13 @@ fn add_traces_info_expression(
input_cost,
output_cost,
cost,
success,
trace_type,
top_level_spans.input_preview top_span_input_preview,
top_level_spans.output_preview top_span_output_preview,
top_level_spans.path top_span_path,
top_level_spans.name top_span_name,
top_level_spans.span_type top_span_type,
EXTRACT(EPOCH FROM (end_time - start_time)) as latency,
CASE WHEN success = true THEN 'Success' ELSE 'Failed' END status
EXTRACT(EPOCH FROM (end_time - start_time)) as latency
FROM traces
JOIN (
SELECT
Expand Down Expand Up @@ -409,9 +381,6 @@ pub async fn get_traces(
id,
start_time,
end_time,
version,
release,
user_id,
session_id,
metadata,
project_id,
Expand All @@ -421,13 +390,11 @@ pub async fn get_traces(
input_cost,
output_cost,
cost,
success,
top_span_input_preview,
top_span_output_preview,
top_span_name,
top_span_type,
top_span_path,
status
top_span_path
FROM traces_info ",
);
if let Some(search) = text_search_filter {
Expand Down Expand Up @@ -466,9 +433,6 @@ pub async fn count_traces(
id,
start_time,
end_time,
version,
release,
user_id,
session_id,
metadata,
project_id,
Expand All @@ -478,10 +442,8 @@ pub async fn count_traces(
input_cost,
output_cost,
cost,
success,
trace_type,
EXTRACT(EPOCH FROM (end_time - start_time)) as latency,
CASE WHEN success = true THEN 'Success' ELSE 'Failed' END status
EXTRACT(EPOCH FROM (end_time - start_time)) as latency
FROM traces
WHERE start_time IS NOT NULL AND end_time IS NOT NULL AND trace_type = 'DEFAULT')",
);
Expand Down Expand Up @@ -516,9 +478,6 @@ pub async fn get_single_trace(pool: &PgPool, id: Uuid) -> Result<Trace> {
id,
start_time,
end_time,
version,
release,
user_id,
session_id,
metadata,
project_id,
Expand All @@ -527,8 +486,7 @@ pub async fn get_single_trace(pool: &PgPool, id: Uuid) -> Result<Trace> {
total_token_count,
input_cost,
output_cost,
cost,
success
cost
FROM traces
WHERE id = $1
AND start_time IS NOT NULL AND end_time IS NOT NULL",
Expand Down
14 changes: 8 additions & 6 deletions app-server/src/traces/attributes.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::collections::HashMap;

use chrono::{DateTime, Utc};
use serde_json::Value;
use uuid::Uuid;

use crate::db::trace::TraceType;
Expand All @@ -16,10 +19,9 @@ pub struct TraceAttributes {
pub output_cost: Option<f64>,
/// Total costis not calculated on this struct and must be set manually
pub cost: Option<f64>,
pub success: Option<bool>,
pub session_id: Option<String>,
pub user_id: Option<String>,
pub trace_type: Option<TraceType>,
pub metadata: Option<HashMap<String, Value>>,
}

impl TraceAttributes {
Expand Down Expand Up @@ -69,11 +71,11 @@ impl TraceAttributes {
self.session_id = session_id;
}

pub fn update_user_id(&mut self, user_id: Option<String>) {
self.user_id = user_id;
}

pub fn update_trace_type(&mut self, trace_type: Option<TraceType>) {
self.trace_type = trace_type;
}

pub fn set_metadata(&mut self, metadata: Option<HashMap<String, Value>>) {
self.metadata = metadata;
}
}
Loading

0 comments on commit 2bd68d8

Please sign in to comment.