Skip to content

Commit

Permalink
Merge pull request #264 from lmnr-ai/dev
Browse files Browse the repository at this point in the history
Events and landing
  • Loading branch information
dinmukhamedm authored Dec 5, 2024
2 parents 8b91d3a + a4efced commit 09c42ce
Show file tree
Hide file tree
Showing 382 changed files with 7,832 additions and 4,130 deletions.
5 changes: 3 additions & 2 deletions frontend/.vscode/settings.json → .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@
},
"editor.codeActionsOnSave": {
"source.fixAll.eslint": "explicit"
}
}
},
"eslint.validate": ["javascript"]
}
5 changes: 0 additions & 5 deletions app-server/.vscode/settings.json

This file was deleted.

33 changes: 4 additions & 29 deletions app-server/src/api/v1/traces.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,25 @@
use std::sync::Arc;

use actix_web::{get, post, web, HttpRequest, HttpResponse};
use actix_web::{post, web, HttpRequest, HttpResponse};
use bytes::Bytes;
use lapin::Connection;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::{
db::{
events::{self, EventObservation},
project_api_keys::ProjectApiKey,
spans::Span,
DB,
},
db::{events::Event, project_api_keys::ProjectApiKey, spans::Span, DB},
features::{is_feature_enabled, Feature},
opentelemetry::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest,
routes::types::ResponseResult,
traces::{limits::get_workspace_limit_exceeded_by_project_id, producer::push_spans_to_queue},
};
use prost::Message;

#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Clone)]
pub struct RabbitMqSpanMessage {
pub project_id: Uuid,
pub span: Span,
pub events: Vec<EventObservation>,
pub events: Vec<Event>,
}

#[post("traces")]
Expand Down Expand Up @@ -78,23 +73,3 @@ pub async fn process_traces(
Ok(HttpResponse::Ok().finish())
}
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GetEventsForSessionRequest {
session_id: String,
}

#[get("session-events")]
pub async fn get_events_for_session(
request: web::Query<GetEventsForSessionRequest>,
project_api_key: ProjectApiKey,
db: web::Data<DB>,
) -> ResponseResult {
let project_id = project_api_key.project_id;
let session_id = request.session_id.clone();
let events = events::get_events_for_session(&db.pool, &session_id, &project_id)
.await
.map_err(|e| anyhow::anyhow!("Failed to get events for session: {}", e))?;
Ok(HttpResponse::Ok().json(events))
}
153 changes: 10 additions & 143 deletions app-server/src/ch/events.rs
Original file line number Diff line number Diff line change
@@ -1,90 +1,30 @@
use anyhow::Result;
use chrono::{DateTime, Utc};
use clickhouse::Row;
use serde::Serialize;
use serde_repr::Serialize_repr;
use uuid::Uuid;

use crate::{
db::{self, event_templates::EventTemplate},
features::{is_feature_enabled, Feature},
};
use crate::{db::events::Event, features::is_feature_enabled, Feature};

use super::{
modifiers::GroupByInterval,
utils::{
chrono_to_nanoseconds, group_by_time_absolute_statement, group_by_time_relative_statement,
},
MetricTimeValue,
};

#[derive(Debug, Serialize_repr)]
#[repr(u8)]
pub enum EventSource {
CODE = 0,
AUTO = 1,
MANUAL = 2,
}

impl From<db::events::EventSource> for EventSource {
fn from(source: db::events::EventSource) -> Self {
match source {
db::events::EventSource::CODE => EventSource::CODE,
db::events::EventSource::AUTO => EventSource::AUTO,
db::events::EventSource::MANUAL => EventSource::MANUAL,
}
}
}

#[derive(Debug, Serialize_repr)]
#[repr(u8)]
pub enum EventType {
BOOLEAN = 0,
NUMBER = 1,
STRING = 2,
}

impl From<db::event_templates::EventType> for EventType {
fn from(event_type: db::event_templates::EventType) -> Self {
match event_type {
db::event_templates::EventType::BOOLEAN => EventType::BOOLEAN,
db::event_templates::EventType::NUMBER => EventType::NUMBER,
db::event_templates::EventType::STRING => EventType::STRING,
}
}
}
use super::utils::chrono_to_nanoseconds;

#[derive(Row, Serialize)]
pub struct CHEvent {
#[serde(with = "clickhouse::serde::uuid")]
pub id: Uuid,
/// Timestamp in nanoseconds
pub timestamp: i64,
pub source: EventSource,
#[serde(with = "clickhouse::serde::uuid")]
pub template_id: Uuid,
pub template_name: String,
pub event_type: EventType,
#[serde(with = "clickhouse::serde::uuid")]
pub project_id: Uuid,
/// Timestamp in nanoseconds
pub timestamp: i64,
pub name: String,
}

impl CHEvent {
pub fn from_data(
id: Uuid,
timestamp: DateTime<Utc>,
event_template: EventTemplate,
source: EventSource,
project_id: Uuid,
) -> Self {
pub fn from_db_event(event: &Event) -> Self {
CHEvent {
id,
timestamp: chrono_to_nanoseconds(timestamp),
source,
template_id: event_template.id,
template_name: event_template.name,
event_type: event_template.event_type.into(),
project_id,
id: event.id,
timestamp: chrono_to_nanoseconds(event.timestamp),
name: event.name.clone(),
project_id: event.project_id,
}
}
}
Expand Down Expand Up @@ -120,76 +60,3 @@ pub async fn insert_events(clickhouse: clickhouse::Client, events: Vec<CHEvent>)
}
}
}

pub async fn get_total_event_count_metrics_relative(
clickhouse: clickhouse::Client,
group_by_interval: GroupByInterval,
project_id: Uuid,
template_id: Uuid,
past_hours: i64,
) -> Result<Vec<MetricTimeValue<i64>>> {
let ch_round_time = group_by_interval.to_ch_truncate_time();

let query_string = format!(
"
SELECT
{ch_round_time}(timestamp) AS time,
COUNT(DISTINCT id) AS value
FROM events
WHERE
project_id = ?
AND template_id = ?
AND timestamp >= now() - INTERVAL ? HOUR
{}",
group_by_time_relative_statement(past_hours, group_by_interval),
);

let rows: Vec<MetricTimeValue<i64>> = clickhouse
.query(&query_string)
.bind(project_id)
.bind(template_id)
.bind(past_hours)
.fetch_all::<MetricTimeValue<i64>>()
.await?;

Ok(rows)
}

pub async fn get_total_event_count_metrics_absolute(
clickhouse: clickhouse::Client,
group_by_interval: GroupByInterval,
project_id: Uuid,
template_id: Uuid,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> Result<Vec<MetricTimeValue<i64>>> {
let ch_round_time = group_by_interval.to_ch_truncate_time();
let ch_start_time = start_time.timestamp();
let ch_end_time = end_time.timestamp();

let query_string = format!(
"
SELECT
{ch_round_time}(timestamp) AS time,
COUNT(DISTINCT id) AS value
FROM events
WHERE
project_id = ?
AND template_id = ?
AND timestamp >= fromUnixTimestamp(?)
AND timestamp <= fromUnixTimestamp(?)
{}",
group_by_time_absolute_statement(start_time, end_time, group_by_interval)
);

let rows: Vec<MetricTimeValue<i64>> = clickhouse
.query(&query_string)
.bind(project_id)
.bind(template_id)
.bind(ch_start_time)
.bind(ch_end_time)
.fetch_all::<MetricTimeValue<i64>>()
.await?;

Ok(rows)
}
Loading

0 comments on commit 09c42ce

Please sign in to comment.