diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 65a110658a40..099481a9077b 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod adapter; +mod metrics; pub mod numbers; pub mod scan; diff --git a/src/table/src/table/metrics.rs b/src/table/src/table/metrics.rs new file mode 100644 index 000000000000..e24f0ff90a6b --- /dev/null +++ b/src/table/src/table/metrics.rs @@ -0,0 +1,64 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datafusion::physical_plan::metrics::{ + Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, Timestamp, +}; + +/// This metrics struct is used to record and hold memory usage +/// of result batch in [`crate::table::scan::StreamWithMetricWrapper`] +/// during query execution, indicating size of the dataset. +#[derive(Debug, Clone)] +pub struct MemoryUsageMetrics { + end_time: Timestamp, + // used memory in bytes + mem_used: Gauge, + // number of rows in output + output_rows: Count, +} + +impl MemoryUsageMetrics { + /// Create a new MemoryUsageMetrics structure, and set `start_time` to now + pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + let start_time = MetricBuilder::new(metrics).start_timestamp(partition); + start_time.record(); + + Self { + end_time: MetricBuilder::new(metrics).end_timestamp(partition), + mem_used: MetricBuilder::new(metrics).mem_used(partition), + output_rows: MetricBuilder::new(metrics).output_rows(partition), + } + } + + pub fn record_mem_usage(&self, mem_used: usize) { + self.mem_used.add(mem_used); + } + + pub fn record_output(&self, num_rows: usize) { + self.output_rows.add(num_rows); + } + + /// Record the end time of the query + pub fn try_done(&self) { + if self.end_time.value().is_none() { + self.end_time.record() + } + } +} + +impl Drop for MemoryUsageMetrics { + fn drop(&mut self) { + self.try_done() + } +} diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 01b57879cf6a..6e04f92d0e1e 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -24,12 +24,14 @@ use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef}; use common_recordbatch::error::Result as RecordBatchResult; use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use datafusion::execution::context::TaskContext; -use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion_physical_expr::PhysicalSortExpr; use datatypes::schema::SchemaRef; use futures::{Stream, StreamExt}; use snafu::OptionExt; +use crate::table::metrics::MemoryUsageMetrics; + /// Adapt greptime's [SendableRecordBatchStream] to GreptimeDB's [PhysicalPlan]. pub struct StreamScanAdapter { stream: Mutex>, @@ -97,10 +99,10 @@ impl PhysicalPlan for StreamScanAdapter { ) -> QueryResult { let mut stream = self.stream.lock().unwrap(); let stream = stream.take().context(query_error::ExecuteRepeatedlySnafu)?; - let baseline_metric = BaselineMetrics::new(&self.metric, partition); + let mem_usage_metrics = MemoryUsageMetrics::new(&self.metric, partition); Ok(Box::pin(StreamWithMetricWrapper { stream, - metric: baseline_metric, + metric: mem_usage_metrics, })) } @@ -111,7 +113,7 @@ impl PhysicalPlan for StreamScanAdapter { pub struct StreamWithMetricWrapper { stream: SendableRecordBatchStream, - metric: BaselineMetrics, + metric: MemoryUsageMetrics, } impl Stream for StreamWithMetricWrapper { @@ -119,9 +121,16 @@ impl Stream for StreamWithMetricWrapper { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - let _timer = this.metric.elapsed_compute().timer(); let poll = this.stream.poll_next_unpin(cx); - if let Poll::Ready(Option::Some(Result::Ok(record_batch))) = &poll { + if let Poll::Ready(Some(Ok(record_batch))) = &poll { + let batch_mem_size = record_batch + .columns() + .iter() + .map(|vec_ref| vec_ref.memory_size()) + .sum::(); + // we don't record elapsed time here + // since it's calling storage api involving I/O ops + this.metric.record_mem_usage(batch_mem_size); this.metric.record_output(record_batch.num_rows()); }