diff --git a/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jImpl.java b/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jImpl.java index 0f87988ab57d..9354a6043a56 100644 --- a/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jImpl.java +++ b/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jImpl.java @@ -27,6 +27,6 @@ public class TracingSlf4jImpl { public static final int TRACE = 4; public static void event(String name, int level, String message) { - Binding.tracingSlf4jEvent(name, level, message); + Binding.tracingSlf4jEvent(Thread.currentThread().getName(), name, level, message); } } diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java index 3001a180a15d..ff490982ccbd 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java @@ -26,7 +26,8 @@ public class Binding { } } - public static native void tracingSlf4jEvent(String name, int level, String message); + public static native void tracingSlf4jEvent( + String threadName, String name, int level, String message); public static native int vnodeCount(); diff --git a/src/batch/src/task/task_manager.rs b/src/batch/src/task/task_manager.rs index b2f20ea6412a..e24c9ee50b28 100644 --- a/src/batch/src/task/task_manager.rs +++ b/src/batch/src/task/task_manager.rs @@ -70,7 +70,7 @@ impl BatchManager { builder.worker_threads(worker_threads_num); } builder - .thread_name("risingwave-batch-tasks") + .thread_name("rw-batch") .enable_all() .build() .unwrap() diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index b7693c6fa06a..27d24096487b 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -193,14 +193,16 @@ fn main() -> Result<()> { fn playground(opts: PlaygroundOpts) { let settings = risingwave_rt::LoggerSettings::new("playground") - .with_target("risingwave_storage", Level::WARN); + .with_target("risingwave_storage", Level::WARN) + .with_thread_name(true); risingwave_rt::init_risingwave_logger(settings); risingwave_rt::main_okk(risingwave_cmd_all::playground(opts)).unwrap(); } fn standalone(opts: StandaloneOpts) { let settings = risingwave_rt::LoggerSettings::new("standalone") - .with_target("risingwave_storage", Level::WARN); + .with_target("risingwave_storage", Level::WARN) + .with_thread_name(true); risingwave_rt::init_risingwave_logger(settings); risingwave_rt::main_okk(risingwave_cmd_all::standalone(opts)).unwrap(); } diff --git a/src/connector/src/source/data_gen_util.rs b/src/connector/src/source/data_gen_util.rs index 7d990cf0ff8c..001a726f9301 100644 --- a/src/connector/src/source/data_gen_util.rs +++ b/src/connector/src/source/data_gen_util.rs @@ -31,7 +31,7 @@ pub fn spawn_data_generation_stream( ) -> impl Stream + Send + 'static { static RUNTIME: LazyLock = LazyLock::new(|| { tokio::runtime::Builder::new_multi_thread() - .thread_name("risingwave-data-generation") + .thread_name("rw-datagen") .enable_all() .build() .expect("failed to build data-generation runtime") diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index e68cb3ff80d2..ba786e2bb34a 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -436,7 +436,7 @@ impl FrontendEnv { Arc::new(BackgroundShutdownRuntime::from( Builder::new_multi_thread() .worker_threads(4) - .thread_name("frontend-compute-threads") + .thread_name("rw-batch-local") .enable_all() .build() .unwrap(), diff --git a/src/jni_core/src/macros.rs b/src/jni_core/src/macros.rs index 89b10b98b9d4..97a9e6049284 100644 --- a/src/jni_core/src/macros.rs +++ b/src/jni_core/src/macros.rs @@ -442,7 +442,7 @@ macro_rules! for_all_plain_native_methods { ($macro:path $(,$args:tt)*) => { $macro! { { - public static native void tracingSlf4jEvent(String name, int level, String string); + public static native void tracingSlf4jEvent(String threadName, String name, int level, String string); public static native int vnodeCount(); @@ -882,7 +882,7 @@ mod tests { // This test shows the signature of all native methods let expected = expect_test::expect![[r#" [ - tracingSlf4jEvent (Ljava/lang/String;ILjava/lang/String;)V, + tracingSlf4jEvent (Ljava/lang/String;Ljava/lang/String;ILjava/lang/String;)V, vnodeCount ()I, iteratorNewHummock ([B)J, iteratorNewStreamChunk (J)J, diff --git a/src/jni_core/src/tracing_slf4j.rs b/src/jni_core/src/tracing_slf4j.rs index ce410b9bcb00..8f7222d11c64 100644 --- a/src/jni_core/src/tracing_slf4j.rs +++ b/src/jni_core/src/tracing_slf4j.rs @@ -23,11 +23,15 @@ use crate::{execute_and_catch, EnvParam}; #[no_mangle] pub(crate) extern "system" fn Java_com_risingwave_java_binding_Binding_tracingSlf4jEvent( env: EnvParam<'_>, + thread_name: JString<'_>, class_name: JString<'_>, level: jint, message: JString<'_>, ) { execute_and_catch(env, move |env| { + let thread_name = env.get_string(&thread_name)?; + let thread_name: Cow<'_, str> = (&thread_name).into(); + let class_name = env.get_string(&class_name)?; let class_name: Cow<'_, str> = (&class_name).into(); @@ -39,7 +43,8 @@ pub(crate) extern "system" fn Java_com_risingwave_java_binding_Binding_tracingSl tracing::event!( target: "risingwave_connector_node", $lvl, - class = class_name.as_ref(), + thread = &*thread_name, + class = &*class_name, "{message}", ) }; diff --git a/src/storage/src/hummock/compactor/compaction_executor.rs b/src/storage/src/hummock/compactor/compaction_executor.rs index d3086709a33d..426245a89a39 100644 --- a/src/storage/src/hummock/compactor/compaction_executor.rs +++ b/src/storage/src/hummock/compactor/compaction_executor.rs @@ -30,7 +30,7 @@ impl CompactionExecutor { let mut worker_num = resource_util::cpu::total_cpu_available() as usize; let runtime = { let mut builder = tokio::runtime::Builder::new_multi_thread(); - builder.thread_name("risingwave-compaction"); + builder.thread_name("rw-compaction"); if let Some(worker_threads_num) = worker_threads_num { builder.worker_threads(worker_threads_num); worker_num = worker_threads_num; diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 013873f17697..579cb7810c49 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -427,7 +427,7 @@ impl LocalStreamManagerCore { builder.worker_threads(worker_threads_num); } builder - .thread_name("risingwave-streaming-actor") + .thread_name("rw-streaming") .enable_all() .build() .unwrap() diff --git a/src/utils/runtime/src/lib.rs b/src/utils/runtime/src/lib.rs index 343550246e24..50577c8df156 100644 --- a/src/utils/runtime/src/lib.rs +++ b/src/utils/runtime/src/lib.rs @@ -74,7 +74,7 @@ where } tokio::runtime::Builder::new_multi_thread() - .thread_name("risingwave-main") + .thread_name("rw-main") .enable_all() .build() .unwrap() diff --git a/src/utils/runtime/src/logger.rs b/src/utils/runtime/src/logger.rs index 18794b257f79..e8abb0aef65e 100644 --- a/src/utils/runtime/src/logger.rs +++ b/src/utils/runtime/src/logger.rs @@ -38,6 +38,8 @@ pub struct LoggerSettings { colorful: bool, /// Output to `stderr` instead of `stdout`. stderr: bool, + /// Whether to include thread name in the log. + with_thread_name: bool, /// Override target settings. targets: Vec<(String, tracing::metadata::LevelFilter)>, /// Override the default level. @@ -57,6 +59,7 @@ impl LoggerSettings { enable_tokio_console: false, colorful: console::colors_enabled_stderr() && console::colors_enabled(), stderr: false, + with_thread_name: false, targets: vec![], default_level: None, } @@ -74,6 +77,12 @@ impl LoggerSettings { self } + /// Whether to include thread name in the log. + pub fn with_thread_name(mut self, enabled: bool) -> Self { + self.with_thread_name = enabled; + self + } + /// Overrides the default target settings. pub fn with_target( mut self, @@ -210,6 +219,7 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { // fmt layer (formatting and logging to `stdout` or `stderr`) { let fmt_layer = tracing_subscriber::fmt::layer() + .with_thread_names(settings.with_thread_name) .with_timer(default_timer.clone()) .with_ansi(settings.colorful) .with_writer(move || { @@ -359,7 +369,7 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { let otel_tracer = { let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() - .thread_name("risingwave-otel") + .thread_name("rw-otel") .worker_threads(2) .build() .unwrap();