From 63646949f83b2a13dc30d2480dcbaf8aa550fabc Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Mon, 11 Sep 2023 17:07:39 +0800 Subject: [PATCH 1/3] chore: try custom metrics --- src/table/src/table.rs | 1 + src/table/src/table/metrics.rs | 39 ++++++++++++++++++++++++++++++++++ src/table/src/table/scan.rs | 20 +++++++++++------ 3 files changed, 54 insertions(+), 6 deletions(-) create mode 100644 src/table/src/table/metrics.rs diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 65a110658a40..099481a9077b 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod adapter; +mod metrics; pub mod numbers; pub mod scan; diff --git a/src/table/src/table/metrics.rs b/src/table/src/table/metrics.rs new file mode 100644 index 000000000000..e468b06e9d0d --- /dev/null +++ b/src/table/src/table/metrics.rs @@ -0,0 +1,39 @@ +use datafusion::physical_plan::metrics::{ + ExecutionPlanMetricsSet, Gauge, MetricBuilder, Timestamp, +}; + +#[derive(Debug, Clone)] +pub struct MemoryUsageMetrics { + end_time: Timestamp, + mem_used: Gauge, +} + +impl MemoryUsageMetrics { + /// Create a new BaselineMetric structure, and set `start_time` to now + pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + let start_time = MetricBuilder::new(metrics).start_timestamp(partition); + start_time.record(); + + Self { + end_time: MetricBuilder::new(metrics).end_timestamp(partition), + mem_used: MetricBuilder::new(metrics).gauge("mem_used", partition), + } + } + + pub fn record_mem_usage(&self, mem_used: usize) { + self.mem_used.add(mem_used); + } + + /// If not previously recorded `done()`, record + pub fn try_done(&self) { + if self.end_time.value().is_none() { + self.end_time.record() + } + } +} + +impl Drop for MemoryUsageMetrics { + fn drop(&mut self) { + self.try_done() + } +} diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 01b57879cf6a..c76bd4ed1263 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -24,12 +24,14 @@ use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef}; use common_recordbatch::error::Result as RecordBatchResult; use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use datafusion::execution::context::TaskContext; -use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion_physical_expr::PhysicalSortExpr; use datatypes::schema::SchemaRef; use futures::{Stream, StreamExt}; use snafu::OptionExt; +use crate::table::metrics::MemoryUsageMetrics; + /// Adapt greptime's [SendableRecordBatchStream] to GreptimeDB's [PhysicalPlan]. pub struct StreamScanAdapter { stream: Mutex>, @@ -97,10 +99,10 @@ impl PhysicalPlan for StreamScanAdapter { ) -> QueryResult { let mut stream = self.stream.lock().unwrap(); let stream = stream.take().context(query_error::ExecuteRepeatedlySnafu)?; - let baseline_metric = BaselineMetrics::new(&self.metric, partition); + let mem_usage_metrics = MemoryUsageMetrics::new(&self.metric, partition); Ok(Box::pin(StreamWithMetricWrapper { stream, - metric: baseline_metric, + metric: mem_usage_metrics, })) } @@ -111,7 +113,7 @@ impl PhysicalPlan for StreamScanAdapter { pub struct StreamWithMetricWrapper { stream: SendableRecordBatchStream, - metric: BaselineMetrics, + metric: MemoryUsageMetrics, } impl Stream for StreamWithMetricWrapper { @@ -119,10 +121,16 @@ impl Stream for StreamWithMetricWrapper { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - let _timer = this.metric.elapsed_compute().timer(); + // it's calling storage level api + // so we don't record time now let poll = this.stream.poll_next_unpin(cx); if let Poll::Ready(Option::Some(Result::Ok(record_batch))) = &poll { - this.metric.record_output(record_batch.num_rows()); + let batch_mem_size = record_batch + .columns() + .iter() + .map(|vec_ref| vec_ref.memory_size()) + .sum::(); + this.metric.record_mem_usage(batch_mem_size); } poll From 0451ca3ac5600401d4b5ef2ed95e17e27e34ee97 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Mon, 11 Sep 2023 22:36:06 +0800 Subject: [PATCH 2/3] chore: fix header --- src/table/src/table/metrics.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/table/src/table/metrics.rs b/src/table/src/table/metrics.rs index e468b06e9d0d..5708771500c6 100644 --- a/src/table/src/table/metrics.rs +++ b/src/table/src/table/metrics.rs @@ -1,3 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use datafusion::physical_plan::metrics::{ ExecutionPlanMetricsSet, Gauge, MetricBuilder, Timestamp, }; From a027806b684b0ebdb0e1bc536354ed4706b74ce0 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Tue, 12 Sep 2023 10:50:02 +0800 Subject: [PATCH 3/3] chore: minor change --- src/table/src/table/metrics.rs | 19 +++++++++++++++---- src/table/src/table/scan.rs | 7 ++++--- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/table/src/table/metrics.rs b/src/table/src/table/metrics.rs index 5708771500c6..e24f0ff90a6b 100644 --- a/src/table/src/table/metrics.rs +++ b/src/table/src/table/metrics.rs @@ -13,24 +13,31 @@ // limitations under the License. use datafusion::physical_plan::metrics::{ - ExecutionPlanMetricsSet, Gauge, MetricBuilder, Timestamp, + Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, Timestamp, }; +/// This metrics struct is used to record and hold memory usage +/// of result batch in [`crate::table::scan::StreamWithMetricWrapper`] +/// during query execution, indicating size of the dataset. #[derive(Debug, Clone)] pub struct MemoryUsageMetrics { end_time: Timestamp, + // used memory in bytes mem_used: Gauge, + // number of rows in output + output_rows: Count, } impl MemoryUsageMetrics { - /// Create a new BaselineMetric structure, and set `start_time` to now + /// Create a new MemoryUsageMetrics structure, and set `start_time` to now pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { let start_time = MetricBuilder::new(metrics).start_timestamp(partition); start_time.record(); Self { end_time: MetricBuilder::new(metrics).end_timestamp(partition), - mem_used: MetricBuilder::new(metrics).gauge("mem_used", partition), + mem_used: MetricBuilder::new(metrics).mem_used(partition), + output_rows: MetricBuilder::new(metrics).output_rows(partition), } } @@ -38,7 +45,11 @@ impl MemoryUsageMetrics { self.mem_used.add(mem_used); } - /// If not previously recorded `done()`, record + pub fn record_output(&self, num_rows: usize) { + self.output_rows.add(num_rows); + } + + /// Record the end time of the query pub fn try_done(&self) { if self.end_time.value().is_none() { self.end_time.record() diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index c76bd4ed1263..6e04f92d0e1e 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -121,16 +121,17 @@ impl Stream for StreamWithMetricWrapper { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - // it's calling storage level api - // so we don't record time now let poll = this.stream.poll_next_unpin(cx); - if let Poll::Ready(Option::Some(Result::Ok(record_batch))) = &poll { + if let Poll::Ready(Some(Ok(record_batch))) = &poll { let batch_mem_size = record_batch .columns() .iter() .map(|vec_ref| vec_ref.memory_size()) .sum::(); + // we don't record elapsed time here + // since it's calling storage api involving I/O ops this.metric.record_mem_usage(batch_mem_size); + this.metric.record_output(record_batch.num_rows()); } poll