Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dbeaver mysql compatibility, use statement and information_schema.tables #4218

Merged
merged 12 commits into from
Jul 3, 2024
27 changes: 26 additions & 1 deletion src/catalog/src/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatc
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder};
use datatypes::vectors::{Int64VectorBuilder, StringVectorBuilder, UInt32VectorBuilder};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
Expand All @@ -43,6 +43,10 @@ pub const TABLE_CATALOG: &str = "table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";
pub const TABLE_NAME: &str = "table_name";
pub const TABLE_TYPE: &str = "table_type";
pub const DATA_LENGTH: &str = "data_length";
pub const INDEX_LENGTH: &str = "index_length";
pub const MAX_DATA_LENGTH: &str = "max_data_length";
pub const AVG_ROW_LENGTH: &str = "avg_row_length";
const TABLE_ID: &str = "table_id";
const ENGINE: &str = "engine";
const INIT_CAPACITY: usize = 42;
Expand All @@ -69,6 +73,10 @@ impl InformationSchemaTables {
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new(DATA_LENGTH, ConcreteDataType::int64_datatype(), true),
ColumnSchema::new(MAX_DATA_LENGTH, ConcreteDataType::int64_datatype(), true),
ColumnSchema::new(INDEX_LENGTH, ConcreteDataType::int64_datatype(), true),
ColumnSchema::new(AVG_ROW_LENGTH, ConcreteDataType::int64_datatype(), true),
ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
]))
}
Expand Down Expand Up @@ -131,6 +139,10 @@ struct InformationSchemaTablesBuilder {
table_names: StringVectorBuilder,
table_types: StringVectorBuilder,
table_ids: UInt32VectorBuilder,
data_length: Int64VectorBuilder,
max_data_length: Int64VectorBuilder,
index_length: Int64VectorBuilder,
avg_row_length: Int64VectorBuilder,
engines: StringVectorBuilder,
}

Expand All @@ -149,6 +161,10 @@ impl InformationSchemaTablesBuilder {
table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
data_length: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
max_data_length: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
index_length: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
avg_row_length: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}
Expand Down Expand Up @@ -215,6 +231,11 @@ impl InformationSchemaTablesBuilder {
self.table_names.push(Some(table_name));
self.table_types.push(Some(table_type));
self.table_ids.push(table_id);
// TODO(sunng87): use real data for these fields
self.data_length.push(Some(0));
self.max_data_length.push(Some(0));
self.index_length.push(Some(0));
self.avg_row_length.push(Some(0));
self.engines.push(engine);
}

Expand All @@ -225,6 +246,10 @@ impl InformationSchemaTablesBuilder {
Arc::new(self.table_names.finish()),
Arc::new(self.table_types.finish()),
Arc::new(self.table_ids.finish()),
Arc::new(self.data_length.finish()),
Arc::new(self.max_data_length.finish()),
Arc::new(self.index_length.finish()),
Arc::new(self.avg_row_length.finish()),
Arc::new(self.engines.finish()),
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl DfTableSourceProvider {
disallow_cross_catalog_query,
resolved_tables: HashMap::new(),
default_catalog: query_ctx.current_catalog().to_owned(),
default_schema: query_ctx.current_schema().to_owned(),
default_schema: query_ctx.current_schema(),
plan_decoder,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/function/src/system/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Function for DatabaseFunction {
fn eval(&self, func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
let db = func_ctx.query_ctx.current_schema();

Ok(Arc::new(StringVector::from_slice(&[db])) as _)
Ok(Arc::new(StringVector::from_slice(&[&db])) as _)
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,8 @@ pub fn check_permission(
Statement::CreateDatabase(_)
| Statement::ShowDatabases(_)
| Statement::DropDatabase(_)
| Statement::DropFlow(_) => {}
| Statement::DropFlow(_)
| Statement::Use(_) => {}
Statement::ShowCreateTable(stmt) => {
validate_param(&stmt.table_name, query_ctx)?;
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl Instance {
let table_name = prom_store::table_name(query)?;

let output = self
.handle_remote_query(&ctx, catalog_name, schema_name, &table_name, query)
.handle_remote_query(&ctx, catalog_name, &schema_name, &table_name, query)
.await
.map_err(BoxedError::new)
.context(error::ExecuteQuerySnafu)?;
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ mod python {
.script_manager
.insert_and_compile(
query_ctx.current_catalog(),
query_ctx.current_schema(),
&query_ctx.current_schema(),
name,
script,
)
Expand Down Expand Up @@ -266,7 +266,7 @@ mod python {
self.script_manager
.execute(
query_ctx.current_catalog(),
query_ctx.current_schema(),
&query_ctx.current_schema(),
name,
params,
)
Expand Down
2 changes: 1 addition & 1 deletion src/operator/src/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl Deleter {
for req in &mut requests.deletes {
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let table = self.get_table(catalog, schema, &req.table_name).await?;
let table = self.get_table(catalog, &schema, &req.table_name).await?;
let key_column_names = self.key_column_names(&table)?;

let rows = req.rows.as_mut().unwrap();
Expand Down
10 changes: 6 additions & 4 deletions src/operator/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,9 @@ pub fn to_create_flow_task_expr(
.to_string();
let schema = sink_table_ref
.schema()
.unwrap_or(query_ctx.current_schema())
.to_string();
.map(|s| s.to_owned())
.unwrap_or(query_ctx.current_schema());

let sink_table_name = TableName {
catalog_name: catalog,
schema_name: schema,
Expand All @@ -571,8 +572,9 @@ pub fn to_create_flow_task_expr(
.to_string();
let schema = reference
.schema()
.unwrap_or(query_ctx.current_schema())
.to_string();
.map(|s| s.to_owned())
sunng87 marked this conversation as resolved.
Show resolved Hide resolved
.unwrap_or(query_ctx.current_schema());

let table_name = TableName {
catalog_name: catalog,
schema_name: schema,
Expand Down
18 changes: 9 additions & 9 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ impl Inserter {
for req in &requests.inserts {
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let table = self.get_table(catalog, schema, &req.table_name).await?;
let table = self.get_table(catalog, &schema, &req.table_name).await?;
match table {
Some(table) => {
let table_info = table.table_info();
Expand Down Expand Up @@ -525,14 +525,14 @@ impl Inserter {

// check if exist
if self
.get_table(catalog_name, schema_name, &physical_table)
.get_table(catalog_name, &schema_name, &physical_table)
.await?
.is_some()
{
return Ok(());
}

let table_reference = TableReference::full(catalog_name, schema_name, &physical_table);
let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
info!("Physical metric table `{table_reference}` does not exist, try creating table");

// schema with timestamp and field column
Expand Down Expand Up @@ -621,8 +621,8 @@ impl Inserter {
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<TableRef> {
let table_ref =
TableReference::full(ctx.current_catalog(), ctx.current_schema(), &req.table_name);
let schema = ctx.current_schema();
let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);

let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?;
Expand Down Expand Up @@ -652,8 +652,8 @@ impl Inserter {
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<TableRef> {
let table_ref =
TableReference::full(ctx.current_catalog(), ctx.current_schema(), &req.table_name);
let schema = ctx.current_schema();
let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
// SAFETY: `req.rows` is guaranteed to be `Some` by `handle_log_inserts`.
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?;
Expand Down Expand Up @@ -692,7 +692,7 @@ impl Inserter {
let create_table_exprs = create_tables
.iter()
.map(|req| {
let table_ref = TableReference::full(catalog_name, schema_name, &req.table_name);
let table_ref = TableReference::full(catalog_name, &schema_name, &req.table_name);
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let mut create_table_expr = build_create_table_expr(&table_ref, request_schema)?;

Expand All @@ -707,7 +707,7 @@ impl Inserter {
.collect::<Result<Vec<_>>>()?;

let res = statement_executor
.create_logical_tables(catalog_name, schema_name, &create_table_exprs, ctx.clone())
.create_logical_tables(catalog_name, &schema_name, &create_table_exprs, ctx.clone())
.await;

match res {
Expand Down
4 changes: 2 additions & 2 deletions src/operator/src/req_convert/delete/row_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ impl<'a> RowToRegion<'a> {
let catalog_name = self.ctx.current_catalog();
let schema_name = self.ctx.current_schema();
self.catalog_manager
.table(catalog_name, schema_name, table_name)
.table(catalog_name, &schema_name, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: common_catalog::format_full_table_name(
catalog_name,
schema_name,
&schema_name,
table_name,
),
})
Expand Down
2 changes: 1 addition & 1 deletion src/operator/src/req_convert/insert/stmt_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl<'a> StatementToRegion<'a> {
match &obj_name.0[..] {
[table] => Ok((
self.ctx.current_catalog().to_owned(),
self.ctx.current_schema().to_owned(),
self.ctx.current_schema(),
table.value.clone(),
)),
[schema, table] => Ok((
Expand Down
21 changes: 19 additions & 2 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod tql;
use std::sync::Arc;

use catalog::CatalogManagerRef;
use client::RecordBatches;
use common_error::ext::BoxedError;
use common_meta::cache::TableRouteCacheRef;
use common_meta::cache_invalidator::CacheInvalidatorRef;
Expand All @@ -42,7 +43,7 @@ use query::plan::LogicalPlan;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::copy::{CopyDatabase, CopyDatabaseArgument, CopyTable, CopyTableArgument};
use sql::statements::statement::Statement;
use sql::statements::OptionMap;
Expand All @@ -56,7 +57,7 @@ use table::TableRef;
use self::set::{set_bytea_output, set_datestyle, set_timezone, validate_client_encoding};
use crate::error::{
self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
PlanStatementSnafu, Result, TableNotFoundSnafu,
PlanStatementSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu,
};
use crate::insert::InserterRef;
use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
Expand Down Expand Up @@ -307,9 +308,25 @@ impl StatementExecutor {
}
Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
Statement::ShowStatus(_) => self.show_status(query_ctx).await,
Statement::Use(db) => self.use_database(db, query_ctx).await,
}
}

pub async fn use_database(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
let catalog = query_ctx.current_catalog();
ensure!(
self.catalog_manager
.schema_exists(catalog, db.as_ref())
.await
.context(CatalogSnafu)?,
SchemaNotFoundSnafu { schema_info: &db }
);

query_ctx.set_current_schema(&db);

Ok(Output::new_with_record_batches(RecordBatches::empty()))
}

pub async fn plan(
&self,
stmt: QueryStatement,
Expand Down
6 changes: 4 additions & 2 deletions src/pipeline/src/manager/pipeline_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,10 @@ impl PipelineOperator {
content_type: &str,
pipeline: &str,
) -> Result<PipelineInfo> {
let schema = ctx.current_schema();
self.get_pipeline_table_from_cache(ctx.current_catalog())
.context(PipelineTableNotFoundSnafu)?
.insert_and_compile(ctx.current_schema(), name, content_type, pipeline)
.insert_and_compile(&schema, name, content_type, pipeline)
.await
}
}
Expand Down Expand Up @@ -185,11 +186,12 @@ impl PipelineOperator {
name: &str,
version: PipelineVersion,
) -> Result<Arc<Pipeline<GreptimeTransformer>>> {
let schema = query_ctx.current_schema();
self.create_pipeline_table_if_not_exists(query_ctx.clone())
.await?;
self.get_pipeline_table_from_cache(query_ctx.current_catalog())
.context(PipelineTableNotFoundSnafu)?
.get_pipeline(query_ctx.current_schema(), name, version)
.get_pipeline(&schema, name, version)
.await
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/src/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl DatafusionQueryEngine {
.start_timer();

let default_catalog = &query_ctx.current_catalog().to_owned();
let default_schema = &query_ctx.current_schema().to_owned();
let default_schema = &query_ctx.current_schema();
let table_name = dml.table_name.resolve(default_catalog, default_schema);
let table = self.find_table(&table_name).await?;

Expand Down
6 changes: 3 additions & 3 deletions src/query/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ pub async fn show_columns(
let schema_name = if let Some(database) = stmt.database {
database
} else {
query_ctx.current_schema().to_owned()
query_ctx.current_schema()
};

let projects = if stmt.full {
Expand Down Expand Up @@ -372,7 +372,7 @@ pub async fn show_index(
let schema_name = if let Some(database) = stmt.database {
database
} else {
query_ctx.current_schema().to_owned()
query_ctx.current_schema()
};

let select = vec![
Expand Down Expand Up @@ -454,7 +454,7 @@ pub async fn show_tables(
let schema_name = if let Some(database) = stmt.database {
database
} else {
query_ctx.current_schema().to_owned()
query_ctx.current_schema()
};

// (dennis): MySQL rename `table_name` to `Tables_in_{schema}`, but we use `Tables` instead.
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/grpc/greptime_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub(crate) async fn auth(
Identity::UserId(&username, None),
Password::PlainText(password.into()),
query_ctx.current_catalog(),
query_ctx.current_schema(),
&query_ctx.current_schema(),
)
.await
.context(AuthSnafu),
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ async fn validate_schema(
query_ctx: QueryContextRef,
) -> Option<(StatusCode, String)> {
match sql_handler
.is_valid_schema(query_ctx.current_catalog(), query_ctx.current_schema())
.is_valid_schema(query_ctx.current_catalog(), &query_ctx.current_schema())
.await
{
Ok(true) => None,
Expand Down
Loading