Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add compaction metrics #2560

Merged
merged 4 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ mod twcs;

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

use common_telemetry::{debug, error};
use common_telemetry::{debug, error, timer};
pub use picker::CompactionPickerRef;
use snafu::ResultExt;
use store_api::storage::RegionId;
Expand All @@ -32,6 +33,7 @@ use crate::compaction::twcs::TwcsPicker;
use crate::error::{
CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
};
use crate::metrics::{COMPACTION_STAGE_ELAPSED, STAGE_LABEL};
use crate::region::options::CompactionOptions;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
Expand All @@ -47,6 +49,8 @@ pub struct CompactionRequest {
/// Waiters of the compaction request.
pub(crate) waiters: Vec<OutputTx>,
pub(crate) file_purger: FilePurgerRef,
/// Start time of compaction task.
pub(crate) start_time: Instant,
}

impl CompactionRequest {
Expand Down Expand Up @@ -175,11 +179,14 @@ impl CompactionScheduler {
"Pick compaction strategy {:?} for region: {}",
picker, region_id
);

let pick_timer = timer!(COMPACTION_STAGE_ELAPSED, &[(STAGE_LABEL, "pick")]);
let Some(mut task) = picker.pick(request) else {
// Nothing to compact, remove it from the region status map.
self.region_status.remove(&region_id);
return Ok(());
};
drop(pick_timer);

// Submit the compaction task.
self.scheduler
Expand All @@ -188,10 +195,8 @@ impl CompactionScheduler {
}))
.map_err(|e| {
error!(e; "Failed to submit compaction request for region {}", region_id);

// If failed to submit the job, we need to remove the region from the scheduler.
self.region_status.remove(&region_id);

e
})
}
Expand Down Expand Up @@ -295,12 +300,14 @@ impl CompactionStatus {
waiter: OptionOutputTx,
) -> CompactionRequest {
let current_version = self.version_control.current().version;
let start_time = Instant::now();
let mut req = CompactionRequest {
current_version,
access_layer: self.access_layer.clone(),
request_sender: request_sender.clone(),
waiters: Vec::new(),
file_purger: self.file_purger.clone(),
start_time,
};

if let Some(pending) = self.pending_compaction.take() {
Expand Down
14 changes: 12 additions & 2 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};

use common_base::readable_size::ReadableSize;
use common_query::Output;
use common_telemetry::{debug, error, info};
use common_telemetry::{debug, error, info, timer};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
use metrics::increment_counter;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
Expand All @@ -34,6 +35,7 @@ use crate::compaction::picker::{CompactionTask, Picker};
use crate::compaction::CompactionRequest;
use crate::error;
use crate::error::CompactRegionSnafu;
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED, STAGE_LABEL};
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
};
Expand Down Expand Up @@ -118,6 +120,7 @@ impl Picker for TwcsPicker {
request_sender,
waiters,
file_purger,
start_time,
} = req;

let region_metadata = current_version.metadata.clone();
Expand Down Expand Up @@ -170,6 +173,7 @@ impl Picker for TwcsPicker {
request_sender,
waiters,
file_purger,
start_time,
};
Some(Box::new(task))
}
Expand Down Expand Up @@ -228,6 +232,8 @@ pub(crate) struct TwcsCompactionTask {
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
/// Senders that are used to notify waiters waiting for pending compaction tasks.
pub waiters: Vec<OutputTx>,
/// Start time of compaction task
pub start_time: Instant,
}

impl Debug for TwcsCompactionTask {
Expand Down Expand Up @@ -310,8 +316,10 @@ impl TwcsCompactionTask {

async fn handle_compaction(&mut self) -> error::Result<(Vec<FileMeta>, Vec<FileMeta>)> {
self.mark_files_compacting(true);
let merge_timer = timer!(COMPACTION_STAGE_ELAPSED, &[(STAGE_LABEL, "merge")]);
let (output, mut compacted) = self.merge_ssts().await.map_err(|e| {
error!(e; "Failed to compact region: {}", self.region_id);
merge_timer.discard();
e
})?;
compacted.extend(self.expired_ssts.iter().map(FileHandle::meta));
Expand All @@ -320,6 +328,7 @@ impl TwcsCompactionTask {

/// Handles compaction failure, notifies all waiters.
fn on_failure(&mut self, err: Arc<error::Error>) {
increment_counter!(COMPACTION_FAILURE_COUNT);
for waiter in self.waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
Expand Down Expand Up @@ -357,6 +366,7 @@ impl CompactionTask for TwcsCompactionTask {
compaction_time_window: self
.compaction_time_window
.map(|seconds| Duration::from_secs(seconds as u64)),
start_time: self.start_time,
})
}
Err(e) => {
Expand Down
15 changes: 13 additions & 2 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

/// Stage label.
pub const STAGE_LABEL: &str = "stage";

/// Global write buffer size in bytes.
pub const WRITE_BUFFER_BYTES: &str = "mito.write_buffer_bytes";
/// Type label.
Expand Down Expand Up @@ -42,6 +45,14 @@ pub const WRITE_STALL_TOTAL: &str = "mito.write.stall_total";
pub const WRITE_REJECT_TOTAL: &str = "mito.write.reject_total";
/// Elapsed time of each write stage.
pub const WRITE_STAGE_ELAPSED: &str = "mito.write.stage_elapsed";
/// Stage label.
pub const STAGE_LABEL: &str = "stage";
// ------ End of write related metrics

// Compaction metrics
/// Timer of different stages in compaction.
pub const COMPACTION_STAGE_ELAPSED: &str = "mito.compaction.stage_elapsed";
/// Timer of whole compaction task.
pub const COMPACTION_ELAPSED_TOTAL: &str = "mito.compaction.total_elapsed";
/// Counter of all requested compaction task.
pub const COMPACTION_REQUEST_COUNT: &str = "mito.compaction.requests_total";
/// Counter of failed compaction task.
pub const COMPACTION_FAILURE_COUNT: &str = "mito.compaction.failure_total";
9 changes: 8 additions & 1 deletion src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};

use api::helper::{
is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type,
Expand All @@ -29,6 +29,7 @@ use common_telemetry::metric::Timer;
use common_telemetry::tracing::log::info;
use common_telemetry::warn;
use datatypes::prelude::DataType;
use metrics::register_histogram;
use prost::Message;
use smallvec::SmallVec;
use snafu::{ensure, OptionExt, ResultExt};
Expand All @@ -45,6 +46,7 @@ use crate::error::{
InvalidRequestSnafu, Result,
};
use crate::memtable::MemtableId;
use crate::metrics::COMPACTION_ELAPSED_TOTAL;
use crate::sst::file::FileMeta;
use crate::sst::file_purger::{FilePurgerRef, PurgeRequest};
use crate::wal::EntryId;
Expand Down Expand Up @@ -646,10 +648,15 @@ pub(crate) struct CompactionFinished {
pub(crate) file_purger: FilePurgerRef,
/// Inferred Compaction time window.
pub(crate) compaction_time_window: Option<Duration>,
/// Start time of compaction task.
pub(crate) start_time: Instant,
}

impl CompactionFinished {
pub fn on_success(self) {
// only update compaction time on success
register_histogram!(COMPACTION_ELAPSED_TOTAL).record(self.start_time.elapsed());
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved

for sender in self.senders {
sender.send(Ok(AffectedRows(0)));
}
Expand Down
50 changes: 29 additions & 21 deletions src/mito2/src/worker/handle_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_telemetry::{error, info};
use common_telemetry::{error, info, timer};
use metrics::increment_counter;
use store_api::logstore::LogStore;
use store_api::storage::RegionId;

use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{COMPACTION_REQUEST_COUNT, COMPACTION_STAGE_ELAPSED, STAGE_LABEL};
use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
use crate::worker::RegionWorkerLoop;

Expand All @@ -30,7 +32,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
return;
};

increment_counter!(COMPACTION_REQUEST_COUNT);
if let Err(e) = self.compaction_scheduler.schedule_compaction(
region.region_id,
&region.version_control,
Expand All @@ -57,27 +59,33 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
};

// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: std::mem::take(&mut request.compaction_outputs),
files_to_remove: std::mem::take(&mut request.compacted_files),
compaction_time_window: request.compaction_time_window,
flushed_entry_id: None,
flushed_sequence: None,
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
if let Err(e) = region.manifest_manager.update(action_list).await {
error!(e; "Failed to update manifest, region: {}", region_id);
request.on_failure(e);
return;
}
{
let manifest_timer =
timer!(COMPACTION_STAGE_ELAPSED, &[(STAGE_LABEL, "write_manifest")]);
// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: std::mem::take(&mut request.compaction_outputs),
files_to_remove: std::mem::take(&mut request.compacted_files),
compaction_time_window: request.compaction_time_window,
flushed_entry_id: None,
flushed_sequence: None,
};
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
if let Err(e) = region.manifest_manager.update(action_list).await {
error!(e; "Failed to update manifest, region: {}", region_id);
manifest_timer.discard();
request.on_failure(e);
return;
}

// Apply edit to region's version.
region
.version_control
.apply_edit(edit, &[], region.file_purger.clone());
// Apply edit to region's version.
region
.version_control
.apply_edit(edit, &[], region.file_purger.clone());
}
// compaction finished.
request.on_success();

// Schedule next compaction if necessary.
self.compaction_scheduler.on_compaction_finished(region_id);
}
Expand Down
Loading