diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index a72b3ff1d22d..c59bbc6555d3 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -31,7 +31,7 @@ use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::RegionEngineRef; use store_api::storage::{RegionId, ScanRequest}; -use table::table::scan::{ReadFromRegion, StreamScanAdapter}; +use table::table::scan::ReadFromRegion; use crate::error::{GetRegionMetadataSnafu, Result}; diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index 9139c8a63b72..5978509f66a7 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -29,7 +29,7 @@ use datafusion_physical_expr::PhysicalSortExpr; use store_api::region_engine::SinglePartitionScanner; use store_api::storage::ScanRequest; -use crate::table::scan::{ReadFromRegion, StreamScanAdapter}; +use crate::table::scan::ReadFromRegion; use crate::table::{TableRef, TableType}; /// Adapt greptime's [TableRef] to DataFusion's [TableProvider]. diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index cdfd272c405e..0e205d0ff705 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -13,12 +13,10 @@ // limitations under the License. use std::any::Any; -use std::fmt::{self, Debug, Formatter}; use std::pin::Pin; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::task::{Context, Poll}; -use common_query::error::ExecuteRepeatedlySnafu; use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream, SendableRecordBatchStream}; use common_telemetry::tracing::Span; use common_telemetry::tracing_context::TracingContext; @@ -30,108 +28,13 @@ use datafusion::physical_plan::{ RecordBatchStream as DfRecordBatchStream, }; use datafusion_common::DataFusionError; -use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr}; +use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortExpr}; use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef; -use datatypes::schema::SchemaRef; use futures::{Stream, StreamExt}; -use snafu::OptionExt; use store_api::region_engine::RegionScannerRef; use crate::table::metrics::MemoryUsageMetrics; -/// Adapt greptime's [SendableRecordBatchStream] to [ExecutionPlan]. -pub struct StreamScanAdapter { - stream: Mutex>, - schema: SchemaRef, - output_ordering: Option>, - metric: ExecutionPlanMetricsSet, - properties: PlanProperties, -} - -impl Debug for StreamScanAdapter { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("StreamScanAdapter") - .field("stream", &"") - .finish() - } -} - -impl StreamScanAdapter { - pub fn new(stream: SendableRecordBatchStream) -> Self { - let schema = stream.schema(); - let properties = PlanProperties::new( - EquivalenceProperties::new(schema.arrow_schema().clone()), - Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, - ); - Self { - stream: Mutex::new(Some(stream)), - schema, - output_ordering: None, - metric: ExecutionPlanMetricsSet::new(), - properties, - } - } - - pub fn with_output_ordering(mut self, output_ordering: Vec) -> Self { - self.output_ordering = Some(output_ordering); - self - } -} - -impl ExecutionPlan for StreamScanAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> ArrowSchemaRef { - self.schema.arrow_schema().clone() - } - - fn properties(&self) -> &PlanProperties { - &self.properties - } - - fn children(&self) -> Vec> { - vec![] - } - - fn with_new_children( - self: Arc, - _children: Vec>, - ) -> DfResult> { - Ok(self) - } - - fn execute( - &self, - partition: usize, - context: Arc, - ) -> DfResult { - let tracing_context = TracingContext::from_json(context.session_id().as_str()); - let span = tracing_context.attach(common_telemetry::tracing::info_span!("stream_adapter")); - - let mut stream = self.stream.lock().unwrap(); - let stream = stream.take().context(ExecuteRepeatedlySnafu)?; - let mem_usage_metrics = MemoryUsageMetrics::new(&self.metric, partition); - Ok(Box::pin(StreamWithMetricWrapper { - stream, - metric: mem_usage_metrics, - span, - })) - } - - fn metrics(&self) -> Option { - Some(self.metric.clone_inner()) - } -} - -impl DisplayAs for StreamScanAdapter { - fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{:?}", self) - } -} - /// A plan to read multiple partitions from a region of a table. #[derive(Debug)] pub struct ReadFromRegion { @@ -271,12 +174,15 @@ impl DfRecordBatchStream for StreamWithMetricWrapper { #[cfg(test)] mod test { + use std::sync::Arc; + use common_recordbatch::{RecordBatch, RecordBatches}; use datafusion::prelude::SessionContext; use datatypes::data_type::ConcreteDataType; - use datatypes::schema::{ColumnSchema, Schema}; + use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::Int32Vector; use futures::TryStreamExt; + use store_api::region_engine::SinglePartitionScanner; use super::*; @@ -304,9 +210,10 @@ mod test { RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap(); let stream = recordbatches.as_stream(); - let scan = StreamScanAdapter::new(stream); + let scanner = Arc::new(SinglePartitionScanner::new(stream)); + let plan = ReadFromRegion::new(scanner); let actual: SchemaRef = Arc::new( - scan.properties + plan.properties .eq_properties .schema() .clone() @@ -315,12 +222,12 @@ mod test { ); assert_eq!(actual, schema); - let stream = scan.execute(0, ctx.task_ctx()).unwrap(); + let stream = plan.execute(0, ctx.task_ctx()).unwrap(); let recordbatches = stream.try_collect::>().await.unwrap(); assert_eq!(batch1.df_record_batch(), &recordbatches[0]); assert_eq!(batch2.df_record_batch(), &recordbatches[1]); - let result = scan.execute(0, ctx.task_ctx()); + let result = plan.execute(0, ctx.task_ctx()); assert!(result.is_err()); match result { Err(e) => assert!(e