Skip to content

Commit

Permalink
fix(notification): use serde_json in flattened structs
Browse files Browse the repository at this point in the history
  • Loading branch information
bodymindarts committed Feb 12, 2024
1 parent 7bb87c1 commit a1ad5bb
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 57 additions & 1 deletion core/notifications/src/job/send_push_notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{executor::Executor, notification_event::NotificationEventPayload};
pub(super) struct SendPushNotificationData {
payload: NotificationEventPayload,
#[serde(flatten)]
pub(super) tracing_data: HashMap<String, String>,
pub(super) tracing_data: HashMap<String, serde_json::Value>,
}

impl From<NotificationEventPayload> for SendPushNotificationData {
Expand All @@ -30,3 +30,59 @@ pub async fn execute(
executor.notify(&data.payload).await?;
Ok(data)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{notification_event::*, primitives::*};
use serde::{Deserialize, Serialize};

#[derive(Default, Debug, Serialize, Deserialize)]
struct JobMeta {
attempts: u32,
tracing_data: Option<HashMap<String, String>>,
}

#[derive(Debug, Deserialize, Serialize)]
struct JobData<T: std::fmt::Debug> {
#[serde(rename = "_job_meta", default)]
job_meta: JobMeta,
#[serde(flatten)]
data: Option<T>,
#[serde(flatten)]
tracing_data: HashMap<String, serde_json::Value>,
}

#[test]
fn job_data_round_trip() {
let tracing_data = vec![
(
"tracestate".to_string(),
serde_json::Value::String("".to_string()),
),
(
"traceparent".to_string(),
serde_json::Value::String(
"00-2da747b31646aadf7aa11efacba7aad1-f1d9a1f51961dee0-01".to_string(),
),
),
]
.into_iter()
.collect();

let payload = NotificationEventPayload::CircleGrew(CircleGrew {
user_id: GaloyUserId::from("172437af-e8c3-4df7-9859-148dea00bf33".to_string()),
circle_type: CircleType::Outer,
this_month_circle_size: 1,
all_time_circle_size: 1,
});
let raw = SendPushNotificationData {
payload,
tracing_data,
};
let data = serde_json::to_string(&raw).unwrap();
let job_data: JobData<SendPushNotificationData> = serde_json::from_str(&data).unwrap();
assert!(job_data.data.is_some());
assert_eq!(job_data.tracing_data.len(), 3);
}
}
4 changes: 2 additions & 2 deletions lib/job-executor-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ struct JobData<T> {
#[serde(flatten)]
data: Option<T>,
#[serde(flatten)]
tracing_data: HashMap<String, String>,
tracing_data: HashMap<String, serde_json::Value>,
}

impl<'a, T: Deserialize<'a>> JobData<T> {
Expand All @@ -207,7 +207,7 @@ struct JobMeta {
attempts: u32,
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
wait_till_next_attempt: Duration,
tracing_data: Option<HashMap<String, String>>,
tracing_data: Option<HashMap<String, serde_json::Value>>,
}
impl Default for JobMeta {
fn default() -> Self {
Expand Down
1 change: 1 addition & 0 deletions lib/tracing-rs/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ galoy_rust_lib(
deps = [
"//third-party/rust:anyhow",
"//third-party/rust:serde",
"//third-party/rust:serde_json",
"//third-party/rust:opentelemetry-otlp",
"//third-party/rust:opentelemetry-semantic-conventions",
"//third-party/rust:opentelemetry",
Expand Down
1 change: 1 addition & 0 deletions lib/tracing-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
anyhow = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
tracing-opentelemetry = { workspace = true }
Expand Down
19 changes: 16 additions & 3 deletions lib/tracing-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,30 @@ pub fn insert_error_fields(level: tracing::Level, error: impl std::fmt::Display)
Span::current().record("error.message", &tracing::field::display(error));
}

pub fn extract_tracing_data() -> HashMap<String, String> {
pub fn extract_tracing_data() -> HashMap<String, serde_json::Value> {
let mut tracing_data = HashMap::new();
let propagator = TraceContextPropagator::new();
let context = Span::current().context();
propagator.inject_context(&context, &mut tracing_data);
tracing_data
.into_iter()
.map(|(k, v)| (k, serde_json::Value::String(v)))
.collect()
}

pub fn inject_tracing_data(span: &Span, tracing_data: &HashMap<String, String>) {
pub fn inject_tracing_data(span: &Span, tracing_data: &HashMap<String, serde_json::Value>) {
let data = tracing_data
.iter()
.filter_map(|(k, v)| {
if let serde_json::Value::String(v) = v {
Some((k.clone(), v.clone()))
} else {
None
}
})
.collect::<HashMap<String, String>>();
let propagator = TraceContextPropagator::new();
let context = propagator.extract(tracing_data);
let context = propagator.extract(&data);
span.set_parent(context);
}

Expand Down

0 comments on commit a1ad5bb

Please sign in to comment.