Skip to content

Commit

Permalink
feat: impl parser and operator for CREATE FLOW (#3806)
Browse files Browse the repository at this point in the history
* feat: impl parser for CREATE TASK

Signed-off-by: Ruihang Xia <[email protected]>

* finish parser

Signed-off-by: Ruihang Xia <[email protected]>

* wip expr

Signed-off-by: Ruihang Xia <[email protected]>

* finish expr

Signed-off-by: Ruihang Xia <[email protected]>

* rename output to sink

Signed-off-by: Ruihang Xia <[email protected]>

* fix parser

Signed-off-by: Ruihang Xia <[email protected]>

* remove debug code

Signed-off-by: Ruihang Xia <[email protected]>

* upload lock file

Signed-off-by: Ruihang Xia <[email protected]>

* rename symbol

Signed-off-by: Ruihang Xia <[email protected]>

* Apply suggestions from code review

Co-authored-by: Jeremyhi <[email protected]>

* remove other task word

Signed-off-by: Ruihang Xia <[email protected]>

* task name to flow name

Signed-off-by: Ruihang Xia <[email protected]>

* one more comment

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Jeremyhi <[email protected]>
  • Loading branch information
waynexia and fengjiachun authored Apr 28, 2024
1 parent 1bbde15 commit 7ef18c0
Show file tree
Hide file tree
Showing 13 changed files with 387 additions and 17 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/api/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
Some(Expr::Alter(_)) => "ddl.alter",
Some(Expr::DropTable(_)) => "ddl.drop_table",
Some(Expr::TruncateTable(_)) => "ddl.truncate_table",
Some(Expr::CreateFlowTask(_)) => "ddl.create_flow_task",
Some(Expr::CreateFlowTask(_)) => "ddl.create_flow",
Some(Expr::DropFlowTask(_)) => "ddl.drop_flow_task",
None => "ddl.empty",
}
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ common-recordbatch.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
datafusion-common.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
etcd-client.workspace = true
Expand Down
6 changes: 3 additions & 3 deletions src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
/// Create flow task
pub struct CreateFlowTask {
pub catalog_name: String,
pub task_name: String,
pub flow_name: String,
pub source_table_names: Vec<TableName>,
pub sink_table_name: TableName,
pub or_replace: bool,
Expand Down Expand Up @@ -758,7 +758,7 @@ impl TryFrom<PbCreateFlowTask> for CreateFlowTask {

Ok(CreateFlowTask {
catalog_name,
task_name,
flow_name: task_name,
source_table_names: source_table_names.into_iter().map(Into::into).collect(),
sink_table_name: sink_table_name
.context(error::InvalidProtoMsgSnafu {
Expand All @@ -779,7 +779,7 @@ impl From<CreateFlowTask> for PbCreateFlowTask {
fn from(
CreateFlowTask {
catalog_name,
task_name,
flow_name: task_name,
source_table_names,
sink_table_name,
or_replace,
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,10 @@ pub fn check_permission(
Statement::CreateExternalTable(stmt) => {
validate_param(&stmt.name, query_ctx)?;
}
Statement::CreateFlow(stmt) => {
// TODO: should also validate source table name here?
validate_param(&stmt.sink_table_name, query_ctx)?;
}
Statement::Alter(stmt) => {
validate_param(stmt.table_name(), query_ctx)?;
}
Expand Down
11 changes: 10 additions & 1 deletion src/operator/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ pub enum Error {
source: sql::error::Error,
},

#[snafu(display("Failed to convert identifier: {}", ident))]
ConvertIdentifier {
ident: String,
location: Location,
#[snafu(source)]
error: datafusion::error::DataFusionError,
},

#[snafu(display("Failed to convert value to sql value: {}", value))]
ConvertSqlValue {
value: Value,
Expand Down Expand Up @@ -568,7 +576,8 @@ impl ErrorExt for Error {
| Error::InferFileTableSchema { .. }
| Error::SchemaIncompatible { .. }
| Error::UnsupportedRegionRequest { .. }
| Error::InvalidTableName { .. } => StatusCode::InvalidArguments,
| Error::InvalidTableName { .. }
| Error::ConvertIdentifier { .. } => StatusCode::InvalidArguments,

Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,

Expand Down
79 changes: 74 additions & 5 deletions src/operator/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ use api::helper::ColumnDataTypeWrapper;
use api::v1::alter_expr::Kind;
use api::v1::{
AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDataTypeExtension,
CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType,
CreateFlowTaskExpr, CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType,
TableName,
};
use common_error::ext::BoxedError;
use common_grpc_expr::util::ColumnExpr;
use common_time::Timezone;
use datafusion::sql::planner::object_name_to_table_reference;
use datatypes::schema::{ColumnSchema, COMMENT_KEY};
use file_engine::FileOptions;
use query::sql::{
Expand All @@ -34,16 +36,17 @@ use session::table_name::table_idents_to_full_name;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::{ColumnDef, ColumnOption, TableConstraint};
use sql::statements::alter::{AlterTable, AlterTableOperation};
use sql::statements::create::{CreateExternalTable, CreateTable, TIME_INDEX};
use sql::statements::create::{CreateExternalTable, CreateFlow, CreateTable, TIME_INDEX};
use sql::statements::{column_def_to_schema, sql_column_def_to_grpc_column_def};
use sql::util::extract_tables_from_query;
use table::requests::{TableOptions, FILE_TABLE_META_KEY};
use table::table_reference::TableReference;

use crate::error::{
BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
EncodeJsonSnafu, ExternalSnafu, IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu,
InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, PrepareFileTableSnafu, Result,
SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, IllegalPrimaryKeysDefSnafu,
InferFileTableSchemaSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu,
PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
};

#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -487,6 +490,72 @@ pub(crate) fn to_alter_expr(
})
}

pub fn to_create_flow_task_expr(
create_flow: CreateFlow,
query_ctx: QueryContextRef,
) -> Result<CreateFlowTaskExpr> {
// retrieve sink table name
let sink_table_ref =
object_name_to_table_reference(create_flow.sink_table_name.clone().into(), true)
.with_context(|_| ConvertIdentifierSnafu {
ident: create_flow.sink_table_name.to_string(),
})?;
let catalog = sink_table_ref
.catalog()
.unwrap_or(query_ctx.current_catalog())
.to_string();
let schema = sink_table_ref
.schema()
.unwrap_or(query_ctx.current_schema())
.to_string();
let sink_table_name = TableName {
catalog_name: catalog,
schema_name: schema,
table_name: sink_table_ref.table().to_string(),
};

let source_table_names = extract_tables_from_query(&create_flow.query)
.map(|name| {
let reference = object_name_to_table_reference(name.clone().into(), true)
.with_context(|_| ConvertIdentifierSnafu {
ident: name.to_string(),
})?;
let catalog = reference
.catalog()
.unwrap_or(query_ctx.current_catalog())
.to_string();
let schema = reference
.schema()
.unwrap_or(query_ctx.current_schema())
.to_string();
let table_name = TableName {
catalog_name: catalog,
schema_name: schema,
table_name: reference.table().to_string(),
};
Ok(table_name)
})
.collect::<Result<Vec<_>>>()?;

Ok(CreateFlowTaskExpr {
catalog_name: query_ctx.current_catalog().to_string(),
task_name: create_flow.flow_name.to_string(),
source_table_names,
sink_table_name: Some(sink_table_name),
create_if_not_exists: create_flow.if_not_exists,
or_replace: create_flow.or_replace,
// TODO(ruihang): change this field to optional in proto
expire_when: create_flow
.expire_when
.map(|e| e.to_string())
.unwrap_or_default(),
// TODO(ruihang): change this field to optional in proto
comment: create_flow.comment.unwrap_or_default(),
sql: create_flow.query.to_string(),
task_options: HashMap::new(),
})
}

#[cfg(test)]
mod tests {
use datatypes::value::Value;
Expand Down
4 changes: 4 additions & 0 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ impl StatementExecutor {
let _ = self.create_external_table(stmt, query_ctx).await?;
Ok(Output::new_with_affected_rows(0))
}
Statement::CreateFlow(stmt) => {
self.create_flow(stmt, query_ctx).await?;
Ok(Output::new_with_affected_rows(0))
}
Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await,
Statement::DropTable(stmt) => {
let (catalog, schema, table) =
Expand Down
15 changes: 14 additions & 1 deletion src/operator/src/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
use snafu::{ensure, IntoError, OptionExt, ResultExt};
use sql::statements::alter::AlterTable;
use sql::statements::create::{CreateExternalTable, CreateTable, CreateTableLike, Partitions};
use sql::statements::create::{
CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, Partitions,
};
use sql::statements::sql_value_to_value;
use sqlparser::ast::{Expr, Ident, Value as ParserValue};
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
Expand Down Expand Up @@ -320,6 +322,17 @@ impl StatementExecutor {
.collect())
}

#[tracing::instrument(skip_all)]
pub async fn create_flow(&self, stmt: CreateFlow, query_ctx: QueryContextRef) -> Result<()> {
// TODO(ruihang): do some verification

let _expr = expr_factory::to_create_flow_task_expr(stmt, query_ctx)?;

// TODO: invoke procedure

Ok(())
}

#[tracing::instrument(skip_all)]
pub async fn alter_logical_tables(&self, alter_table_exprs: Vec<AlterExpr>) -> Result<Output> {
let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
Expand Down
Loading

0 comments on commit 7ef18c0

Please sign in to comment.