Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External Logging #500

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
374 changes: 268 additions & 106 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions crates/cdk-mintd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ url = "2.3"
utoipa = { version = "4", optional = true }
utoipa-swagger-ui = { version = "4", features = ["axum"], optional = true }
rand = "0.8.5"
chrono = "0.4.39"
serde_json = "1.0.133"
reqwest = "0.12.9"
Comment on lines +40 to +42
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
chrono = "0.4.39"
serde_json = "1.0.133"
reqwest = "0.12.9"
chrono = "0.4.39"
serde_json = "1"
reqwest = "0.12"

Since for MSRV we need 24.3 of reqwest we should not set the patch version, cargo will use the lastest one we can.

This change plus rebasing onto main should fix the MSRV CI errors


[features]
swagger = ["cdk-axum/swagger", "dep:utoipa", "dep:utoipa-swagger-ui"]
12 changes: 12 additions & 0 deletions crates/cdk-mintd/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,18 @@ pub struct Settings {
pub lnd: Option<Lnd>,
pub fake_wallet: Option<FakeWallet>,
pub database: Database,
pub elasticsearch: Option<ElasticSearchLogger>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ElasticSearchLogger {
/// authenticated URL endpoint of the ElasticSearch instance
/// (e.g. https://elastic:password@locahost:9200)
pub elasticsearch_url: String,
/// API Key as an alternative, more secure authentication
pub api_key: Option<String>,
/// index under which store the logs
pub index: String,
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
Expand Down
51 changes: 49 additions & 2 deletions crates/cdk-mintd/src/env_vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use anyhow::{anyhow, bail, Result};
use cdk::nuts::CurrencyUnit;

use crate::config::{
Cln, Database, DatabaseEngine, FakeWallet, Info, LNbits, Ln, LnBackend, Lnd, MintInfo,
Phoenixd, Settings, Strike,
Cln, Database, DatabaseEngine, ElasticSearchLogger, FakeWallet, Info, LNbits, Ln, LnBackend,
Lnd, MintInfo, Phoenixd, Settings, Strike,
};

pub const DATABASE_ENV_VAR: &str = "CDK_MINTD_DATABASE";
Expand Down Expand Up @@ -68,6 +68,10 @@ pub const ENV_FAKE_WALLET_FEE_PERCENT: &str = "CDK_MINTD_FAKE_WALLET_FEE_PERCENT
pub const ENV_FAKE_WALLET_RESERVE_FEE_MIN: &str = "CDK_MINTD_FAKE_WALLET_RESERVE_FEE_MIN";
pub const ENV_FAKE_WALLET_MIN_DELAY: &str = "CDK_MINTD_FAKE_WALLET_MIN_DELAY";
pub const ENV_FAKE_WALLET_MAX_DELAY: &str = "CDK_MINTD_FAKE_WALLET_MAX_DELAY";
// Elastic Search
pub const ENV_ELASTIC_SEARCH_URL: &str = "CDK_MINTD_ELASTIC_SEARCH_URL";
pub const ENV_ELASTIC_SEARCH_INDEX: &str = "CDK_MINTD_ELASTIC_SEARCH_INDEX";
pub const ENV_ELASTIC_SEARCH_API_KEY_BASE64: &str = "CDK_MINTD_ELASTIC_SEARCH_API_KEY_BASE64";

impl Settings {
pub fn from_env(&mut self) -> Result<Self> {
Expand All @@ -80,6 +84,8 @@ impl Settings {
self.mint_info = self.mint_info.clone().from_env();
self.ln = self.ln.clone().from_env();

self.external_loggers_from_env();

match self.ln.ln_backend {
LnBackend::Cln => {
self.cln = Some(self.cln.clone().unwrap_or_default().from_env());
Expand All @@ -104,6 +110,27 @@ impl Settings {

Ok(self.clone())
}

fn external_loggers_from_env(&mut self) {
// ElasticSearch
if env::var(ENV_ELASTIC_SEARCH_URL).is_ok() {
if self.elasticsearch.is_none() {
let extlogger = ElasticSearchLogger {
elasticsearch_url: "".to_string(),
api_key: None,
index: "".to_string(),
};
self.elasticsearch = Some(extlogger.from_env());
} else {
self.elasticsearch = Some(
self.elasticsearch
.clone()
.expect("Expected ElasticSearchLogger")
.from_env(),
);
}
}
}
}

impl Info {
Expand Down Expand Up @@ -433,3 +460,23 @@ impl FakeWallet {
self
}
}

impl ElasticSearchLogger {
pub fn from_env(mut self) -> Self {
if let Ok(elasticsearch_url) = env::var(ENV_ELASTIC_SEARCH_URL) {
self.elasticsearch_url = elasticsearch_url;
}

if let Ok(api_key) = env::var(ENV_ELASTIC_SEARCH_API_KEY_BASE64) {
self.api_key = Some(api_key);
} else {
self.api_key = None;
}

if let Ok(index) = env::var(ENV_ELASTIC_SEARCH_INDEX) {
self.index = index;
}

self
}
}
1 change: 1 addition & 0 deletions crates/cdk-mintd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::path::PathBuf;
pub mod cli;
pub mod config;
pub mod env_vars;
pub mod loggers;
pub mod setup;

fn expand_path(path: &str) -> Option<PathBuf> {
Expand Down
117 changes: 117 additions & 0 deletions crates/cdk-mintd/src/loggers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
use chrono::Utc;
use reqwest::Client;
use serde::Serialize;
use std::fmt::Write;
use tokio::sync::mpsc::{self, Sender};
use tokio::task::JoinHandle;
use tracing::field::{Field, Visit};
use tracing::Event;
use tracing_subscriber::layer::{Context, Layer};
use tracing_subscriber::registry::LookupSpan;
Comment on lines +1 to +10
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
use chrono::Utc;
use reqwest::Client;
use serde::Serialize;
use std::fmt::Write;
use tokio::sync::mpsc::{self, Sender};
use tokio::task::JoinHandle;
use tracing::field::{Field, Visit};
use tracing::Event;
use tracing_subscriber::layer::{Context, Layer};
use tracing_subscriber::registry::LookupSpan;
use std::fmt::Write;
use chrono::Utc;
use reqwest::Client;
use serde::Serialize;
use tokio::sync::mpsc::{self, Sender};
use tokio::task::JoinHandle;
use tracing::field::{Field, Visit};
use tracing::Event;
use tracing_subscriber::layer::{Context, Layer};
use tracing_subscriber::registry::LookupSpan;

Std imports should be first, this will fix formatting ci


pub const BACKLOG: usize = 10_000;

struct Visitor<'a> {
output: &'a mut String,
}

impl<'a> Visit for Visitor<'a> {
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
let _ = write!(self.output, "{}={:?} ", field.name(), value);
}
}

#[derive(Serialize)]
struct LogMessage {
timestamp: String,
level: String,
message: String,
target: String,
}

pub struct ElasticsearchLayer {
sender: Sender<LogMessage>,
handle: JoinHandle<()>,
}

impl ElasticsearchLayer {
/// Create a new ElasticsearchLayer.
///
/// # Arguments
///
/// - `elasticsearch_url`: The base URL of the Elasticsearch instance.
/// - `index`: The name of the index where logs will be sent.
/// - `api_key`: Optional API key for authentication.
pub fn new(elasticsearch_url: &str, index: &str, api_key: Option<&str>) -> Self {
let (sender, mut receiver) = mpsc::channel(BACKLOG);
let client = Client::new();
let base_url = format!("{}/{}/_doc", elasticsearch_url.trim_end_matches('/'), index);
let api_key_header = api_key.map(|key| format!("ApiKey {}", key));

// Spawn an async task to process logs and send them to Elasticsearch.
let handle = tokio::spawn(async move {
while let Some(log) = receiver.recv().await {
let mut request = client.post(&base_url).json(&log);

// Add the Authorization header if an API key is provided.
if let Some(ref key) = api_key_header {
request = request.header("Authorization", key);
}

let response = request.send().await;

match response {
Ok(res) if !res.status().is_success() => {
eprintln!("Failed to send log to Elasticsearch: HTTP {}", res.status());
}
Ok(_) => {}
Err(e) => {
eprintln!("Failed to send log to Elasticsearch: {:?}", e);
}
}
}
});

Self { sender, handle }
}
}

impl<S> Layer<S> for ElasticsearchLayer
where
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
let timestamp = Utc::now().to_rfc3339();
let level = event.metadata().level().to_string();
let target = event.metadata().target().to_string();

let mut message = String::new();
let mut visitor = Visitor {
output: &mut message,
};
event.record(&mut visitor);

let log = LogMessage {
timestamp,
level: level.clone(),
message: message.clone(),
target,
};

// Non-blocking send to the channel
let result = self.sender.try_send(log);
match result {
Err(e) => {
eprintln!("Failed to write log message to channel. Likely exceeded backlog capacity: {:?}", e);
eprintln!("{}\n{}", level, message)
}
Ok(_) => {}
}
}
}

impl Drop for ElasticsearchLayer {
fn drop(&mut self) {
self.handle.abort()
}
}
43 changes: 28 additions & 15 deletions crates/cdk-mintd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ use cdk::nuts::{ContactInfo, CurrencyUnit, MintVersion, PaymentMethod};
use cdk::types::LnKey;
use cdk_mintd::cli::CLIArgs;
use cdk_mintd::config::{self, DatabaseEngine, LnBackend};
use cdk_mintd::loggers::ElasticsearchLayer;
use cdk_mintd::setup::LnBackendSetup;
use cdk_redb::MintRedbDatabase;
use cdk_sqlite::MintSqliteDatabase;
use clap::Parser;
use tokio::sync::Notify;
use tower_http::cors::CorsLayer;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::{EnvFilter, Layer, Registry};
#[cfg(feature = "swagger")]
use utoipa::OpenApi;

Expand All @@ -38,18 +40,6 @@ const DEFAULT_CACHE_TTI_SECS: u64 = 1800;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let default_filter = "debug";

let sqlx_filter = "sqlx=warn";
let hyper_filter = "hyper=warn";

let env_filter = EnvFilter::new(format!(
"{},{},{}",
default_filter, sqlx_filter, hyper_filter
));

tracing_subscriber::fmt().with_env_filter(env_filter).init();

let args = CLIArgs::parse();

let work_dir = match args.work_dir {
Expand Down Expand Up @@ -78,6 +68,31 @@ async fn main() -> anyhow::Result<()> {
// ENV VARS will take **priority** over those in the config
let settings = settings.from_env()?;

let default_filter = "debug";
let sqlx_filter = "sqlx=warn";
let hyper_filter = "hyper=warn";

let env_filter = format!("{},{},{}", default_filter, sqlx_filter, hyper_filter,);

if let Some(extlogger) = settings.elasticsearch.clone() {
// Export logs to elastic search
let elasticsearch_layer = ElasticsearchLayer::new(
&extlogger.elasticsearch_url,
&extlogger.index,
extlogger.api_key.as_deref(),
);
let subscriber = Registry::default()
.with(tracing_subscriber::fmt::layer().with_filter(EnvFilter::new(&env_filter)))
.with(elasticsearch_layer.with_filter(EnvFilter::new(&env_filter)));

tracing::subscriber::set_global_default(subscriber).expect("Failed to set subscriber");
} else {
let subscriber = Registry::default()
.with(tracing_subscriber::fmt::layer().with_filter(EnvFilter::new(&env_filter)));

tracing::subscriber::set_global_default(subscriber).expect("Failed to set subscriber");
}

let localstore: Arc<dyn MintDatabase<Err = cdk_database::Error> + Send + Sync> =
match settings.database.engine {
DatabaseEngine::Sqlite => {
Expand Down Expand Up @@ -140,8 +155,6 @@ async fn main() -> anyhow::Result<()> {
melt_max: settings.ln.max_melt,
};

println!("{:?}", settings);

match settings.ln.ln_backend {
LnBackend::Cln => {
let cln_settings = settings
Expand Down
1 change: 1 addition & 0 deletions crates/cdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ uuid = { version = "1", features = ["v4", "serde"] }
# -Z minimal-versions
sync_wrapper = "0.1.2"
bech32 = "0.9.1"
chrono = "0.4.39"
Copy link
Collaborator

@thesimplekid thesimplekid Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imports under the -Z minimal-versions are for the case where a dep of ours does not properly define their minimum deps I don't think that is the case here.

Do we need the dep in cdk I don't see where it is used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I forgot to remove it because previously was working on cdk crate not cdk-mintd so that's left over from that..


[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { version = "1.21", features = [
Expand Down
Loading