diff --git a/Cargo.lock b/Cargo.lock index b373eabede8..3b97d7ec8ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3845,6 +3845,7 @@ dependencies = [ "opentelemetry-otlp", "opentelemetry-semantic-conventions", "serde", + "serde_json", "tonic 0.10.2", "tracing 0.1.40", "tracing-opentelemetry", diff --git a/core/notifications/src/job/send_push_notification.rs b/core/notifications/src/job/send_push_notification.rs index 2a0d33aa458..95fa35ab0f7 100644 --- a/core/notifications/src/job/send_push_notification.rs +++ b/core/notifications/src/job/send_push_notification.rs @@ -10,7 +10,7 @@ use crate::{executor::Executor, notification_event::NotificationEventPayload}; pub(super) struct SendPushNotificationData { payload: NotificationEventPayload, #[serde(flatten)] - pub(super) tracing_data: HashMap, + pub(super) tracing_data: HashMap, } impl From for SendPushNotificationData { @@ -30,3 +30,61 @@ pub async fn execute( executor.notify(&data.payload).await?; Ok(data) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{notification_event::*, primitives::*}; + use serde::{Deserialize, Serialize}; + + #[derive(Default, Debug, Serialize, Deserialize)] + struct JobMeta { + attempts: u32, + tracing_data: Option>, + } + + #[derive(Debug, Deserialize, Serialize)] + struct JobData { + #[serde(rename = "_job_meta", default)] + job_meta: JobMeta, + #[serde(flatten)] + data: Option, + #[serde(flatten)] + tracing_data: HashMap, + } + + #[test] + fn job_data_round_trip() { + let tracing_data = vec![ + ( + "tracestate".to_string(), + serde_json::Value::String("".to_string()), + ), + ( + "traceparent".to_string(), + serde_json::Value::String( + "00-2da747b31646aadf7aa11efacba7aad1-f1d9a1f51961dee0-01".to_string(), + ), + ), + ] + .into_iter() + .collect(); + + let payload = NotificationEventPayload::CircleGrew(CircleGrew { + user_id: GaloyUserId::from("172437af-e8c3-4df7-9859-148dea00bf33".to_string()), + circle_type: CircleType::Outer, + this_month_circle_size: 1, + all_time_circle_size: 1, + }); + let raw = SendPushNotificationData { + payload, + tracing_data, + }; + let data = serde_json::to_string(&raw).unwrap(); + dbg!(&data); + let job_data: JobData = serde_json::from_str(&data).unwrap(); + assert!(job_data.data.is_some()); + dbg!(&job_data); + assert_eq!(job_data.tracing_data.len(), 3); + } +} diff --git a/lib/job-executor-rs/src/lib.rs b/lib/job-executor-rs/src/lib.rs index 8405a5b2a77..4e49b40b419 100644 --- a/lib/job-executor-rs/src/lib.rs +++ b/lib/job-executor-rs/src/lib.rs @@ -184,7 +184,7 @@ struct JobData { #[serde(flatten)] data: Option, #[serde(flatten)] - tracing_data: HashMap, + tracing_data: HashMap, } impl<'a, T: Deserialize<'a>> JobData { @@ -207,7 +207,7 @@ struct JobMeta { attempts: u32, #[serde_as(as = "serde_with::DurationSeconds")] wait_till_next_attempt: Duration, - tracing_data: Option>, + tracing_data: Option>, } impl Default for JobMeta { fn default() -> Self { diff --git a/lib/tracing-rs/BUCK b/lib/tracing-rs/BUCK index 54022ab0261..96c79f39999 100644 --- a/lib/tracing-rs/BUCK +++ b/lib/tracing-rs/BUCK @@ -5,6 +5,7 @@ galoy_rust_lib( deps = [ "//third-party/rust:anyhow", "//third-party/rust:serde", + "//third-party/rust:serde_json", "//third-party/rust:opentelemetry-otlp", "//third-party/rust:opentelemetry-semantic-conventions", "//third-party/rust:opentelemetry", diff --git a/lib/tracing-rs/Cargo.toml b/lib/tracing-rs/Cargo.toml index a724ca7ae9a..73b09f8b1a8 100644 --- a/lib/tracing-rs/Cargo.toml +++ b/lib/tracing-rs/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] anyhow = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } tracing-opentelemetry = { workspace = true } diff --git a/lib/tracing-rs/src/lib.rs b/lib/tracing-rs/src/lib.rs index 0b94e063944..1fdcdaa4aca 100644 --- a/lib/tracing-rs/src/lib.rs +++ b/lib/tracing-rs/src/lib.rs @@ -73,17 +73,30 @@ pub fn insert_error_fields(level: tracing::Level, error: impl std::fmt::Display) Span::current().record("error.message", &tracing::field::display(error)); } -pub fn extract_tracing_data() -> HashMap { +pub fn extract_tracing_data() -> HashMap { let mut tracing_data = HashMap::new(); let propagator = TraceContextPropagator::new(); let context = Span::current().context(); propagator.inject_context(&context, &mut tracing_data); tracing_data + .into_iter() + .map(|(k, v)| (k, serde_json::Value::String(v))) + .collect() } -pub fn inject_tracing_data(span: &Span, tracing_data: &HashMap) { +pub fn inject_tracing_data(span: &Span, tracing_data: &HashMap) { + let data = tracing_data + .iter() + .filter_map(|(k, v)| { + if let serde_json::Value::String(v) = v { + Some((k.clone(), v.clone())) + } else { + None + } + }) + .collect::>(); let propagator = TraceContextPropagator::new(); - let context = propagator.extract(tracing_data); + let context = propagator.extract(&data); span.set_parent(context); }