Skip to content

Commit

Permalink
feat: Implement SHOW CREATE FLOW
Browse files Browse the repository at this point in the history
  • Loading branch information
irenjj committed May 25, 2024
1 parent 3477fde commit d7c53d4
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/common/meta/src/key/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) mod flow_info;
pub mod flow_info;
pub(crate) mod flow_name;
pub(crate) mod flownode_flow;
pub(crate) mod table_flow;
Expand Down
20 changes: 20 additions & 0 deletions src/common/meta/src/key/flow/flow_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,26 @@ impl FlowInfoValue {
pub fn source_table_ids(&self) -> &[TableId] {
&self.source_table_ids
}

pub fn flow_name(&self) -> &String {
&self.flow_name
}

pub fn sink_table_name(&self) -> &TableName {
&self.sink_table_name
}

pub fn raw_sql(&self) -> &String {
&self.raw_sql
}

pub fn expire_when(&self) -> &String {
&self.expire_when
}

pub fn comment(&self) -> &String {
&self.comment
}
}

pub type FlowInfoManagerRef = Arc<FlowInfoManager>;
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,9 @@ pub fn check_permission(
Statement::ShowCreateTable(stmt) => {
validate_param(&stmt.table_name, query_ctx)?;
}
Statement::ShowCreateFlow(stmt) => {
validate_param(&stmt.flow_name, query_ctx)?;
}
Statement::CreateExternalTable(stmt) => {
validate_param(&stmt.name, query_ctx)?;
}
Expand Down
37 changes: 37 additions & 0 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,43 @@ impl StatementExecutor {
self.show_create_table(table_name, table_ref, query_ctx)
.await
}
Statement::ShowCreateFlow(show) => {
let obj_name = &show.flow_name;
let (catalog_name, flow_name) = match &obj_name.0[..] {
[table] => (query_ctx.current_catalog().to_string(), table.value.clone()),
[catalog, table] => (catalog.value.clone(), table.value.clone()),
_ => {
return InvalidSqlSnafu {
err_msg: format!(
"expect flow name to be <schema>.<table> or <table>, actual: {obj_name}",
),
}
.fail()
}
};

let flow_name_val = self
.flow_metadata_manager
.flow_name_manager()
.get(&catalog_name, &flow_name)
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::FlowNotFoundSnafu {
flow_name: &flow_name,
})?;

let flow_val = self
.flow_metadata_manager
.flow_info_manager()
.get(flow_name_val.flow_id())
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::FlowNotFoundSnafu {
flow_name: &flow_name,
})?;

self.show_create_flow(obj_name, flow_val).await
}
Statement::SetVariables(set_var) => {
let var_name = set_var.variable.to_string().to_uppercase();
match var_name.as_str() {
Expand Down
11 changes: 11 additions & 0 deletions src/operator/src/statement/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_meta::key::flow::flow_info::FlowInfoValue;
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::tracing;
use sqlparser::ast::ObjectName;
use partition::manager::PartitionInfo;
use partition::partition::PartitionBound;
use session::context::QueryContextRef;
Expand Down Expand Up @@ -95,6 +97,15 @@ impl StatementExecutor {
.context(error::ExecuteStatementSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn show_create_flow(
&self,
obj_name: &ObjectName,
flow_val: FlowInfoValue,
) -> Result<Output> {
query::sql::show_create_flow(obj_name, flow_val).context(error::ExecuteStatementSnafu)
}

#[tracing::instrument(skip_all)]
pub fn show_variable(&self, stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
query::sql::show_variable(stmt, query_ctx).context(error::ExecuteStatementSnafu)
Expand Down
1 change: 1 addition & 0 deletions src/query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ regex.workspace = true
session.workspace = true
snafu.workspace = true
sql.workspace = true
sqlparser.workspace = true
store-api.workspace = true
substrait.workspace = true
table.workspace = true
Expand Down
34 changes: 34 additions & 0 deletions src/query/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use common_datasource::file_format::{infer_schemas, FileFormat, Format};
use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::build_backend;
use common_datasource::util::find_dir_and_filename;
use common_meta::key::flow::flow_info::FlowInfoValue;
use common_query::prelude::GREPTIME_TIMESTAMP;
use common_query::Output;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
Expand All @@ -40,6 +41,7 @@ use common_time::Timestamp;
use datafusion::common::ScalarValue;
use datafusion::prelude::SessionContext;
use datafusion_expr::{case, col, lit, Expr};
use sqlparser::ast::ObjectName;
use datatypes::prelude::*;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema, Schema};
use datatypes::vectors::StringVector;
Expand Down Expand Up @@ -134,6 +136,13 @@ static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
]))
});

static SHOW_CREATE_FLOW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
Arc::new(Schema::new(vec![
ColumnSchema::new("Flow", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("Create Flow", ConcreteDataType::string_datatype(), false),
]))
});

fn null() -> Expr {
lit(ScalarValue::Null)
}
Expand Down Expand Up @@ -590,6 +599,31 @@ pub fn show_create_table(
Ok(Output::new_with_record_batches(records))
}

pub fn show_create_flow(obj_name: &ObjectName, flow_val: FlowInfoValue) -> Result<Output> {
let mut sql = String::new();
sql.push_str(&format!(
"CREATE OR REPLACE TASK IF NOT EXISTS {} ",
&obj_name
));
sql.push_str(&format!("OUTPUT AS {} ", flow_val.sink_table_name()));
if !flow_val.expire_when().is_empty() {
sql.push_str(&format!("EXPIRE WHEN {} ", flow_val.expire_when()));
}
if !flow_val.comment().is_empty() {
sql.push_str(&format!("COMMENT '{}' ", flow_val.comment()));
}
sql.push_str(&format!("AS `{}`", flow_val.raw_sql()));

let columns = vec![
Arc::new(StringVector::from(vec![flow_val.flow_name().clone()])) as _,
Arc::new(StringVector::from(vec![sql])) as _,
];
let records = RecordBatches::try_from_columns(SHOW_CREATE_FLOW_OUTPUT_SCHEMA.clone(), columns)
.context(error::CreateRecordBatchSnafu)?;

Ok(Output::new_with_record_batches(records))
}

pub fn describe_table(table: TableRef) -> Result<Output> {
let table_info = table.table_info();
let columns_schemas = table_info.meta.schema.column_schemas();
Expand Down
4 changes: 4 additions & 0 deletions src/sql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ pub enum Error {
#[snafu(display("Invalid table name: {}", name))]
InvalidTableName { name: String },

#[snafu(display("Invalid flow name: {}", name))]
InvalidFlowName { name: String },

#[snafu(display("Invalid default constraint, column: {}", column))]
InvalidDefault {
column: String,
Expand Down Expand Up @@ -238,6 +241,7 @@ impl ErrorExt for Error {
| InvalidDatabaseOption { .. }
| ColumnTypeMismatch { .. }
| InvalidTableName { .. }
| InvalidFlowName { .. }
| InvalidSqlValue { .. }
| TimestampOverflow { .. }
| InvalidTableOption { .. }
Expand Down
27 changes: 25 additions & 2 deletions src/sql/src/parsers/show_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ use snafu::{ensure, ResultExt};
use sqlparser::keywords::Keyword;
use sqlparser::tokenizer::Token;

use crate::error::{self, InvalidDatabaseNameSnafu, InvalidTableNameSnafu, Result};
use crate::error::{
self, InvalidDatabaseNameSnafu, InvalidFlowNameSnafu, InvalidTableNameSnafu, Result,
};
use crate::parser::ParserContext;
use crate::statements::show::{
ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowTables, ShowVariables,
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowTables,
ShowVariables,
};
use crate::statements::statement::Statement;

Expand Down Expand Up @@ -61,6 +64,8 @@ impl<'a> ParserContext<'a> {
} else if self.consume_token("CREATE") {
if self.consume_token("TABLE") {
self.parse_show_create_table()
} else if self.consume_token("FLOW") {
self.parse_show_create_flow()
} else {
self.unsupported(self.peek_token_as_string())
}
Expand Down Expand Up @@ -106,6 +111,24 @@ impl<'a> ParserContext<'a> {
Ok(Statement::ShowCreateTable(ShowCreateTable { table_name }))
}

fn parse_show_create_flow(&mut self) -> Result<Statement> {
let raw_flow_name = self
.parse_object_name()
.with_context(|_| error::UnexpectedSnafu {
sql: self.sql,
expected: "a flow name",
actual: self.peek_token_as_string(),
})?;
let flow_name = Self::canonicalize_object_name(raw_flow_name);
ensure!(
!flow_name.0.is_empty(),
InvalidFlowNameSnafu {
name: flow_name.to_string(),
}
);
Ok(Statement::ShowCreateFlow(ShowCreateFlow { flow_name }))
}

fn parse_show_table_name(&mut self) -> Result<String> {
self.parser.next_token();
let table_name = self
Expand Down
42 changes: 42 additions & 0 deletions src/sql/src/statements/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,19 @@ impl Display for ShowCreateTable {
}
}

/// SQL structure for `SHOW CREATE FLOW`.
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
pub struct ShowCreateFlow {
pub flow_name: ObjectName,
}

impl Display for ShowCreateFlow {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let flow_name = &self.flow_name;
write!(f, r#"SHOW CREATE FLOW {flow_name}"#)
}
}

/// SQL structure for `SHOW VARIABLES xxx`.
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
pub struct ShowVariables {
Expand Down Expand Up @@ -231,6 +244,35 @@ mod tests {
.is_err());
}

#[test]
pub fn test_show_create_flow() {
let sql = "SHOW CREATE FLOW test";
let stmts: Vec<Statement> =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, stmts.len());
assert_matches!(&stmts[0], Statement::ShowCreateFlow { .. });
match &stmts[0] {
Statement::ShowCreateFlow(show) => {
let flow_name = show.flow_name.to_string();
assert_eq!(flow_name, "test");
}
_ => {
unreachable!();
}
}
}
#[test]
pub fn test_show_create_missing_flow() {
let sql = "SHOW CREATE FLOW";
assert!(ParserContext::create_with_dialect(
sql,
&GreptimeDbDialect {},
ParseOptions::default()
)
.is_err());
}

#[test]
fn test_display_show_variables() {
let sql = r"show variables v1;";
Expand Down
6 changes: 5 additions & 1 deletion src/sql/src/statements/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use crate::statements::insert::Insert;
use crate::statements::query::Query;
use crate::statements::set_variables::SetVariables;
use crate::statements::show::{
ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowTables, ShowVariables,
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowTables,
ShowVariables,
};
use crate::statements::tql::Tql;
use crate::statements::truncate::TruncateTable;
Expand Down Expand Up @@ -80,6 +81,8 @@ pub enum Statement {
ShowIndex(ShowIndex),
// SHOW CREATE TABLE
ShowCreateTable(ShowCreateTable),
// SHOW CREATE FLOW
ShowCreateFlow(ShowCreateFlow),
// DESCRIBE TABLE
DescribeTable(DescribeTable),
// EXPLAIN QUERY
Expand Down Expand Up @@ -115,6 +118,7 @@ impl Display for Statement {
Statement::ShowColumns(s) => s.fmt(f),
Statement::ShowIndex(s) => s.fmt(f),
Statement::ShowCreateTable(s) => s.fmt(f),
Statement::ShowCreateFlow(s) => s.fmt(f),
Statement::DescribeTable(s) => s.fmt(f),
Statement::Explain(s) => s.fmt(f),
Statement::Copy(s) => s.fmt(f),
Expand Down

0 comments on commit d7c53d4

Please sign in to comment.