Skip to content

Commit

Permalink
chore: tweak log level in production (#12739) (#12770)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
Co-authored-by: Bugen Zhao <[email protected]>
  • Loading branch information
github-actions[bot] and BugenZhao authored Oct 11, 2023
1 parent 8584408 commit c1d72f1
Show file tree
Hide file tree
Showing 15 changed files with 55 additions and 65 deletions.
2 changes: 1 addition & 1 deletion src/batch/src/task/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl BatchManager {
pb_task_output_id: &PbTaskOutputId,
) -> Result<()> {
let task_id = TaskOutputId::try_from(pb_task_output_id)?;
tracing::trace!(target: "events::compute::exchange", peer_addr = %peer_addr, from = ?task_id, "serve exchange RPC");
tracing::debug!(target: "events::compute::exchange", peer_addr = %peer_addr, from = ?task_id, "serve exchange RPC");
let mut task_output = self.take_output(pb_task_output_id)?;
self.runtime.spawn(async move {
let mut writer = GrpcExchangeWriter::new(tx.clone());
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/rpc/service/exchange_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl ExchangeServiceImpl {
up_down_actor_ids: (u32, u32),
up_down_fragment_ids: (u32, u32),
) {
tracing::trace!(target: "events::compute::exchange", peer_addr = %peer_addr, "serve stream exchange RPC");
tracing::debug!(target: "events::compute::exchange", peer_addr = %peer_addr, "serve stream exchange RPC");
let up_actor_id = up_down_actor_ids.0.to_string();
let up_fragment_id = up_down_fragment_ids.0.to_string();
let down_fragment_id = up_down_fragment_ids.1.to_string();
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,9 +796,9 @@ impl GlobalBarrierManager {
actor_ids_to_send,
actor_ids_to_collect,
};
tracing::trace!(
tracing::debug!(
target: "events::meta::barrier::inject_barrier",
"inject barrier request: {:?}", request
?request, "inject barrier request"
);

// This RPC returns only if this worker node has injected this barrier.
Expand Down Expand Up @@ -838,9 +838,9 @@ impl GlobalBarrierManager {
prev_epoch,
tracing_context,
};
tracing::trace!(
tracing::debug!(
target: "events::meta::barrier::barrier_complete",
"barrier complete request: {:?}", request
?request, "barrier complete"
);

// This RPC returns only if this worker node has collected this barrier.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ impl ClusterManager {
worker_id: WorkerId,
info: Vec<heartbeat_request::extra_info::Info>,
) -> MetaResult<()> {
tracing::trace!(target: "events::meta::server_heartbeat", worker_id = worker_id, "receive heartbeat");
tracing::debug!(target: "events::meta::server_heartbeat", worker_id, "receive heartbeat");
let mut core = self.core.write().await;
for worker in core.workers.values_mut() {
if worker.worker_id() == worker_id {
Expand Down
2 changes: 1 addition & 1 deletion src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ impl MetaClient {
extra_info.push(info);
}
}
tracing::trace!(target: "events::meta::client_heartbeat", "heartbeat");
tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
match tokio::time::timeout(
// TODO: decide better min_interval for timeout
min_interval * 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl SstableIterator {
idx: usize,
seek_key: Option<FullKey<&[u8]>>,
) -> HummockResult<()> {
tracing::trace!(
tracing::debug!(
target: "events::storage::sstable::block_seek",
"table iterator seek: sstable_object_id = {}, block_id = {}",
self.sst.value().id,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> Virtu
vnode
};

tracing::trace!(target: "events::storage::storage_table", "compute vnode: {:?} key {:?} => {}", row, indices, vnode);
tracing::debug!(target: "events::storage::storage_table", "compute vnode: {:?} key {:?} => {}", row, indices, vnode);

vnode
}
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ impl Dispatcher for HashDataDispatcher {
// get hash value of every line by its key
let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys);

tracing::trace!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);

let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
.take(num_outputs)
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/lookup/impl_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl<S: StateStore> LookupExecutor<S> {
.lookup_one_row(&row, self.last_barrier.as_ref().unwrap().epoch)
.await?
{
tracing::trace!(target: "events::stream::lookup::put", "{:?} {:?}", row, matched_row);
tracing::debug!(target: "events::stream::lookup::put", "{:?} {:?}", row, matched_row);

if let Some(chunk) = builder.append_row(*op, row, &matched_row) {
yield Message::Chunk(chunk);
Expand Down Expand Up @@ -388,7 +388,7 @@ impl<S: StateStore> LookupExecutor<S> {
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc();

tracing::trace!(target: "events::stream::lookup::lookup_row", "{:?}", lookup_row);
tracing::debug!(target: "events::stream::lookup::lookup_row", "{:?}", lookup_row);

let mut all_rows = VecWithKvSize::new();
// Drop the stream.
Expand Down Expand Up @@ -427,7 +427,7 @@ impl<S: StateStore> LookupExecutor<S> {
}
}

tracing::trace!(target: "events::stream::lookup::result", "{:?} => {:?}", lookup_row, all_rows.inner());
tracing::debug!(target: "events::stream::lookup::result", "{:?} => {:?}", lookup_row, all_rows.inner());

self.lookup_cache.batch_update(lookup_row, all_rows.clone());

Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl MergeExecutor {
.inc_by(chunk.cardinality() as _);
}
Message::Barrier(barrier) => {
tracing::trace!(
tracing::debug!(
target: "events::stream::barrier::path",
actor_id = actor_id,
"receiver receives barrier from path: {:?}",
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl Executor for ReceiverExecutor {
.inc_by(chunk.cardinality() as _);
}
Message::Barrier(barrier) => {
tracing::trace!(
tracing::debug!(
target: "events::stream::barrier::path",
actor_id = actor_id,
"receiver receives barrier from path: {:?}",
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/wrapper/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub async fn trace(
.with_label_values(&[&actor_id_str, &fragment_id_str, &info.identity])
.inc_by(chunk.cardinality() as u64);
}
tracing::trace!(
tracing::debug!(
target: "events::stream::message::chunk",
cardinality = chunk.cardinality(),
capacity = chunk.capacity(),
Expand All @@ -68,14 +68,14 @@ pub async fn trace(
}
}
Message::Watermark(watermark) => {
tracing::trace!(
tracing::debug!(
target: "events::stream::message::watermark",
value = ?watermark.val,
col_idx = watermark.col_idx,
);
}
Message::Barrier(barrier) => {
tracing::trace!(
tracing::debug!(
target: "events::stream::message::barrier",
prev_epoch = barrier.epoch.prev,
curr_epoch = barrier.epoch.curr,
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl LocalBarrierManager {

/// Register sender for source actors, used to send barriers.
pub fn register_sender(&mut self, actor_id: ActorId, sender: UnboundedSender<Barrier>) {
tracing::trace!(
tracing::debug!(
target: "events::stream::barrier::manager",
actor_id = actor_id,
"register sender"
Expand Down Expand Up @@ -132,7 +132,7 @@ impl LocalBarrierManager {
}
};
let to_collect: HashSet<ActorId> = actor_ids_to_collect.into_iter().collect();
trace!(
debug!(
target: "events::stream::barrier::manager::send",
"send barrier {:?}, senders = {:?}, actor_ids_to_collect = {:?}",
barrier,
Expand Down Expand Up @@ -172,7 +172,7 @@ impl LocalBarrierManager {

// Actors to stop should still accept this barrier, but won't get sent to in next times.
if let Some(actors) = barrier.all_stop_actors() {
trace!(
debug!(
target: "events::stream::barrier::manager",
"remove actors {:?} from senders",
actors
Expand Down
8 changes: 3 additions & 5 deletions src/stream/src/task/barrier_manager/managed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,10 @@ impl ManagedBarrierState {

/// Collect a `barrier` from the actor with `actor_id`.
pub(super) fn collect(&mut self, actor_id: ActorId, barrier: &Barrier) {
tracing::trace!(
tracing::debug!(
target: "events::stream::barrier::manager::collect",
"collect_barrier: epoch = {}, actor_id = {}, state = {:#?}",
barrier.epoch.curr,
actor_id,
self
epoch = barrier.epoch.curr, actor_id, state = ?self,
"collect_barrier",
);

match self.epoch_barrier_state_map.get_mut(&barrier.epoch.curr) {
Expand Down
68 changes: 30 additions & 38 deletions src/utils/runtime/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,6 @@ use tracing_subscriber::{filter, EnvFilter};
const PGWIRE_QUERY_LOG: &str = "pgwire_query_log";
const SLOW_QUERY_LOG: &str = "risingwave_frontend_slow_query_log";

/// Configure log targets for some `RisingWave` crates.
///
/// Other RisingWave crates will follow the default level (`DEBUG` or `INFO` according to
/// the `debug_assertions` and `is_ci` flag).
fn configure_risingwave_targets_fmt(targets: filter::Targets) -> filter::Targets {
targets
// force a lower level for important logs
.with_target("risingwave_stream", Level::DEBUG)
.with_target("risingwave_storage", Level::DEBUG)
// force a higher level for noisy logs
.with_target("risingwave_sqlparser", Level::INFO)
.with_target("pgwire", Level::INFO)
.with_target(PGWIRE_QUERY_LOG, Level::OFF)
// force a higher level for foyer logs
.with_target("foyer", Level::WARN)
.with_target("foyer_common", Level::WARN)
.with_target("foyer_intrusive", Level::WARN)
.with_target("foyer_memory", Level::WARN)
.with_target("foyer_storage", Level::WARN)
// disable events that are too verbose
.with_target("events", Level::ERROR)
}

pub struct LoggerSettings {
/// The name of the service.
name: String,
Expand Down Expand Up @@ -122,9 +99,12 @@ impl LoggerSettings {
/// Overrides default level and tracing targets of the fmt layer (formatting and
/// logging to `stdout` or `stderr`).
///
/// Note that only verbosity levels below or equal to `DEBUG` are effective in
/// release builds.
///
/// e.g.,
/// ```bash
/// RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info"
/// RUST_LOG="info,risingwave_stream=debug,events=debug"
/// ```
///
/// ### `RW_QUERY_LOG_PATH`
Expand Down Expand Up @@ -159,7 +139,20 @@ pub fn init_risingwave_logger(settings: LoggerSettings) {

// Default filter for logging to stdout and tracing.
let default_filter = {
let mut filter = filter::Targets::new()
let mut filter = filter::Targets::new();

// Configure levels for some RisingWave crates.
// Other RisingWave crates like `stream` and `storage` will follow the default level.
filter = filter
.with_target("risingwave_sqlparser", Level::INFO)
.with_target("pgwire", Level::INFO)
.with_target(PGWIRE_QUERY_LOG, Level::OFF)
// debug-purposed events are disabled unless `RUST_LOG` overrides
.with_target("events", Level::OFF);

// Configure levels for external crates.
filter = filter
.with_target("foyer", Level::WARN)
.with_target("aws_sdk_ec2", Level::INFO)
.with_target("aws_sdk_s3", Level::INFO)
.with_target("aws_config", Level::WARN)
Expand All @@ -175,10 +168,8 @@ pub fn init_risingwave_logger(settings: LoggerSettings) {
.with_target("reqwest", Level::WARN)
.with_target("sled", Level::INFO);

filter = configure_risingwave_targets_fmt(filter);

// For all other crates
filter = filter.with_default(match Deployment::current() {
// For all other crates, apply default level depending on the deployment and `debug_assertions` flag.
let default_level = match deployment {
Deployment::Ci => Level::INFO,
_ => {
if cfg!(debug_assertions) {
Expand All @@ -187,22 +178,23 @@ pub fn init_risingwave_logger(settings: LoggerSettings) {
Level::INFO
}
}
});
};
filter = filter.with_default(default_level);

// Overrides from settings
// Overrides from settings.
filter = filter.with_targets(settings.targets);
if let Some(default_level) = settings.default_level {
filter = filter.with_default(default_level);
}

// Overrides from env var
// Overrides from env var.
if let Ok(rust_log) = std::env::var(EnvFilter::DEFAULT_ENV) && !rust_log.is_empty() {
let rust_log_targets: Targets = rust_log.parse().expect("failed to parse `RUST_LOG`");
if let Some(default_level) = rust_log_targets.default_level() {
filter = filter.with_default(default_level);
}
filter = filter.with_targets(rust_log_targets)
};
let rust_log_targets: Targets = rust_log.parse().expect("failed to parse `RUST_LOG`");
if let Some(default_level) = rust_log_targets.default_level() {
filter = filter.with_default(default_level);
}
filter = filter.with_targets(rust_log_targets)
};

filter
};
Expand Down

0 comments on commit c1d72f1

Please sign in to comment.