Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl parser and operator for CREATE FLOW #3806

Merged
merged 15 commits into from
Apr 28, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ common-recordbatch.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
datafusion-common.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
etcd-client.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,10 @@ pub fn check_permission(
Statement::CreateExternalTable(stmt) => {
validate_param(&stmt.name, query_ctx)?;
}
Statement::CreateFlow(stmt) => {
// TODO: should also validate source table name here?
validate_param(&stmt.sink_table_name, query_ctx)?;
}
Statement::Alter(stmt) => {
validate_param(stmt.table_name(), query_ctx)?;
}
Expand Down
11 changes: 10 additions & 1 deletion src/operator/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ pub enum Error {
source: sql::error::Error,
},

#[snafu(display("Failed to convert identifier: {}", ident))]
ConvertIdentifier {
ident: String,
location: Location,
#[snafu(source)]
error: datafusion::error::DataFusionError,
},

#[snafu(display("Failed to convert value to sql value: {}", value))]
ConvertSqlValue {
value: Value,
Expand Down Expand Up @@ -568,7 +576,8 @@ impl ErrorExt for Error {
| Error::InferFileTableSchema { .. }
| Error::SchemaIncompatible { .. }
| Error::UnsupportedRegionRequest { .. }
| Error::InvalidTableName { .. } => StatusCode::InvalidArguments,
| Error::InvalidTableName { .. }
| Error::ConvertIdentifier { .. } => StatusCode::InvalidArguments,

Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,

Expand Down
79 changes: 74 additions & 5 deletions src/operator/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ use api::helper::ColumnDataTypeWrapper;
use api::v1::alter_expr::Kind;
use api::v1::{
AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDataTypeExtension,
CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType,
CreateFlowTaskExpr, CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType,
TableName,
};
use common_error::ext::BoxedError;
use common_grpc_expr::util::ColumnExpr;
use common_time::Timezone;
use datafusion::sql::planner::object_name_to_table_reference;
use datatypes::schema::{ColumnSchema, COMMENT_KEY};
use file_engine::FileOptions;
use query::sql::{
Expand All @@ -34,16 +36,17 @@ use session::table_name::table_idents_to_full_name;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::{ColumnDef, ColumnOption, TableConstraint};
use sql::statements::alter::{AlterTable, AlterTableOperation};
use sql::statements::create::{CreateExternalTable, CreateTable, TIME_INDEX};
use sql::statements::create::{CreateExternalTable, CreateFlow, CreateTable, TIME_INDEX};
use sql::statements::{column_def_to_schema, sql_column_def_to_grpc_column_def};
use sql::util::extract_tables_from_query;
use table::requests::{TableOptions, FILE_TABLE_META_KEY};
use table::table_reference::TableReference;

use crate::error::{
BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
EncodeJsonSnafu, ExternalSnafu, IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu,
InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, PrepareFileTableSnafu, Result,
SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, IllegalPrimaryKeysDefSnafu,
InferFileTableSchemaSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu,
PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
};

#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -487,6 +490,72 @@ pub(crate) fn to_alter_expr(
})
}

pub fn to_create_flow_task_expr(
create_task: CreateFlow,
waynexia marked this conversation as resolved.
Show resolved Hide resolved
query_ctx: QueryContextRef,
) -> Result<CreateFlowTaskExpr> {
// retrieve sink table name
let sink_table_ref =
object_name_to_table_reference(create_task.sink_table_name.clone().into(), true)
.with_context(|_| ConvertIdentifierSnafu {
ident: create_task.sink_table_name.to_string(),
})?;
let catalog = sink_table_ref
.catalog()
.unwrap_or(query_ctx.current_catalog())
.to_string();
let schema = sink_table_ref
.schema()
.unwrap_or(query_ctx.current_schema())
.to_string();
let sink_table_name = TableName {
catalog_name: catalog,
schema_name: schema,
table_name: sink_table_ref.table().to_string(),
};

let source_table_names = extract_tables_from_query(&create_task.query)
.map(|name| {
let reference = object_name_to_table_reference(name.clone().into(), true)
.with_context(|_| ConvertIdentifierSnafu {
ident: name.to_string(),
})?;
let catalog = reference
.catalog()
.unwrap_or(query_ctx.current_catalog())
.to_string();
let schema = reference
.schema()
.unwrap_or(query_ctx.current_schema())
.to_string();
let table_name = TableName {
catalog_name: catalog,
schema_name: schema,
table_name: reference.table().to_string(),
};
Ok(table_name)
})
.collect::<Result<Vec<_>>>()?;

Ok(CreateFlowTaskExpr {
catalog_name: query_ctx.current_catalog().to_string(),
task_name: create_task.task_name.to_string(),
source_table_names,
sink_table_name: Some(sink_table_name),
create_if_not_exists: create_task.if_not_exists,
or_replace: create_task.or_replace,
// TODO: change this field to optional in proto
waynexia marked this conversation as resolved.
Show resolved Hide resolved
expire_when: create_task
.expire_when
.map(|e| e.to_string())
.unwrap_or_default(),
// TODO: change this field to optional in proto
waynexia marked this conversation as resolved.
Show resolved Hide resolved
comment: create_task.comment.unwrap_or_default(),
sql: create_task.query.to_string(),
task_options: HashMap::new(),
})
}

#[cfg(test)]
mod tests {
use datatypes::value::Value;
Expand Down
4 changes: 4 additions & 0 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ impl StatementExecutor {
let _ = self.create_external_table(stmt, query_ctx).await?;
Ok(Output::new_with_affected_rows(0))
}
Statement::CreateFlow(stmt) => {
self.create_flow_task(stmt, query_ctx).await?;
Ok(Output::new_with_affected_rows(0))
}
Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await,
Statement::DropTable(stmt) => {
let (catalog, schema, table) =
Expand Down
19 changes: 18 additions & 1 deletion src/operator/src/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
use snafu::{ensure, IntoError, OptionExt, ResultExt};
use sql::statements::alter::AlterTable;
use sql::statements::create::{CreateExternalTable, CreateTable, CreateTableLike, Partitions};
use sql::statements::create::{
CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, Partitions,
};
use sql::statements::sql_value_to_value;
use sqlparser::ast::{Expr, Ident, Value as ParserValue};
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
Expand Down Expand Up @@ -320,6 +322,21 @@ impl StatementExecutor {
.collect())
}

#[tracing::instrument(skip_all)]
pub async fn create_flow_task(
waynexia marked this conversation as resolved.
Show resolved Hide resolved
&self,
stmt: CreateFlow,
query_ctx: QueryContextRef,
) -> Result<()> {
// TODO: do some verification
waynexia marked this conversation as resolved.
Show resolved Hide resolved

let _expr = expr_factory::to_create_flow_task_expr(stmt, query_ctx)?;

// TODO: invoke procedure

Ok(())
}

#[tracing::instrument(skip_all)]
pub async fn alter_logical_tables(&self, alter_table_exprs: Vec<AlterExpr>) -> Result<Output> {
let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
Expand Down
Loading
Loading