diff --git a/Cargo.lock b/Cargo.lock index e92b392161a7b..f95850e15524c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5227,6 +5227,7 @@ dependencies = [ "risingwave_rpc_client", "risingwave_rt", "risingwave_storage", + "risingwave_tracing", "serde", "toml", "tracing", @@ -5410,6 +5411,7 @@ dependencies = [ "risingwave_rpc_client", "risingwave_storage", "risingwave_stream", + "risingwave_tracing", "serde_json", "size", "tracing", @@ -6056,6 +6058,8 @@ dependencies = [ "futures", "madsim-tokio", "minitrace", + "minitrace-jaeger", + "rand 0.8.5", "tracing", "workspace-hack", ] diff --git a/src/cmd/src/bin/compute_node.rs b/src/cmd/src/bin/compute_node.rs index 9df107a984497..e3ce5e12df81c 100644 --- a/src/cmd/src/bin/compute_node.rs +++ b/src/cmd/src/bin/compute_node.rs @@ -25,10 +25,7 @@ fn main() { let opts = risingwave_compute::ComputeNodeOpts::parse(); - risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new( - opts.enable_jaeger_tracing, - false, - )); + risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new(false)); risingwave_rt::main_okk(risingwave_compute::start(opts)) } diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index 54ee5fec37b91..879df2872692d 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -42,10 +42,7 @@ fn main() -> Result<()> { let opts = risingwave_compute::ComputeNodeOpts::parse_from(args); - risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new( - opts.enable_jaeger_tracing, - false, - )); + risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new(false)); risingwave_rt::main_okk(risingwave_compute::start(opts)); diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index ede5fc285e1a6..61f2e616738a7 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -120,6 +120,16 @@ pub async fn compute_node_serve( state_store_metrics.clone(), object_store_metrics, TieredCacheMetricsBuilder::new(registry.clone()), + if opts.enable_jaeger_tracing { + Arc::new( + risingwave_tracing::RwTracingService::new(risingwave_tracing::TracingConfig::new( + "127.0.0.1:6831".to_string(), + )) + .unwrap(), + ) + } else { + Arc::new(risingwave_tracing::RwTracingService::disabled()) + }, ) .await .unwrap(); diff --git a/src/ctl/Cargo.toml b/src/ctl/Cargo.toml index df62662857ae1..2e60a5a00e07e 100644 --- a/src/ctl/Cargo.toml +++ b/src/ctl/Cargo.toml @@ -26,6 +26,7 @@ risingwave_pb = { path = "../prost" } risingwave_rpc_client = { path = "../rpc_client" } risingwave_storage = { path = "../storage" } risingwave_stream = { path = "../stream" } +risingwave_tracing = { path = "../tracing" } serde_json = "1" size = "0.4" tokio = { version = "0.2", package = "madsim-tokio", features = [ diff --git a/src/ctl/src/common/hummock_service.rs b/src/ctl/src/common/hummock_service.rs index 9f8be3c8c430a..cfaa91e91a115 100644 --- a/src/ctl/src/common/hummock_service.rs +++ b/src/ctl/src/common/hummock_service.rs @@ -119,6 +119,7 @@ risectl requires a full persistent cluster to operate. Please make sure you're n metrics.state_store_metrics.clone(), metrics.object_store_metrics.clone(), TieredCacheMetricsBuilder::unused(), + Arc::new(risingwave_tracing::RwTracingService::disabled()), ) .await?; diff --git a/src/meta/src/rpc/server.rs b/src/meta/src/rpc/server.rs index c98dfe24bb1df..3b462ce5aae29 100644 --- a/src/meta/src/rpc/server.rs +++ b/src/meta/src/rpc/server.rs @@ -295,7 +295,7 @@ pub async fn register_leader_for_meta( pub async fn rpc_serve_with_store( meta_store: Arc, - mut address_info: AddressInfo, + address_info: AddressInfo, max_heartbeat_interval: Duration, lease_interval_secs: u64, opts: MetaOpts, @@ -336,9 +336,9 @@ pub async fn rpc_serve_with_store( ); #[cfg(not(madsim))] - if let Some(dashboard_addr) = address_info.dashboard_addr.take() { + if let Some(ref dashboard_addr) = address_info.dashboard_addr { let dashboard_service = crate::dashboard::DashboardService { - dashboard_addr, + dashboard_addr: *dashboard_addr, cluster_manager: cluster_manager.clone(), fragment_manager: fragment_manager.clone(), meta_store: env.meta_store_ref(), diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index bd61a66dc5419..631ee4139c4ba 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -52,6 +52,7 @@ risingwave_hummock_sdk = { path = "../storage/hummock_sdk" } risingwave_object_store = { path = "../object_store" } risingwave_pb = { path = "../prost" } risingwave_rpc_client = { path = "../rpc_client" } +risingwave_tracing = { path = "../tracing" } scopeguard = "1" # rocksdb = { git = "https://github.com/tikv/rust-rocksdb.git", rev = "fa83ff19", features = [ # "encryption", @@ -83,7 +84,6 @@ twox-hash = "1" zstd = "0.11.2" [target.'cfg(not(madsim))'.dependencies] -risingwave_tracing = { path = "../tracing" } workspace-hack = { path = "../workspace-hack" } [dev-dependencies] diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index e301d013e830e..8359c8be2b852 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -131,7 +131,7 @@ async fn get_local_hummock_storage( event_tx.clone(), MemoryLimiter::unlimit(), #[cfg(not(madsim))] - Arc::new(risingwave_tracing::RwTracingService::new()), + Arc::new(risingwave_tracing::RwTracingService::disabled()), ) } diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index c1f6cec0316b0..9aeb6d1519747 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -285,6 +285,7 @@ async fn test_snapshot_backward_range_scan_inner(enable_sync: bool, enable_commi mock_hummock_meta_client.clone(), get_test_notification_client(env, hummock_manager_ref, worker_node), Arc::new(StateStoreMetrics::unused()), + Arc::new(risingwave_tracing::RwTracingService::disabled()), ) .await .unwrap(); diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 79a628d0f95d2..6becab1777e2f 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -507,6 +507,7 @@ async fn test_reload_storage() { worker_node.clone(), ), Arc::new(StateStoreMetrics::unused()), + Arc::new(risingwave_tracing::RwTracingService::disabled()), ) .await .unwrap(); diff --git a/src/storage/hummock_test/src/test_utils.rs b/src/storage/hummock_test/src/test_utils.rs index afa2334a3c258..9d19482b5649d 100644 --- a/src/storage/hummock_test/src/test_utils.rs +++ b/src/storage/hummock_test/src/test_utils.rs @@ -347,6 +347,7 @@ pub(crate) async fn with_hummock_storage_v1() -> (HummockStorageV1, Arc, - #[cfg(not(madsim))] tracing: Arc, } @@ -148,6 +147,7 @@ impl HummockStorage { notification_client: impl NotificationClient, // TODO: separate `HummockStats` from `StateStoreMetrics`. stats: Arc, + tracing: Arc, ) -> HummockResult { let sstable_id_manager = Arc::new(SstableIdManager::new( hummock_meta_client.clone(), @@ -176,9 +176,6 @@ impl HummockStorage { hummock_meta_client.clone(), )); - #[cfg(not(madsim))] - let tracing = Arc::new(risingwave_tracing::RwTracingService::new()); - let compactor_context = Arc::new(Context::new_local_compact_context( options.clone(), sstable_store.clone(), @@ -207,7 +204,6 @@ impl HummockStorage { shutdown_sender: event_tx, }), read_version_mapping: hummock_event_handler.read_version_mapping(), - #[cfg(not(madsim))] tracing, }; @@ -233,7 +229,6 @@ impl HummockStorage { self.hummock_version_reader.clone(), self.hummock_event_sender.clone(), self.buffer_tracker.get_memory_limiter().clone(), - #[cfg(not(madsim))] self.tracing.clone(), ) } @@ -308,6 +303,7 @@ impl HummockStorage { hummock_meta_client, notification_client, Arc::new(StateStoreMetrics::unused()), + Arc::new(risingwave_tracing::RwTracingService::disabled()), ) .await } @@ -472,7 +468,6 @@ pub struct HummockStorageV1 { version_update_notifier_tx: Arc>, - #[cfg(not(madsim))] tracing: Arc, } @@ -485,6 +480,7 @@ impl HummockStorageV1 { notification_client: impl NotificationClient, // TODO: separate `HummockStats` from `StateStoreMetrics`. stats: Arc, + tracing: Arc, ) -> HummockResult { // For conflict key detection. Enabled by setting `write_conflict_detection_enabled` to // true in `StorageConfig` @@ -515,9 +511,6 @@ impl HummockStorageV1 { hummock_meta_client.clone(), )); - #[cfg(not(madsim))] - let tracing = Arc::new(risingwave_tracing::RwTracingService::new()); - let compactor_context = Arc::new(Context::new_local_compact_context( options.clone(), sstable_store.clone(), @@ -571,7 +564,6 @@ impl HummockStorageV1 { }), version_update_notifier_tx: epoch_update_tx_clone, hummock_event_sender: event_tx, - #[cfg(not(madsim))] tracing, }; diff --git a/src/storage/src/hummock/state_store_v1.rs b/src/storage/src/hummock/state_store_v1.rs index f8c9e373fc499..f04fd0d66d56a 100644 --- a/src/storage/src/hummock/state_store_v1.rs +++ b/src/storage/src/hummock/state_store_v1.rs @@ -485,10 +485,7 @@ impl StateStoreRead for HummockStorageV1 { let iter = self.iter_inner::(epoch, map_table_key_range(key_range), read_options); - #[cfg(not(madsim))] - return iter.in_span(self.tracing.new_tracer("hummock_iter")); - #[cfg(madsim)] - iter + iter.in_span(self.tracing.new_tracer("hummock_iter")) } } diff --git a/src/storage/src/hummock/store/state_store.rs b/src/storage/src/hummock/store/state_store.rs index 4d4ddfc57e6e5..0e06829511751 100644 --- a/src/storage/src/hummock/store/state_store.rs +++ b/src/storage/src/hummock/store/state_store.rs @@ -17,7 +17,6 @@ use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; -#[cfg(not(madsim))] use minitrace::future::FutureExt; use parking_lot::RwLock; use risingwave_common::catalog::TableId; @@ -65,8 +64,6 @@ pub struct HummockStorageCore { pub struct LocalHummockStorage { core: Arc, - - #[cfg(not(madsim))] tracing: Arc, } @@ -76,7 +73,6 @@ impl Clone for LocalHummockStorage { fn clone(&self) -> Self { Self { core: self.core.clone(), - #[cfg(not(madsim))] tracing: self.tracing.clone(), } } @@ -169,10 +165,7 @@ impl StateStoreRead for LocalHummockStorage { let iter = self .core .iter_inner(map_table_key_range(key_range), epoch, read_options); - #[cfg(not(madsim))] - return iter.in_span(self.tracing.new_tracer("hummock_iter")); - #[cfg(madsim)] - iter + iter.in_span(self.tracing.new_tracer("hummock_iter")) } } @@ -225,7 +218,7 @@ impl LocalHummockStorage { hummock_version_reader: HummockVersionReader, event_sender: mpsc::UnboundedSender, memory_limiter: Arc, - #[cfg(not(madsim))] tracing: Arc, + tracing: Arc, ) -> Self { let storage_core = HummockStorageCore::new( instance_guard, @@ -237,7 +230,6 @@ impl LocalHummockStorage { Self { core: Arc::new(storage_core), - #[cfg(not(madsim))] tracing, } } diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index f42dbe5773298..9eed43b84c55a 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -437,6 +437,7 @@ pub mod verify { impl StateStoreImpl { #[cfg_attr(not(target_os = "linux"), expect(unused_variables))] + #[allow(clippy::too_many_arguments)] pub async fn new( s: &str, file_cache_dir: &str, @@ -445,6 +446,7 @@ impl StateStoreImpl { state_store_stats: Arc, object_store_metrics: Arc, tiered_cache_metrics_builder: TieredCacheMetricsBuilder, + tracing: Arc, ) -> StorageResult { #[cfg(not(target_os = "linux"))] let tiered_cache = TieredCache::none(); @@ -512,6 +514,7 @@ impl StateStoreImpl { hummock_meta_client.clone(), notification_client, state_store_stats.clone(), + tracing, ) .await?; @@ -523,6 +526,7 @@ impl StateStoreImpl { hummock_meta_client.clone(), notification_client, state_store_stats.clone(), + tracing, ) .await?; diff --git a/src/tests/compaction_test/Cargo.toml b/src/tests/compaction_test/Cargo.toml index 8c6416d5cd2be..8c6f651e64f5a 100644 --- a/src/tests/compaction_test/Cargo.toml +++ b/src/tests/compaction_test/Cargo.toml @@ -23,6 +23,7 @@ risingwave_pb = { path = "../../prost" } risingwave_rpc_client = { path = "../../rpc_client" } risingwave_rt = { path = "../../utils/runtime" } risingwave_storage = { path = "../../storage", features = ["test"] } +risingwave_tracing = { path = "../../tracing" } serde = { version = "1", features = ["derive"] } tokio = { version = "0.2", package = "madsim-tokio", features = [ "fs", diff --git a/src/tests/compaction_test/src/runner.rs b/src/tests/compaction_test/src/runner.rs index 6d6081df30d17..71482058d049a 100644 --- a/src/tests/compaction_test/src/runner.rs +++ b/src/tests/compaction_test/src/runner.rs @@ -659,6 +659,7 @@ pub async fn create_hummock_store_with_metrics( metrics.state_store_metrics.clone(), metrics.object_store_metrics.clone(), TieredCacheMetricsBuilder::unused(), + Arc::new(risingwave_tracing::RwTracingService::disabled()), ) .await?; diff --git a/src/tracing/Cargo.toml b/src/tracing/Cargo.toml index 0d5a3d1d50892..e2bcceb139ff4 100644 --- a/src/tracing/Cargo.toml +++ b/src/tracing/Cargo.toml @@ -12,6 +12,8 @@ repository = { workspace = true } anyhow = "1" futures = { version = "0.3", default-features = false, features = ["alloc", "executor"] } minitrace = "0.4" +minitrace-jaeger = "0.4" +rand = "0.8" tokio = { version = "0.2", package = "madsim-tokio", features = [ "sync", "macros", diff --git a/src/tracing/src/lib.rs b/src/tracing/src/lib.rs index a59e521ae978f..bdca10f3e6179 100644 --- a/src/tracing/src/lib.rs +++ b/src/tracing/src/lib.rs @@ -13,36 +13,75 @@ // limitations under the License. use std::env; +use std::net::SocketAddr; use std::thread::JoinHandle; -use anyhow::Error; +use anyhow::{Error, Result}; use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; -use futures::StreamExt; use minitrace::prelude::*; pub struct RwTracingService { tx: UnboundedSender, - _join_handle: JoinHandle<()>, + _join_handle: Option>, enabled: bool, } -impl RwTracingService { - /// Create a new tracing service instance. Spawn a background thread to observe slow requests. - #[allow(clippy::new_without_default)] - pub fn new() -> Self { - let (tx, rx) = unbounded(); +pub struct TracingConfig { + pub jaeger_endpoint: Option, + pub print_to_console: bool, + pub slow_request_threshold_ms: u64, +} + +impl TracingConfig { + pub fn new(jaeger_endpoint: String) -> Self { let slow_request_threshold_ms: u64 = env::var("RW_TRACE_SLOW_REQUEST_THRESHOLD_MS") .ok() .map_or_else(|| 100, |v| v.parse().unwrap()); - - let join_handle = Self::start_tracing_listener(rx, slow_request_threshold_ms); + let print_to_console = env::var("RW_TRACE_SLOW_REQUEST") + .ok() + .map_or_else(|| false, |v| v == "true"); Self { + jaeger_endpoint: Some(jaeger_endpoint), + print_to_console, + slow_request_threshold_ms, + } + } +} + +impl RwTracingService { + /// Create a new tracing service instance. Spawn a background thread to observe slow requests. + pub fn new(config: TracingConfig) -> Result { + let (tx, rx) = unbounded(); + + let jaeger_addr: Option = + config.jaeger_endpoint.map(|x| x.parse()).transpose()?; + + let join_handle = if cfg!(madsim) { + None + } else { + Some(Self::start_tracing_listener( + rx, + config.print_to_console, + config.slow_request_threshold_ms, + jaeger_addr, + )) + }; + + let tr = Self { tx, _join_handle: join_handle, - enabled: env::var("RW_TRACE_SLOW_REQUEST") - .ok() - .map_or_else(|| false, |v| v == "true"), + enabled: jaeger_addr.is_some(), + }; + Ok(tr) + } + + pub fn disabled() -> Self { + let (tx, _) = unbounded(); + Self { + tx, + _join_handle: None, + enabled: false, } } @@ -57,27 +96,66 @@ impl RwTracingService { } } + #[cfg(madsim)] + fn start_tracing_listener( + _rx: UnboundedReceiver, + _print_to_console: bool, + _slow_request_threshold_ms: u64, + _jaeger_addr: Option, + ) -> JoinHandle<()> { + unreachable!() + } + + #[cfg(not(madsim))] fn start_tracing_listener( rx: UnboundedReceiver, + print_to_console: bool, slow_request_threshold_ms: u64, + jaeger_addr: Option, ) -> JoinHandle<()> { + use futures::StreamExt; + use rand::Rng; + tracing::info!( - "tracing service started with slow_request_threshold_ms={slow_request_threshold_ms}" + "tracing service started with slow_request_threshold_ms={slow_request_threshold_ms}, print_to_console={print_to_console}" ); + std::thread::Builder::new() .name("minitrace_listener".to_string()) .spawn(move || { let func = move || { - let stream = rx.for_each_concurrent(None, |collector| async move { + let rt = tokio::runtime::Builder::new_current_thread().build()?; + let stream = rx.for_each_concurrent(None, |collector| async { let spans = collector.collect().await; - if let Some(span) = spans.first() { - // print requests > 100ms - if span.duration_ns >= slow_request_threshold_ms * 1_000_000 { - tracing::info!("{:?}", spans); + if !spans.is_empty() { + // print slow requests + if print_to_console { + // print requests > 100ms + if spans[0].duration_ns >= slow_request_threshold_ms * 1_000_000 { + tracing::info!("{:?}", spans); + } + } + // report spans to jaeger + if let Some(ref jaeger_addr) = jaeger_addr { + let trace_id = rand::thread_rng().gen::(); + let span_id = rand::thread_rng().gen::(); + let encoded = minitrace_jaeger::encode( + "risingwave".to_string(), + trace_id, + 0, + span_id, + &spans, + ) + .unwrap(); + if let Err(err) = + minitrace_jaeger::report(*jaeger_addr, &encoded).await + { + tracing::warn!("failed to report spans to jaeger: {}", err); + } } } }); - futures::executor::block_on(stream); + rt.block_on(stream); Ok::<_, Error>(()) }; if let Err(err) = func() { diff --git a/src/utils/runtime/src/lib.rs b/src/utils/runtime/src/lib.rs index 09af6321c3435..909c8abd1fb53 100644 --- a/src/utils/runtime/src/lib.rs +++ b/src/utils/runtime/src/lib.rs @@ -53,8 +53,6 @@ fn configure_risingwave_targets_fmt(targets: filter::Targets) -> filter::Targets } pub struct LoggerSettings { - /// Enable Jaeger tracing. - enable_jaeger_tracing: bool, /// Enable tokio console output. enable_tokio_console: bool, /// Enable colorful output in console. @@ -63,12 +61,11 @@ pub struct LoggerSettings { impl LoggerSettings { pub fn new_default() -> Self { - Self::new(false, false) + Self::new(false) } - pub fn new(enable_jaeger_tracing: bool, enable_tokio_console: bool) -> Self { + pub fn new(enable_tokio_console: bool) -> Self { Self { - enable_jaeger_tracing, enable_tokio_console, colorful: console::colors_enabled_stderr(), } @@ -123,10 +120,6 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { fmt_layer.with_filter(filter) }; - if settings.enable_jaeger_tracing { - todo!("jaeger tracing is not supported for now, and it will be replaced with minitrace jaeger tracing. Tracking issue: https://github.com/risingwavelabs/risingwave/issues/4120"); - } - let tokio_console_layer = if settings.enable_tokio_console { let (console_layer, server) = console_subscriber::ConsoleLayer::builder() .with_default_env()