-
Notifications
You must be signed in to change notification settings - Fork 45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
External Logging #500
base: main
Are you sure you want to change the base?
External Logging #500
Changes from all commits
35f4764
a220ef6
1c9aba2
718842c
ff51cb3
a3e3fa4
01d61a8
f910971
d88b305
5efad2c
ebf55b9
40078c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,117 @@ | ||||||||||||||||||||||||||||||||||||||||||||
use chrono::Utc; | ||||||||||||||||||||||||||||||||||||||||||||
use reqwest::Client; | ||||||||||||||||||||||||||||||||||||||||||||
use serde::Serialize; | ||||||||||||||||||||||||||||||||||||||||||||
use std::fmt::Write; | ||||||||||||||||||||||||||||||||||||||||||||
use tokio::sync::mpsc::{self, Sender}; | ||||||||||||||||||||||||||||||||||||||||||||
use tokio::task::JoinHandle; | ||||||||||||||||||||||||||||||||||||||||||||
use tracing::field::{Field, Visit}; | ||||||||||||||||||||||||||||||||||||||||||||
use tracing::Event; | ||||||||||||||||||||||||||||||||||||||||||||
use tracing_subscriber::layer::{Context, Layer}; | ||||||||||||||||||||||||||||||||||||||||||||
use tracing_subscriber::registry::LookupSpan; | ||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+1
to
+10
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Std imports should be first, this will fix formatting ci |
||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
pub const BACKLOG: usize = 10_000; | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
struct Visitor<'a> { | ||||||||||||||||||||||||||||||||||||||||||||
output: &'a mut String, | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
impl<'a> Visit for Visitor<'a> { | ||||||||||||||||||||||||||||||||||||||||||||
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) { | ||||||||||||||||||||||||||||||||||||||||||||
let _ = write!(self.output, "{}={:?} ", field.name(), value); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
#[derive(Serialize)] | ||||||||||||||||||||||||||||||||||||||||||||
struct LogMessage { | ||||||||||||||||||||||||||||||||||||||||||||
timestamp: String, | ||||||||||||||||||||||||||||||||||||||||||||
level: String, | ||||||||||||||||||||||||||||||||||||||||||||
message: String, | ||||||||||||||||||||||||||||||||||||||||||||
target: String, | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
pub struct ElasticsearchLayer { | ||||||||||||||||||||||||||||||||||||||||||||
sender: Sender<LogMessage>, | ||||||||||||||||||||||||||||||||||||||||||||
handle: JoinHandle<()>, | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
impl ElasticsearchLayer { | ||||||||||||||||||||||||||||||||||||||||||||
/// Create a new ElasticsearchLayer. | ||||||||||||||||||||||||||||||||||||||||||||
/// | ||||||||||||||||||||||||||||||||||||||||||||
/// # Arguments | ||||||||||||||||||||||||||||||||||||||||||||
/// | ||||||||||||||||||||||||||||||||||||||||||||
/// - `elasticsearch_url`: The base URL of the Elasticsearch instance. | ||||||||||||||||||||||||||||||||||||||||||||
/// - `index`: The name of the index where logs will be sent. | ||||||||||||||||||||||||||||||||||||||||||||
/// - `api_key`: Optional API key for authentication. | ||||||||||||||||||||||||||||||||||||||||||||
pub fn new(elasticsearch_url: &str, index: &str, api_key: Option<&str>) -> Self { | ||||||||||||||||||||||||||||||||||||||||||||
let (sender, mut receiver) = mpsc::channel(BACKLOG); | ||||||||||||||||||||||||||||||||||||||||||||
let client = Client::new(); | ||||||||||||||||||||||||||||||||||||||||||||
let base_url = format!("{}/{}/_doc", elasticsearch_url.trim_end_matches('/'), index); | ||||||||||||||||||||||||||||||||||||||||||||
let api_key_header = api_key.map(|key| format!("ApiKey {}", key)); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
// Spawn an async task to process logs and send them to Elasticsearch. | ||||||||||||||||||||||||||||||||||||||||||||
let handle = tokio::spawn(async move { | ||||||||||||||||||||||||||||||||||||||||||||
while let Some(log) = receiver.recv().await { | ||||||||||||||||||||||||||||||||||||||||||||
let mut request = client.post(&base_url).json(&log); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
// Add the Authorization header if an API key is provided. | ||||||||||||||||||||||||||||||||||||||||||||
if let Some(ref key) = api_key_header { | ||||||||||||||||||||||||||||||||||||||||||||
request = request.header("Authorization", key); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
let response = request.send().await; | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
match response { | ||||||||||||||||||||||||||||||||||||||||||||
Ok(res) if !res.status().is_success() => { | ||||||||||||||||||||||||||||||||||||||||||||
eprintln!("Failed to send log to Elasticsearch: HTTP {}", res.status()); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
Ok(_) => {} | ||||||||||||||||||||||||||||||||||||||||||||
Err(e) => { | ||||||||||||||||||||||||||||||||||||||||||||
eprintln!("Failed to send log to Elasticsearch: {:?}", e); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
Self { sender, handle } | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
impl<S> Layer<S> for ElasticsearchLayer | ||||||||||||||||||||||||||||||||||||||||||||
where | ||||||||||||||||||||||||||||||||||||||||||||
S: tracing::Subscriber + for<'a> LookupSpan<'a>, | ||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) { | ||||||||||||||||||||||||||||||||||||||||||||
let timestamp = Utc::now().to_rfc3339(); | ||||||||||||||||||||||||||||||||||||||||||||
let level = event.metadata().level().to_string(); | ||||||||||||||||||||||||||||||||||||||||||||
let target = event.metadata().target().to_string(); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
let mut message = String::new(); | ||||||||||||||||||||||||||||||||||||||||||||
let mut visitor = Visitor { | ||||||||||||||||||||||||||||||||||||||||||||
output: &mut message, | ||||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||||
event.record(&mut visitor); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
let log = LogMessage { | ||||||||||||||||||||||||||||||||||||||||||||
timestamp, | ||||||||||||||||||||||||||||||||||||||||||||
level: level.clone(), | ||||||||||||||||||||||||||||||||||||||||||||
message: message.clone(), | ||||||||||||||||||||||||||||||||||||||||||||
target, | ||||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
// Non-blocking send to the channel | ||||||||||||||||||||||||||||||||||||||||||||
let result = self.sender.try_send(log); | ||||||||||||||||||||||||||||||||||||||||||||
match result { | ||||||||||||||||||||||||||||||||||||||||||||
Err(e) => { | ||||||||||||||||||||||||||||||||||||||||||||
eprintln!("Failed to write log message to channel. Likely exceeded backlog capacity: {:?}", e); | ||||||||||||||||||||||||||||||||||||||||||||
eprintln!("{}\n{}", level, message) | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
Ok(_) => {} | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
impl Drop for ElasticsearchLayer { | ||||||||||||||||||||||||||||||||||||||||||||
fn drop(&mut self) { | ||||||||||||||||||||||||||||||||||||||||||||
self.handle.abort() | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,6 +58,7 @@ uuid = { version = "1", features = ["v4", "serde"] } | |
# -Z minimal-versions | ||
sync_wrapper = "0.1.2" | ||
bech32 = "0.9.1" | ||
chrono = "0.4.39" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Imports under the -Z minimal-versions are for the case where a dep of ours does not properly define their minimum deps I don't think that is the case here. Do we need the dep in cdk I don't see where it is used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No I forgot to remove it because previously was working on cdk crate not cdk-mintd so that's left over from that.. |
||
|
||
[target.'cfg(not(target_arch = "wasm32"))'.dependencies] | ||
tokio = { version = "1.21", features = [ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since for MSRV we need 24.3 of reqwest we should not set the patch version, cargo will use the lastest one we can.
This change plus rebasing onto main should fix the MSRV CI errors