-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement event tracking telemetry and submission to Mixpanel
- Loading branch information
Showing
10 changed files
with
460 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
mod proto_event; | ||
|
||
use crate::http::{self, Http}; | ||
use crate::telemetry::Event; | ||
use crate::time; | ||
use core::fmt; | ||
use embassy_time::Instant; | ||
use log::{debug, warn}; | ||
|
||
/// Mixpanel API base URL | ||
const BASE_URL: &str = "https://api-eu.mixpanel.com"; | ||
|
||
/// Mixpanel API error | ||
#[derive(Debug)] | ||
pub enum Error { | ||
/// Current time is required but not set | ||
CurrentTimeNotSet, | ||
/// Failed to connect to API server | ||
Connect(http::Error), | ||
/// Failed to submit events to API server | ||
Submit(http::Error), | ||
} | ||
|
||
impl fmt::Display for Error { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
match self { | ||
Self::CurrentTimeNotSet => write!(f, "Unknown current time"), | ||
Self::Connect(err) => write!(f, "MP connect failed ({err})"), | ||
Self::Submit(err) => write!(f, "MP submit failed ({err})"), | ||
} | ||
} | ||
} | ||
|
||
/// Mixpanel API client | ||
#[derive(Debug)] | ||
pub struct Mixpanel<'a> { | ||
token: &'a str, | ||
device_id: &'a str, | ||
} | ||
|
||
impl<'a> Mixpanel<'a> { | ||
/// Create new Mixpanel API client using the given project token | ||
pub fn new(token: &'a str, device_id: &'a str) -> Self { | ||
Self { token, device_id } | ||
} | ||
|
||
/// Connect to API server | ||
pub async fn connect<'conn>( | ||
&'conn mut self, | ||
http: &'conn mut Http<'_>, | ||
) -> Result<Connection<'conn>, Error> { | ||
Connection::new(self, http).await | ||
} | ||
} | ||
|
||
/// Mixpanel API client connection | ||
#[derive(Debug)] | ||
pub struct Connection<'a> { | ||
http: http::Connection<'a>, | ||
token: &'a str, | ||
device_id: &'a str, | ||
} | ||
|
||
impl Connection<'_> { | ||
/// Submit tracked events | ||
pub async fn submit(&mut self, events: &[(Instant, Event)]) -> Result<(), Error> { | ||
use proto_event::{TrackRequest, TrackResponse}; | ||
|
||
if time::now().is_none() { | ||
warn!("Mixpanel: Not submitting events. No current time set."); | ||
return Err(Error::CurrentTimeNotSet); | ||
} | ||
|
||
debug!("Mixpanel: Submitting {} events...", events.len()); | ||
let response: TrackResponse = self | ||
.http | ||
.post( | ||
"track?verbose=1", | ||
&TrackRequest { | ||
token: self.token, | ||
device_id: self.device_id, | ||
events, | ||
}, | ||
) | ||
.await | ||
.map_err(Error::Submit)?; | ||
debug!( | ||
"Mixpanel: Submit successul, status {} {}", | ||
response.status, response.error | ||
); | ||
Ok(()) | ||
} | ||
} | ||
|
||
impl<'a> Connection<'a> { | ||
/// Connect to API server | ||
async fn new(mp: &'a Mixpanel<'_>, http: &'a mut Http<'_>) -> Result<Self, Error> { | ||
// Connect to API server | ||
let connection = http.connect(BASE_URL).await.map_err(Error::Connect)?; | ||
|
||
Ok(Self { | ||
http: connection, | ||
token: mp.token, | ||
device_id: mp.device_id, | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
use crate::json::{self, FromJsonObject, ToJson}; | ||
use crate::telemetry; | ||
use crate::time::DateTimeExt; | ||
use alloc::string::String; | ||
use embassy_time::Instant; | ||
use embedded_io_async::{BufRead, Write}; | ||
|
||
/// `track` request | ||
#[derive(Debug)] | ||
pub struct TrackRequest<'a> { | ||
pub token: &'a str, | ||
pub device_id: &'a str, | ||
pub events: &'a [(Instant, telemetry::Event)], | ||
} | ||
|
||
impl ToJson for TrackRequest<'_> { | ||
async fn to_json<W: Write>( | ||
&self, | ||
json: &mut json::Writer<W>, | ||
) -> Result<(), json::Error<W::Error>> { | ||
json.write_array(self.events.iter().map(|(time, event)| Event { | ||
token: self.token, | ||
device_id: self.device_id, | ||
time, | ||
telemetry: event, | ||
})) | ||
.await | ||
} | ||
} | ||
|
||
/// `track` response | ||
#[derive(Debug, Default)] | ||
pub struct TrackResponse { | ||
pub error: String, | ||
pub status: u32, | ||
} | ||
|
||
impl FromJsonObject for TrackResponse { | ||
type Context<'ctx> = (); | ||
|
||
async fn read_next<R: BufRead>( | ||
&mut self, | ||
_key: String, | ||
json: &mut json::Reader<R>, | ||
_context: &Self::Context<'_>, | ||
) -> Result<(), json::Error<R::Error>> { | ||
_ = json.read_any().await?; | ||
Ok(()) | ||
} | ||
} | ||
|
||
/// Event | ||
#[derive(Debug)] | ||
struct Event<'a> { | ||
token: &'a str, | ||
device_id: &'a str, | ||
time: &'a Instant, | ||
telemetry: &'a telemetry::Event, | ||
} | ||
|
||
impl ToJson for Event<'_> { | ||
async fn to_json<W: Write>( | ||
&self, | ||
json: &mut json::Writer<W>, | ||
) -> Result<(), json::Error<W::Error>> { | ||
json.write_object() | ||
.await? | ||
.field("event", self.telemetry.event_name()) | ||
.await? | ||
.field("properties", EventProperties { event: self }) | ||
.await? | ||
.finish() | ||
.await | ||
} | ||
} | ||
|
||
/// Event properties | ||
#[derive(Debug)] | ||
struct EventProperties<'a> { | ||
event: &'a Event<'a>, | ||
} | ||
|
||
impl ToJson for EventProperties<'_> { | ||
async fn to_json<W: Write>( | ||
&self, | ||
json: &mut json::Writer<W>, | ||
) -> Result<(), json::Error<W::Error>> { | ||
// Convert relative `Instant` time to absolute `DateTime` (needs current time set) | ||
let time = self | ||
.event | ||
.time | ||
.to_datetime() | ||
.ok_or(json::Error::InvalidType)?; | ||
|
||
let mut object = json.write_object().await?; | ||
|
||
// Reserved properties, see https://docs.mixpanel.com/docs/data-structure/property-reference/reserved-properties | ||
object | ||
.field("token", self.event.token) | ||
.await? | ||
.field("time", time.timestamp_millis()) | ||
.await?; | ||
// Use user id as distinct id if event is associated with a user, use device id otherwise | ||
match self.event.telemetry.user_id() { | ||
Some(user_id) => object.field("distinct_id", user_id).await?, | ||
None => object.field("distinct_id", self.event.device_id).await?, | ||
}; | ||
|
||
// Global custom properties | ||
object | ||
.field("firmware_version", crate::VERSION_STR) | ||
.await? | ||
.field("firmware_git_sha", crate::GIT_SHA_STR) | ||
.await? | ||
.field("device_id", self.event.device_id) | ||
.await?; | ||
// Event-specific custom properties | ||
self.event | ||
.telemetry | ||
.add_event_attributes(&mut object) | ||
.await?; | ||
|
||
object.finish().await | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.