Skip to content

Commit

Permalink
Merge branch 'develop' into feat/compaction-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r authored Oct 10, 2023
2 parents adaceb5 + 8a5ef82 commit 450fe0a
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub const WRITE_STALL_TOTAL: &str = "mito.write.stall_total";
pub const WRITE_REJECT_TOTAL: &str = "mito.write.reject_total";
/// Elapsed time of each write stage.
pub const WRITE_STAGE_ELAPSED: &str = "mito.write.stage_elapsed";
/// Counter of rows to write.
pub const WRITE_ROWS_TOTAL: &str = "mito.write.rows_total";
// ------ End of write related metrics

// Compaction metrics
Expand Down
27 changes: 26 additions & 1 deletion src/mito2/src/region_write_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::mem;
use std::sync::Arc;

use api::v1::{Mutation, Rows, WalEntry};
use api::v1::{Mutation, OpType, Rows, WalEntry};
use common_query::Output;
use snafu::ResultExt;
use store_api::logstore::LogStore;
Expand Down Expand Up @@ -92,6 +92,14 @@ pub(crate) struct RegionWriteCtx {
///
/// The i-th notify is for i-th mutation.
notifiers: Vec<WriteNotify>,
/// The write operation is failed and we should not write to the mutable memtable.
failed: bool,

// Metrics:
/// Rows to put.
pub(crate) put_num: usize,
/// Rows to delete.
pub(crate) delete_num: usize,
}

impl RegionWriteCtx {
Expand All @@ -112,6 +120,9 @@ impl RegionWriteCtx {
next_entry_id: last_entry_id + 1,
wal_entry: WalEntry::default(),
notifiers: Vec::new(),
failed: false,
put_num: 0,
delete_num: 0,
}
}

Expand All @@ -130,6 +141,13 @@ impl RegionWriteCtx {

// Increase sequence number.
self.next_sequence += num_rows as u64;

// Update metrics.
match OpType::from_i32(op_type) {
Some(OpType::Delete) => self.delete_num += num_rows,
Some(OpType::Put) => self.put_num += num_rows,
None => (),
}
}

/// Encode and add WAL entry to the writer.
Expand All @@ -153,6 +171,9 @@ impl RegionWriteCtx {
for notify in &mut self.notifiers {
notify.err = Some(err.clone());
}

// Fail the whole write operation.
self.failed = true;
}

/// Updates next entry id.
Expand All @@ -164,6 +185,10 @@ impl RegionWriteCtx {
pub(crate) fn write_memtable(&mut self) {
debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len());

if self.failed {
return;
}

let mutable = &self.version.memtables.mutable;
// Takes mutations from the wal entry.
let mutations = mem::take(&mut self.wal_entry.mutations);
Expand Down
11 changes: 10 additions & 1 deletion src/mito2/src/worker/handle_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use store_api::metadata::RegionMetadata;
use store_api::storage::RegionId;

use crate::error::{RejectWriteSnafu, Result};
use crate::metrics::{STAGE_LABEL, WRITE_REJECT_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL};
use crate::metrics::{
STAGE_LABEL, TYPE_LABEL, WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED,
WRITE_STALL_TOTAL,
};
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::{SenderWriteRequest, WriteRequest};
use crate::worker::RegionWorkerLoop;
Expand Down Expand Up @@ -80,13 +83,19 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
}

let (mut put_rows, mut delete_rows) = (0, 0);
// Write to memtables.
{
let _timer = timer!(WRITE_STAGE_ELAPSED, &[(STAGE_LABEL, "write_memtable")]);
for mut region_ctx in region_ctxs.into_values() {
region_ctx.write_memtable();
put_rows += region_ctx.put_num;
delete_rows += region_ctx.delete_num;
}
}

counter!(WRITE_ROWS_TOTAL, put_rows as u64, TYPE_LABEL => "put");
counter!(WRITE_ROWS_TOTAL, delete_rows as u64, TYPE_LABEL => "delete");
}
}

Expand Down
1 change: 1 addition & 0 deletions src/promql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ datafusion.workspace = true
datatypes = { workspace = true }
futures = "0.3"
greptime-proto.workspace = true
metrics = { workspace = true }
promql-parser = "0.1.1"
prost.workspace = true
session = { workspace = true }
Expand Down
10 changes: 9 additions & 1 deletion src/promql/src/extension_plan/series_divide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use prost::Message;
use snafu::ResultExt;

use crate::error::{DeserializeSnafu, Result};
use crate::metrics::PROMQL_SERIES_COUNT;

#[derive(Debug, PartialEq, Eq, Hash)]
pub struct SeriesDivide {
Expand Down Expand Up @@ -204,6 +205,7 @@ impl ExecutionPlan for SeriesDivideExec {
schema,
input,
metric: baseline_metric,
num_series: 0,
}))
}

Expand Down Expand Up @@ -239,6 +241,7 @@ pub struct SeriesDivideStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
metric: BaselineMetrics,
num_series: usize,
}

impl RecordBatchStream for SeriesDivideStream {
Expand All @@ -259,6 +262,7 @@ impl Stream for SeriesDivideStream {
Some(Ok(batch)) => batch,
None => {
self.buffer = None;
self.num_series += 1;
return Poll::Ready(Some(Ok(batch)));
}
error => return Poll::Ready(error),
Expand All @@ -271,12 +275,16 @@ impl Stream for SeriesDivideStream {
let result_batch = batch.slice(0, same_length);
let remaining_batch = batch.slice(same_length, batch.num_rows() - same_length);
self.buffer = Some(remaining_batch);
self.num_series += 1;
return Poll::Ready(Some(Ok(result_batch)));
}
} else {
let batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
Some(Ok(batch)) => batch,
None => return Poll::Ready(None),
None => {
metrics::histogram!(PROMQL_SERIES_COUNT, self.num_series as f64);
return Poll::Ready(None);
}
error => return Poll::Ready(error),
};
self.buffer = Some(batch);
Expand Down
1 change: 1 addition & 0 deletions src/promql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@
pub mod error;
pub mod extension_plan;
pub mod functions;
mod metrics;
pub mod planner;
pub mod range_array;
16 changes: 16 additions & 0 deletions src/promql/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// Counter for the number of series processed per query.
pub static PROMQL_SERIES_COUNT: &str = "promql.series_count";
20 changes: 19 additions & 1 deletion src/query/src/dist_plan/merge_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::any::Any;
use std::sync::Arc;
use std::time::Duration;

use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use async_stream::stream;
Expand All @@ -39,8 +40,12 @@ use futures_util::StreamExt;
use greptime_proto::v1::region::{QueryRequest, RegionRequestHeader};
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::time::Instant;

use crate::error::ConvertSchemaSnafu;
use crate::metrics::{
METRIC_MERGE_SCAN_ERRORS_TOTAL, METRIC_MERGE_SCAN_POLL_ELAPSED, METRIC_MERGE_SCAN_REGIONS,
};
use crate::region_query::RegionQueryHandlerRef;

#[derive(Debug, Hash, PartialEq, Eq, Clone)]
Expand Down Expand Up @@ -161,6 +166,7 @@ impl MergeScanExec {
let trace_id = trace_id().unwrap_or_default();

let stream = Box::pin(stream!({
metrics::histogram!(METRIC_MERGE_SCAN_REGIONS, regions.len() as f64);
let _finish_timer = metric.finish_time().timer();
let mut ready_timer = metric.ready_time().timer();
let mut first_consume_timer = Some(metric.first_consume_time().timer());
Expand All @@ -178,12 +184,21 @@ impl MergeScanExec {
let mut stream = region_query_handler
.do_get(request)
.await
.map_err(BoxedError::new)
.map_err(|e| {
metrics::increment_counter!(METRIC_MERGE_SCAN_ERRORS_TOTAL);
BoxedError::new(e)
})
.context(ExternalSnafu)?;

ready_timer.stop();

let mut poll_duration = Duration::new(0, 0);

let mut poll_timer = Instant::now();
while let Some(batch) = stream.next().await {
let poll_elapsed = poll_timer.elapsed();
poll_duration += poll_elapsed;

let batch = batch?;
// reconstruct batch using `self.schema`
// to remove metadata and correct column name
Expand All @@ -193,7 +208,10 @@ impl MergeScanExec {
first_consume_timer.stop();
}
yield Ok(batch);
// reset poll timer
poll_timer = Instant::now();
}
metrics::histogram!(METRIC_MERGE_SCAN_POLL_ELAPSED, poll_duration.as_secs_f64());
}
}));

Expand Down
3 changes: 3 additions & 0 deletions src/query/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ pub static METRIC_OPTIMIZE_LOGICAL_ELAPSED: &str = "query.optimize_logicalplan_e
pub static METRIC_OPTIMIZE_PHYSICAL_ELAPSED: &str = "query.optimize_physicalplan_elapsed";
pub static METRIC_CREATE_PHYSICAL_ELAPSED: &str = "query.create_physicalplan_elapsed";
pub static METRIC_EXEC_PLAN_ELAPSED: &str = "query.execute_plan_elapsed";
pub static METRIC_MERGE_SCAN_POLL_ELAPSED: &str = "query.merge_scan.poll_elapsed";
pub static METRIC_MERGE_SCAN_REGIONS: &str = "query.merge_scan.regions";
pub static METRIC_MERGE_SCAN_ERRORS_TOTAL: &str = "query.merge_scan.errors_total";
1 change: 0 additions & 1 deletion src/store-api/src/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::storage::{ColumnId, RegionId, ScanRequest};

#[derive(Debug, IntoStaticStr)]
pub enum RegionRequest {
// TODO: rename to InsertRequest
Put(RegionPutRequest),
Delete(RegionDeleteRequest),
Create(RegionCreateRequest),
Expand Down

0 comments on commit 450fe0a

Please sign in to comment.