From e4101925607726481962000486b32c926d09a0f1 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 26 Apr 2024 19:55:44 +0800 Subject: [PATCH] fix: push down order hint of the query again (#3797) * feat: add dummy catalog list to query * chore: fix compiler errors * feat: use query's dummy catalog * chore: remove error * feat: match dummy provider in the order hint * docs: revert config change * Apply suggestions from code review --------- Co-authored-by: Ruihang Xia --- src/datanode/src/error.rs | 15 +- src/datanode/src/region_server.rs | 211 ++-------------------- src/query/src/dummy_catalog.rs | 248 ++++++++++++++++++++++++++ src/query/src/error.rs | 14 ++ src/query/src/lib.rs | 7 +- src/query/src/optimizer.rs | 10 +- src/query/src/optimizer/order_hint.rs | 43 +++-- src/query/src/optimizer/test_util.rs | 144 +++++++++++++++ 8 files changed, 448 insertions(+), 244 deletions(-) create mode 100644 src/query/src/dummy_catalog.rs create mode 100644 src/query/src/optimizer/test_util.rs diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index d7873812cf5d..008f7d0ae4d9 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -248,18 +248,6 @@ pub enum Error { location: Location, }, - #[snafu(display( - "Failed to get metadata from engine {} for region_id {}", - engine, - region_id, - ))] - GetRegionMetadata { - engine: String, - region_id: RegionId, - location: Location, - source: BoxedError, - }, - #[snafu(display("Failed to build region requests"))] BuildRegionRequests { location: Location, @@ -337,8 +325,7 @@ impl ErrorExt for Error { | IncorrectInternalState { .. } | ShutdownInstance { .. } | RegionEngineNotFound { .. } - | UnsupportedOutput { .. } - | GetRegionMetadata { .. } => StatusCode::Internal, + | UnsupportedOutput { .. } => StatusCode::Internal, RegionNotFound { .. } => StatusCode::RegionNotFound, RegionNotReady { .. } => StatusCode::RegionNotReady, diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 1af0fda462cf..23d21200af30 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -12,11 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::any::Any; use std::collections::HashMap; use std::fmt::Debug; use std::ops::Deref; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, RwLock}; use api::region::RegionResponse; use api::v1::region::{region_request, QueryRequest, RegionResponse as RegionResponseV1}; @@ -26,46 +25,36 @@ use async_trait::async_trait; use bytes::Bytes; use common_error::ext::BoxedError; use common_error::status_code::StatusCode; -use common_query::logical_plan::Expr; -use common_query::physical_plan::DfPhysicalPlanAdapter; -use common_query::{DfPhysicalPlan, OutputData}; +use common_query::OutputData; use common_recordbatch::SendableRecordBatchStream; use common_runtime::Runtime; use common_telemetry::tracing::{self, info_span}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{info, warn}; use dashmap::DashMap; -use datafusion::catalog::schema::SchemaProvider; -use datafusion::catalog::{CatalogProvider, CatalogProviderList}; -use datafusion::datasource::TableProvider; -use datafusion::error::Result as DfResult; -use datafusion::execution::context::SessionState; -use datafusion_common::DataFusionError; -use datafusion_expr::{Expr as DfExpr, TableProviderFilterPushDown, TableType}; -use datatypes::arrow::datatypes::SchemaRef; use futures_util::future::try_join_all; use metric_engine::engine::MetricEngine; use prost::Message; +pub use query::dummy_catalog::{ + DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef, +}; use query::QueryEngineRef; use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult}; use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream}; use servers::grpc::region_server::RegionServerHandler; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; -use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY}; use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse}; use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest}; -use store_api::storage::{RegionId, ScanRequest}; +use store_api::storage::RegionId; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; -use table::table::scan::StreamScanAdapter; use tonic::{Request, Response, Result as TonicResult}; use crate::error::{ self, BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, - FindLogicalRegionsSnafu, GetRegionMetadataSnafu, HandleRegionRequestSnafu, - RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, - UnsupportedOutputSnafu, + FindLogicalRegionsSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu, + RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu, }; use crate::event_listener::RegionServerEventListenerRef; @@ -640,7 +629,8 @@ impl RegionServerInner { let table_provider = self .table_provider_factory .create(region_id, region_status.into_engine()) - .await?; + .await + .context(ExecuteLogicalPlanSnafu)?; let catalog_list = Arc::new(DummyCatalogList::with_table_provider(table_provider)); let query_engine_ctx = self.query_engine.engine_context(ctx.clone()); @@ -713,187 +703,6 @@ enum RegionChange { Deregisters, } -/// Resolve to the given region (specified by [RegionId]) unconditionally. -#[derive(Clone)] -struct DummyCatalogList { - catalog: DummyCatalogProvider, -} - -impl DummyCatalogList { - fn with_table_provider(table_provider: Arc) -> Self { - let schema_provider = DummySchemaProvider { - table: table_provider, - }; - let catalog_provider = DummyCatalogProvider { - schema: schema_provider, - }; - Self { - catalog: catalog_provider, - } - } -} - -impl CatalogProviderList for DummyCatalogList { - fn as_any(&self) -> &dyn Any { - self - } - - fn register_catalog( - &self, - _name: String, - _catalog: Arc, - ) -> Option> { - None - } - - fn catalog_names(&self) -> Vec { - vec![] - } - - fn catalog(&self, _name: &str) -> Option> { - Some(Arc::new(self.catalog.clone())) - } -} - -/// For [DummyCatalogList]. -#[derive(Clone)] -struct DummyCatalogProvider { - schema: DummySchemaProvider, -} - -impl CatalogProvider for DummyCatalogProvider { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema_names(&self) -> Vec { - vec![] - } - - fn schema(&self, _name: &str) -> Option> { - Some(Arc::new(self.schema.clone())) - } -} - -/// For [DummyCatalogList]. -#[derive(Clone)] -struct DummySchemaProvider { - table: Arc, -} - -#[async_trait] -impl SchemaProvider for DummySchemaProvider { - fn as_any(&self) -> &dyn Any { - self - } - - fn table_names(&self) -> Vec { - vec![] - } - - async fn table(&self, _name: &str) -> DfResult>> { - Ok(Some(self.table.clone())) - } - - fn table_exist(&self, _name: &str) -> bool { - true - } -} - -/// For [TableProvider](TableProvider) and [DummyCatalogList] -#[derive(Clone)] -struct DummyTableProvider { - region_id: RegionId, - engine: RegionEngineRef, - metadata: RegionMetadataRef, - /// Keeping a mutable request makes it possible to change in the optimize phase. - scan_request: Arc>, -} - -#[async_trait] -impl TableProvider for DummyTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.metadata.schema.arrow_schema().clone() - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - async fn scan( - &self, - _state: &SessionState, - projection: Option<&Vec>, - filters: &[DfExpr], - limit: Option, - ) -> DfResult> { - let mut request = self.scan_request.lock().unwrap().clone(); - request.projection = match projection { - Some(x) if !x.is_empty() => Some(x.clone()), - _ => None, - }; - request.filters = filters.iter().map(|e| Expr::from(e.clone())).collect(); - request.limit = limit; - - let stream = self - .engine - .handle_query(self.region_id, request) - .await - .map_err(|e| DataFusionError::External(Box::new(e)))?; - Ok(Arc::new(DfPhysicalPlanAdapter(Arc::new( - StreamScanAdapter::new(stream), - )))) - } - - fn supports_filters_pushdown( - &self, - filters: &[&DfExpr], - ) -> DfResult> { - Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) - } -} - -pub struct DummyTableProviderFactory; - -#[async_trait] -impl TableProviderFactory for DummyTableProviderFactory { - async fn create( - &self, - region_id: RegionId, - engine: RegionEngineRef, - ) -> Result> { - let metadata = - engine - .get_metadata(region_id) - .await - .with_context(|_| GetRegionMetadataSnafu { - engine: engine.name(), - region_id, - })?; - Ok(Arc::new(DummyTableProvider { - region_id, - engine, - metadata, - scan_request: Default::default(), - })) - } -} - -#[async_trait] -pub trait TableProviderFactory: Send + Sync { - async fn create( - &self, - region_id: RegionId, - engine: RegionEngineRef, - ) -> Result>; -} - -pub type TableProviderFactoryRef = Arc; - #[cfg(test)] mod tests { diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs new file mode 100644 index 000000000000..23e26005d9c1 --- /dev/null +++ b/src/query/src/dummy_catalog.rs @@ -0,0 +1,248 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Dummy catalog for region server. + +use std::any::Any; +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use common_query::physical_plan::DfPhysicalPlanAdapter; +use common_query::DfPhysicalPlan; +use common_recordbatch::OrderOption; +use datafusion::catalog::schema::SchemaProvider; +use datafusion::catalog::{CatalogProvider, CatalogProviderList}; +use datafusion::datasource::TableProvider; +use datafusion::execution::context::SessionState; +use datafusion_common::DataFusionError; +use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; +use datatypes::arrow::datatypes::SchemaRef; +use snafu::ResultExt; +use store_api::metadata::RegionMetadataRef; +use store_api::region_engine::RegionEngineRef; +use store_api::storage::{RegionId, ScanRequest}; +use table::table::scan::StreamScanAdapter; + +use crate::error::{GetRegionMetadataSnafu, Result}; + +/// Resolve to the given region (specified by [RegionId]) unconditionally. +#[derive(Clone)] +pub struct DummyCatalogList { + catalog: DummyCatalogProvider, +} + +impl DummyCatalogList { + /// Creates a new catalog list with the given table provider. + pub fn with_table_provider(table_provider: Arc) -> Self { + let schema_provider = DummySchemaProvider { + table: table_provider, + }; + let catalog_provider = DummyCatalogProvider { + schema: schema_provider, + }; + Self { + catalog: catalog_provider, + } + } +} + +impl CatalogProviderList for DummyCatalogList { + fn as_any(&self) -> &dyn Any { + self + } + + fn register_catalog( + &self, + _name: String, + _catalog: Arc, + ) -> Option> { + None + } + + fn catalog_names(&self) -> Vec { + vec![] + } + + fn catalog(&self, _name: &str) -> Option> { + Some(Arc::new(self.catalog.clone())) + } +} + +/// A dummy catalog provider for [DummyCatalogList]. +#[derive(Clone)] +struct DummyCatalogProvider { + schema: DummySchemaProvider, +} + +impl CatalogProvider for DummyCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + vec![] + } + + fn schema(&self, _name: &str) -> Option> { + Some(Arc::new(self.schema.clone())) + } +} + +/// A dummy schema provider for [DummyCatalogList]. +#[derive(Clone)] +struct DummySchemaProvider { + table: Arc, +} + +#[async_trait] +impl SchemaProvider for DummySchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + vec![] + } + + async fn table( + &self, + _name: &str, + ) -> datafusion::error::Result>> { + Ok(Some(self.table.clone())) + } + + fn table_exist(&self, _name: &str) -> bool { + true + } +} + +/// For [TableProvider] and [DummyCatalogList] +#[derive(Clone)] +pub struct DummyTableProvider { + region_id: RegionId, + engine: RegionEngineRef, + metadata: RegionMetadataRef, + /// Keeping a mutable request makes it possible to change in the optimize phase. + scan_request: Arc>, +} + +#[async_trait] +impl TableProvider for DummyTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.metadata.schema.arrow_schema().clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> datafusion::error::Result> { + let mut request = self.scan_request.lock().unwrap().clone(); + request.projection = match projection { + Some(x) if !x.is_empty() => Some(x.clone()), + _ => None, + }; + request.filters = filters + .iter() + .map(|e| common_query::logical_plan::Expr::from(e.clone())) + .collect(); + request.limit = limit; + + let stream = self + .engine + .handle_query(self.region_id, request) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + Ok(Arc::new(DfPhysicalPlanAdapter(Arc::new( + StreamScanAdapter::new(stream), + )))) + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> datafusion::error::Result> { + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } +} + +impl DummyTableProvider { + /// Creates a new provider. + pub fn new(region_id: RegionId, engine: RegionEngineRef, metadata: RegionMetadataRef) -> Self { + Self { + region_id, + engine, + metadata, + scan_request: Default::default(), + } + } + + /// Sets the ordering hint of the query to the provider. + pub fn with_ordering_hint(&self, order_opts: &[OrderOption]) { + self.scan_request.lock().unwrap().output_ordering = Some(order_opts.to_vec()); + } + + /// Gets the scan request of the provider. + #[cfg(test)] + pub fn scan_request(&self) -> ScanRequest { + self.scan_request.lock().unwrap().clone() + } +} + +pub struct DummyTableProviderFactory; + +#[async_trait] +impl TableProviderFactory for DummyTableProviderFactory { + async fn create( + &self, + region_id: RegionId, + engine: RegionEngineRef, + ) -> Result> { + let metadata = + engine + .get_metadata(region_id) + .await + .with_context(|_| GetRegionMetadataSnafu { + engine: engine.name(), + region_id, + })?; + Ok(Arc::new(DummyTableProvider { + region_id, + engine, + metadata, + scan_request: Default::default(), + })) + } +} + +#[async_trait] +pub trait TableProviderFactory: Send + Sync { + async fn create( + &self, + region_id: RegionId, + engine: RegionEngineRef, + ) -> Result>; +} + +pub type TableProviderFactoryRef = Arc; diff --git a/src/query/src/error.rs b/src/query/src/error.rs index f8fcb13abf09..bb3bc0ff4b97 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -22,6 +22,7 @@ use datafusion::error::DataFusionError; use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; use snafu::{Location, Snafu}; +use store_api::storage::RegionId; #[derive(Snafu)] #[snafu(visibility(pub))] @@ -244,6 +245,18 @@ pub enum Error { #[snafu(display("Range Query: {}", msg))] RangeQuery { msg: String, location: Location }, + + #[snafu(display( + "Failed to get metadata from engine {} for region_id {}", + engine, + region_id, + ))] + GetRegionMetadata { + engine: String, + region_id: RegionId, + location: Location, + source: BoxedError, + }, } impl ErrorExt for Error { @@ -296,6 +309,7 @@ impl ErrorExt for Error { RegionQuery { source, .. } => source.status_code(), TableMutation { source, .. } => source.status_code(), MissingTableMutationHandler { .. } => StatusCode::Unexpected, + GetRegionMetadata { .. } => StatusCode::Internal, } } diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 234353e393e9..7d325668bad9 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -18,6 +18,7 @@ pub mod dataframe; pub mod datafusion; pub mod dist_plan; +pub mod dummy_catalog; pub mod error; pub mod executor; pub mod logical_optimizer; @@ -34,10 +35,10 @@ mod range_select; pub mod region_query; pub mod sql; +#[cfg(test)] +mod tests; + pub use crate::datafusion::DfContextProviderAdapter; pub use crate::query_engine::{ QueryEngine, QueryEngineContext, QueryEngineFactory, QueryEngineRef, }; - -#[cfg(test)] -mod tests; diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index fc99f8be12b8..da95851b87a6 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -12,6 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod order_hint; +pub mod string_normalization; +#[cfg(test)] +mod test_util; +pub mod type_conversion; + use datafusion_common::config::ConfigOptions; use datafusion_common::Result; use datafusion_expr::LogicalPlan; @@ -30,7 +36,3 @@ pub trait ExtensionAnalyzerRule { config: &ConfigOptions, ) -> Result; } - -pub mod order_hint; -pub mod string_normalization; -pub mod type_conversion; diff --git a/src/query/src/optimizer/order_hint.rs b/src/query/src/optimizer/order_hint.rs index 6c027568a244..e2bf9105bb35 100644 --- a/src/query/src/optimizer/order_hint.rs +++ b/src/query/src/optimizer/order_hint.rs @@ -20,7 +20,8 @@ use datafusion_common::Result as DataFusionResult; use datafusion_expr::expr::Sort; use datafusion_expr::{Expr, LogicalPlan}; use datafusion_optimizer::{OptimizerConfig, OptimizerRule}; -use table::table::adapter::DfTableProviderAdapter; + +use crate::dummy_catalog::DummyTableProvider; /// This rule will pass the nearest order requirement to the leaf table /// scan node as ordering hint. @@ -66,10 +67,11 @@ impl OrderHintRule { .as_any() .downcast_ref::() { + // The provider in the region server is [DummyTableProvider]. if let Some(adapter) = source .table_provider .as_any() - .downcast_ref::() + .downcast_ref::() { let mut opts = Vec::with_capacity(order_expr.len()); for sort in order_expr { @@ -129,41 +131,38 @@ mod test { use datafusion_expr::{col, LogicalPlanBuilder}; use datafusion_optimizer::OptimizerContext; - use table::table::numbers::NumbersTable; + use store_api::storage::RegionId; use super::*; + use crate::optimizer::test_util::mock_table_provider; #[test] - #[allow(clippy::bool_assert_comparison)] fn set_order_hint() { - let numbers_table = NumbersTable::table(0); - let adapter = Arc::new(DfTableProviderAdapter::new(numbers_table)); - let table_source = Arc::new(DefaultTableSource::new(adapter.clone())); - - let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![]) + let provider = Arc::new(mock_table_provider(RegionId::new(1, 1))); + let table_source = Arc::new(DefaultTableSource::new(provider.clone())); + let plan = LogicalPlanBuilder::scan("t", table_source, None) .unwrap() - .sort(vec![col("number").sort(true, false)]) + .sort(vec![col("ts").sort(true, false)]) .unwrap() - .sort(vec![col("number").sort(false, true)]) + .sort(vec![col("ts").sort(false, true)]) .unwrap() .build() .unwrap(); let context = OptimizerContext::default(); - let _ = OrderHintRule.try_optimize(&plan, &context).unwrap(); + OrderHintRule.try_optimize(&plan, &context).unwrap(); // should read the first (with `.sort(true, false)`) sort option - let scan_req = adapter.get_scan_req(); - assert_eq!("number", &scan_req.output_ordering.clone().unwrap()[0].name); + let scan_req = provider.scan_request(); assert_eq!( - true, - !scan_req.output_ordering.clone().unwrap()[0] - .options - .descending // the previous parameter is `asc` - ); - assert_eq!( - false, - scan_req.output_ordering.unwrap()[0].options.nulls_first + OrderOption { + name: "ts".to_string(), + options: SortOptions { + descending: false, + nulls_first: false + } + }, + scan_req.output_ordering.as_ref().unwrap()[0] ); } } diff --git a/src/query/src/optimizer/test_util.rs b/src/query/src/optimizer/test_util.rs new file mode 100644 index 000000000000..ea18e54a09b4 --- /dev/null +++ b/src/query/src/optimizer/test_util.rs @@ -0,0 +1,144 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utils for testing the optimizer. + +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; + +use api::region::RegionResponse; +use api::v1::SemanticType; +use async_trait::async_trait; +use common_error::ext::{BoxedError, PlainError}; +use common_error::status_code::StatusCode; +use common_recordbatch::SendableRecordBatchStream; +use datatypes::schema::ColumnSchema; +use store_api::metadata::{ + ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, +}; +use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; +use store_api::region_request::RegionRequest; +use store_api::storage::{ConcreteDataType, RegionId, ScanRequest}; + +use crate::dummy_catalog::DummyTableProvider; + +/// A mock region engine that can be used for testing the optimizer. +pub(crate) struct MetaRegionEngine { + metadatas: HashMap, +} + +impl MetaRegionEngine { + /// Creates a engine with the given metadata. + pub(crate) fn with_metadata(metadata: RegionMetadataRef) -> Self { + let mut metadatas = HashMap::new(); + metadatas.insert(metadata.region_id, metadata); + + Self { metadatas } + } +} + +#[async_trait] +impl RegionEngine for MetaRegionEngine { + fn name(&self) -> &str { + "MetaRegionEngine" + } + + async fn handle_request( + &self, + _region_id: RegionId, + _request: RegionRequest, + ) -> Result { + unimplemented!() + } + + async fn handle_query( + &self, + _region_id: RegionId, + _request: ScanRequest, + ) -> Result { + unimplemented!() + } + + async fn get_metadata(&self, region_id: RegionId) -> Result { + self.metadatas.get(®ion_id).cloned().ok_or_else(|| { + BoxedError::new(PlainError::new( + "Region not found".to_string(), + StatusCode::RegionNotFound, + )) + }) + } + + async fn region_disk_usage(&self, _region_id: RegionId) -> Option { + None + } + + async fn stop(&self) -> Result<(), BoxedError> { + Ok(()) + } + + fn set_writable(&self, _region_id: RegionId, _writable: bool) -> Result<(), BoxedError> { + unimplemented!() + } + + async fn set_readonly_gracefully( + &self, + _region_id: RegionId, + ) -> Result { + unimplemented!() + } + + fn role(&self, _region_id: RegionId) -> Option { + None + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +/// Mock a [DummyTableProvider] with a single region. +pub(crate) fn mock_table_provider(region_id: RegionId) -> DummyTableProvider { + let metadata = Arc::new(mock_region_metadata(region_id)); + let engine = Arc::new(MetaRegionEngine::with_metadata(metadata.clone())); + DummyTableProvider::new(region_id, engine, metadata) +} + +/// Returns a mock region metadata. +/// The schema is: `k0: string, ts: timestamp, v0: float64` +fn mock_region_metadata(region_id: RegionId) -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(region_id); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), true), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("v0", ConcreteDataType::float64_datatype(), false), + semantic_type: SemanticType::Field, + column_id: 3, + }) + .primary_key(vec![1]); + builder.build().unwrap() +}