Skip to content

Commit

Permalink
fix: push down order hint of the query again (#3797)
Browse files Browse the repository at this point in the history
* feat: add dummy catalog list to query

* chore: fix compiler errors

* feat: use query's dummy catalog

* chore: remove error

* feat: match dummy provider in the order hint

* docs: revert config change

* Apply suggestions from code review

---------

Co-authored-by: Ruihang Xia <[email protected]>
  • Loading branch information
evenyag and waynexia authored Apr 26, 2024
1 parent eb3d2ca commit e410192
Show file tree
Hide file tree
Showing 8 changed files with 448 additions and 244 deletions.
15 changes: 1 addition & 14 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,18 +248,6 @@ pub enum Error {
location: Location,
},

#[snafu(display(
"Failed to get metadata from engine {} for region_id {}",
engine,
region_id,
))]
GetRegionMetadata {
engine: String,
region_id: RegionId,
location: Location,
source: BoxedError,
},

#[snafu(display("Failed to build region requests"))]
BuildRegionRequests {
location: Location,
Expand Down Expand Up @@ -337,8 +325,7 @@ impl ErrorExt for Error {
| IncorrectInternalState { .. }
| ShutdownInstance { .. }
| RegionEngineNotFound { .. }
| UnsupportedOutput { .. }
| GetRegionMetadata { .. } => StatusCode::Internal,
| UnsupportedOutput { .. } => StatusCode::Internal,

RegionNotFound { .. } => StatusCode::RegionNotFound,
RegionNotReady { .. } => StatusCode::RegionNotReady,
Expand Down
211 changes: 10 additions & 201 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, RwLock};

use api::region::RegionResponse;
use api::v1::region::{region_request, QueryRequest, RegionResponse as RegionResponseV1};
Expand All @@ -26,46 +25,36 @@ use async_trait::async_trait;
use bytes::Bytes;
use common_error::ext::BoxedError;
use common_error::status_code::StatusCode;
use common_query::logical_plan::Expr;
use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::{DfPhysicalPlan, OutputData};
use common_query::OutputData;
use common_recordbatch::SendableRecordBatchStream;
use common_runtime::Runtime;
use common_telemetry::tracing::{self, info_span};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, warn};
use dashmap::DashMap;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::datasource::TableProvider;
use datafusion::error::Result as DfResult;
use datafusion::execution::context::SessionState;
use datafusion_common::DataFusionError;
use datafusion_expr::{Expr as DfExpr, TableProviderFilterPushDown, TableType};
use datatypes::arrow::datatypes::SchemaRef;
use futures_util::future::try_join_all;
use metric_engine::engine::MetricEngine;
use prost::Message;
pub use query::dummy_catalog::{
DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
};
use query::QueryEngineRef;
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
use servers::grpc::region_server::RegionServerHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use store_api::storage::RegionId;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table::scan::StreamScanAdapter;
use tonic::{Request, Response, Result as TonicResult};

use crate::error::{
self, BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu,
FindLogicalRegionsSnafu, GetRegionMetadataSnafu, HandleRegionRequestSnafu,
RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu,
UnsupportedOutputSnafu,
FindLogicalRegionsSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu,
RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
};
use crate::event_listener::RegionServerEventListenerRef;

Expand Down Expand Up @@ -640,7 +629,8 @@ impl RegionServerInner {
let table_provider = self
.table_provider_factory
.create(region_id, region_status.into_engine())
.await?;
.await
.context(ExecuteLogicalPlanSnafu)?;

let catalog_list = Arc::new(DummyCatalogList::with_table_provider(table_provider));
let query_engine_ctx = self.query_engine.engine_context(ctx.clone());
Expand Down Expand Up @@ -713,187 +703,6 @@ enum RegionChange {
Deregisters,
}

/// Resolve to the given region (specified by [RegionId]) unconditionally.
#[derive(Clone)]
struct DummyCatalogList {
catalog: DummyCatalogProvider,
}

impl DummyCatalogList {
fn with_table_provider(table_provider: Arc<dyn TableProvider>) -> Self {
let schema_provider = DummySchemaProvider {
table: table_provider,
};
let catalog_provider = DummyCatalogProvider {
schema: schema_provider,
};
Self {
catalog: catalog_provider,
}
}
}

impl CatalogProviderList for DummyCatalogList {
fn as_any(&self) -> &dyn Any {
self
}

fn register_catalog(
&self,
_name: String,
_catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
None
}

fn catalog_names(&self) -> Vec<String> {
vec![]
}

fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
Some(Arc::new(self.catalog.clone()))
}
}

/// For [DummyCatalogList].
#[derive(Clone)]
struct DummyCatalogProvider {
schema: DummySchemaProvider,
}

impl CatalogProvider for DummyCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema_names(&self) -> Vec<String> {
vec![]
}

fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
Some(Arc::new(self.schema.clone()))
}
}

/// For [DummyCatalogList].
#[derive(Clone)]
struct DummySchemaProvider {
table: Arc<dyn TableProvider>,
}

#[async_trait]
impl SchemaProvider for DummySchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn table_names(&self) -> Vec<String> {
vec![]
}

async fn table(&self, _name: &str) -> DfResult<Option<Arc<dyn TableProvider>>> {
Ok(Some(self.table.clone()))
}

fn table_exist(&self, _name: &str) -> bool {
true
}
}

/// For [TableProvider](TableProvider) and [DummyCatalogList]
#[derive(Clone)]
struct DummyTableProvider {
region_id: RegionId,
engine: RegionEngineRef,
metadata: RegionMetadataRef,
/// Keeping a mutable request makes it possible to change in the optimize phase.
scan_request: Arc<Mutex<ScanRequest>>,
}

#[async_trait]
impl TableProvider for DummyTableProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.metadata.schema.arrow_schema().clone()
}

fn table_type(&self) -> TableType {
TableType::Base
}

async fn scan(
&self,
_state: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[DfExpr],
limit: Option<usize>,
) -> DfResult<Arc<dyn DfPhysicalPlan>> {
let mut request = self.scan_request.lock().unwrap().clone();
request.projection = match projection {
Some(x) if !x.is_empty() => Some(x.clone()),
_ => None,
};
request.filters = filters.iter().map(|e| Expr::from(e.clone())).collect();
request.limit = limit;

let stream = self
.engine
.handle_query(self.region_id, request)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(Arc::new(DfPhysicalPlanAdapter(Arc::new(
StreamScanAdapter::new(stream),
))))
}

fn supports_filters_pushdown(
&self,
filters: &[&DfExpr],
) -> DfResult<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
}
}

pub struct DummyTableProviderFactory;

#[async_trait]
impl TableProviderFactory for DummyTableProviderFactory {
async fn create(
&self,
region_id: RegionId,
engine: RegionEngineRef,
) -> Result<Arc<dyn TableProvider>> {
let metadata =
engine
.get_metadata(region_id)
.await
.with_context(|_| GetRegionMetadataSnafu {
engine: engine.name(),
region_id,
})?;
Ok(Arc::new(DummyTableProvider {
region_id,
engine,
metadata,
scan_request: Default::default(),
}))
}
}

#[async_trait]
pub trait TableProviderFactory: Send + Sync {
async fn create(
&self,
region_id: RegionId,
engine: RegionEngineRef,
) -> Result<Arc<dyn TableProvider>>;
}

pub type TableProviderFactoryRef = Arc<dyn TableProviderFactory>;

#[cfg(test)]
mod tests {

Expand Down
Loading

0 comments on commit e410192

Please sign in to comment.