diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 50e128702dcec..ce26d1cf1b66a 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -27,7 +27,7 @@ use risingwave_hummock_sdk::{info_in_release, HummockEpoch, LocalSstableInfo}; use risingwave_pb::hummock::version_update_payload::Payload; use tokio::spawn; use tokio::sync::{mpsc, oneshot}; -use tracing::{error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; use super::refiller::{CacheRefillConfig, CacheRefiller}; use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType}; @@ -238,7 +238,7 @@ impl HummockEventHandler { epoch: HummockEpoch, newly_uploaded_sstables: Vec, ) { - info_in_release!("epoch has been synced: {}.", epoch); + debug!("epoch has been synced: {}.", epoch); if !newly_uploaded_sstables.is_empty() { newly_uploaded_sstables .into_iter() @@ -311,7 +311,7 @@ impl HummockEventHandler { new_sync_epoch: HummockEpoch, sync_result_sender: oneshot::Sender>, ) { - info_in_release!("receive await sync epoch: {}", new_sync_epoch); + debug!("receive await sync epoch: {}", new_sync_epoch); // The epoch to sync has been committed already. if new_sync_epoch <= self.uploader.max_committed_epoch() { send_sync_result( @@ -326,7 +326,7 @@ impl HummockEventHandler { } // The epoch has been synced if new_sync_epoch <= self.uploader.max_synced_epoch() { - info_in_release!( + debug!( "epoch {} has been synced. Current max_sync_epoch {}", new_sync_epoch, self.uploader.max_synced_epoch() @@ -345,7 +345,7 @@ impl HummockEventHandler { return; } - info_in_release!( + debug!( "awaiting for epoch to be synced: {}, max_synced_epoch: {}", new_sync_epoch, self.uploader.max_synced_epoch() @@ -473,7 +473,7 @@ impl HummockEventHandler { self.pinned_version.load().max_committed_epoch(), )); - info_in_release!( + debug!( "update to hummock version: {}, epoch: {}", new_pinned_version.id(), new_pinned_version.max_committed_epoch() diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index b3a96f109d233..cb62969580c34 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap, VecDeque}; -use std::fmt::{Debug, Formatter}; +use std::fmt::{Debug, Display, Formatter}; use std::future::Future; use std::mem::swap; use std::ops::DerefMut; @@ -28,9 +28,9 @@ use more_asserts::{assert_ge, assert_gt, assert_le}; use prometheus::core::{AtomicU64, GenericGauge}; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::EPOCH_LEN; -use risingwave_hummock_sdk::{info_in_release, CompactionGroupId, HummockEpoch, LocalSstableInfo}; +use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, LocalSstableInfo}; use tokio::task::JoinHandle; -use tracing::error; +use tracing::{debug, error, info}; use crate::hummock::compactor::{merge_imms_in_memory, CompactionExecutor}; use crate::hummock::event_handler::hummock_event_handler::BufferTracker; @@ -61,6 +61,16 @@ pub struct UploadTaskInfo { pub compaction_group_index: Arc>, } +impl Display for UploadTaskInfo { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("UploadTaskInfo") + .field("task_size", &self.task_size) + .field("epochs", &self.epochs) + .field("len(imm_ids)", &self.imm_ids.len()) + .finish() + } +} + impl Debug for UploadTaskInfo { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("UploadTaskInfo") @@ -152,6 +162,9 @@ impl Debug for UploadingTask { } impl UploadingTask { + // INFO logs will be enabled for task with size exceeding 50MB. + const LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE: usize = 50 * (1 << 20); + fn new(payload: UploadTaskPayload, context: &UploaderContext) -> Self { assert!(!payload.is_empty()); let mut epochs = payload @@ -175,7 +188,11 @@ impl UploadingTask { .buffer_tracker .global_upload_task_size() .add(task_size as u64); - info_in_release!("start upload task: {:?}", task_info); + if task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE { + info!("start upload task: {:?}", task_info); + } else { + debug!("start upload task: {:?}", task_info); + } let join_handle = (context.spawn_upload_task)(payload.clone(), task_info.clone()); Self { payload, @@ -190,7 +207,13 @@ impl UploadingTask { fn poll_result(&mut self, cx: &mut Context<'_>) -> Poll> { Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) { Ok(task_result) => task_result - .inspect(|_| info_in_release!("upload task finish {:?}", self.task_info)) + .inspect(|_| { + if self.task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE { + info!("upload task finish {:?}", self.task_info) + } else { + debug!("upload task finish {:?}", self.task_info) + } + }) .map(|ssts| { StagingSstableInfo::new( ssts, @@ -227,6 +250,10 @@ impl UploadingTask { } } } + + pub fn get_task_info(&self) -> &UploadTaskInfo { + &self.task_info + } } impl Future for UploadingTask { @@ -296,6 +323,7 @@ impl UnsealedEpochData { .stats .spill_task_size_from_unsealed .inc_by(task.task_info.task_size as u64); + info!("Spill unsealed data. Task: {}", task.get_task_info()); self.spilled_data.add_task(task); } } @@ -447,6 +475,7 @@ impl SealedData { .stats .spill_task_size_from_sealed .inc_by(task.task_info.task_size as u64); + info!("Spill sealed data. Task: {}", task.get_task_info()); } self.spilled_data.add_task(task); } @@ -662,7 +691,7 @@ impl HummockUploader { } pub(crate) fn seal_epoch(&mut self, epoch: HummockEpoch) { - info_in_release!("epoch {} is sealed", epoch); + debug!("epoch {} is sealed", epoch); assert!( epoch > self.max_sealed_epoch, "sealing a sealed epoch {}. {}", @@ -684,10 +713,10 @@ impl HummockUploader { .expect("we have checked non-empty"); self.sealed_data.seal_new_epoch(epoch, unsealed_data); } else { - info_in_release!("epoch {} to seal has no data", epoch); + debug!("epoch {} to seal has no data", epoch); } } else { - info_in_release!("epoch {} to seal has no data", epoch); + debug!("epoch {} to seal has no data", epoch); } } @@ -746,7 +775,7 @@ impl HummockUploader { } pub(crate) fn start_sync_epoch(&mut self, epoch: HummockEpoch) { - info_in_release!("start sync epoch: {}", epoch); + debug!("start sync epoch: {}", epoch); assert!( epoch > self.max_syncing_epoch, "the epoch {} has started syncing already: {}",