diff --git a/src/batch/src/task/task_manager.rs b/src/batch/src/task/task_manager.rs index a60b24f583b03..2aab5720d0fdf 100644 --- a/src/batch/src/task/task_manager.rs +++ b/src/batch/src/task/task_manager.rs @@ -194,7 +194,7 @@ impl BatchManager { pb_task_output_id: &PbTaskOutputId, ) -> Result<()> { let task_id = TaskOutputId::try_from(pb_task_output_id)?; - tracing::trace!(target: "events::compute::exchange", peer_addr = %peer_addr, from = ?task_id, "serve exchange RPC"); + tracing::debug!(target: "events::compute::exchange", peer_addr = %peer_addr, from = ?task_id, "serve exchange RPC"); let mut task_output = self.take_output(pb_task_output_id)?; self.runtime.spawn(async move { let mut writer = GrpcExchangeWriter::new(tx.clone()); diff --git a/src/compute/src/rpc/service/exchange_service.rs b/src/compute/src/rpc/service/exchange_service.rs index c50322cc2c94f..e78ddb538add0 100644 --- a/src/compute/src/rpc/service/exchange_service.rs +++ b/src/compute/src/rpc/service/exchange_service.rs @@ -138,7 +138,7 @@ impl ExchangeServiceImpl { up_down_actor_ids: (u32, u32), up_down_fragment_ids: (u32, u32), ) { - tracing::trace!(target: "events::compute::exchange", peer_addr = %peer_addr, "serve stream exchange RPC"); + tracing::debug!(target: "events::compute::exchange", peer_addr = %peer_addr, "serve stream exchange RPC"); let up_actor_id = up_down_actor_ids.0.to_string(); let up_fragment_id = up_down_fragment_ids.0.to_string(); let down_fragment_id = up_down_fragment_ids.1.to_string(); diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index e59225c8de510..262da65f19b0c 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -796,9 +796,9 @@ impl GlobalBarrierManager { actor_ids_to_send, actor_ids_to_collect, }; - tracing::trace!( + tracing::debug!( target: "events::meta::barrier::inject_barrier", - "inject barrier request: {:?}", request + ?request, "inject barrier request" ); // This RPC returns only if this worker node has injected this barrier. @@ -838,9 +838,9 @@ impl GlobalBarrierManager { prev_epoch, tracing_context, }; - tracing::trace!( + tracing::debug!( target: "events::meta::barrier::barrier_complete", - "barrier complete request: {:?}", request + ?request, "barrier complete" ); // This RPC returns only if this worker node has collected this barrier. diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index da5b4fce20711..36123c6bd9d03 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -314,7 +314,7 @@ impl ClusterManager { worker_id: WorkerId, info: Vec, ) -> MetaResult<()> { - tracing::trace!(target: "events::meta::server_heartbeat", worker_id = worker_id, "receive heartbeat"); + tracing::debug!(target: "events::meta::server_heartbeat", worker_id, "receive heartbeat"); let mut core = self.core.write().await; for worker in core.workers.values_mut() { if worker.worker_id() == worker_id { diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 35f69aaa74fbd..74a80f8e9f3e6 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -664,7 +664,7 @@ impl MetaClient { extra_info.push(info); } } - tracing::trace!(target: "events::meta::client_heartbeat", "heartbeat"); + tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat"); match tokio::time::timeout( // TODO: decide better min_interval for timeout min_interval * 3, diff --git a/src/storage/src/hummock/sstable/forward_sstable_iterator.rs b/src/storage/src/hummock/sstable/forward_sstable_iterator.rs index edb6c372ba31c..3988d082177f8 100644 --- a/src/storage/src/hummock/sstable/forward_sstable_iterator.rs +++ b/src/storage/src/hummock/sstable/forward_sstable_iterator.rs @@ -235,7 +235,7 @@ impl SstableIterator { idx: usize, seek_key: Option>, ) -> HummockResult<()> { - tracing::trace!( + tracing::debug!( target: "events::storage::sstable::block_seek", "table iterator seek: sstable_object_id = {}, block_id = {}", self.sst.value().id, diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index d50e4ec0277ab..b6407528d5272 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -162,7 +162,7 @@ pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> Virtu vnode }; - tracing::trace!(target: "events::storage::storage_table", "compute vnode: {:?} key {:?} => {}", row, indices, vnode); + tracing::debug!(target: "events::storage::storage_table", "compute vnode: {:?} key {:?} => {}", row, indices, vnode); vnode } diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 9c0c931aaa022..414721c34efbf 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -651,7 +651,7 @@ impl Dispatcher for HashDataDispatcher { // get hash value of every line by its key let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys); - tracing::trace!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes); + tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes); let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity())) .take(num_outputs) diff --git a/src/stream/src/executor/lookup/impl_.rs b/src/stream/src/executor/lookup/impl_.rs index e0e62e7e04b7a..e7f39c0247bf9 100644 --- a/src/stream/src/executor/lookup/impl_.rs +++ b/src/stream/src/executor/lookup/impl_.rs @@ -322,7 +322,7 @@ impl LookupExecutor { .lookup_one_row(&row, self.last_barrier.as_ref().unwrap().epoch) .await? { - tracing::trace!(target: "events::stream::lookup::put", "{:?} {:?}", row, matched_row); + tracing::debug!(target: "events::stream::lookup::put", "{:?} {:?}", row, matched_row); if let Some(chunk) = builder.append_row(*op, row, &matched_row) { yield Message::Chunk(chunk); @@ -388,7 +388,7 @@ impl LookupExecutor { .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc(); - tracing::trace!(target: "events::stream::lookup::lookup_row", "{:?}", lookup_row); + tracing::debug!(target: "events::stream::lookup::lookup_row", "{:?}", lookup_row); let mut all_rows = VecWithKvSize::new(); // Drop the stream. @@ -427,7 +427,7 @@ impl LookupExecutor { } } - tracing::trace!(target: "events::stream::lookup::result", "{:?} => {:?}", lookup_row, all_rows.inner()); + tracing::debug!(target: "events::stream::lookup::result", "{:?} => {:?}", lookup_row, all_rows.inner()); self.lookup_cache.batch_update(lookup_row, all_rows.clone()); diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index 9c71684f529b8..f2f7d84ca2a3d 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -138,7 +138,7 @@ impl MergeExecutor { .inc_by(chunk.cardinality() as _); } Message::Barrier(barrier) => { - tracing::trace!( + tracing::debug!( target: "events::stream::barrier::path", actor_id = actor_id, "receiver receives barrier from path: {:?}", diff --git a/src/stream/src/executor/receiver.rs b/src/stream/src/executor/receiver.rs index a99bbc2347a03..5b96cf6f9f8d8 100644 --- a/src/stream/src/executor/receiver.rs +++ b/src/stream/src/executor/receiver.rs @@ -144,7 +144,7 @@ impl Executor for ReceiverExecutor { .inc_by(chunk.cardinality() as _); } Message::Barrier(barrier) => { - tracing::trace!( + tracing::debug!( target: "events::stream::barrier::path", actor_id = actor_id, "receiver receives barrier from path: {:?}", diff --git a/src/stream/src/executor/wrapper/trace.rs b/src/stream/src/executor/wrapper/trace.rs index 5314d54ff1d89..fbf22c5d6d34b 100644 --- a/src/stream/src/executor/wrapper/trace.rs +++ b/src/stream/src/executor/wrapper/trace.rs @@ -59,7 +59,7 @@ pub async fn trace( .with_label_values(&[&actor_id_str, &fragment_id_str, &info.identity]) .inc_by(chunk.cardinality() as u64); } - tracing::trace!( + tracing::debug!( target: "events::stream::message::chunk", cardinality = chunk.cardinality(), capacity = chunk.capacity(), @@ -68,14 +68,14 @@ pub async fn trace( } } Message::Watermark(watermark) => { - tracing::trace!( + tracing::debug!( target: "events::stream::message::watermark", value = ?watermark.val, col_idx = watermark.col_idx, ); } Message::Barrier(barrier) => { - tracing::trace!( + tracing::debug!( target: "events::stream::message::barrier", prev_epoch = barrier.epoch.prev, curr_epoch = barrier.epoch.curr, diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 5581a8529c067..996881d3ff4b0 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -101,7 +101,7 @@ impl LocalBarrierManager { /// Register sender for source actors, used to send barriers. pub fn register_sender(&mut self, actor_id: ActorId, sender: UnboundedSender) { - tracing::trace!( + tracing::debug!( target: "events::stream::barrier::manager", actor_id = actor_id, "register sender" @@ -132,7 +132,7 @@ impl LocalBarrierManager { } }; let to_collect: HashSet = actor_ids_to_collect.into_iter().collect(); - trace!( + debug!( target: "events::stream::barrier::manager::send", "send barrier {:?}, senders = {:?}, actor_ids_to_collect = {:?}", barrier, @@ -172,7 +172,7 @@ impl LocalBarrierManager { // Actors to stop should still accept this barrier, but won't get sent to in next times. if let Some(actors) = barrier.all_stop_actors() { - trace!( + debug!( target: "events::stream::barrier::manager", "remove actors {:?} from senders", actors diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 2c14d6672eb69..43aeb4afba46b 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -193,12 +193,10 @@ impl ManagedBarrierState { /// Collect a `barrier` from the actor with `actor_id`. pub(super) fn collect(&mut self, actor_id: ActorId, barrier: &Barrier) { - tracing::trace!( + tracing::debug!( target: "events::stream::barrier::manager::collect", - "collect_barrier: epoch = {}, actor_id = {}, state = {:#?}", - barrier.epoch.curr, - actor_id, - self + epoch = barrier.epoch.curr, actor_id, state = ?self, + "collect_barrier", ); match self.epoch_barrier_state_map.get_mut(&barrier.epoch.curr) { diff --git a/src/utils/runtime/src/logger.rs b/src/utils/runtime/src/logger.rs index 4a4b77936b800..916dd93d7a32b 100644 --- a/src/utils/runtime/src/logger.rs +++ b/src/utils/runtime/src/logger.rs @@ -28,29 +28,6 @@ use tracing_subscriber::{filter, EnvFilter}; const PGWIRE_QUERY_LOG: &str = "pgwire_query_log"; const SLOW_QUERY_LOG: &str = "risingwave_frontend_slow_query_log"; -/// Configure log targets for some `RisingWave` crates. -/// -/// Other RisingWave crates will follow the default level (`DEBUG` or `INFO` according to -/// the `debug_assertions` and `is_ci` flag). -fn configure_risingwave_targets_fmt(targets: filter::Targets) -> filter::Targets { - targets - // force a lower level for important logs - .with_target("risingwave_stream", Level::DEBUG) - .with_target("risingwave_storage", Level::DEBUG) - // force a higher level for noisy logs - .with_target("risingwave_sqlparser", Level::INFO) - .with_target("pgwire", Level::INFO) - .with_target(PGWIRE_QUERY_LOG, Level::OFF) - // force a higher level for foyer logs - .with_target("foyer", Level::WARN) - .with_target("foyer_common", Level::WARN) - .with_target("foyer_intrusive", Level::WARN) - .with_target("foyer_memory", Level::WARN) - .with_target("foyer_storage", Level::WARN) - // disable events that are too verbose - .with_target("events", Level::ERROR) -} - pub struct LoggerSettings { /// The name of the service. name: String, @@ -122,9 +99,12 @@ impl LoggerSettings { /// Overrides default level and tracing targets of the fmt layer (formatting and /// logging to `stdout` or `stderr`). /// +/// Note that only verbosity levels below or equal to `DEBUG` are effective in +/// release builds. +/// /// e.g., /// ```bash -/// RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" +/// RUST_LOG="info,risingwave_stream=debug,events=debug" /// ``` /// /// ### `RW_QUERY_LOG_PATH` @@ -159,7 +139,20 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { // Default filter for logging to stdout and tracing. let default_filter = { - let mut filter = filter::Targets::new() + let mut filter = filter::Targets::new(); + + // Configure levels for some RisingWave crates. + // Other RisingWave crates like `stream` and `storage` will follow the default level. + filter = filter + .with_target("risingwave_sqlparser", Level::INFO) + .with_target("pgwire", Level::INFO) + .with_target(PGWIRE_QUERY_LOG, Level::OFF) + // debug-purposed events are disabled unless `RUST_LOG` overrides + .with_target("events", Level::OFF); + + // Configure levels for external crates. + filter = filter + .with_target("foyer", Level::WARN) .with_target("aws_sdk_ec2", Level::INFO) .with_target("aws_sdk_s3", Level::INFO) .with_target("aws_config", Level::WARN) @@ -175,10 +168,8 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { .with_target("reqwest", Level::WARN) .with_target("sled", Level::INFO); - filter = configure_risingwave_targets_fmt(filter); - - // For all other crates - filter = filter.with_default(match Deployment::current() { + // For all other crates, apply default level depending on the deployment and `debug_assertions` flag. + let default_level = match deployment { Deployment::Ci => Level::INFO, _ => { if cfg!(debug_assertions) { @@ -187,22 +178,23 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { Level::INFO } } - }); + }; + filter = filter.with_default(default_level); - // Overrides from settings + // Overrides from settings. filter = filter.with_targets(settings.targets); if let Some(default_level) = settings.default_level { filter = filter.with_default(default_level); } - // Overrides from env var + // Overrides from env var. if let Ok(rust_log) = std::env::var(EnvFilter::DEFAULT_ENV) && !rust_log.is_empty() { - let rust_log_targets: Targets = rust_log.parse().expect("failed to parse `RUST_LOG`"); - if let Some(default_level) = rust_log_targets.default_level() { - filter = filter.with_default(default_level); - } - filter = filter.with_targets(rust_log_targets) - }; + let rust_log_targets: Targets = rust_log.parse().expect("failed to parse `RUST_LOG`"); + if let Some(default_level) = rust_log_targets.default_level() { + filter = filter.with_default(default_level); + } + filter = filter.with_targets(rust_log_targets) + }; filter };