diff --git a/src/common/query/src/lib.rs b/src/common/query/src/lib.rs index 09d4d411bbfc..626a2c01fad9 100644 --- a/src/common/query/src/lib.rs +++ b/src/common/query/src/lib.rs @@ -12,6 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod columnar_value; +pub mod error; +mod function; +pub mod logical_plan; +pub mod physical_plan; +pub mod prelude; +mod signature; + use std::fmt::{Debug, Display, Formatter}; use std::sync::Arc; @@ -20,14 +28,6 @@ use api::greptime_proto::v1::AddColumnLocation as Location; use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; use physical_plan::PhysicalPlan; use serde::{Deserialize, Serialize}; - -pub mod columnar_value; -pub mod error; -mod function; -pub mod logical_plan; -pub mod physical_plan; -pub mod prelude; -mod signature; use sqlparser_derive::{Visit, VisitMut}; /// new Output struct with output data(previously Output) and output meta diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index 5f5cc45abbd2..35d32e20843e 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Display; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; @@ -22,7 +23,10 @@ use datafusion::arrow::compute::cast; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; use datafusion::error::Result as DfResult; use datafusion::physical_plan::metrics::{BaselineMetrics, MetricValue}; -use datafusion::physical_plan::{ExecutionPlan, RecordBatchStream as DfRecordBatchStream}; +use datafusion::physical_plan::{ + accept, displayable, ExecutionPlan, ExecutionPlanVisitor, + RecordBatchStream as DfRecordBatchStream, +}; use datafusion_common::arrow::error::ArrowError; use datafusion_common::DataFusionError; use datatypes::schema::{Schema, SchemaRef}; @@ -228,7 +232,7 @@ impl RecordBatchStream for RecordBatchStreamAdapter { fn metrics(&self) -> Option { match &self.metrics_2 { - Metrics::Resolved(metrics) => Some(*metrics), + Metrics::Resolved(metrics) => Some(metrics.clone()), Metrics::Unavailable | Metrics::Unresolved(_) => None, } } @@ -259,11 +263,9 @@ impl Stream for RecordBatchStreamAdapter { } Poll::Ready(None) => { if let Metrics::Unresolved(df_plan) = &self.metrics_2 { - let mut metrics_holder = RecordBatchMetrics::default(); - collect_metrics(df_plan, &mut metrics_holder); - if metrics_holder.elapsed_compute != 0 || metrics_holder.memory_usage != 0 { - self.metrics_2 = Metrics::Resolved(metrics_holder); - } + let mut metric_collector = MetricCollector::default(); + accept(df_plan.as_ref(), &mut metric_collector).unwrap(); + self.metrics_2 = Metrics::Resolved(metric_collector.record_batch_metrics); } Poll::Ready(None) } @@ -276,28 +278,110 @@ impl Stream for RecordBatchStreamAdapter { } } -fn collect_metrics(df_plan: &Arc, result: &mut RecordBatchMetrics) { - if let Some(metrics) = df_plan.metrics() { - metrics.iter().for_each(|m| match m.value() { - MetricValue::ElapsedCompute(ec) => result.elapsed_compute += ec.value(), - MetricValue::CurrentMemoryUsage(m) => result.memory_usage += m.value(), - _ => {} - }); +/// An [ExecutionPlanVisitor] to collect metrics from a [ExecutionPlan]. +#[derive(Default)] +pub struct MetricCollector { + current_level: usize, + pub record_batch_metrics: RecordBatchMetrics, +} + +impl ExecutionPlanVisitor for MetricCollector { + type Error = !; + + fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> std::result::Result { + // skip if no metric available + let Some(metric) = plan.metrics() else { + self.record_batch_metrics.plan_metrics.push(PlanMetrics { + plan: plan.name().to_string(), + level: self.current_level, + metrics: vec![], + }); + return Ok(true); + }; + + // scrape plan metrics + let metric = metric + .aggregate_by_name() + .sorted_for_display() + .timestamps_removed(); + let mut plan_metric = PlanMetrics { + plan: displayable(plan).one_line().to_string(), + level: self.current_level, + metrics: Vec::with_capacity(metric.iter().size_hint().0), + }; + for m in metric.iter() { + plan_metric + .metrics + .push((m.value().name().to_string(), m.value().as_usize())); + + // aggregate high-level metrics + match m.value() { + MetricValue::ElapsedCompute(ec) => { + self.record_batch_metrics.elapsed_compute += ec.value() + } + MetricValue::CurrentMemoryUsage(m) => { + self.record_batch_metrics.memory_usage += m.value() + } + _ => {} + } + } + self.record_batch_metrics.plan_metrics.push(plan_metric); + + self.current_level += 1; + Ok(true) } - for child in df_plan.children() { - collect_metrics(&child, result); + fn post_visit(&mut self, _plan: &dyn ExecutionPlan) -> std::result::Result { + // the last minus will underflow + self.current_level = self.current_level.wrapping_sub(1); + Ok(true) } } /// [`RecordBatchMetrics`] carrys metrics value /// from datanode to frontend through gRPC -#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone, Copy)] +#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)] pub struct RecordBatchMetrics { - // cpu consumption in nanoseconds + // High-level aggregated metrics + /// CPU consumption in nanoseconds pub elapsed_compute: usize, - // memory used by the plan in bytes + /// Memory used by the plan in bytes pub memory_usage: usize, + // Detailed per-plan metrics + /// An ordered list of plan metrics, from top to bottom in post-order. + pub plan_metrics: Vec, +} + +/// Only display `plan_metrics` with indent ` ` (2 spaces). +impl Display for RecordBatchMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for metric in &self.plan_metrics { + write!( + f, + "{:indent$}{} metrics=[", + " ", + metric.plan.trim_end(), + indent = metric.level * 2, + )?; + for (label, value) in &metric.metrics { + write!(f, "{}: {}, ", label, value)?; + } + writeln!(f, "]")?; + } + + Ok(()) + } +} + +#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)] +pub struct PlanMetrics { + /// The plan name + pub plan: String, + /// The level of the plan, starts from 0 + pub level: usize, + /// An ordered key-value list of metrics. + /// Key is metric label and value is metric value. + pub metrics: Vec<(String, usize)>, } enum AsyncRecordBatchStreamAdapterState { diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index ff3135d9e4e8..0016e02e94ed 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(never_type)] + pub mod adapter; pub mod error; pub mod filter; @@ -260,7 +262,7 @@ impl> + Unpin> RecordBatchStream } fn metrics(&self) -> Option { - self.metrics.load().as_ref().map(|s| *s.as_ref()) + self.metrics.load().as_ref().map(|s| s.as_ref().clone()) } } diff --git a/src/query/src/analyze.rs b/src/query/src/analyze.rs new file mode 100644 index 000000000000..402f7cb29d42 --- /dev/null +++ b/src/query/src/analyze.rs @@ -0,0 +1,229 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Customized `ANALYZE` plan that aware of [MergeScanExec]. +//! +//! The code skeleton is taken from `datafusion/physical-plan/src/analyze.rs` + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{StringBuilder, UInt32Builder}; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use common_query::{DfPhysicalPlan, DfPhysicalPlanRef}; +use common_recordbatch::adapter::{MetricCollector, RecordBatchMetrics}; +use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream}; +use datafusion::error::Result as DfResult; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + accept, DisplayAs, DisplayFormatType, ExecutionPlanProperties, PlanProperties, +}; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; +use datafusion_common::{internal_err, DataFusionError}; +use datafusion_physical_expr::{Distribution, EquivalenceProperties, Partitioning}; +use futures::StreamExt; + +use crate::dist_plan::MergeScanExec; + +const STAGE: &str = "stage"; +const NODE: &str = "node"; +const PLAN: &str = "plan"; + +#[derive(Debug)] +pub struct DistAnalyzeExec { + input: DfPhysicalPlanRef, + schema: SchemaRef, + properties: PlanProperties, +} + +impl DistAnalyzeExec { + /// Create a new DistAnalyzeExec + pub fn new(input: DfPhysicalPlanRef) -> Self { + let schema = SchemaRef::new(Schema::new(vec![ + Field::new(STAGE, DataType::UInt32, true), + Field::new(NODE, DataType::UInt32, true), + Field::new(PLAN, DataType::Utf8, true), + ])); + let properties = Self::compute_properties(&input, schema.clone()); + Self { + input, + schema, + properties, + } + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties(input: &DfPhysicalPlanRef, schema: SchemaRef) -> PlanProperties { + let eq_properties = EquivalenceProperties::new(schema); + let output_partitioning = Partitioning::UnknownPartitioning(1); + let exec_mode = input.execution_mode(); + PlanProperties::new(eq_properties, output_partitioning, exec_mode) + } +} + +impl DisplayAs for DistAnalyzeExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "DistAnalyzeExec",) + } + } + } +} + +impl DfPhysicalPlan for DistAnalyzeExec { + fn name(&self) -> &'static str { + "DistAnalyzeExec" + } + + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn children(&self) -> Vec { + vec![self.input.clone()] + } + + /// AnalyzeExec is handled specially so this value is ignored + fn required_input_distribution(&self) -> Vec { + vec![] + } + + fn with_new_children( + self: Arc, + mut children: Vec, + ) -> DfResult { + Ok(Arc::new(Self::new(children.pop().unwrap()))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DfResult { + if 0 != partition { + return internal_err!("AnalyzeExec invalid partition. Expected 0, got {partition}"); + } + + // Wrap the input plan using `CoalescePartitionsExec` to poll multiple + // partitions in parallel + let coalesce_partition_plan = CoalescePartitionsExec::new(self.input.clone()); + + // Create future that computes thefinal output + let captured_input = self.input.clone(); + let captured_schema = self.schema.clone(); + + // Finish the input stream and create the output + let mut input_stream = coalesce_partition_plan.execute(0, context)?; + let output = async move { + let mut total_rows = 0; + while let Some(batch) = input_stream.next().await.transpose()? { + total_rows += batch.num_rows(); + } + + create_output_batch(total_rows, captured_input, captured_schema) + }; + + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + futures::stream::once(output), + ))) + } +} + +/// Build the result [`DfRecordBatch`] of `ANALYZE` +struct AnalyzeOutputBuilder { + stage_builder: UInt32Builder, + node_builder: UInt32Builder, + plan_builder: StringBuilder, + schema: SchemaRef, +} + +impl AnalyzeOutputBuilder { + fn new(schema: SchemaRef) -> Self { + Self { + stage_builder: UInt32Builder::with_capacity(4), + node_builder: UInt32Builder::with_capacity(4), + plan_builder: StringBuilder::with_capacity(1, 1024), + schema, + } + } + + fn append_metric(&mut self, stage: u32, node: u32, metric: RecordBatchMetrics) { + self.stage_builder.append_value(stage); + self.node_builder.append_value(node); + self.plan_builder.append_value(metric.to_string()); + } + + fn append_total_rows(&mut self, total_rows: usize) { + self.stage_builder.append_null(); + self.node_builder.append_null(); + self.plan_builder + .append_value(format!("Total rows: {}", total_rows)); + } + + fn finish(mut self) -> DfResult { + DfRecordBatch::try_new( + self.schema, + vec![ + Arc::new(self.stage_builder.finish()), + Arc::new(self.node_builder.finish()), + Arc::new(self.plan_builder.finish()), + ], + ) + .map_err(DataFusionError::from) + } +} + +/// Creates the output of AnalyzeExec as a RecordBatch +fn create_output_batch( + total_rows: usize, + input: DfPhysicalPlanRef, + schema: SchemaRef, +) -> DfResult { + let mut builder = AnalyzeOutputBuilder::new(schema); + + // Treat the current stage as stage 0. Fetch its metrics + let mut collector = MetricCollector::default(); + // Safety: metric collector won't return error + accept(input.as_ref(), &mut collector).unwrap(); + let stage_0_metrics = collector.record_batch_metrics; + + // Append the metrics of the current stage + builder.append_metric(0, 0, stage_0_metrics); + + // Find merge scan and append its sub_stage_metrics + input.apply(&mut |plan| { + if let Some(merge_scan) = plan.as_any().downcast_ref::() { + let sub_stage_metrics = merge_scan.sub_stage_metrics(); + for (node, metric) in sub_stage_metrics.into_iter().enumerate() { + builder.append_metric(1, node as _, metric); + } + return Ok(TreeNodeRecursion::Stop); + } + Ok(TreeNodeRecursion::Continue) + })?; + + // Write total rows + builder.append_total_rows(total_rows); + + builder.finish() +} diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index f646d6b90ce3..32fa4cdd0b69 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -44,6 +44,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use table::requests::{DeleteRequest, InsertRequest}; use table::TableRef; +use crate::analyze::DistAnalyzeExec; use crate::dataframe::DataFrame; pub use crate::datafusion::planner::DfContextProviderAdapter; use crate::error::{ @@ -407,9 +408,7 @@ impl PhysicalOptimizer for DatafusionQueryEngine { .optimize(new_plan, config) .context(DataFusionSnafu)?; } - Arc::new(analyze_plan.clone()) - .with_new_children(vec![new_plan]) - .unwrap() + Arc::new(DistAnalyzeExec::new(new_plan)) } else { let mut new_plan = df_plan; for optimizer in state.physical_optimizers() { diff --git a/src/query/src/dist_plan.rs b/src/query/src/dist_plan.rs index 830f3b14cf72..6ab93d4e1dba 100644 --- a/src/query/src/dist_plan.rs +++ b/src/query/src/dist_plan.rs @@ -18,5 +18,5 @@ mod merge_scan; mod planner; pub use analyzer::DistPlannerAnalyzer; -pub use merge_scan::MergeScanLogicalPlan; +pub use merge_scan::{MergeScanExec, MergeScanLogicalPlan}; pub use planner::DistExtensionPlanner; diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 6845e26bc10c..29ae1a083fa1 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::any::Any; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; @@ -24,7 +24,7 @@ use common_error::ext::BoxedError; use common_meta::table_name::TableName; use common_plugins::GREPTIME_EXEC_READ_COST; use common_query::physical_plan::TaskContext; -use common_recordbatch::adapter::DfRecordBatchStreamAdapter; +use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics}; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{ DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream, @@ -128,6 +128,8 @@ pub struct MergeScanExec { region_query_handler: RegionQueryHandlerRef, metric: ExecutionPlanMetricsSet, properties: PlanProperties, + /// Metrics from sub stages + sub_stage_metrics: Arc>>, query_ctx: QueryContextRef, } @@ -166,6 +168,7 @@ impl MergeScanExec { arrow_schema: arrow_schema_without_metadata, region_query_handler, metric: ExecutionPlanMetricsSet::new(), + sub_stage_metrics: Arc::default(), properties, query_ctx, }) @@ -185,6 +188,7 @@ impl MergeScanExec { let timezone = self.query_ctx.timezone().to_string(); let extensions = self.query_ctx.extensions(); + let sub_sgate_metrics_moved = self.sub_stage_metrics.clone(); let stream = Box::pin(stream!({ MERGE_SCAN_REGIONS.observe(regions.len() as f64); let _finish_timer = metric.finish_time().timer(); @@ -236,6 +240,8 @@ impl MergeScanExec { // reset poll timer poll_timer = Instant::now(); } + + // process metrics after all data is drained. if let Some(metrics) = stream.metrics() { let (c, s) = parse_catalog_and_schema_from_db_string(&dbname); let value = read_meter!( @@ -247,6 +253,9 @@ impl MergeScanExec { } ); metric.record_greptime_exec_cost(value as usize); + + // record metrics from sub sgates + sub_sgate_metrics_moved.lock().unwrap().push(metrics); } MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64()); @@ -279,6 +288,10 @@ impl MergeScanExec { let schema = Schema::try_from(arrow_schema).context(ConvertSchemaSnafu)?; Ok(Arc::new(schema)) } + + pub fn sub_stage_metrics(&self) -> Vec { + self.sub_stage_metrics.lock().unwrap().clone() + } } impl ExecutionPlan for MergeScanExec { diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 7d325668bad9..9b6413e4ed92 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -15,6 +15,7 @@ #![feature(let_chains)] #![feature(int_roundings)] +mod analyze; pub mod dataframe; pub mod datafusion; pub mod dist_plan; diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 4eb907063e28..22ce06e9208d 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -49,7 +49,6 @@ impl Debug for StreamScanAdapter { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("StreamScanAdapter") .field("stream", &"") - .field("schema", &self.schema.arrow_schema().fields) .finish() } } diff --git a/tests/cases/distributed/explain/analyze.result b/tests/cases/distributed/explain/analyze.result new file mode 100644 index 000000000000..1ce443adfff6 --- /dev/null +++ b/tests/cases/distributed/explain/analyze.result @@ -0,0 +1,46 @@ +CREATE TABLE system_metrics ( + host STRING, + idc STRING, + cpu_util DOUBLE, + memory_util DOUBLE, + disk_util DOUBLE, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP(), + PRIMARY KEY(host, idc), + TIME INDEX(ts) +); + +Affected Rows: 0 + +INSERT INTO system_metrics +VALUES + ("host1", "idc_a", 11.8, 10.3, 10.3, 1667446797450), + ("host2", "idc_a", 80.0, 70.3, 90.0, 1667446797450), + ("host1", "idc_b", 50.0, 66.7, 40.6, 1667446797450); + +Affected Rows: 3 + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +explain analyze SELECT count(*) FROM system_metrics; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_AggregateExec: mode=Final, gby=[], aggr=[COUNT(greptime.public.system_REDACTED +|_|_|_CoalescePartitionsExec REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(greptime.public.system_REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_StreamScanAdapter { stream: "" } REDACTED +|_|_|_| +|_|_| Total rows: 1_| ++-+-+-+ + +drop table system_metrics; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/explain/analyze.sql b/tests/cases/distributed/explain/analyze.sql new file mode 100644 index 000000000000..1dfbc7166e97 --- /dev/null +++ b/tests/cases/distributed/explain/analyze.sql @@ -0,0 +1,25 @@ +CREATE TABLE system_metrics ( + host STRING, + idc STRING, + cpu_util DOUBLE, + memory_util DOUBLE, + disk_util DOUBLE, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP(), + PRIMARY KEY(host, idc), + TIME INDEX(ts) +); + +INSERT INTO system_metrics +VALUES + ("host1", "idc_a", 11.8, 10.3, 10.3, 1667446797450), + ("host2", "idc_a", 80.0, 70.3, 90.0, 1667446797450), + ("host1", "idc_b", 50.0, 66.7, 40.6, 1667446797450); + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +explain analyze SELECT count(*) FROM system_metrics; + +drop table system_metrics; diff --git a/tests/cases/distributed/optimizer/order_by.result b/tests/cases/distributed/optimizer/order_by.result index 40e73f6410ea..88ce54ce5de9 100644 --- a/tests/cases/distributed/optimizer/order_by.result +++ b/tests/cases/distributed/optimizer/order_by.result @@ -1,61 +1,61 @@ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers; -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | -| | | -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+-------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | StreamScanAdapter { stream: "" } | +| | | ++---------------+-------------------------------------------------------------+ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers order by number desc; -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | SortExec: expr=[number@0 DESC] | -| | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | -| | | -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+---------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SortExec: expr=[number@0 DESC] | +| | StreamScanAdapter { stream: "" } | +| | | ++---------------+---------------------------------------------------------------+ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers order by number asc; -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | -| | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | -| | | -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+---------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | +| | StreamScanAdapter { stream: "" } | +| | | ++---------------+---------------------------------------------------------------+ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers order by number desc limit 10; -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | GlobalLimitExec: skip=0, fetch=10 | -| | SortExec: TopK(fetch=10), expr=[number@0 DESC] | -| | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | -| | | -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+-----------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: TopK(fetch=10), expr=[number@0 DESC] | +| | StreamScanAdapter { stream: "" } | +| | | ++---------------+-----------------------------------------------------------------+ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers order by number asc limit 10; -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | GlobalLimitExec: skip=0, fetch=10 | -| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST] | -| | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | -| | | -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+-----------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST] | +| | StreamScanAdapter { stream: "" } | +| | | ++---------------+-----------------------------------------------------------------+ diff --git a/tests/cases/standalone/common/range/nest.result b/tests/cases/standalone/common/range/nest.result index 4c5adc8032b6..4484d96c699e 100644 --- a/tests/cases/standalone/common/range/nest.result +++ b/tests/cases/standalone/common/range/nest.result @@ -68,13 +68,16 @@ EXPLAIN SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s'; -- SQLNESS REPLACE (metrics.*) REDACTED EXPLAIN ANALYZE SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s'; -+-+-+ -| plan_type_| plan_| -+-+-+ -| Plan with Metrics | RangeSelectExec: range_expr=[MIN(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host@1], time_index=ts, REDACTED -|_|_MergeScanExec: REDACTED -|_|_| -+-+-+ ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_RangeSelectExec: range_expr=[MIN(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host@1], time_index=ts REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_StreamScanAdapter { stream: "" } REDACTED +|_|_|_| +|_|_| Total rows: 10_| ++-+-+-+ DROP TABLE host; diff --git a/tests/cases/standalone/common/tql-explain-analyze/analyze.result b/tests/cases/standalone/common/tql-explain-analyze/analyze.result index fcc3d50bbde7..8b5b09dc803a 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/analyze.result +++ b/tests/cases/standalone/common/tql-explain-analyze/analyze.result @@ -15,16 +15,25 @@ Affected Rows: 3 -- SQLNESS REPLACE (peers.*) REDACTED TQL ANALYZE (0, 10, '5s') test; -+-+-+ -| plan_type_| plan_| -+-+-+ -| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED -|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED -|_|_PromSeriesDivideExec: tags=["k"], REDACTED -|_|_SortExec: expr=[k@2 ASC NULLS LAST], REDACTED -|_|_MergeScanExec: REDACTED -|_|_| -+-+-+ ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED +|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED +|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED +|_|_|_SortExec: expr=[k@2 ASC NULLS LAST] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_StreamScanAdapter { stream: "" } REDACTED +|_|_|_| +|_|_| Total rows: 4_| ++-+-+-+ -- 'lookback' parameter is not fully supported, the test has to be updated -- analyze at 0s, 5s and 10s. No point at 0s. @@ -35,16 +44,25 @@ TQL ANALYZE (0, 10, '5s') test; -- SQLNESS REPLACE (peers.*) REDACTED TQL ANALYZE (0, 10, '1s', '2s') test; -+-+-+ -| plan_type_| plan_| -+-+-+ -| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[2000], interval=[1000], time index=[j], REDACTED -|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED -|_|_PromSeriesDivideExec: tags=["k"], REDACTED -|_|_SortExec: expr=[k@2 ASC NULLS LAST], REDACTED -|_|_MergeScanExec: REDACTED -|_|_| -+-+-+ ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[2000], interval=[1000], time index=[j] REDACTED +|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED +|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED +|_|_|_SortExec: expr=[k@2 ASC NULLS LAST] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: j@1 >= -2000 AND j@1 <= 12000 REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_StreamScanAdapter { stream: "" } REDACTED +|_|_|_| +|_|_| Total rows: 4_| ++-+-+-+ -- analyze at 0s, 5s and 10s. No point at 0s. -- SQLNESS REPLACE (metrics.*) REDACTED @@ -54,16 +72,25 @@ TQL ANALYZE (0, 10, '1s', '2s') test; -- SQLNESS REPLACE (peers.*) REDACTED TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp + '10 seconds'::interval, '5s') test; -+-+-+ -| plan_type_| plan_| -+-+-+ -| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED -|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED -|_|_PromSeriesDivideExec: tags=["k"], REDACTED -|_|_SortExec: expr=[k@2 ASC NULLS LAST], REDACTED -|_|_MergeScanExec: REDACTED -|_|_| -+-+-+ ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED +|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED +|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED +|_|_|_SortExec: expr=[k@2 ASC NULLS LAST] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_StreamScanAdapter { stream: "" } REDACTED +|_|_|_| +|_|_| Total rows: 4_| ++-+-+-+ -- analyze verbose at 0s, 5s and 10s. No point at 0s. -- SQLNESS REPLACE (-+) - @@ -75,24 +102,25 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp -- SQLNESS REPLACE (Duration.*) REDACTED TQL ANALYZE VERBOSE (0, 10, '5s') test; -+-+-+ -| plan_type_| plan_| -+-+-+ -| Plan with Metrics_| PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED -|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED -|_|_PromSeriesDivideExec: tags=["k"], REDACTED -|_|_SortExec: expr=[k@2 ASC NULLS LAST], REDACTED -|_|_MergeScanExec: REDACTED -|_|_| -| Plan with Full Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED -|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED -|_|_PromSeriesDivideExec: tags=["k"], REDACTED -|_|_SortExec: expr=[k@2 ASC NULLS LAST], REDACTED -|_|_MergeScanExec: REDACTED -|_|_| -| Output Rows_| 4_| -| REDACTED -+-+-+ ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED +|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED +|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED +|_|_|_SortExec: expr=[k@2 ASC NULLS LAST] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_StreamScanAdapter { stream: "" } REDACTED +|_|_|_| +|_|_| Total rows: 4_| ++-+-+-+ DROP TABLE test; diff --git a/tests/cases/standalone/optimizer/order_by.result b/tests/cases/standalone/optimizer/order_by.result index eea30ecf01f5..574f753e4073 100644 --- a/tests/cases/standalone/optimizer/order_by.result +++ b/tests/cases/standalone/optimizer/order_by.result @@ -1,56 +1,56 @@ explain select * from numbers; -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | -| | | -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+-------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | StreamScanAdapter { stream: "" } | +| | | ++---------------+-------------------------------------------------------------+ explain select * from numbers order by number desc; -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | SortExec: expr=[number@0 DESC] | -| | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | -| | | -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+---------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SortExec: expr=[number@0 DESC] | +| | StreamScanAdapter { stream: "" } | +| | | ++---------------+---------------------------------------------------------------+ explain select * from numbers order by number asc; -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | -| | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | -| | | -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+---------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | +| | StreamScanAdapter { stream: "" } | +| | | ++---------------+---------------------------------------------------------------+ explain select * from numbers order by number desc limit 10; -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | GlobalLimitExec: skip=0, fetch=10 | -| | SortExec: TopK(fetch=10), expr=[number@0 DESC] | -| | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | -| | | -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+-----------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: TopK(fetch=10), expr=[number@0 DESC] | +| | StreamScanAdapter { stream: "" } | +| | | ++---------------+-----------------------------------------------------------------+ explain select * from numbers order by number asc limit 10; -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | GlobalLimitExec: skip=0, fetch=10 | -| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST] | -| | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | -| | | -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+-----------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST] | +| | StreamScanAdapter { stream: "" } | +| | | ++---------------+-----------------------------------------------------------------+