Skip to content

Commit

Permalink
feat: add compaction metrics (#2560)
Browse files Browse the repository at this point in the history
* feat: add compaction metrics

* feat: add compaction request total count

* fix: CR comments
  • Loading branch information
v0y4g3r authored Oct 10, 2023
1 parent ed725d0 commit 6b39f59
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 29 deletions.
13 changes: 10 additions & 3 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ mod twcs;

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

use common_telemetry::{debug, error};
use common_telemetry::{debug, error, timer};
pub use picker::CompactionPickerRef;
use snafu::ResultExt;
use store_api::storage::RegionId;
Expand All @@ -32,6 +33,7 @@ use crate::compaction::twcs::TwcsPicker;
use crate::error::{
CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
};
use crate::metrics::{COMPACTION_STAGE_ELAPSED, STAGE_LABEL};
use crate::region::options::CompactionOptions;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
Expand All @@ -47,6 +49,8 @@ pub struct CompactionRequest {
/// Waiters of the compaction request.
pub(crate) waiters: Vec<OutputTx>,
pub(crate) file_purger: FilePurgerRef,
/// Start time of compaction task.
pub(crate) start_time: Instant,
}

impl CompactionRequest {
Expand Down Expand Up @@ -175,11 +179,14 @@ impl CompactionScheduler {
"Pick compaction strategy {:?} for region: {}",
picker, region_id
);

let pick_timer = timer!(COMPACTION_STAGE_ELAPSED, &[(STAGE_LABEL, "pick")]);
let Some(mut task) = picker.pick(request) else {
// Nothing to compact, remove it from the region status map.
self.region_status.remove(&region_id);
return Ok(());
};
drop(pick_timer);

// Submit the compaction task.
self.scheduler
Expand All @@ -188,10 +195,8 @@ impl CompactionScheduler {
}))
.map_err(|e| {
error!(e; "Failed to submit compaction request for region {}", region_id);

// If failed to submit the job, we need to remove the region from the scheduler.
self.region_status.remove(&region_id);

e
})
}
Expand Down Expand Up @@ -295,12 +300,14 @@ impl CompactionStatus {
waiter: OptionOutputTx,
) -> CompactionRequest {
let current_version = self.version_control.current().version;
let start_time = Instant::now();
let mut req = CompactionRequest {
current_version,
access_layer: self.access_layer.clone(),
request_sender: request_sender.clone(),
waiters: Vec::new(),
file_purger: self.file_purger.clone(),
start_time,
};

if let Some(pending) = self.pending_compaction.take() {
Expand Down
14 changes: 12 additions & 2 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};

use common_base::readable_size::ReadableSize;
use common_query::Output;
use common_telemetry::{debug, error, info};
use common_telemetry::{debug, error, info, timer};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
use metrics::increment_counter;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
Expand All @@ -34,6 +35,7 @@ use crate::compaction::picker::{CompactionTask, Picker};
use crate::compaction::CompactionRequest;
use crate::error;
use crate::error::CompactRegionSnafu;
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED, STAGE_LABEL};
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
};
Expand Down Expand Up @@ -118,6 +120,7 @@ impl Picker for TwcsPicker {
request_sender,
waiters,
file_purger,
start_time,
} = req;

let region_metadata = current_version.metadata.clone();
Expand Down Expand Up @@ -170,6 +173,7 @@ impl Picker for TwcsPicker {
request_sender,
waiters,
file_purger,
start_time,
};
Some(Box::new(task))
}
Expand Down Expand Up @@ -228,6 +232,8 @@ pub(crate) struct TwcsCompactionTask {
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
/// Senders that are used to notify waiters waiting for pending compaction tasks.
pub waiters: Vec<OutputTx>,
/// Start time of compaction task
pub start_time: Instant,
}

impl Debug for TwcsCompactionTask {
Expand Down Expand Up @@ -310,8 +316,10 @@ impl TwcsCompactionTask {

async fn handle_compaction(&mut self) -> error::Result<(Vec<FileMeta>, Vec<FileMeta>)> {
self.mark_files_compacting(true);
let merge_timer = timer!(COMPACTION_STAGE_ELAPSED, &[(STAGE_LABEL, "merge")]);
let (output, mut compacted) = self.merge_ssts().await.map_err(|e| {
error!(e; "Failed to compact region: {}", self.region_id);
merge_timer.discard();
e
})?;
compacted.extend(self.expired_ssts.iter().map(FileHandle::meta));
Expand All @@ -320,6 +328,7 @@ impl TwcsCompactionTask {

/// Handles compaction failure, notifies all waiters.
fn on_failure(&mut self, err: Arc<error::Error>) {
increment_counter!(COMPACTION_FAILURE_COUNT);
for waiter in self.waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
Expand Down Expand Up @@ -357,6 +366,7 @@ impl CompactionTask for TwcsCompactionTask {
compaction_time_window: self
.compaction_time_window
.map(|seconds| Duration::from_secs(seconds as u64)),
start_time: self.start_time,
})
}
Err(e) => {
Expand Down
15 changes: 13 additions & 2 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

/// Stage label.
pub const STAGE_LABEL: &str = "stage";

/// Global write buffer size in bytes.
pub const WRITE_BUFFER_BYTES: &str = "mito.write_buffer_bytes";
/// Type label.
Expand Down Expand Up @@ -42,8 +45,16 @@ pub const WRITE_STALL_TOTAL: &str = "mito.write.stall_total";
pub const WRITE_REJECT_TOTAL: &str = "mito.write.reject_total";
/// Elapsed time of each write stage.
pub const WRITE_STAGE_ELAPSED: &str = "mito.write.stage_elapsed";
/// Stage label.
pub const STAGE_LABEL: &str = "stage";
/// Counter of rows to write.
pub const WRITE_ROWS_TOTAL: &str = "mito.write.rows_total";
// ------ End of write related metrics

// Compaction metrics
/// Timer of different stages in compaction.
pub const COMPACTION_STAGE_ELAPSED: &str = "mito.compaction.stage_elapsed";
/// Timer of whole compaction task.
pub const COMPACTION_ELAPSED_TOTAL: &str = "mito.compaction.total_elapsed";
/// Counter of all requested compaction task.
pub const COMPACTION_REQUEST_COUNT: &str = "mito.compaction.requests_total";
/// Counter of failed compaction task.
pub const COMPACTION_FAILURE_COUNT: &str = "mito.compaction.failure_total";
9 changes: 8 additions & 1 deletion src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};

use api::helper::{
is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type,
Expand All @@ -29,6 +29,7 @@ use common_telemetry::metric::Timer;
use common_telemetry::tracing::log::info;
use common_telemetry::warn;
use datatypes::prelude::DataType;
use metrics::histogram;
use prost::Message;
use smallvec::SmallVec;
use snafu::{ensure, OptionExt, ResultExt};
Expand All @@ -45,6 +46,7 @@ use crate::error::{
InvalidRequestSnafu, Result,
};
use crate::memtable::MemtableId;
use crate::metrics::COMPACTION_ELAPSED_TOTAL;
use crate::sst::file::FileMeta;
use crate::sst::file_purger::{FilePurgerRef, PurgeRequest};
use crate::wal::EntryId;
Expand Down Expand Up @@ -646,10 +648,15 @@ pub(crate) struct CompactionFinished {
pub(crate) file_purger: FilePurgerRef,
/// Inferred Compaction time window.
pub(crate) compaction_time_window: Option<Duration>,
/// Start time of compaction task.
pub(crate) start_time: Instant,
}

impl CompactionFinished {
pub fn on_success(self) {
// only update compaction time on success
histogram!(COMPACTION_ELAPSED_TOTAL, self.start_time.elapsed());

for sender in self.senders {
sender.send(Ok(AffectedRows(0)));
}
Expand Down
50 changes: 29 additions & 21 deletions src/mito2/src/worker/handle_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_telemetry::{error, info};
use common_telemetry::{error, info, timer};
use metrics::increment_counter;
use store_api::logstore::LogStore;
use store_api::storage::RegionId;

use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{COMPACTION_REQUEST_COUNT, COMPACTION_STAGE_ELAPSED, STAGE_LABEL};
use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
use crate::worker::RegionWorkerLoop;

Expand All @@ -30,7 +32,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
return;
};

increment_counter!(COMPACTION_REQUEST_COUNT);
if let Err(e) = self.compaction_scheduler.schedule_compaction(
region.region_id,
&region.version_control,
Expand All @@ -57,27 +59,33 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
};

// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: std::mem::take(&mut request.compaction_outputs),
files_to_remove: std::mem::take(&mut request.compacted_files),
compaction_time_window: request.compaction_time_window,
flushed_entry_id: None,
flushed_sequence: None,
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
if let Err(e) = region.manifest_manager.update(action_list).await {
error!(e; "Failed to update manifest, region: {}", region_id);
request.on_failure(e);
return;
}
{
let manifest_timer =
timer!(COMPACTION_STAGE_ELAPSED, &[(STAGE_LABEL, "write_manifest")]);
// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: std::mem::take(&mut request.compaction_outputs),
files_to_remove: std::mem::take(&mut request.compacted_files),
compaction_time_window: request.compaction_time_window,
flushed_entry_id: None,
flushed_sequence: None,
};
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
if let Err(e) = region.manifest_manager.update(action_list).await {
error!(e; "Failed to update manifest, region: {}", region_id);
manifest_timer.discard();
request.on_failure(e);
return;
}

// Apply edit to region's version.
region
.version_control
.apply_edit(edit, &[], region.file_purger.clone());
// Apply edit to region's version.
region
.version_control
.apply_edit(edit, &[], region.file_purger.clone());
}
// compaction finished.
request.on_success();

// Schedule next compaction if necessary.
self.compaction_scheduler.on_compaction_finished(region_id);
}
Expand Down

0 comments on commit 6b39f59

Please sign in to comment.