Skip to content

Commit

Permalink
chore(storage): only log spilled task and large task in uploader (#13625
Browse files Browse the repository at this point in the history
)
  • Loading branch information
hzxa21 authored Nov 24, 2023
1 parent 937e099 commit 3f5180c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 15 deletions.
12 changes: 6 additions & 6 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_hummock_sdk::{info_in_release, HummockEpoch, LocalSstableInfo};
use risingwave_pb::hummock::version_update_payload::Payload;
use tokio::spawn;
use tokio::sync::{mpsc, oneshot};
use tracing::{error, info, trace, warn};
use tracing::{debug, error, info, trace, warn};

use super::refiller::{CacheRefillConfig, CacheRefiller};
use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType};
Expand Down Expand Up @@ -238,7 +238,7 @@ impl HummockEventHandler {
epoch: HummockEpoch,
newly_uploaded_sstables: Vec<StagingSstableInfo>,
) {
info_in_release!("epoch has been synced: {}.", epoch);
debug!("epoch has been synced: {}.", epoch);
if !newly_uploaded_sstables.is_empty() {
newly_uploaded_sstables
.into_iter()
Expand Down Expand Up @@ -311,7 +311,7 @@ impl HummockEventHandler {
new_sync_epoch: HummockEpoch,
sync_result_sender: oneshot::Sender<HummockResult<SyncResult>>,
) {
info_in_release!("receive await sync epoch: {}", new_sync_epoch);
debug!("receive await sync epoch: {}", new_sync_epoch);
// The epoch to sync has been committed already.
if new_sync_epoch <= self.uploader.max_committed_epoch() {
send_sync_result(
Expand All @@ -326,7 +326,7 @@ impl HummockEventHandler {
}
// The epoch has been synced
if new_sync_epoch <= self.uploader.max_synced_epoch() {
info_in_release!(
debug!(
"epoch {} has been synced. Current max_sync_epoch {}",
new_sync_epoch,
self.uploader.max_synced_epoch()
Expand All @@ -345,7 +345,7 @@ impl HummockEventHandler {
return;
}

info_in_release!(
debug!(
"awaiting for epoch to be synced: {}, max_synced_epoch: {}",
new_sync_epoch,
self.uploader.max_synced_epoch()
Expand Down Expand Up @@ -473,7 +473,7 @@ impl HummockEventHandler {
self.pinned_version.load().max_committed_epoch(),
));

info_in_release!(
debug!(
"update to hummock version: {}, epoch: {}",
new_pinned_version.id(),
new_pinned_version.max_committed_epoch()
Expand Down
47 changes: 38 additions & 9 deletions src/storage/src/hummock/event_handler/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::collections::{BTreeMap, HashMap, VecDeque};
use std::fmt::{Debug, Formatter};
use std::fmt::{Debug, Display, Formatter};
use std::future::Future;
use std::mem::swap;
use std::ops::DerefMut;
Expand All @@ -28,9 +28,9 @@ use more_asserts::{assert_ge, assert_gt, assert_le};
use prometheus::core::{AtomicU64, GenericGauge};
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::EPOCH_LEN;
use risingwave_hummock_sdk::{info_in_release, CompactionGroupId, HummockEpoch, LocalSstableInfo};
use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, LocalSstableInfo};
use tokio::task::JoinHandle;
use tracing::error;
use tracing::{debug, error, info};

use crate::hummock::compactor::{merge_imms_in_memory, CompactionExecutor};
use crate::hummock::event_handler::hummock_event_handler::BufferTracker;
Expand Down Expand Up @@ -61,6 +61,16 @@ pub struct UploadTaskInfo {
pub compaction_group_index: Arc<HashMap<TableId, CompactionGroupId>>,
}

impl Display for UploadTaskInfo {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UploadTaskInfo")
.field("task_size", &self.task_size)
.field("epochs", &self.epochs)
.field("len(imm_ids)", &self.imm_ids.len())
.finish()
}
}

impl Debug for UploadTaskInfo {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UploadTaskInfo")
Expand Down Expand Up @@ -152,6 +162,9 @@ impl Debug for UploadingTask {
}

impl UploadingTask {
// INFO logs will be enabled for task with size exceeding 50MB.
const LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE: usize = 50 * (1 << 20);

fn new(payload: UploadTaskPayload, context: &UploaderContext) -> Self {
assert!(!payload.is_empty());
let mut epochs = payload
Expand All @@ -175,7 +188,11 @@ impl UploadingTask {
.buffer_tracker
.global_upload_task_size()
.add(task_size as u64);
info_in_release!("start upload task: {:?}", task_info);
if task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
info!("start upload task: {:?}", task_info);
} else {
debug!("start upload task: {:?}", task_info);
}
let join_handle = (context.spawn_upload_task)(payload.clone(), task_info.clone());
Self {
payload,
Expand All @@ -190,7 +207,13 @@ impl UploadingTask {
fn poll_result(&mut self, cx: &mut Context<'_>) -> Poll<HummockResult<StagingSstableInfo>> {
Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) {
Ok(task_result) => task_result
.inspect(|_| info_in_release!("upload task finish {:?}", self.task_info))
.inspect(|_| {
if self.task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
info!("upload task finish {:?}", self.task_info)
} else {
debug!("upload task finish {:?}", self.task_info)
}
})
.map(|ssts| {
StagingSstableInfo::new(
ssts,
Expand Down Expand Up @@ -227,6 +250,10 @@ impl UploadingTask {
}
}
}

pub fn get_task_info(&self) -> &UploadTaskInfo {
&self.task_info
}
}

impl Future for UploadingTask {
Expand Down Expand Up @@ -296,6 +323,7 @@ impl UnsealedEpochData {
.stats
.spill_task_size_from_unsealed
.inc_by(task.task_info.task_size as u64);
info!("Spill unsealed data. Task: {}", task.get_task_info());
self.spilled_data.add_task(task);
}
}
Expand Down Expand Up @@ -447,6 +475,7 @@ impl SealedData {
.stats
.spill_task_size_from_sealed
.inc_by(task.task_info.task_size as u64);
info!("Spill sealed data. Task: {}", task.get_task_info());
}
self.spilled_data.add_task(task);
}
Expand Down Expand Up @@ -662,7 +691,7 @@ impl HummockUploader {
}

pub(crate) fn seal_epoch(&mut self, epoch: HummockEpoch) {
info_in_release!("epoch {} is sealed", epoch);
debug!("epoch {} is sealed", epoch);
assert!(
epoch > self.max_sealed_epoch,
"sealing a sealed epoch {}. {}",
Expand All @@ -684,10 +713,10 @@ impl HummockUploader {
.expect("we have checked non-empty");
self.sealed_data.seal_new_epoch(epoch, unsealed_data);
} else {
info_in_release!("epoch {} to seal has no data", epoch);
debug!("epoch {} to seal has no data", epoch);
}
} else {
info_in_release!("epoch {} to seal has no data", epoch);
debug!("epoch {} to seal has no data", epoch);
}
}

Expand Down Expand Up @@ -746,7 +775,7 @@ impl HummockUploader {
}

pub(crate) fn start_sync_epoch(&mut self, epoch: HummockEpoch) {
info_in_release!("start sync epoch: {}", epoch);
debug!("start sync epoch: {}", epoch);
assert!(
epoch > self.max_syncing_epoch,
"the epoch {} has started syncing already: {}",
Expand Down

0 comments on commit 3f5180c

Please sign in to comment.