From 8dafa39e2e80c480313b9451b1e4117e934f028b Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 11 May 2024 19:50:32 +0800 Subject: [PATCH] feat: table provider use ReadFromRegion --- src/table/src/table/adapter.rs | 10 ++- .../distributed/optimizer/order_by.result | 82 +++++++++---------- .../standalone/optimizer/order_by.result | 82 +++++++++---------- 3 files changed, 88 insertions(+), 86 deletions(-) diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index ffc6618a548a..9139c8a63b72 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -26,9 +26,10 @@ use datafusion_expr::expr::Expr as DfExpr; use datafusion_expr::TableProviderFilterPushDown as DfTableProviderFilterPushDown; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::PhysicalSortExpr; +use store_api::region_engine::SinglePartitionScanner; use store_api::storage::ScanRequest; -use crate::table::scan::StreamScanAdapter; +use crate::table::scan::{ReadFromRegion, StreamScanAdapter}; use crate::table::{TableRef, TableType}; /// Adapt greptime's [TableRef] to DataFusion's [TableProvider]. @@ -110,11 +111,12 @@ impl TableProvider for DfTableProviderAdapter { .collect::>() }); - let mut stream_adapter = StreamScanAdapter::new(stream); + let scanner = Arc::new(SinglePartitionScanner::new(stream)); + let mut plan = ReadFromRegion::new(scanner); if let Some(sort_expr) = sort_expr { - stream_adapter = stream_adapter.with_output_ordering(sort_expr); + plan = plan.with_output_ordering(sort_expr); } - Ok(Arc::new(stream_adapter)) + Ok(Arc::new(plan)) } fn supports_filters_pushdown( diff --git a/tests/cases/distributed/optimizer/order_by.result b/tests/cases/distributed/optimizer/order_by.result index 88ce54ce5de9..5c03ac107c16 100644 --- a/tests/cases/distributed/optimizer/order_by.result +++ b/tests/cases/distributed/optimizer/order_by.result @@ -1,61 +1,61 @@ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers; -+---------------+-------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | StreamScanAdapter { stream: "" } | -| | | -+---------------+-------------------------------------------------------------+ ++---------------+-----------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SinglePartitionScanner: | +| | | ++---------------+-----------------------------------------------------+ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers order by number desc; -+---------------+---------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | SortExec: expr=[number@0 DESC] | -| | StreamScanAdapter { stream: "" } | -| | | -+---------------+---------------------------------------------------------------+ ++---------------+-------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SortExec: expr=[number@0 DESC] | +| | SinglePartitionScanner: | +| | | ++---------------+-------------------------------------------------------+ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers order by number asc; -+---------------+---------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | -| | StreamScanAdapter { stream: "" } | -| | | -+---------------+---------------------------------------------------------------+ ++---------------+-------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | +| | SinglePartitionScanner: | +| | | ++---------------+-------------------------------------------------------+ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers order by number desc limit 10; -+---------------+-----------------------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | GlobalLimitExec: skip=0, fetch=10 | -| | SortExec: TopK(fetch=10), expr=[number@0 DESC] | -| | StreamScanAdapter { stream: "" } | -| | | -+---------------+-----------------------------------------------------------------+ ++---------------+---------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: TopK(fetch=10), expr=[number@0 DESC] | +| | SinglePartitionScanner: | +| | | ++---------------+---------------------------------------------------------+ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers order by number asc limit 10; -+---------------+-----------------------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | GlobalLimitExec: skip=0, fetch=10 | -| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST] | -| | StreamScanAdapter { stream: "" } | -| | | -+---------------+-----------------------------------------------------------------+ ++---------------+------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST] | +| | SinglePartitionScanner: | +| | | ++---------------+------------------------------------------------------------+ diff --git a/tests/cases/standalone/optimizer/order_by.result b/tests/cases/standalone/optimizer/order_by.result index 574f753e4073..49996d130e78 100644 --- a/tests/cases/standalone/optimizer/order_by.result +++ b/tests/cases/standalone/optimizer/order_by.result @@ -1,56 +1,56 @@ explain select * from numbers; -+---------------+-------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | StreamScanAdapter { stream: "" } | -| | | -+---------------+-------------------------------------------------------------+ ++---------------+-----------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SinglePartitionScanner: | +| | | ++---------------+-----------------------------------------------------+ explain select * from numbers order by number desc; -+---------------+---------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | SortExec: expr=[number@0 DESC] | -| | StreamScanAdapter { stream: "" } | -| | | -+---------------+---------------------------------------------------------------+ ++---------------+-------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SortExec: expr=[number@0 DESC] | +| | SinglePartitionScanner: | +| | | ++---------------+-------------------------------------------------------+ explain select * from numbers order by number asc; -+---------------+---------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | -| | StreamScanAdapter { stream: "" } | -| | | -+---------------+---------------------------------------------------------------+ ++---------------+-------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | +| | SinglePartitionScanner: | +| | | ++---------------+-------------------------------------------------------+ explain select * from numbers order by number desc limit 10; -+---------------+-----------------------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | GlobalLimitExec: skip=0, fetch=10 | -| | SortExec: TopK(fetch=10), expr=[number@0 DESC] | -| | StreamScanAdapter { stream: "" } | -| | | -+---------------+-----------------------------------------------------------------+ ++---------------+---------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: TopK(fetch=10), expr=[number@0 DESC] | +| | SinglePartitionScanner: | +| | | ++---------------+---------------------------------------------------------+ explain select * from numbers order by number asc limit 10; -+---------------+-----------------------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | GlobalLimitExec: skip=0, fetch=10 | -| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST] | -| | StreamScanAdapter { stream: "" } | -| | | -+---------------+-----------------------------------------------------------------+ ++---------------+------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST] | +| | SinglePartitionScanner: | +| | | ++---------------+------------------------------------------------------------+