From 179c8c716c4e33476afaaefebb856b2dd6b1cea4 Mon Sep 17 00:00:00 2001
From: Yingwen <realevenyag@gmail.com>
Date: Mon, 20 May 2024 19:52:00 +0800
Subject: [PATCH] feat: Adds `RegionScanner` trait (#3948)

* feat: define region scanner

* feat: single partition scanner

* feat: use single partition scanner

* feat: implement ExecutionPlan wip

* feat: mito engine returns single partition scanner

* feat: implement DisplayAs for region server

* feat: dummy table provider use handle_partitioned_query()

* test: update sqlness test

* feat: table provider use ReadFromRegion

* refactor: remove StreamScanAdapter

* chore: update lock

* style: fix clippy

* refactor: remove handle_query from the RegionEngine trait

* chore: address CR comments

* refactor: rename methods

* refactor: rename ReadFromRegion to RegionScanExec
---
 Cargo.lock                                    |   1 +
 Cargo.toml                                    |   1 +
 src/datanode/src/tests.rs                     |   5 +-
 src/file-engine/src/engine.rs                 |  30 +++--
 src/metric-engine/src/engine.rs               |  26 ++--
 src/metric-engine/src/engine/read.rs          |   4 +-
 src/metric-engine/src/metadata_region.rs      |   8 +-
 src/mito2/src/engine.rs                       |  33 +++++-
 src/mito2/src/engine/alter_test.rs            |   2 +-
 src/mito2/src/engine/append_mode_test.rs      |   4 +-
 src/mito2/src/engine/basic_test.rs            |  18 +--
 src/mito2/src/engine/catchup_test.rs          |   6 +-
 src/mito2/src/engine/create_test.rs           |   2 +-
 src/mito2/src/engine/filter_deleted_test.rs   |   2 +-
 src/mito2/src/engine/open_test.rs             |   4 +-
 src/mito2/src/engine/parallel_test.rs         |   2 +-
 src/mito2/src/engine/projection_test.rs       |   2 +-
 src/mito2/src/engine/prune_test.rs            |   6 +-
 src/mito2/src/engine/truncate_test.rs         |  10 +-
 src/mito2/src/read/scan_region.rs             |   9 ++
 src/mito2/src/read/unordered_scan.rs          |  16 +--
 src/query/src/dummy_catalog.rs                |   6 +-
 src/query/src/optimizer/test_util.rs          |   5 +-
 src/store-api/Cargo.toml                      |   1 +
 src/store-api/src/region_engine.rs            | 112 +++++++++++++++++-
 src/table/src/table/adapter.rs                |  10 +-
 src/table/src/table/scan.rs                   |  79 ++++++------
 .../cases/distributed/explain/analyze.result  |   2 +-
 .../distributed/optimizer/order_by.result     |  82 ++++++-------
 .../cases/standalone/common/range/nest.result |   2 +-
 .../common/tql-explain-analyze/analyze.result |   8 +-
 .../standalone/optimizer/order_by.result      |  82 ++++++-------
 32 files changed, 371 insertions(+), 209 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index a4f63ea2a72b..84941ae3a226 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -10169,6 +10169,7 @@ dependencies = [
  "common-query",
  "common-recordbatch",
  "common-wal",
+ "datafusion-physical-plan 37.0.0",
  "datatypes",
  "derive_builder 0.12.0",
  "futures",
diff --git a/Cargo.toml b/Cargo.toml
index 13a62f6682c3..f3f99f9c63bf 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -110,6 +110,7 @@ datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev
 datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
 datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
 datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
+datafusion-physical-plan = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
 datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
 datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
 derive_builder = "0.12"
diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs
index 04af03ec857d..327e1be46256 100644
--- a/src/datanode/src/tests.rs
+++ b/src/datanode/src/tests.rs
@@ -23,7 +23,6 @@ use common_function::function::FunctionRef;
 use common_function::scalars::aggregate::AggregateFunctionMetaRef;
 use common_query::prelude::ScalarUdf;
 use common_query::Output;
-use common_recordbatch::SendableRecordBatchStream;
 use common_runtime::Runtime;
 use query::dataframe::DataFrame;
 use query::plan::LogicalPlan;
@@ -32,7 +31,7 @@ use query::query_engine::DescribeResult;
 use query::{QueryEngine, QueryEngineContext};
 use session::context::QueryContextRef;
 use store_api::metadata::RegionMetadataRef;
-use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
+use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse};
 use store_api::region_request::{AffectedRows, RegionRequest};
 use store_api::storage::{RegionId, ScanRequest};
 use table::TableRef;
@@ -193,7 +192,7 @@ impl RegionEngine for MockRegionEngine {
         &self,
         _region_id: RegionId,
         _request: ScanRequest,
-    ) -> Result<SendableRecordBatchStream, BoxedError> {
+    ) -> Result<RegionScannerRef, BoxedError> {
         unimplemented!()
     }
 
diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs
index fa4f2c5a3f0d..e0a3a6ebdc42 100644
--- a/src/file-engine/src/engine.rs
+++ b/src/file-engine/src/engine.rs
@@ -25,7 +25,9 @@ use common_telemetry::{error, info};
 use object_store::ObjectStore;
 use snafu::{ensure, OptionExt};
 use store_api::metadata::RegionMetadataRef;
-use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
+use store_api::region_engine::{
+    RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse, SinglePartitionScanner,
+};
 use store_api::region_request::{
     AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
     RegionRequest,
@@ -49,6 +51,20 @@ impl FileRegionEngine {
             inner: Arc::new(EngineInner::new(object_store)),
         }
     }
+
+    async fn handle_query(
+        &self,
+        region_id: RegionId,
+        request: ScanRequest,
+    ) -> Result<SendableRecordBatchStream, BoxedError> {
+        self.inner
+            .get_region(region_id)
+            .await
+            .context(RegionNotFoundSnafu { region_id })
+            .map_err(BoxedError::new)?
+            .query(request)
+            .map_err(BoxedError::new)
+    }
 }
 
 #[async_trait]
@@ -72,14 +88,10 @@ impl RegionEngine for FileRegionEngine {
         &self,
         region_id: RegionId,
         request: ScanRequest,
-    ) -> Result<SendableRecordBatchStream, BoxedError> {
-        self.inner
-            .get_region(region_id)
-            .await
-            .context(RegionNotFoundSnafu { region_id })
-            .map_err(BoxedError::new)?
-            .query(request)
-            .map_err(BoxedError::new)
+    ) -> Result<RegionScannerRef, BoxedError> {
+        let stream = self.handle_query(region_id, request).await?;
+        let scanner = Arc::new(SinglePartitionScanner::new(stream));
+        Ok(scanner)
     }
 
     async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs
index 1cf36f5661a0..20c7e0d050ae 100644
--- a/src/metric-engine/src/engine.rs
+++ b/src/metric-engine/src/engine.rs
@@ -35,7 +35,9 @@ use common_recordbatch::SendableRecordBatchStream;
 use mito2::engine::MitoEngine;
 use store_api::metadata::RegionMetadataRef;
 use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
-use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
+use store_api::region_engine::{
+    RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse, SinglePartitionScanner,
+};
 use store_api::region_request::RegionRequest;
 use store_api::storage::{RegionId, ScanRequest};
 
@@ -155,16 +157,14 @@ impl RegionEngine for MetricEngine {
         })
     }
 
-    /// Handles substrait query and return a stream of record batches
     async fn handle_query(
         &self,
         region_id: RegionId,
         request: ScanRequest,
-    ) -> Result<SendableRecordBatchStream, BoxedError> {
-        self.inner
-            .read_region(region_id, request)
-            .await
-            .map_err(BoxedError::new)
+    ) -> Result<RegionScannerRef, BoxedError> {
+        let stream = self.handle_query(region_id, request).await?;
+        let scanner = Arc::new(SinglePartitionScanner::new(stream));
+        Ok(scanner)
     }
 
     /// Retrieves region's metadata.
@@ -251,6 +251,18 @@ impl MetricEngine {
             .logical_regions(physical_region_id)
             .await
     }
+
+    /// Handles substrait query and return a stream of record batches
+    async fn handle_query(
+        &self,
+        region_id: RegionId,
+        request: ScanRequest,
+    ) -> Result<SendableRecordBatchStream, BoxedError> {
+        self.inner
+            .read_region(region_id, request)
+            .await
+            .map_err(BoxedError::new)
+    }
 }
 
 struct MetricEngineInner {
diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs
index 6093d41cd2be..ed4d6b6e4f7a 100644
--- a/src/metric-engine/src/engine/read.rs
+++ b/src/metric-engine/src/engine/read.rs
@@ -62,7 +62,7 @@ impl MetricEngineInner {
             .start_timer();
 
         self.mito
-            .handle_query(region_id, request)
+            .scan_to_stream(region_id, request)
             .await
             .context(MitoReadOperationSnafu)
     }
@@ -82,7 +82,7 @@ impl MetricEngineInner {
             .transform_request(physical_region_id, logical_region_id, request)
             .await?;
         self.mito
-            .handle_query(data_region_id, request)
+            .scan_to_stream(data_region_id, request)
             .await
             .context(MitoReadOperationSnafu)
     }
diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs
index 73ad6e9b3286..6c869c6e3f2f 100644
--- a/src/metric-engine/src/metadata_region.rs
+++ b/src/metric-engine/src/metadata_region.rs
@@ -300,7 +300,7 @@ impl MetadataRegion {
         let scan_req = Self::build_read_request(key);
         let record_batch_stream = self
             .mito
-            .handle_query(region_id, scan_req)
+            .scan_to_stream(region_id, scan_req)
             .await
             .context(MitoReadOperationSnafu)?;
         let scan_result = collect(record_batch_stream)
@@ -317,7 +317,7 @@ impl MetadataRegion {
         let scan_req = Self::build_read_request(key);
         let record_batch_stream = self
             .mito
-            .handle_query(region_id, scan_req)
+            .scan_to_stream(region_id, scan_req)
             .await
             .context(MitoReadOperationSnafu)?;
         let scan_result = collect(record_batch_stream)
@@ -351,7 +351,7 @@ impl MetadataRegion {
         };
         let record_batch_stream = self
             .mito
-            .handle_query(region_id, scan_req)
+            .scan_to_stream(region_id, scan_req)
             .await
             .context(MitoReadOperationSnafu)?;
         let scan_result = collect(record_batch_stream)
@@ -590,7 +590,7 @@ mod test {
         let scan_req = MetadataRegion::build_read_request("test_key");
         let record_batch_stream = metadata_region
             .mito
-            .handle_query(region_id, scan_req)
+            .scan_to_stream(region_id, scan_req)
             .await
             .unwrap();
         let scan_result = collect(record_batch_stream).await.unwrap();
diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs
index 71954678ad04..e0223a5585ee 100644
--- a/src/mito2/src/engine.rs
+++ b/src/mito2/src/engine.rs
@@ -62,7 +62,7 @@ use object_store::manager::ObjectStoreManagerRef;
 use snafu::{ensure, OptionExt, ResultExt};
 use store_api::logstore::LogStore;
 use store_api::metadata::RegionMetadataRef;
-use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
+use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse};
 use store_api::region_request::{AffectedRows, RegionRequest};
 use store_api::storage::{RegionId, ScanRequest};
 use tokio::sync::oneshot;
@@ -115,11 +115,35 @@ impl MitoEngine {
         Ok(region.region_usage().await)
     }
 
+    /// Handle substrait query and return a stream of record batches
+    #[tracing::instrument(skip_all)]
+    pub async fn scan_to_stream(
+        &self,
+        region_id: RegionId,
+        request: ScanRequest,
+    ) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
+        self.scanner(region_id, request)
+            .map_err(BoxedError::new)?
+            .scan()
+            .await
+            .map_err(BoxedError::new)
+    }
+
     /// Returns a scanner to scan for `request`.
     fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
         self.scan_region(region_id, request)?.scanner()
     }
 
+    /// Returns a region scanner to scan the region for `request`.
+    async fn region_scanner(
+        &self,
+        region_id: RegionId,
+        request: ScanRequest,
+    ) -> Result<RegionScannerRef> {
+        let scanner = self.scanner(region_id, request)?;
+        scanner.region_scanner().await
+    }
+
     /// Scans a region.
     fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
         self.inner.handle_query(region_id, request)
@@ -312,16 +336,13 @@ impl RegionEngine for MitoEngine {
             .map_err(BoxedError::new)
     }
 
-    /// Handle substrait query and return a stream of record batches
     #[tracing::instrument(skip_all)]
     async fn handle_query(
         &self,
         region_id: RegionId,
         request: ScanRequest,
-    ) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
-        self.scanner(region_id, request)
-            .map_err(BoxedError::new)?
-            .scan()
+    ) -> Result<RegionScannerRef, BoxedError> {
+        self.region_scanner(region_id, request)
             .await
             .map_err(BoxedError::new)
     }
diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs
index ba25cc1a638e..29f1ecc18895 100644
--- a/src/mito2/src/engine/alter_test.rs
+++ b/src/mito2/src/engine/alter_test.rs
@@ -245,7 +245,7 @@ async fn test_put_after_alter() {
 |       | b     | 2.0     | 1970-01-01T00:00:02 |
 +-------+-------+---------+---------------------+";
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     assert_eq!(expected, batches.pretty_print().unwrap());
 }
diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs
index d1fc41739034..85509094fed7 100644
--- a/src/mito2/src/engine/append_mode_test.rs
+++ b/src/mito2/src/engine/append_mode_test.rs
@@ -63,7 +63,7 @@ async fn test_append_mode_write_query() {
     put_rows(&engine, region_id, rows).await;
 
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "\
 +-------+---------+---------------------+
@@ -183,7 +183,7 @@ async fn test_append_mode_compaction() {
     // Reopens the region.
     reopen_region(&engine, region_id, region_dir, false, region_opts).await;
     let stream = engine
-        .handle_query(region_id, ScanRequest::default())
+        .scan_to_stream(region_id, ScanRequest::default())
         .await
         .unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs
index a0f6b6df441b..dbe33ff37f89 100644
--- a/src/mito2/src/engine/basic_test.rs
+++ b/src/mito2/src/engine/basic_test.rs
@@ -128,7 +128,7 @@ async fn test_region_replay() {
     assert_eq!(0, result.affected_rows);
 
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     assert_eq!(42, batches.iter().map(|b| b.num_rows()).sum::<usize>());
 
@@ -166,7 +166,7 @@ async fn test_write_query_region() {
     put_rows(&engine, region_id, rows).await;
 
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "\
 +-------+---------+---------------------+
@@ -227,7 +227,7 @@ async fn test_different_order() {
     put_rows(&engine, region_id, rows).await;
 
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "\
 +-------+-------+---------+---------+---------------------+
@@ -289,7 +289,7 @@ async fn test_different_order_and_type() {
     put_rows(&engine, region_id, rows).await;
 
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "\
 +-------+-------+---------+---------+---------------------+
@@ -341,7 +341,7 @@ async fn test_put_delete() {
     delete_rows(&engine, region_id, rows).await;
 
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "\
 +-------+---------+---------------------+
@@ -383,7 +383,7 @@ async fn test_delete_not_null_fields() {
     delete_rows(&engine, region_id, rows).await;
 
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "\
 +-------+---------+---------------------+
@@ -398,7 +398,7 @@ async fn test_delete_not_null_fields() {
     // Reopen and scan again.
     reopen_region(&engine, region_id, region_dir, false, HashMap::new()).await;
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     assert_eq!(expected, batches.pretty_print().unwrap());
 }
@@ -447,7 +447,7 @@ async fn test_put_overwrite() {
     put_rows(&engine, region_id, rows).await;
 
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "\
 +-------+---------+---------------------+
@@ -688,7 +688,7 @@ async fn test_cache_null_primary_key() {
     put_rows(&engine, region_id, rows).await;
 
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "\
 +-------+-------+---------+---------------------+
diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs
index b9779b2ea1c4..3d0a04017ef2 100644
--- a/src/mito2/src/engine/catchup_test.rs
+++ b/src/mito2/src/engine/catchup_test.rs
@@ -104,7 +104,7 @@ async fn test_catchup_with_last_entry_id() {
     // Scans
     let request = ScanRequest::default();
     let stream = follower_engine
-        .handle_query(region_id, request)
+        .scan_to_stream(region_id, request)
         .await
         .unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
@@ -264,7 +264,7 @@ async fn test_catchup_without_last_entry_id() {
 
     let request = ScanRequest::default();
     let stream = follower_engine
-        .handle_query(region_id, request)
+        .scan_to_stream(region_id, request)
         .await
         .unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
@@ -367,7 +367,7 @@ async fn test_catchup_with_manifest_update() {
 
     let request = ScanRequest::default();
     let stream = follower_engine
-        .handle_query(region_id, request)
+        .scan_to_stream(region_id, request)
         .await
         .unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
diff --git a/src/mito2/src/engine/create_test.rs b/src/mito2/src/engine/create_test.rs
index 602eea30bde1..9ce3c53b7661 100644
--- a/src/mito2/src/engine/create_test.rs
+++ b/src/mito2/src/engine/create_test.rs
@@ -231,7 +231,7 @@ async fn test_engine_create_with_memtable_opts() {
     put_rows(&engine, region_id, rows).await;
 
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "\
 +-------+---------+---------------------+
diff --git a/src/mito2/src/engine/filter_deleted_test.rs b/src/mito2/src/engine/filter_deleted_test.rs
index c3c35f9ba0c8..4d123a89b8ba 100644
--- a/src/mito2/src/engine/filter_deleted_test.rs
+++ b/src/mito2/src/engine/filter_deleted_test.rs
@@ -69,7 +69,7 @@ async fn test_scan_without_filtering_deleted() {
 
     // scan
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "\
 +-------+---------+---------------------+
diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs
index dc0590cdd079..3cf4a21e561a 100644
--- a/src/mito2/src/engine/open_test.rs
+++ b/src/mito2/src/engine/open_test.rs
@@ -276,7 +276,7 @@ async fn test_open_region_skip_wal_replay() {
         .unwrap();
 
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "\
 +-------+---------+---------------------+
@@ -305,7 +305,7 @@ async fn test_open_region_skip_wal_replay() {
         .unwrap();
 
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "\
 +-------+---------+---------------------+
diff --git a/src/mito2/src/engine/parallel_test.rs b/src/mito2/src/engine/parallel_test.rs
index c6c97f68d269..cc5d98291230 100644
--- a/src/mito2/src/engine/parallel_test.rs
+++ b/src/mito2/src/engine/parallel_test.rs
@@ -57,7 +57,7 @@ async fn scan_in_parallel(
         .unwrap();
 
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "\
 +-------+---------+---------------------+
diff --git a/src/mito2/src/engine/projection_test.rs b/src/mito2/src/engine/projection_test.rs
index af08acfba0d0..6e31a56c8b37 100644
--- a/src/mito2/src/engine/projection_test.rs
+++ b/src/mito2/src/engine/projection_test.rs
@@ -79,7 +79,7 @@ async fn test_scan_projection() {
         output_ordering: None,
         limit: None,
     };
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "\
 +-------+---------+---------------------+
diff --git a/src/mito2/src/engine/prune_test.rs b/src/mito2/src/engine/prune_test.rs
index 27a66d68c6d2..7c20dd8a5d10 100644
--- a/src/mito2/src/engine/prune_test.rs
+++ b/src/mito2/src/engine/prune_test.rs
@@ -53,7 +53,7 @@ async fn check_prune_row_groups(expr: DfExpr, expected: &str) {
     flush_region(&engine, region_id, Some(5)).await;
 
     let stream = engine
-        .handle_query(
+        .scan_to_stream(
             region_id,
             ScanRequest {
                 filters: vec![Expr::from(expr)],
@@ -186,7 +186,7 @@ async fn test_prune_memtable() {
     .await;
 
     let stream = engine
-        .handle_query(
+        .scan_to_stream(
             region_id,
             ScanRequest {
                 filters: vec![time_range_expr(0, 20)],
@@ -238,7 +238,7 @@ async fn test_prune_memtable_complex_expr() {
     let filters = vec![time_range_expr(4, 7), Expr::from(col("tag_0").lt(lit("6")))];
 
     let stream = engine
-        .handle_query(
+        .scan_to_stream(
             region_id,
             ScanRequest {
                 filters,
diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs
index b9336afad3d1..91c08f3b3e78 100644
--- a/src/mito2/src/engine/truncate_test.rs
+++ b/src/mito2/src/engine/truncate_test.rs
@@ -55,7 +55,7 @@ async fn test_engine_truncate_region_basic() {
 
     // Scan the region.
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "\
 +-------+---------+---------------------+
@@ -75,7 +75,7 @@ async fn test_engine_truncate_region_basic() {
 
     // Scan the region.
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "++\n++";
     assert_eq!(expected, batches.pretty_print().unwrap());
@@ -104,7 +104,7 @@ async fn test_engine_put_data_after_truncate() {
 
     // Scan the region
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "\
 +-------+---------+---------------------+
@@ -131,7 +131,7 @@ async fn test_engine_put_data_after_truncate() {
 
     // Scan the region.
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "\
 +-------+---------+---------------------+
@@ -261,7 +261,7 @@ async fn test_engine_truncate_reopen() {
 
     // Scan the region.
     let request = ScanRequest::default();
-    let stream = engine.handle_query(region_id, request).await.unwrap();
+    let stream = engine.scan_to_stream(region_id, request).await.unwrap();
     let batches = RecordBatches::try_collect(stream).await.unwrap();
     let expected = "++\n++";
     assert_eq!(expected, batches.pretty_print().unwrap());
diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs
index a33765479b11..f619744dc0c9 100644
--- a/src/mito2/src/read/scan_region.rs
+++ b/src/mito2/src/read/scan_region.rs
@@ -20,6 +20,7 @@ use std::time::Instant;
 use common_recordbatch::SendableRecordBatchStream;
 use common_telemetry::{debug, error, warn};
 use common_time::range::TimestampRange;
+use store_api::region_engine::{RegionScannerRef, SinglePartitionScanner};
 use store_api::storage::ScanRequest;
 use table::predicate::{Predicate, TimeRangePredicateBuilder};
 use tokio::sync::{mpsc, Semaphore};
@@ -57,6 +58,14 @@ impl Scanner {
             Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
         }
     }
+
+    /// Returns a [RegionScanner] to scan the region.
+    pub(crate) async fn region_scanner(&self) -> Result<RegionScannerRef> {
+        let stream = self.scan().await?;
+        let scanner = SinglePartitionScanner::new(stream);
+
+        Ok(Arc::new(scanner))
+    }
 }
 
 #[cfg(test)]
diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs
index f725d83817ac..5042764ede8c 100644
--- a/src/mito2/src/read/unordered_scan.rs
+++ b/src/mito2/src/read/unordered_scan.rs
@@ -198,6 +198,14 @@ impl UnorderedScan {
     }
 }
 
+#[cfg(test)]
+impl UnorderedScan {
+    /// Returns the input.
+    pub(crate) fn input(&self) -> &ScanInput {
+        &self.input
+    }
+}
+
 /// Metrics for [UnorderedScan].
 #[derive(Debug, Default)]
 struct Metrics {
@@ -216,11 +224,3 @@ struct Metrics {
     /// Number of rows returned.
     num_rows: usize,
 }
-
-#[cfg(test)]
-impl UnorderedScan {
-    /// Returns the input.
-    pub(crate) fn input(&self) -> &ScanInput {
-        &self.input
-    }
-}
diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs
index 71ec0d4ac7e0..c39b6bad59ee 100644
--- a/src/query/src/dummy_catalog.rs
+++ b/src/query/src/dummy_catalog.rs
@@ -31,7 +31,7 @@ use snafu::ResultExt;
 use store_api::metadata::RegionMetadataRef;
 use store_api::region_engine::RegionEngineRef;
 use store_api::storage::{RegionId, ScanRequest};
-use table::table::scan::StreamScanAdapter;
+use table::table::scan::RegionScanExec;
 
 use crate::error::{GetRegionMetadataSnafu, Result};
 
@@ -168,12 +168,12 @@ impl TableProvider for DummyTableProvider {
             .collect();
         request.limit = limit;
 
-        let stream = self
+        let scanner = self
             .engine
             .handle_query(self.region_id, request)
             .await
             .map_err(|e| DataFusionError::External(Box::new(e)))?;
-        Ok(Arc::new(StreamScanAdapter::new(stream)))
+        Ok(Arc::new(RegionScanExec::new(scanner)))
     }
 
     fn supports_filters_pushdown(
diff --git a/src/query/src/optimizer/test_util.rs b/src/query/src/optimizer/test_util.rs
index ea18e54a09b4..4ffc3e28e08f 100644
--- a/src/query/src/optimizer/test_util.rs
+++ b/src/query/src/optimizer/test_util.rs
@@ -23,12 +23,11 @@ use api::v1::SemanticType;
 use async_trait::async_trait;
 use common_error::ext::{BoxedError, PlainError};
 use common_error::status_code::StatusCode;
-use common_recordbatch::SendableRecordBatchStream;
 use datatypes::schema::ColumnSchema;
 use store_api::metadata::{
     ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
 };
-use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
+use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse};
 use store_api::region_request::RegionRequest;
 use store_api::storage::{ConcreteDataType, RegionId, ScanRequest};
 
@@ -67,7 +66,7 @@ impl RegionEngine for MetaRegionEngine {
         &self,
         _region_id: RegionId,
         _request: ScanRequest,
-    ) -> Result<SendableRecordBatchStream, BoxedError> {
+    ) -> Result<RegionScannerRef, BoxedError> {
         unimplemented!()
     }
 
diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml
index 2c9a42a9d8e4..7068f6686d1b 100644
--- a/src/store-api/Cargo.toml
+++ b/src/store-api/Cargo.toml
@@ -17,6 +17,7 @@ common-macro.workspace = true
 common-query.workspace = true
 common-recordbatch.workspace = true
 common-wal.workspace = true
+datafusion-physical-plan.workspace = true
 datatypes.workspace = true
 derive_builder.workspace = true
 futures.workspace = true
diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs
index 7e461617aa21..f4b5aec37e7d 100644
--- a/src/store-api/src/region_engine.rs
+++ b/src/store-api/src/region_engine.rs
@@ -15,15 +15,19 @@
 //! Region Engine's definition
 
 use std::any::Any;
-use std::fmt::Display;
-use std::sync::Arc;
+use std::fmt::{Debug, Display};
+use std::sync::{Arc, Mutex};
 
 use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
 use api::region::RegionResponse;
 use async_trait::async_trait;
 use common_error::ext::BoxedError;
+use common_query::error::ExecuteRepeatedlySnafu;
 use common_recordbatch::SendableRecordBatchStream;
+use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
+use datatypes::schema::SchemaRef;
 use serde::{Deserialize, Serialize};
+use snafu::OptionExt;
 
 use crate::logstore::entry;
 use crate::metadata::RegionMetadataRef;
@@ -120,6 +124,57 @@ impl From<PbRegionRole> for RegionRole {
     }
 }
 
+/// Output partition properties of the [RegionScanner].
+#[derive(Debug)]
+pub enum ScannerPartitioning {
+    /// Unknown partitioning scheme with a known number of partitions
+    Unknown(usize),
+}
+
+impl ScannerPartitioning {
+    /// Returns the number of partitions.
+    pub fn num_partitions(&self) -> usize {
+        match self {
+            ScannerPartitioning::Unknown(num_partitions) => *num_partitions,
+        }
+    }
+}
+
+/// Properties of the [RegionScanner].
+#[derive(Debug)]
+pub struct ScannerProperties {
+    /// Partitions to scan.
+    partitioning: ScannerPartitioning,
+}
+
+impl ScannerProperties {
+    /// Creates a new [ScannerProperties] with the given partitioning.
+    pub fn new(partitioning: ScannerPartitioning) -> Self {
+        Self { partitioning }
+    }
+
+    /// Returns properties of partitions to scan.
+    pub fn partitioning(&self) -> &ScannerPartitioning {
+        &self.partitioning
+    }
+}
+
+/// A scanner that provides a way to scan the region concurrently.
+/// The scanner splits the region into partitions so that each partition can be scanned concurrently.
+/// You can use this trait to implement an [ExecutionPlan](datafusion_physical_plan::ExecutionPlan).
+pub trait RegionScanner: Debug + DisplayAs + Send + Sync {
+    /// Returns the properties of the scanner.
+    fn properties(&self) -> &ScannerProperties;
+
+    /// Returns the schema of the record batches.
+    fn schema(&self) -> SchemaRef;
+
+    /// Scans the partition and returns a stream of record batches.
+    fn scan_partition(&self, partition: usize) -> Result<SendableRecordBatchStream, BoxedError>;
+}
+
+pub type RegionScannerRef = Arc<dyn RegionScanner>;
+
 #[async_trait]
 pub trait RegionEngine: Send + Sync {
     /// Name of this engine
@@ -132,12 +187,12 @@ pub trait RegionEngine: Send + Sync {
         request: RegionRequest,
     ) -> Result<RegionResponse, BoxedError>;
 
-    /// Handles substrait query and return a stream of record batches
+    /// Handles query and return a scanner that can be used to scan the region concurrently.
     async fn handle_query(
         &self,
         region_id: RegionId,
         request: ScanRequest,
-    ) -> Result<SendableRecordBatchStream, BoxedError>;
+    ) -> Result<RegionScannerRef, BoxedError>;
 
     /// Retrieves region's metadata.
     async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
@@ -172,3 +227,52 @@ pub trait RegionEngine: Send + Sync {
 }
 
 pub type RegionEngineRef = Arc<dyn RegionEngine>;
+
+/// A [RegionScanner] that only scans a single partition.
+pub struct SinglePartitionScanner {
+    stream: Mutex<Option<SendableRecordBatchStream>>,
+    schema: SchemaRef,
+    properties: ScannerProperties,
+}
+
+impl SinglePartitionScanner {
+    /// Creates a new [SinglePartitionScanner] with the given stream.
+    pub fn new(stream: SendableRecordBatchStream) -> Self {
+        let schema = stream.schema();
+        Self {
+            stream: Mutex::new(Some(stream)),
+            schema,
+            properties: ScannerProperties::new(ScannerPartitioning::Unknown(1)),
+        }
+    }
+}
+
+impl Debug for SinglePartitionScanner {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "SinglePartitionScanner: <SendableRecordBatchStream>")
+    }
+}
+
+impl RegionScanner for SinglePartitionScanner {
+    fn properties(&self) -> &ScannerProperties {
+        &self.properties
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn scan_partition(&self, _partition: usize) -> Result<SendableRecordBatchStream, BoxedError> {
+        let mut stream = self.stream.lock().unwrap();
+        stream
+            .take()
+            .context(ExecuteRepeatedlySnafu)
+            .map_err(BoxedError::new)
+    }
+}
+
+impl DisplayAs for SinglePartitionScanner {
+    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{:?}", self)
+    }
+}
diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs
index ffc6618a548a..d00988958531 100644
--- a/src/table/src/table/adapter.rs
+++ b/src/table/src/table/adapter.rs
@@ -26,9 +26,10 @@ use datafusion_expr::expr::Expr as DfExpr;
 use datafusion_expr::TableProviderFilterPushDown as DfTableProviderFilterPushDown;
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::PhysicalSortExpr;
+use store_api::region_engine::SinglePartitionScanner;
 use store_api::storage::ScanRequest;
 
-use crate::table::scan::StreamScanAdapter;
+use crate::table::scan::RegionScanExec;
 use crate::table::{TableRef, TableType};
 
 /// Adapt greptime's [TableRef] to DataFusion's [TableProvider].
@@ -110,11 +111,12 @@ impl TableProvider for DfTableProviderAdapter {
                 .collect::<Vec<_>>()
         });
 
-        let mut stream_adapter = StreamScanAdapter::new(stream);
+        let scanner = Arc::new(SinglePartitionScanner::new(stream));
+        let mut plan = RegionScanExec::new(scanner);
         if let Some(sort_expr) = sort_expr {
-            stream_adapter = stream_adapter.with_output_ordering(sort_expr);
+            plan = plan.with_output_ordering(sort_expr);
         }
-        Ok(Arc::new(stream_adapter))
+        Ok(Arc::new(plan))
     }
 
     fn supports_filters_pushdown(
diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs
index 4c09c8904781..c612ff3746cb 100644
--- a/src/table/src/table/scan.rs
+++ b/src/table/src/table/scan.rs
@@ -13,12 +13,10 @@
 // limitations under the License.
 
 use std::any::Any;
-use std::fmt::{self, Debug, Formatter};
 use std::pin::Pin;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 use std::task::{Context, Poll};
 
-use common_query::error::ExecuteRepeatedlySnafu;
 use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream, SendableRecordBatchStream};
 use common_telemetry::tracing::Span;
 use common_telemetry::tracing_context::TracingContext;
@@ -32,59 +30,54 @@ use datafusion::physical_plan::{
 use datafusion_common::DataFusionError;
 use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr};
 use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef;
-use datatypes::schema::SchemaRef;
 use futures::{Stream, StreamExt};
-use snafu::OptionExt;
+use store_api::region_engine::RegionScannerRef;
 
 use crate::table::metrics::MemoryUsageMetrics;
 
-/// Adapt greptime's [SendableRecordBatchStream] to [ExecutionPlan].
-pub struct StreamScanAdapter {
-    stream: Mutex<Option<SendableRecordBatchStream>>,
-    schema: SchemaRef,
+/// A plan to read multiple partitions from a region of a table.
+#[derive(Debug)]
+pub struct RegionScanExec {
+    scanner: RegionScannerRef,
+    arrow_schema: ArrowSchemaRef,
+    /// The expected output ordering for the plan.
     output_ordering: Option<Vec<PhysicalSortExpr>>,
     metric: ExecutionPlanMetricsSet,
     properties: PlanProperties,
 }
 
-impl Debug for StreamScanAdapter {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("StreamScanAdapter")
-            .field("stream", &"<SendableRecordBatchStream>")
-            .finish()
-    }
-}
-
-impl StreamScanAdapter {
-    pub fn new(stream: SendableRecordBatchStream) -> Self {
-        let schema = stream.schema();
+impl RegionScanExec {
+    pub fn new(scanner: RegionScannerRef) -> Self {
+        let arrow_schema = scanner.schema().arrow_schema().clone();
+        let scanner_props = scanner.properties();
         let properties = PlanProperties::new(
-            EquivalenceProperties::new(schema.arrow_schema().clone()),
-            Partitioning::UnknownPartitioning(1),
+            EquivalenceProperties::new(arrow_schema.clone()),
+            Partitioning::UnknownPartitioning(scanner_props.partitioning().num_partitions()),
             ExecutionMode::Bounded,
         );
         Self {
-            stream: Mutex::new(Some(stream)),
-            schema,
+            scanner,
+            arrow_schema,
             output_ordering: None,
             metric: ExecutionPlanMetricsSet::new(),
             properties,
         }
     }
 
+    /// Set the expected output ordering for the plan.
     pub fn with_output_ordering(mut self, output_ordering: Vec<PhysicalSortExpr>) -> Self {
         self.output_ordering = Some(output_ordering);
         self
     }
 }
 
-impl ExecutionPlan for StreamScanAdapter {
+impl ExecutionPlan for RegionScanExec {
     fn as_any(&self) -> &dyn Any {
         self
     }
 
     fn schema(&self) -> ArrowSchemaRef {
-        self.schema.arrow_schema().clone()
+        self.arrow_schema.clone()
     }
 
     fn properties(&self) -> &PlanProperties {
@@ -98,7 +91,7 @@ impl ExecutionPlan for StreamScanAdapter {
     fn with_new_children(
         self: Arc<Self>,
         _children: Vec<Arc<dyn ExecutionPlan>>,
-    ) -> DfResult<Arc<dyn ExecutionPlan>> {
+    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
         Ok(self)
     }
 
@@ -106,12 +99,15 @@ impl ExecutionPlan for StreamScanAdapter {
         &self,
         partition: usize,
         context: Arc<TaskContext>,
-    ) -> DfResult<DfSendableRecordBatchStream> {
+    ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
         let tracing_context = TracingContext::from_json(context.session_id().as_str());
-        let span = tracing_context.attach(common_telemetry::tracing::info_span!("stream_adapter"));
+        let span =
+            tracing_context.attach(common_telemetry::tracing::info_span!("read_from_region"));
 
-        let mut stream = self.stream.lock().unwrap();
-        let stream = stream.take().context(ExecuteRepeatedlySnafu)?;
+        let stream = self
+            .scanner
+            .scan_partition(partition)
+            .map_err(|e| DataFusionError::External(Box::new(e)))?;
         let mem_usage_metrics = MemoryUsageMetrics::new(&self.metric, partition);
         Ok(Box::pin(StreamWithMetricWrapper {
             stream,
@@ -125,9 +121,10 @@ impl ExecutionPlan for StreamScanAdapter {
     }
 }
 
-impl DisplayAs for StreamScanAdapter {
-    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
-        write!(f, "{:?}", self)
+impl DisplayAs for RegionScanExec {
+    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        // The scanner contains all information needed to display the plan.
+        write!(f, "{:?}", self.scanner)
     }
 }
 
@@ -177,12 +174,15 @@ impl DfRecordBatchStream for StreamWithMetricWrapper {
 
 #[cfg(test)]
 mod test {
+    use std::sync::Arc;
+
     use common_recordbatch::{RecordBatch, RecordBatches};
     use datafusion::prelude::SessionContext;
     use datatypes::data_type::ConcreteDataType;
-    use datatypes::schema::{ColumnSchema, Schema};
+    use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
     use datatypes::vectors::Int32Vector;
     use futures::TryStreamExt;
+    use store_api::region_engine::SinglePartitionScanner;
 
     use super::*;
 
@@ -210,9 +210,10 @@ mod test {
             RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
         let stream = recordbatches.as_stream();
 
-        let scan = StreamScanAdapter::new(stream);
+        let scanner = Arc::new(SinglePartitionScanner::new(stream));
+        let plan = RegionScanExec::new(scanner);
         let actual: SchemaRef = Arc::new(
-            scan.properties
+            plan.properties
                 .eq_properties
                 .schema()
                 .clone()
@@ -221,12 +222,12 @@ mod test {
         );
         assert_eq!(actual, schema);
 
-        let stream = scan.execute(0, ctx.task_ctx()).unwrap();
+        let stream = plan.execute(0, ctx.task_ctx()).unwrap();
         let recordbatches = stream.try_collect::<Vec<_>>().await.unwrap();
         assert_eq!(batch1.df_record_batch(), &recordbatches[0]);
         assert_eq!(batch2.df_record_batch(), &recordbatches[1]);
 
-        let result = scan.execute(0, ctx.task_ctx());
+        let result = plan.execute(0, ctx.task_ctx());
         assert!(result.is_err());
         match result {
             Err(e) => assert!(e
diff --git a/tests/cases/distributed/explain/analyze.result b/tests/cases/distributed/explain/analyze.result
index 1ce443adfff6..762f70bdace4 100644
--- a/tests/cases/distributed/explain/analyze.result
+++ b/tests/cases/distributed/explain/analyze.result
@@ -35,7 +35,7 @@ explain analyze SELECT count(*) FROM system_metrics;
 |_|_|_CoalescePartitionsExec REDACTED
 |_|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(greptime.public.system_REDACTED
 |_|_|_RepartitionExec: partitioning=REDACTED
-|_|_|_StreamScanAdapter { stream: "<SendableRecordBatchStream>" } REDACTED
+|_|_|_SinglePartitionScanner: <SendableRecordBatchStream> REDACTED
 |_|_|_|
 |_|_| Total rows: 1_|
 +-+-+-+
diff --git a/tests/cases/distributed/optimizer/order_by.result b/tests/cases/distributed/optimizer/order_by.result
index 88ce54ce5de9..5c03ac107c16 100644
--- a/tests/cases/distributed/optimizer/order_by.result
+++ b/tests/cases/distributed/optimizer/order_by.result
@@ -1,61 +1,61 @@
 -- SQLNESS REPLACE (peers.*) REDACTED
 explain select * from numbers;
 
-+---------------+-------------------------------------------------------------+
-| plan_type     | plan                                                        |
-+---------------+-------------------------------------------------------------+
-| logical_plan  | MergeScan [is_placeholder=false]                            |
-| physical_plan | StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
-|               |                                                             |
-+---------------+-------------------------------------------------------------+
++---------------+-----------------------------------------------------+
+| plan_type     | plan                                                |
++---------------+-----------------------------------------------------+
+| logical_plan  | MergeScan [is_placeholder=false]                    |
+| physical_plan | SinglePartitionScanner: <SendableRecordBatchStream> |
+|               |                                                     |
++---------------+-----------------------------------------------------+
 
 -- SQLNESS REPLACE (peers.*) REDACTED
 explain select * from numbers order by number desc;
 
-+---------------+---------------------------------------------------------------+
-| plan_type     | plan                                                          |
-+---------------+---------------------------------------------------------------+
-| logical_plan  | MergeScan [is_placeholder=false]                              |
-| physical_plan | SortExec: expr=[number@0 DESC]                                |
-|               |   StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
-|               |                                                               |
-+---------------+---------------------------------------------------------------+
++---------------+-------------------------------------------------------+
+| plan_type     | plan                                                  |
++---------------+-------------------------------------------------------+
+| logical_plan  | MergeScan [is_placeholder=false]                      |
+| physical_plan | SortExec: expr=[number@0 DESC]                        |
+|               |   SinglePartitionScanner: <SendableRecordBatchStream> |
+|               |                                                       |
++---------------+-------------------------------------------------------+
 
 -- SQLNESS REPLACE (peers.*) REDACTED
 explain select * from numbers order by number asc;
 
-+---------------+---------------------------------------------------------------+
-| plan_type     | plan                                                          |
-+---------------+---------------------------------------------------------------+
-| logical_plan  | MergeScan [is_placeholder=false]                              |
-| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST]                      |
-|               |   StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
-|               |                                                               |
-+---------------+---------------------------------------------------------------+
++---------------+-------------------------------------------------------+
+| plan_type     | plan                                                  |
++---------------+-------------------------------------------------------+
+| logical_plan  | MergeScan [is_placeholder=false]                      |
+| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST]              |
+|               |   SinglePartitionScanner: <SendableRecordBatchStream> |
+|               |                                                       |
++---------------+-------------------------------------------------------+
 
 -- SQLNESS REPLACE (peers.*) REDACTED
 explain select * from numbers order by number desc limit 10;
 
-+---------------+-----------------------------------------------------------------+
-| plan_type     | plan                                                            |
-+---------------+-----------------------------------------------------------------+
-| logical_plan  | MergeScan [is_placeholder=false]                                |
-| physical_plan | GlobalLimitExec: skip=0, fetch=10                               |
-|               |   SortExec: TopK(fetch=10), expr=[number@0 DESC]                |
-|               |     StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
-|               |                                                                 |
-+---------------+-----------------------------------------------------------------+
++---------------+---------------------------------------------------------+
+| plan_type     | plan                                                    |
++---------------+---------------------------------------------------------+
+| logical_plan  | MergeScan [is_placeholder=false]                        |
+| physical_plan | GlobalLimitExec: skip=0, fetch=10                       |
+|               |   SortExec: TopK(fetch=10), expr=[number@0 DESC]        |
+|               |     SinglePartitionScanner: <SendableRecordBatchStream> |
+|               |                                                         |
++---------------+---------------------------------------------------------+
 
 -- SQLNESS REPLACE (peers.*) REDACTED
 explain select * from numbers order by number asc limit 10;
 
-+---------------+-----------------------------------------------------------------+
-| plan_type     | plan                                                            |
-+---------------+-----------------------------------------------------------------+
-| logical_plan  | MergeScan [is_placeholder=false]                                |
-| physical_plan | GlobalLimitExec: skip=0, fetch=10                               |
-|               |   SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST]      |
-|               |     StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
-|               |                                                                 |
-+---------------+-----------------------------------------------------------------+
++---------------+------------------------------------------------------------+
+| plan_type     | plan                                                       |
++---------------+------------------------------------------------------------+
+| logical_plan  | MergeScan [is_placeholder=false]                           |
+| physical_plan | GlobalLimitExec: skip=0, fetch=10                          |
+|               |   SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST] |
+|               |     SinglePartitionScanner: <SendableRecordBatchStream>    |
+|               |                                                            |
++---------------+------------------------------------------------------------+
 
diff --git a/tests/cases/standalone/common/range/nest.result b/tests/cases/standalone/common/range/nest.result
index 4484d96c699e..5ccc155b05fe 100644
--- a/tests/cases/standalone/common/range/nest.result
+++ b/tests/cases/standalone/common/range/nest.result
@@ -74,7 +74,7 @@ EXPLAIN ANALYZE SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s';
 | 0_| 0_|_RangeSelectExec: range_expr=[MIN(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host@1], time_index=ts REDACTED
 |_|_|_MergeScanExec: REDACTED
 |_|_|_|
-| 1_| 0_|_StreamScanAdapter { stream: "<SendableRecordBatchStream>" } REDACTED
+| 1_| 0_|_SinglePartitionScanner: <SendableRecordBatchStream> REDACTED
 |_|_|_|
 |_|_| Total rows: 10_|
 +-+-+-+
diff --git a/tests/cases/standalone/common/tql-explain-analyze/analyze.result b/tests/cases/standalone/common/tql-explain-analyze/analyze.result
index 8b5b09dc803a..e8e388b916c0 100644
--- a/tests/cases/standalone/common/tql-explain-analyze/analyze.result
+++ b/tests/cases/standalone/common/tql-explain-analyze/analyze.result
@@ -30,7 +30,7 @@ TQL ANALYZE (0, 10, '5s') test;
 |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
 |_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED
 |_|_|_RepartitionExec: partitioning=REDACTED
-|_|_|_StreamScanAdapter { stream: "<SendableRecordBatchStream>" } REDACTED
+|_|_|_SinglePartitionScanner: <SendableRecordBatchStream> REDACTED
 |_|_|_|
 |_|_| Total rows: 4_|
 +-+-+-+
@@ -59,7 +59,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
 |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
 |_|_|_FilterExec: j@1 >= -2000 AND j@1 <= 12000 REDACTED
 |_|_|_RepartitionExec: partitioning=REDACTED
-|_|_|_StreamScanAdapter { stream: "<SendableRecordBatchStream>" } REDACTED
+|_|_|_SinglePartitionScanner: <SendableRecordBatchStream> REDACTED
 |_|_|_|
 |_|_| Total rows: 4_|
 +-+-+-+
@@ -87,7 +87,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
 |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
 |_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED
 |_|_|_RepartitionExec: partitioning=REDACTED
-|_|_|_StreamScanAdapter { stream: "<SendableRecordBatchStream>" } REDACTED
+|_|_|_SinglePartitionScanner: <SendableRecordBatchStream> REDACTED
 |_|_|_|
 |_|_| Total rows: 4_|
 +-+-+-+
@@ -117,7 +117,7 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test;
 |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
 |_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED
 |_|_|_RepartitionExec: partitioning=REDACTED
-|_|_|_StreamScanAdapter { stream: "<SendableRecordBatchStream>" } REDACTED
+|_|_|_SinglePartitionScanner: <SendableRecordBatchStream> REDACTED
 |_|_|_|
 |_|_| Total rows: 4_|
 +-+-+-+
diff --git a/tests/cases/standalone/optimizer/order_by.result b/tests/cases/standalone/optimizer/order_by.result
index 574f753e4073..49996d130e78 100644
--- a/tests/cases/standalone/optimizer/order_by.result
+++ b/tests/cases/standalone/optimizer/order_by.result
@@ -1,56 +1,56 @@
 explain select * from numbers;
 
-+---------------+-------------------------------------------------------------+
-| plan_type     | plan                                                        |
-+---------------+-------------------------------------------------------------+
-| logical_plan  | MergeScan [is_placeholder=false]                            |
-| physical_plan | StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
-|               |                                                             |
-+---------------+-------------------------------------------------------------+
++---------------+-----------------------------------------------------+
+| plan_type     | plan                                                |
++---------------+-----------------------------------------------------+
+| logical_plan  | MergeScan [is_placeholder=false]                    |
+| physical_plan | SinglePartitionScanner: <SendableRecordBatchStream> |
+|               |                                                     |
++---------------+-----------------------------------------------------+
 
 explain select * from numbers order by number desc;
 
-+---------------+---------------------------------------------------------------+
-| plan_type     | plan                                                          |
-+---------------+---------------------------------------------------------------+
-| logical_plan  | MergeScan [is_placeholder=false]                              |
-| physical_plan | SortExec: expr=[number@0 DESC]                                |
-|               |   StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
-|               |                                                               |
-+---------------+---------------------------------------------------------------+
++---------------+-------------------------------------------------------+
+| plan_type     | plan                                                  |
++---------------+-------------------------------------------------------+
+| logical_plan  | MergeScan [is_placeholder=false]                      |
+| physical_plan | SortExec: expr=[number@0 DESC]                        |
+|               |   SinglePartitionScanner: <SendableRecordBatchStream> |
+|               |                                                       |
++---------------+-------------------------------------------------------+
 
 explain select * from numbers order by number asc;
 
-+---------------+---------------------------------------------------------------+
-| plan_type     | plan                                                          |
-+---------------+---------------------------------------------------------------+
-| logical_plan  | MergeScan [is_placeholder=false]                              |
-| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST]                      |
-|               |   StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
-|               |                                                               |
-+---------------+---------------------------------------------------------------+
++---------------+-------------------------------------------------------+
+| plan_type     | plan                                                  |
++---------------+-------------------------------------------------------+
+| logical_plan  | MergeScan [is_placeholder=false]                      |
+| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST]              |
+|               |   SinglePartitionScanner: <SendableRecordBatchStream> |
+|               |                                                       |
++---------------+-------------------------------------------------------+
 
 explain select * from numbers order by number desc limit 10;
 
-+---------------+-----------------------------------------------------------------+
-| plan_type     | plan                                                            |
-+---------------+-----------------------------------------------------------------+
-| logical_plan  | MergeScan [is_placeholder=false]                                |
-| physical_plan | GlobalLimitExec: skip=0, fetch=10                               |
-|               |   SortExec: TopK(fetch=10), expr=[number@0 DESC]                |
-|               |     StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
-|               |                                                                 |
-+---------------+-----------------------------------------------------------------+
++---------------+---------------------------------------------------------+
+| plan_type     | plan                                                    |
++---------------+---------------------------------------------------------+
+| logical_plan  | MergeScan [is_placeholder=false]                        |
+| physical_plan | GlobalLimitExec: skip=0, fetch=10                       |
+|               |   SortExec: TopK(fetch=10), expr=[number@0 DESC]        |
+|               |     SinglePartitionScanner: <SendableRecordBatchStream> |
+|               |                                                         |
++---------------+---------------------------------------------------------+
 
 explain select * from numbers order by number asc limit 10;
 
-+---------------+-----------------------------------------------------------------+
-| plan_type     | plan                                                            |
-+---------------+-----------------------------------------------------------------+
-| logical_plan  | MergeScan [is_placeholder=false]                                |
-| physical_plan | GlobalLimitExec: skip=0, fetch=10                               |
-|               |   SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST]      |
-|               |     StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
-|               |                                                                 |
-+---------------+-----------------------------------------------------------------+
++---------------+------------------------------------------------------------+
+| plan_type     | plan                                                       |
++---------------+------------------------------------------------------------+
+| logical_plan  | MergeScan [is_placeholder=false]                           |
+| physical_plan | GlobalLimitExec: skip=0, fetch=10                          |
+|               |   SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST] |
+|               |     SinglePartitionScanner: <SendableRecordBatchStream>    |
+|               |                                                            |
++---------------+------------------------------------------------------------+