diff --git a/Cargo.lock b/Cargo.lock index da7ff9461400..3c320e4c78ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7036,6 +7036,7 @@ dependencies = [ "datatypes", "futures", "greptime-proto", + "metrics", "promql-parser", "prost", "query", diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index faa73f659c0a..4b32ec877e43 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -45,6 +45,8 @@ pub const WRITE_STALL_TOTAL: &str = "mito.write.stall_total"; pub const WRITE_REJECT_TOTAL: &str = "mito.write.reject_total"; /// Elapsed time of each write stage. pub const WRITE_STAGE_ELAPSED: &str = "mito.write.stage_elapsed"; +/// Counter of rows to write. +pub const WRITE_ROWS_TOTAL: &str = "mito.write.rows_total"; // ------ End of write related metrics // Compaction metrics diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 8e83a6442a1f..5270ceec326d 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -15,7 +15,7 @@ use std::mem; use std::sync::Arc; -use api::v1::{Mutation, Rows, WalEntry}; +use api::v1::{Mutation, OpType, Rows, WalEntry}; use common_query::Output; use snafu::ResultExt; use store_api::logstore::LogStore; @@ -92,6 +92,14 @@ pub(crate) struct RegionWriteCtx { /// /// The i-th notify is for i-th mutation. notifiers: Vec, + /// The write operation is failed and we should not write to the mutable memtable. + failed: bool, + + // Metrics: + /// Rows to put. + pub(crate) put_num: usize, + /// Rows to delete. + pub(crate) delete_num: usize, } impl RegionWriteCtx { @@ -112,6 +120,9 @@ impl RegionWriteCtx { next_entry_id: last_entry_id + 1, wal_entry: WalEntry::default(), notifiers: Vec::new(), + failed: false, + put_num: 0, + delete_num: 0, } } @@ -130,6 +141,13 @@ impl RegionWriteCtx { // Increase sequence number. self.next_sequence += num_rows as u64; + + // Update metrics. + match OpType::from_i32(op_type) { + Some(OpType::Delete) => self.delete_num += num_rows, + Some(OpType::Put) => self.put_num += num_rows, + None => (), + } } /// Encode and add WAL entry to the writer. @@ -153,6 +171,9 @@ impl RegionWriteCtx { for notify in &mut self.notifiers { notify.err = Some(err.clone()); } + + // Fail the whole write operation. + self.failed = true; } /// Updates next entry id. @@ -164,6 +185,10 @@ impl RegionWriteCtx { pub(crate) fn write_memtable(&mut self) { debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len()); + if self.failed { + return; + } + let mutable = &self.version.memtables.mutable; // Takes mutations from the wal entry. let mutations = mem::take(&mut self.wal_entry.mutations); diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index de6b6a5c5ea0..873ef726fb99 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -24,7 +24,10 @@ use store_api::metadata::RegionMetadata; use store_api::storage::RegionId; use crate::error::{RejectWriteSnafu, Result}; -use crate::metrics::{STAGE_LABEL, WRITE_REJECT_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL}; +use crate::metrics::{ + STAGE_LABEL, TYPE_LABEL, WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, + WRITE_STALL_TOTAL, +}; use crate::region_write_ctx::RegionWriteCtx; use crate::request::{SenderWriteRequest, WriteRequest}; use crate::worker::RegionWorkerLoop; @@ -80,13 +83,19 @@ impl RegionWorkerLoop { } } + let (mut put_rows, mut delete_rows) = (0, 0); // Write to memtables. { let _timer = timer!(WRITE_STAGE_ELAPSED, &[(STAGE_LABEL, "write_memtable")]); for mut region_ctx in region_ctxs.into_values() { region_ctx.write_memtable(); + put_rows += region_ctx.put_num; + delete_rows += region_ctx.delete_num; } } + + counter!(WRITE_ROWS_TOTAL, put_rows as u64, TYPE_LABEL => "put"); + counter!(WRITE_ROWS_TOTAL, delete_rows as u64, TYPE_LABEL => "delete"); } } diff --git a/src/promql/Cargo.toml b/src/promql/Cargo.toml index 00f55ce296c5..990197a34c4b 100644 --- a/src/promql/Cargo.toml +++ b/src/promql/Cargo.toml @@ -17,6 +17,7 @@ datafusion.workspace = true datatypes = { workspace = true } futures = "0.3" greptime-proto.workspace = true +metrics = { workspace = true } promql-parser = "0.1.1" prost.workspace = true session = { workspace = true } diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index 3487fe578678..fdf2c9cdf9bc 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -38,6 +38,7 @@ use prost::Message; use snafu::ResultExt; use crate::error::{DeserializeSnafu, Result}; +use crate::metrics::PROMQL_SERIES_COUNT; #[derive(Debug, PartialEq, Eq, Hash)] pub struct SeriesDivide { @@ -204,6 +205,7 @@ impl ExecutionPlan for SeriesDivideExec { schema, input, metric: baseline_metric, + num_series: 0, })) } @@ -239,6 +241,7 @@ pub struct SeriesDivideStream { schema: SchemaRef, input: SendableRecordBatchStream, metric: BaselineMetrics, + num_series: usize, } impl RecordBatchStream for SeriesDivideStream { @@ -259,6 +262,7 @@ impl Stream for SeriesDivideStream { Some(Ok(batch)) => batch, None => { self.buffer = None; + self.num_series += 1; return Poll::Ready(Some(Ok(batch))); } error => return Poll::Ready(error), @@ -271,12 +275,16 @@ impl Stream for SeriesDivideStream { let result_batch = batch.slice(0, same_length); let remaining_batch = batch.slice(same_length, batch.num_rows() - same_length); self.buffer = Some(remaining_batch); + self.num_series += 1; return Poll::Ready(Some(Ok(result_batch))); } } else { let batch = match ready!(self.as_mut().fetch_next_batch(cx)) { Some(Ok(batch)) => batch, - None => return Poll::Ready(None), + None => { + metrics::histogram!(PROMQL_SERIES_COUNT, self.num_series as f64); + return Poll::Ready(None); + } error => return Poll::Ready(error), }; self.buffer = Some(batch); diff --git a/src/promql/src/lib.rs b/src/promql/src/lib.rs index 1f636dcb5394..9514a015380b 100644 --- a/src/promql/src/lib.rs +++ b/src/promql/src/lib.rs @@ -18,5 +18,6 @@ pub mod error; pub mod extension_plan; pub mod functions; +mod metrics; pub mod planner; pub mod range_array; diff --git a/src/promql/src/metrics.rs b/src/promql/src/metrics.rs new file mode 100644 index 000000000000..b8bebf7a43f3 --- /dev/null +++ b/src/promql/src/metrics.rs @@ -0,0 +1,16 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Counter for the number of series processed per query. +pub static PROMQL_SERIES_COUNT: &str = "promql.series_count"; diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index ce07b9e5dde2..cc06f84d82f0 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -14,6 +14,7 @@ use std::any::Any; use std::sync::Arc; +use std::time::Duration; use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use async_stream::stream; @@ -39,8 +40,12 @@ use futures_util::StreamExt; use greptime_proto::v1::region::{QueryRequest, RegionRequestHeader}; use snafu::ResultExt; use store_api::storage::RegionId; +use tokio::time::Instant; use crate::error::ConvertSchemaSnafu; +use crate::metrics::{ + METRIC_MERGE_SCAN_ERRORS_TOTAL, METRIC_MERGE_SCAN_POLL_ELAPSED, METRIC_MERGE_SCAN_REGIONS, +}; use crate::region_query::RegionQueryHandlerRef; #[derive(Debug, Hash, PartialEq, Eq, Clone)] @@ -161,6 +166,7 @@ impl MergeScanExec { let trace_id = trace_id().unwrap_or_default(); let stream = Box::pin(stream!({ + metrics::histogram!(METRIC_MERGE_SCAN_REGIONS, regions.len() as f64); let _finish_timer = metric.finish_time().timer(); let mut ready_timer = metric.ready_time().timer(); let mut first_consume_timer = Some(metric.first_consume_time().timer()); @@ -178,12 +184,21 @@ impl MergeScanExec { let mut stream = region_query_handler .do_get(request) .await - .map_err(BoxedError::new) + .map_err(|e| { + metrics::increment_counter!(METRIC_MERGE_SCAN_ERRORS_TOTAL); + BoxedError::new(e) + }) .context(ExternalSnafu)?; ready_timer.stop(); + let mut poll_duration = Duration::new(0, 0); + + let mut poll_timer = Instant::now(); while let Some(batch) = stream.next().await { + let poll_elapsed = poll_timer.elapsed(); + poll_duration += poll_elapsed; + let batch = batch?; // reconstruct batch using `self.schema` // to remove metadata and correct column name @@ -193,7 +208,10 @@ impl MergeScanExec { first_consume_timer.stop(); } yield Ok(batch); + // reset poll timer + poll_timer = Instant::now(); } + metrics::histogram!(METRIC_MERGE_SCAN_POLL_ELAPSED, poll_duration.as_secs_f64()); } })); diff --git a/src/query/src/metrics.rs b/src/query/src/metrics.rs index 489e7fb62a56..7efac4e4141b 100644 --- a/src/query/src/metrics.rs +++ b/src/query/src/metrics.rs @@ -18,3 +18,6 @@ pub static METRIC_OPTIMIZE_LOGICAL_ELAPSED: &str = "query.optimize_logicalplan_e pub static METRIC_OPTIMIZE_PHYSICAL_ELAPSED: &str = "query.optimize_physicalplan_elapsed"; pub static METRIC_CREATE_PHYSICAL_ELAPSED: &str = "query.create_physicalplan_elapsed"; pub static METRIC_EXEC_PLAN_ELAPSED: &str = "query.execute_plan_elapsed"; +pub static METRIC_MERGE_SCAN_POLL_ELAPSED: &str = "query.merge_scan.poll_elapsed"; +pub static METRIC_MERGE_SCAN_REGIONS: &str = "query.merge_scan.regions"; +pub static METRIC_MERGE_SCAN_ERRORS_TOTAL: &str = "query.merge_scan.errors_total"; diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 9d608f2077da..04251b81aeed 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -30,7 +30,6 @@ use crate::storage::{ColumnId, RegionId, ScanRequest}; #[derive(Debug, IntoStaticStr)] pub enum RegionRequest { - // TODO: rename to InsertRequest Put(RegionPutRequest), Delete(RegionDeleteRequest), Create(RegionCreateRequest),