diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs
index d2d83e4d9d8a..e3c54ee4d462 100644
--- a/src/catalog/src/information_schema.rs
+++ b/src/catalog/src/information_schema.rs
@@ -19,9 +19,9 @@ mod partitions;
 mod predicate;
 mod region_peers;
 mod runtime_metrics;
-mod schemata;
+pub mod schemata;
 mod table_names;
-mod tables;
+pub mod tables;
 
 use std::collections::HashMap;
 use std::sync::{Arc, Weak};
diff --git a/src/catalog/src/information_schema/schemata.rs b/src/catalog/src/information_schema/schemata.rs
index 9f435878658e..7cc9e9290ad5 100644
--- a/src/catalog/src/information_schema/schemata.rs
+++ b/src/catalog/src/information_schema/schemata.rs
@@ -37,8 +37,8 @@ use crate::error::{
 use crate::information_schema::{InformationTable, Predicates};
 use crate::CatalogManager;
 
-const CATALOG_NAME: &str = "catalog_name";
-const SCHEMA_NAME: &str = "schema_name";
+pub const CATALOG_NAME: &str = "catalog_name";
+pub const SCHEMA_NAME: &str = "schema_name";
 const DEFAULT_CHARACTER_SET_NAME: &str = "default_character_set_name";
 const DEFAULT_COLLATION_NAME: &str = "default_collation_name";
 const INIT_CAPACITY: usize = 42;
diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs
index f55abce61f1f..4103a17b87ba 100644
--- a/src/catalog/src/information_schema/tables.rs
+++ b/src/catalog/src/information_schema/tables.rs
@@ -39,10 +39,10 @@ use crate::error::{
 use crate::information_schema::{InformationTable, Predicates};
 use crate::CatalogManager;
 
-const TABLE_CATALOG: &str = "table_catalog";
-const TABLE_SCHEMA: &str = "table_schema";
-const TABLE_NAME: &str = "table_name";
-const TABLE_TYPE: &str = "table_type";
+pub const TABLE_CATALOG: &str = "table_catalog";
+pub const TABLE_SCHEMA: &str = "table_schema";
+pub const TABLE_NAME: &str = "table_name";
+pub const TABLE_TYPE: &str = "table_type";
 const TABLE_ID: &str = "table_id";
 const ENGINE: &str = "engine";
 const INIT_CAPACITY: usize = 42;
diff --git a/src/operator/src/statement/show.rs b/src/operator/src/statement/show.rs
index d73737cc967b..a768a07c761a 100644
--- a/src/operator/src/statement/show.rs
+++ b/src/operator/src/statement/show.rs
@@ -33,7 +33,7 @@ impl StatementExecutor {
         stmt: ShowDatabases,
         query_ctx: QueryContextRef,
     ) -> Result<Output> {
-        query::sql::show_databases(stmt, self.catalog_manager.clone(), query_ctx)
+        query::sql::show_databases(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
             .await
             .context(ExecuteStatementSnafu)
     }
@@ -44,7 +44,7 @@ impl StatementExecutor {
         stmt: ShowTables,
         query_ctx: QueryContextRef,
     ) -> Result<Output> {
-        query::sql::show_tables(stmt, self.catalog_manager.clone(), query_ctx)
+        query::sql::show_tables(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
             .await
             .context(ExecuteStatementSnafu)
     }
diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs
index 03945da66c9d..c8bb0d1bda33 100644
--- a/src/query/src/datafusion.rs
+++ b/src/query/src/datafusion.rs
@@ -30,32 +30,26 @@ use common_query::physical_plan::{DfPhysicalPlanAdapter, PhysicalPlan, PhysicalP
 use common_query::prelude::ScalarUdf;
 use common_query::Output;
 use common_recordbatch::adapter::RecordBatchStreamAdapter;
-use common_recordbatch::{
-    EmptyRecordBatchStream, RecordBatch, RecordBatches, SendableRecordBatchStream,
-};
+use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
 use common_telemetry::tracing;
-use datafusion::common::Column;
 use datafusion::physical_plan::analyze::AnalyzeExec;
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::ExecutionPlan;
-use datafusion::prelude::SessionContext;
-use datafusion_common::{ResolvedTableReference, ScalarValue};
-use datafusion_expr::{DmlStatement, Expr as DfExpr, LogicalPlan as DfLogicalPlan, WriteOp};
+use datafusion_common::ResolvedTableReference;
+use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp};
 use datatypes::prelude::VectorRef;
-use datatypes::schema::Schema;
 use futures_util::StreamExt;
 use session::context::QueryContextRef;
 use snafu::{ensure, OptionExt, ResultExt};
-use sql::ast::{BinaryOperator, Expr, Value};
 use table::requests::{DeleteRequest, InsertRequest};
 use table::TableRef;
 
 use crate::dataframe::DataFrame;
 pub use crate::datafusion::planner::DfContextProviderAdapter;
 use crate::error::{
-    CatalogSnafu, CreateRecordBatchSnafu, CreateSchemaSnafu, DataFusionSnafu,
-    MissingTableMutationHandlerSnafu, MissingTimestampColumnSnafu, QueryExecutionSnafu, Result,
-    TableMutationSnafu, TableNotFoundSnafu, UnimplementedSnafu, UnsupportedExprSnafu,
+    CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu, MissingTableMutationHandlerSnafu,
+    MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableMutationSnafu,
+    TableNotFoundSnafu, UnsupportedExprSnafu,
 };
 use crate::executor::QueryExecutor;
 use crate::logical_optimizer::LogicalOptimizer;
@@ -456,78 +450,6 @@ impl QueryExecutor for DatafusionQueryEngine {
     }
 }
 
-fn convert_filter_to_df_filter(filter: Expr) -> Result<DfExpr> {
-    match filter {
-        Expr::BinaryOp { left, op, right } => {
-            let left = convert_filter_to_df_filter(*left)?;
-            let right = convert_filter_to_df_filter(*right)?;
-            match op {
-                BinaryOperator::Eq => Ok(left.eq(right)),
-                _ => UnimplementedSnafu {
-                    operation: format!("convert BinaryOperator into datafusion Expr, op: {op}"),
-                }
-                .fail(),
-            }
-        }
-        Expr::Value(value) => match value {
-            Value::SingleQuotedString(v) => Ok(DfExpr::Literal(ScalarValue::Utf8(Some(v)))),
-            _ => UnimplementedSnafu {
-                operation: format!("convert Expr::Value into datafusion Expr, value: {value}"),
-            }
-            .fail(),
-        },
-        Expr::Identifier(ident) => Ok(DfExpr::Column(Column::from_name(ident.value))),
-        _ => UnimplementedSnafu {
-            operation: format!("convert Expr into datafusion Expr, Expr: {filter}"),
-        }
-        .fail(),
-    }
-}
-
-/// Creates a table in memory and executes a show statement on the table.
-pub async fn execute_show_with_filter(
-    record_batch: RecordBatch,
-    filter: Option<Expr>,
-) -> Result<Output> {
-    let table_name = "table_name";
-    let column_schemas = record_batch.schema.column_schemas().to_vec();
-    let context = SessionContext::new();
-    context
-        .register_batch(table_name, record_batch.into_df_record_batch())
-        .context(error::DatafusionSnafu)
-        .map_err(BoxedError::new)
-        .context(QueryExecutionSnafu)?;
-    let mut dataframe = context
-        .sql(&format!("SELECT * FROM {table_name}"))
-        .await
-        .context(error::DatafusionSnafu)
-        .map_err(BoxedError::new)
-        .context(QueryExecutionSnafu)?;
-    if let Some(filter) = filter {
-        let filter = convert_filter_to_df_filter(filter)?;
-        dataframe = dataframe
-            .filter(filter)
-            .context(error::DatafusionSnafu)
-            .map_err(BoxedError::new)
-            .context(QueryExecutionSnafu)?
-    }
-    let df_batches = dataframe
-        .collect()
-        .await
-        .context(error::DatafusionSnafu)
-        .map_err(BoxedError::new)
-        .context(QueryExecutionSnafu)?;
-    let mut batches = Vec::with_capacity(df_batches.len());
-    let schema = Arc::new(Schema::try_new(column_schemas).context(CreateSchemaSnafu)?);
-    for df_batch in df_batches.into_iter() {
-        let batch = RecordBatch::try_from_df_record_batch(schema.clone(), df_batch)
-            .context(CreateRecordBatchSnafu)?;
-        batches.push(batch);
-    }
-    let record_batches = RecordBatches::try_new(schema, batches).context(CreateRecordBatchSnafu)?;
-    Ok(Output::RecordBatches(record_batches))
-}
-
 #[cfg(test)]
 mod tests {
     use std::borrow::Cow::Borrowed;
@@ -536,17 +458,12 @@ mod tests {
     use catalog::RegisterTableRequest;
     use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID};
     use common_query::Output;
-    use common_recordbatch::{util, RecordBatch};
+    use common_recordbatch::util;
     use datafusion::prelude::{col, lit};
-    use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder};
-    use datatypes::schema::{ColumnSchema, Schema};
-    use datatypes::types::StringType;
-    use datatypes::vectors::{Helper, StringVectorBuilder, UInt32Vector, UInt64Vector, VectorRef};
+    use datatypes::prelude::ConcreteDataType;
+    use datatypes::schema::ColumnSchema;
+    use datatypes::vectors::{Helper, UInt32Vector, UInt64Vector, VectorRef};
     use session::context::QueryContext;
-    use sql::dialect::GreptimeDbDialect;
-    use sql::parser::{ParseOptions, ParserContext};
-    use sql::statements::show::{ShowKind, ShowTables};
-    use sql::statements::statement::Statement;
     use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
 
     use super::*;
@@ -691,71 +608,4 @@ mod tests {
         );
         assert_eq!("Limit: skip=0, fetch=20\n  Aggregate: groupBy=[[]], aggr=[[SUM(CAST(numbers.number AS UInt64))]]\n    TableScan: numbers projection=[number]", format!("{}", logical_plan.display_indent()));
     }
-
-    #[tokio::test]
-    async fn test_show_tables() {
-        // No filter
-        let column_schemas = vec![ColumnSchema::new(
-            "Tables",
-            ConcreteDataType::String(StringType),
-            false,
-        )];
-        let schema = Arc::new(Schema::new(column_schemas));
-        let mut builder = StringVectorBuilder::with_capacity(3);
-        builder.push(Some("monitor"));
-        builder.push(Some("system_metrics"));
-        let columns = vec![builder.to_vector()];
-        let record_batch = RecordBatch::new(schema, columns).unwrap();
-        let output = execute_show_with_filter(record_batch, None).await.unwrap();
-        let Output::RecordBatches(record_batches) = output else {
-            unreachable!()
-        };
-        let expected = "\
-+----------------+
-| Tables         |
-+----------------+
-| monitor        |
-| system_metrics |
-+----------------+";
-        assert_eq!(record_batches.pretty_print().unwrap(), expected);
-
-        // Filter
-        let column_schemas = vec![ColumnSchema::new(
-            "Tables",
-            ConcreteDataType::String(StringType),
-            false,
-        )];
-        let schema = Arc::new(Schema::new(column_schemas));
-        let mut builder = StringVectorBuilder::with_capacity(3);
-        builder.push(Some("monitor"));
-        builder.push(Some("system_metrics"));
-        let columns = vec![builder.to_vector()];
-        let record_batch = RecordBatch::new(schema, columns).unwrap();
-        let statement = ParserContext::create_with_dialect(
-            "SHOW TABLES WHERE \"Tables\"='monitor'",
-            &GreptimeDbDialect {},
-            ParseOptions::default(),
-        )
-        .unwrap()[0]
-            .clone();
-        let Statement::ShowTables(ShowTables { kind, .. }) = statement else {
-            unreachable!()
-        };
-        let ShowKind::Where(filter) = kind else {
-            unreachable!()
-        };
-        let output = execute_show_with_filter(record_batch, Some(filter))
-            .await
-            .unwrap();
-        let Output::RecordBatches(record_batches) = output else {
-            unreachable!()
-        };
-        let expected = "\
-+---------+
-| Tables  |
-+---------+
-| monitor |
-+---------+";
-        assert_eq!(record_batches.pretty_print().unwrap(), expected);
-    }
 }
diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs
index 46865df5c2ee..67ec4c8a3c10 100644
--- a/src/query/src/datafusion/planner.rs
+++ b/src/query/src/datafusion/planner.rs
@@ -49,12 +49,16 @@ impl DfContextProviderAdapter {
     pub(crate) async fn try_new(
         engine_state: Arc<QueryEngineState>,
         session_state: SessionState,
-        df_stmt: &DfStatement,
+        df_stmt: Option<&DfStatement>,
         query_ctx: QueryContextRef,
     ) -> Result<Self> {
-        let table_names = session_state
-            .resolve_table_references(df_stmt)
-            .context(DataFusionSnafu)?;
+        let table_names = if let Some(df_stmt) = df_stmt {
+            session_state
+                .resolve_table_references(df_stmt)
+                .context(DataFusionSnafu)?
+        } else {
+            vec![]
+        };
 
         let mut table_provider = DfTableSourceProvider::new(
             engine_state.catalog_manager().clone(),
diff --git a/src/query/src/error.rs b/src/query/src/error.rs
index c646a9ae659c..f8fcb13abf09 100644
--- a/src/query/src/error.rs
+++ b/src/query/src/error.rs
@@ -54,24 +54,12 @@ pub enum Error {
     #[snafu(display("Table not found: {}", table))]
     TableNotFound { table: String, location: Location },
 
-    #[snafu(display("Failed to do vector computation"))]
-    VectorComputation {
-        source: datatypes::error::Error,
-        location: Location,
-    },
-
     #[snafu(display("Failed to create RecordBatch"))]
     CreateRecordBatch {
         source: common_recordbatch::error::Error,
         location: Location,
     },
 
-    #[snafu(display("Failed to create Schema"))]
-    CreateSchema {
-        source: datatypes::error::Error,
-        location: Location,
-    },
-
     #[snafu(display("Failure during query execution"))]
     QueryExecution {
         source: BoxedError,
@@ -291,9 +279,7 @@ impl ErrorExt for Error {
 
             QueryAccessDenied { .. } => StatusCode::AccessDenied,
             Catalog { source, .. } => source.status_code(),
-            VectorComputation { source, .. } | ConvertDatafusionSchema { source, .. } => {
-                source.status_code()
-            }
+            ConvertDatafusionSchema { source, .. } => source.status_code(),
             CreateRecordBatch { source, .. } => source.status_code(),
             QueryExecution { source, .. } | QueryPlan { source, .. } => source.status_code(),
             DataFusion { error, .. } => match error {
@@ -306,7 +292,6 @@ impl ErrorExt for Error {
             Sql { source, .. } => source.status_code(),
             PlanSql { .. } => StatusCode::PlanQuery,
             ConvertSqlType { source, .. } | ConvertSqlValue { source, .. } => source.status_code(),
-            CreateSchema { source, .. } => source.status_code(),
 
             RegionQuery { source, .. } => source.status_code(),
             TableMutation { source, .. } => source.status_code(),
diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs
index cf3f47761192..eb3cb255d6f1 100644
--- a/src/query/src/planner.rs
+++ b/src/query/src/planner.rs
@@ -12,18 +12,23 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+use std::any::Any;
 use std::sync::Arc;
 
 use async_trait::async_trait;
 use catalog::table_source::DfTableSourceProvider;
 use common_error::ext::BoxedError;
 use common_telemetry::tracing;
+use datafusion::common::DFSchema;
 use datafusion::execution::context::SessionState;
+use datafusion::sql::planner::PlannerContext;
+use datafusion_expr::Expr as DfExpr;
 use datafusion_sql::planner::{ParserOptions, SqlToRel};
 use promql::planner::PromPlanner;
 use promql_parser::parser::EvalStmt;
 use session::context::QueryContextRef;
 use snafu::ResultExt;
+use sql::ast::Expr as SqlExpr;
 use sql::statements::statement::Statement;
 
 use crate::error::{DataFusionSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu};
@@ -36,6 +41,8 @@ use crate::{DfContextProviderAdapter, QueryEngineContext};
 #[async_trait]
 pub trait LogicalPlanner: Send + Sync {
     async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
+
+    fn as_any(&self) -> &dyn Any;
 }
 
 pub struct DfLogicalPlanner {
@@ -65,7 +72,7 @@ impl DfLogicalPlanner {
         let context_provider = DfContextProviderAdapter::try_new(
             self.engine_state.clone(),
             self.session_state.clone(),
-            &df_stmt,
+            Some(&df_stmt),
             query_ctx.clone(),
         )
         .await?;
@@ -95,6 +102,36 @@ impl DfLogicalPlanner {
         Ok(LogicalPlan::DfPlan(plan))
     }
 
+    /// Generate a relational expression from a SQL expression
+    #[tracing::instrument(skip_all)]
+    pub(crate) async fn sql_to_expr(
+        &self,
+        sql: SqlExpr,
+        schema: &DFSchema,
+        normalize_ident: bool,
+        query_ctx: QueryContextRef,
+    ) -> Result<DfExpr> {
+        let context_provider = DfContextProviderAdapter::try_new(
+            self.engine_state.clone(),
+            self.session_state.clone(),
+            None,
+            query_ctx,
+        )
+        .await?;
+
+        let config_options = self.session_state.config().options();
+        let parser_options = ParserOptions {
+            enable_ident_normalization: normalize_ident,
+            parse_float_as_decimal: config_options.sql_parser.parse_float_as_decimal,
+        };
+
+        let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
+
+        sql_to_rel
+            .sql_to_expr(sql.into(), schema, &mut PlannerContext::new())
+            .context(DataFusionSnafu)
+    }
+
     #[tracing::instrument(skip_all)]
     async fn plan_pql(&self, stmt: EvalStmt, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
         let table_provider = DfTableSourceProvider::new(
@@ -119,4 +156,8 @@ impl LogicalPlanner for DfLogicalPlanner {
             QueryStatement::Promql(stmt) => self.plan_pql(stmt, query_ctx).await,
         }
     }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
 }
diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs
index 34e3f17fdd35..cc4462830eb3 100644
--- a/src/query/src/sql.rs
+++ b/src/query/src/sql.rs
@@ -17,22 +17,28 @@ mod show_create_table;
 use std::collections::HashMap;
 use std::sync::Arc;
 
+use catalog::information_schema::{schemata, tables, SCHEMATA, TABLES};
 use catalog::CatalogManagerRef;
 use common_catalog::consts::{
-    SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY, SEMANTIC_TYPE_TIME_INDEX,
+    INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
+    SEMANTIC_TYPE_TIME_INDEX,
 };
+use common_catalog::format_full_table_name;
 use common_datasource::file_format::{infer_schemas, FileFormat, Format};
 use common_datasource::lister::{Lister, Source};
 use common_datasource::object_store::build_backend;
 use common_datasource::util::find_dir_and_filename;
 use common_query::prelude::GREPTIME_TIMESTAMP;
 use common_query::Output;
-use common_recordbatch::{RecordBatch, RecordBatches};
+use common_recordbatch::adapter::RecordBatchStreamAdapter;
+use common_recordbatch::RecordBatches;
 use common_time::timezone::get_timezone;
 use common_time::Timestamp;
+use datafusion::prelude::SessionContext;
+use datafusion_expr::{col, lit, Expr};
 use datatypes::prelude::*;
 use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema, Schema};
-use datatypes::vectors::{Helper, StringVector};
+use datatypes::vectors::StringVector;
 use object_store::ObjectStore;
 use once_cell::sync::Lazy;
 use regex::Regex;
@@ -44,11 +50,14 @@ use sql::statements::show::{ShowDatabases, ShowKind, ShowTables, ShowVariables};
 use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
 use table::TableRef;
 
-use crate::datafusion::execute_show_with_filter;
+use crate::dataframe::DataFrame;
 use crate::error::{self, Result, UnsupportedVariableSnafu};
+use crate::planner::DfLogicalPlanner;
+use crate::QueryEngineRef;
 
-const SCHEMAS_COLUMN: &str = "Schemas";
+const SCHEMAS_COLUMN: &str = "Database";
 const TABLES_COLUMN: &str = "Tables";
+const TABLE_TYPE_COLUMN: &str = "Table_type";
 const COLUMN_NAME_COLUMN: &str = "Column";
 const COLUMN_TYPE_COLUMN: &str = "Type";
 const COLUMN_KEY_COLUMN: &str = "Key";
@@ -100,49 +109,144 @@ static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
 
 pub async fn show_databases(
     stmt: ShowDatabases,
-    catalog_manager: CatalogManagerRef,
+    query_engine: &QueryEngineRef,
+    catalog_manager: &CatalogManagerRef,
     query_ctx: QueryContextRef,
 ) -> Result<Output> {
-    let mut databases = catalog_manager
-        .schema_names(query_ctx.current_catalog())
+    let projects = vec![(schemata::SCHEMA_NAME, SCHEMAS_COLUMN)];
+
+    let filters = vec![col(schemata::CATALOG_NAME).eq(lit(query_ctx.current_catalog()))];
+    let like_field = Some(schemata::SCHEMA_NAME);
+    let sort = vec![col(schemata::SCHEMA_NAME).sort(true, true)];
+
+    query_from_information_schema_table(
+        query_engine,
+        catalog_manager,
+        query_ctx,
+        SCHEMATA,
+        projects,
+        filters,
+        like_field,
+        sort,
+        stmt.kind,
+    )
+    .await
+}
+
+/// Cast a `show` statement execution into a query from tables in  `information_schema`.
+/// - `table_name`: the table name in `information_schema`,
+/// - `projects`: query projection, a list of `(column, renamed_column)`,
+/// - `filters`: filter expressions for query,
+/// - `like_field`: the field to filter by the predicate `ShowKind::Like`,
+/// - `sort`: sort the results by the specified sorting expressions,
+/// - `kind`: the show kind
+#[allow(clippy::too_many_arguments)]
+async fn query_from_information_schema_table(
+    query_engine: &QueryEngineRef,
+    catalog_manager: &CatalogManagerRef,
+    query_ctx: QueryContextRef,
+    table_name: &str,
+    projects: Vec<(&str, &str)>,
+    filters: Vec<Expr>,
+    like_field: Option<&str>,
+    sort: Vec<Expr>,
+    kind: ShowKind,
+) -> Result<Output> {
+    let table = catalog_manager
+        .table(
+            query_ctx.current_catalog(),
+            INFORMATION_SCHEMA_NAME,
+            table_name,
+        )
         .await
-        .context(error::CatalogSnafu)?;
+        .context(error::CatalogSnafu)?
+        .with_context(|| error::TableNotFoundSnafu {
+            table: format_full_table_name(
+                query_ctx.current_catalog(),
+                INFORMATION_SCHEMA_NAME,
+                table_name,
+            ),
+        })?;
 
-    // TODO(dennis): Specify the order of the results in catalog manager API
-    databases.sort();
+    let DataFrame::DataFusion(dataframe) = query_engine.read_table(table)?;
 
-    let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
-        SCHEMAS_COLUMN,
-        ConcreteDataType::string_datatype(),
-        false,
-    )]));
-    match stmt.kind {
-        ShowKind::All => {
-            let databases = Arc::new(StringVector::from(databases)) as _;
-            let records = RecordBatches::try_from_columns(schema, vec![databases])
-                .context(error::CreateRecordBatchSnafu)?;
-            Ok(Output::RecordBatches(records))
+    // Apply filters
+    let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
+        df.filter(expr).context(error::PlanSqlSnafu)
+    })?;
+
+    // Apply `like` predicate if exists
+    let dataframe = if let (ShowKind::Like(ident), Some(field)) = (&kind, like_field) {
+        dataframe
+            .filter(col(field).like(lit(ident.value.clone())))
+            .context(error::PlanSqlSnafu)?
+    } else {
+        dataframe
+    };
+
+    // Apply sorting
+    let dataframe = dataframe
+        .sort(sort)
+        .context(error::PlanSqlSnafu)?
+        .select_columns(&projects.iter().map(|(c, _)| *c).collect::<Vec<_>>())
+        .context(error::PlanSqlSnafu)?;
+
+    // Apply projection
+    let dataframe = projects
+        .into_iter()
+        .try_fold(dataframe, |df, (column, renamed_column)| {
+            df.with_column_renamed(column, renamed_column)
+                .context(error::PlanSqlSnafu)
+        })?;
+
+    let dataframe = match kind {
+        ShowKind::All | ShowKind::Like(_) => {
+            // Like kind is processed above
+            dataframe
         }
         ShowKind::Where(filter) => {
-            let columns = vec![Arc::new(StringVector::from(databases)) as _];
-            let record_batch =
-                RecordBatch::new(schema, columns).context(error::CreateRecordBatchSnafu)?;
-            let result = execute_show_with_filter(record_batch, Some(filter)).await?;
-            Ok(result)
-        }
-        ShowKind::Like(ident) => {
-            let databases = Helper::like_utf8(databases, &ident.value)
-                .context(error::VectorComputationSnafu)?;
-            let records = RecordBatches::try_from_columns(schema, vec![databases])
-                .context(error::CreateRecordBatchSnafu)?;
-            Ok(Output::RecordBatches(records))
+            // Cast the results into VIEW for `where` clause,
+            // which is evaluated against the column names displayed by the SHOW statement.
+            let view = dataframe.into_view();
+            let dataframe = SessionContext::new_with_state(
+                query_engine
+                    .engine_context(query_ctx.clone())
+                    .state()
+                    .clone(),
+            )
+            .read_table(view)
+            .context(error::DataFusionSnafu)?;
+
+            let planner = query_engine.planner();
+            let planner = planner
+                .as_any()
+                .downcast_ref::<DfLogicalPlanner>()
+                .expect("Must be the datafusion planner");
+
+            let filter = planner
+                .sql_to_expr(filter, dataframe.schema(), false, query_ctx)
+                .await?;
+
+            // Apply the `where` clause filters
+            dataframe.filter(filter).context(error::PlanSqlSnafu)?
         }
-    }
+    };
+
+    let stream = dataframe
+        .execute_stream()
+        .await
+        .context(error::DataFusionSnafu)?;
+
+    Ok(Output::Stream(
+        Box::pin(RecordBatchStreamAdapter::try_new(stream).context(error::CreateRecordBatchSnafu)?),
+        None,
+    ))
 }
 
 pub async fn show_tables(
     stmt: ShowTables,
-    catalog_manager: CatalogManagerRef,
+    query_engine: &QueryEngineRef,
+    catalog_manager: &CatalogManagerRef,
     query_ctx: QueryContextRef,
 ) -> Result<Output> {
     let schema_name = if let Some(database) = stmt.database {
@@ -150,85 +254,36 @@ pub async fn show_tables(
     } else {
         query_ctx.current_schema().to_owned()
     };
-    // TODO(sunng87): move this function into query_ctx
-    let mut tables = catalog_manager
-        .table_names(query_ctx.current_catalog(), &schema_name)
-        .await
-        .context(error::CatalogSnafu)?;
-
-    // TODO(dennis): Specify the order of the results in schema provider API
-    tables.sort();
-
-    let table_types: Option<Arc<dyn Vector>> = {
-        if stmt.full {
-            Some(
-                get_table_types(
-                    &tables,
-                    catalog_manager.clone(),
-                    query_ctx.clone(),
-                    &schema_name,
-                )
-                .await?,
-            )
-        } else {
-            None
-        }
-    };
-
-    let mut column_schema = vec![ColumnSchema::new(
-        TABLES_COLUMN,
-        ConcreteDataType::string_datatype(),
-        false,
-    )];
-    if table_types.is_some() {
-        column_schema.push(ColumnSchema::new(
-            "Table_type",
-            ConcreteDataType::string_datatype(),
-            false,
-        ));
-    }
-
-    let schema = Arc::new(Schema::new(column_schema));
-
-    match stmt.kind {
-        ShowKind::All => {
-            let tables = Arc::new(StringVector::from(tables)) as _;
-            let mut columns = vec![tables];
-            if let Some(table_types) = table_types {
-                columns.push(table_types)
-            }
-
-            let records = RecordBatches::try_from_columns(schema, columns)
-                .context(error::CreateRecordBatchSnafu)?;
-            Ok(Output::RecordBatches(records))
-        }
-        ShowKind::Where(filter) => {
-            let mut columns = vec![Arc::new(StringVector::from(tables)) as _];
-            if let Some(table_types) = table_types {
-                columns.push(table_types)
-            }
-            let record_batch =
-                RecordBatch::new(schema, columns).context(error::CreateRecordBatchSnafu)?;
-            let result = execute_show_with_filter(record_batch, Some(filter)).await?;
-            Ok(result)
-        }
-        ShowKind::Like(ident) => {
-            let (tables, filter) = Helper::like_utf8_filter(tables, &ident.value)
-                .context(error::VectorComputationSnafu)?;
-            let mut columns = vec![tables];
-
-            if let Some(table_types) = table_types {
-                let table_types = table_types
-                    .filter(&filter)
-                    .context(error::VectorComputationSnafu)?;
-                columns.push(table_types)
-            }
 
-            let records = RecordBatches::try_from_columns(schema, columns)
-                .context(error::CreateRecordBatchSnafu)?;
-            Ok(Output::RecordBatches(records))
-        }
-    }
+    // (dennis): MySQL rename `table_name` to `Tables_in_{schema}`, but we use `Tables` instead.
+    // I don't want to modify this currently, our dashboard may depend on it.
+    let projects = if stmt.full {
+        vec![
+            (tables::TABLE_NAME, TABLES_COLUMN),
+            (tables::TABLE_TYPE, TABLE_TYPE_COLUMN),
+        ]
+    } else {
+        vec![(tables::TABLE_NAME, TABLES_COLUMN)]
+    };
+    let filters = vec![
+        col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
+        col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
+    ];
+    let like_field = Some(tables::TABLE_NAME);
+    let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
+
+    query_from_information_schema_table(
+        query_engine,
+        catalog_manager,
+        query_ctx,
+        TABLES,
+        projects,
+        filters,
+        like_field,
+        sort,
+        stmt.kind,
+    )
+    .await
 }
 
 pub fn show_variable(stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
@@ -513,25 +568,6 @@ fn parse_file_table_format(options: &HashMap<String, String>) -> Result<Box<dyn
     )
 }
 
-async fn get_table_types(
-    tables: &[String],
-    catalog_manager: CatalogManagerRef,
-    query_ctx: QueryContextRef,
-    schema_name: &str,
-) -> Result<Arc<dyn Vector>> {
-    let mut table_types = Vec::with_capacity(tables.len());
-    for table_name in tables {
-        if let Some(table) = catalog_manager
-            .table(query_ctx.current_catalog(), schema_name, table_name)
-            .await
-            .context(error::CatalogSnafu)?
-        {
-            table_types.push(table.table_type().to_string());
-        }
-    }
-    Ok(Arc::new(StringVector::from(table_types)) as _)
-}
-
 #[cfg(test)]
 mod test {
     use std::sync::Arc;
diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs
index 8136b182f40f..fae444ba7c0f 100644
--- a/tests-integration/src/tests/instance_test.rs
+++ b/tests-integration/src/tests/instance_test.rs
@@ -440,39 +440,27 @@ async fn test_execute_query(instance: Arc<dyn MockInstance>) {
 async fn test_execute_show_databases_tables(instance: Arc<dyn MockInstance>) {
     let instance = instance.frontend();
 
+    let expected = "\
++--------------------+
+| Database           |
++--------------------+
+| greptime_private   |
+| information_schema |
+| public             |
++--------------------+\
+";
     let output = execute_sql(&instance, "show databases").await;
-    match output {
-        Output::RecordBatches(databases) => {
-            let databases = databases.take();
-            assert_eq!(1, databases[0].num_columns());
-            assert_eq!(databases[0].column(0).len(), 3);
-
-            assert_eq!(
-                *databases[0].column(0),
-                Arc::new(StringVector::from(vec![
-                    Some("greptime_private"),
-                    Some("information_schema"),
-                    Some("public")
-                ])) as VectorRef
-            );
-        }
-        _ => unreachable!(),
-    }
+    check_unordered_output_stream(output, expected).await;
 
     let output = execute_sql(&instance, "show databases like '%bl%'").await;
-    match output {
-        Output::RecordBatches(databases) => {
-            let databases = databases.take();
-            assert_eq!(1, databases[0].num_columns());
-            assert_eq!(databases[0].column(0).len(), 1);
-
-            assert_eq!(
-                *databases[0].column(0),
-                Arc::new(StringVector::from(vec![Some("public")])) as VectorRef
-            );
-        }
-        _ => unreachable!(),
-    }
+    let expected = "\
++----------+
+| Database |
++----------+
+| public   |
++----------+\
+";
+    check_unordered_output_stream(output, expected).await;
 
     let expected = "\
 +---------+
@@ -500,21 +488,41 @@ async fn test_execute_show_databases_tables(instance: Arc<dyn MockInstance>) {
 ";
     check_unordered_output_stream(output, expected).await;
 
+    let output = execute_sql(&instance, "SHOW FULL TABLES WHERE Table_type != 'VIEW'").await;
+    let expected = "\
++---------+-----------------+
+| Tables  | Table_type      |
++---------+-----------------+
+| demo    | BASE TABLE      |
+| numbers | LOCAL TEMPORARY |
++---------+-----------------+\
+";
+    check_unordered_output_stream(output, expected).await;
+
+    let output = execute_sql(
+        &instance,
+        "SHOW FULL TABLES WHERE Table_type = 'BASE TABLE'",
+    )
+    .await;
+    let expected = "\
++--------+------------+
+| Tables | Table_type |
++--------+------------+
+| demo   | BASE TABLE |
++--------+------------+\
+";
+    check_unordered_output_stream(output, expected).await;
+
     // show tables like [string]
     let output = execute_sql(&instance, "show tables like 'de%'").await;
-    match output {
-        Output::RecordBatches(databases) => {
-            let databases = databases.take();
-            assert_eq!(1, databases[0].num_columns());
-            assert_eq!(databases[0].column(0).len(), 1);
-
-            assert_eq!(
-                *databases[0].column(0),
-                Arc::new(StringVector::from(vec![Some("demo")])) as VectorRef
-            );
-        }
-        _ => unreachable!(),
-    }
+    let expected = "\
++--------+
+| Tables |
++--------+
+| demo   |
++--------+\
+";
+    check_unordered_output_stream(output, expected).await;
 }
 
 #[apply(both_instances_cases)]
diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs
index c58284494617..b6d47a3249a0 100644
--- a/tests-integration/src/tests/test_util.rs
+++ b/tests-integration/src/tests/test_util.rs
@@ -336,7 +336,12 @@ pub(crate) async fn check_unordered_output_stream(output: Output, expected: &str
     };
     let pretty_print = sort_table(&recordbatches.pretty_print().unwrap());
     let expected = sort_table(expected);
-    assert_eq!(pretty_print, expected);
+    assert_eq!(
+        pretty_print,
+        expected,
+        "\n{}",
+        recordbatches.pretty_print().unwrap()
+    );
 }
 
 pub fn prepare_path(p: &str) -> String {
diff --git a/tests/cases/standalone/common/catalog/schema.result b/tests/cases/standalone/common/catalog/schema.result
index 17e4a68536b4..8a385b7e171f 100644
--- a/tests/cases/standalone/common/catalog/schema.result
+++ b/tests/cases/standalone/common/catalog/schema.result
@@ -13,16 +13,16 @@ Affected Rows: 1
 SHOW DATABASES LIKE '%public%';
 
 +--------------------+
-| Schemas            |
+| Database           |
 +--------------------+
 | public             |
 | test_public_schema |
 +--------------------+
 
-SHOW DATABASES WHERE Schemas='test_public_schema';
+SHOW DATABASES WHERE Database = 'test_public_schema';
 
 +--------------------+
-| Schemas            |
+| Database           |
 +--------------------+
 | test_public_schema |
 +--------------------+
@@ -81,6 +81,14 @@ SHOW TABLES;
 | hello  |
 +--------+
 
+SHOW FULL TABLES WHERE Table_type != 'VIEW';
+
++--------+------------+
+| Tables | Table_type |
++--------+------------+
+| hello  | BASE TABLE |
++--------+------------+
+
 DROP TABLE hello;
 
 Affected Rows: 0
@@ -91,10 +99,8 @@ Error: 4001(TableNotFound), Table not found: greptime.test_public_schema.hello
 
 SHOW TABLES FROM test_public_schema;
 
-+--------+
-| Tables |
-+--------+
-+--------+
+++
+++
 
 SHOW TABLES FROM public;
 
@@ -104,7 +110,7 @@ SHOW TABLES FROM public;
 | numbers |
 +---------+
 
-SHOW TABLES FROM public WHERE Tables='numbers';
+SHOW TABLES FROM public WHERE Tables = 'numbers';
 
 +---------+
 | Tables  |
diff --git a/tests/cases/standalone/common/catalog/schema.sql b/tests/cases/standalone/common/catalog/schema.sql
index f829ad3317d5..240c495ea41a 100644
--- a/tests/cases/standalone/common/catalog/schema.sql
+++ b/tests/cases/standalone/common/catalog/schema.sql
@@ -6,7 +6,7 @@ CREATE SCHEMA IF NOT EXISTS test_public_schema;
 
 SHOW DATABASES LIKE '%public%';
 
-SHOW DATABASES WHERE Schemas='test_public_schema';
+SHOW DATABASES WHERE Database = 'test_public_schema';
 
 USE test_public_schema;
 
@@ -26,6 +26,8 @@ SELECT * FROM hello;
 
 SHOW TABLES;
 
+SHOW FULL TABLES WHERE Table_type != 'VIEW';
+
 DROP TABLE hello;
 
 DROP TABLE hello;
@@ -34,7 +36,7 @@ SHOW TABLES FROM test_public_schema;
 
 SHOW TABLES FROM public;
 
-SHOW TABLES FROM public WHERE Tables='numbers';
+SHOW TABLES FROM public WHERE Tables = 'numbers';
 
 DROP SCHEMA test_public_schema;
 
diff --git a/tests/cases/standalone/common/create/create_database.result b/tests/cases/standalone/common/create/create_database.result
index a843ece56a7b..780aca34c32a 100644
--- a/tests/cases/standalone/common/create/create_database.result
+++ b/tests/cases/standalone/common/create/create_database.result
@@ -13,7 +13,7 @@ Error: 1002(Unexpected), Unexpected, violated: Invalid database name: ㊙️data
 show databases;
 
 +--------------------+
-| Schemas            |
+| Database           |
 +--------------------+
 | greptime_private   |
 | illegal-database   |
diff --git a/tests/cases/standalone/common/show/show_databases_tables.result b/tests/cases/standalone/common/show/show_databases_tables.result
index f4cee31855dc..0417ffb85d26 100644
--- a/tests/cases/standalone/common/show/show_databases_tables.result
+++ b/tests/cases/standalone/common/show/show_databases_tables.result
@@ -1,7 +1,7 @@
 show databases;
 
 +-----------------------+
-| Schemas               |
+| Database              |
 +-----------------------+
 | greptime_private      |
 | illegal-database      |