Skip to content

Commit

Permalink
feat(frontend):support create table as select (#6798)
Browse files Browse the repository at this point in the history
add create-table-as and related test

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
ZENOTME and mergify[bot] authored Dec 9, 2022
1 parent bb2cbd9 commit 5d0ebdf
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 4 deletions.
95 changes: 95 additions & 0 deletions e2e_test/ddl/table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,98 @@ create table t (v1 int not null);

statement error
create table t (v1 varchar collate "en_US");

# Test create-table-as
statement ok
create table t (v1 int,v2 int);

statement ok
insert into t values (1,1);

statement ok
insert into t values (1,1);

statement ok
insert into t values (1,1);

statement ok
flush

statement ok
create table t1 as select * from t;

statement ok
flush;

query I
select * from t1;
----
1 1
1 1
1 1

statement ok
drop table t1;

statement ok
drop table t;

statement ok
create table t AS SELECT * FROM generate_series(0, 5,1) tbl(i);

statement ok
flush;

query I
select * from t order by i;
----
0
1
2
3
4
5

statement ok
drop table t;

statement ok
create table t (v1 int);

statement ok
insert into t values (1);

statement ok
insert into t values (2);

statement error
create table n1 as select sum(v1) from t;

statement error
create table n1(v1) as select sum(v1) from t;

statement ok
create table n1 as select sum(v1) as sum from t;

statement ok
flush;

query I
select * from n1;
----
3

statement error
create table n1 (v2 int);

statement error
create table n1 as select * from t;

statement ok
create table if not exists n1 (v2 int);

statement ok
drop table n1;

statement ok
drop table t;
18 changes: 18 additions & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,24 @@ pub(crate) fn gen_create_table_plan(
constraints: Vec<TableConstraint>,
) -> Result<(PlanRef, ProstSource, ProstTable)> {
let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns)?;
gen_create_table_plan_without_bind(
session,
context,
table_name,
column_descs,
pk_column_id_from_columns,
constraints,
)
}

pub(crate) fn gen_create_table_plan_without_bind(
session: &SessionImpl,
context: OptimizerContextRef,
table_name: ObjectName,
column_descs: Vec<ColumnDesc>,
pk_column_id_from_columns: Option<ColumnId>,
constraints: Vec<TableConstraint>,
) -> Result<(PlanRef, ProstSource, ProstTable)> {
let (columns, pk_column_ids, row_id_index) =
bind_sql_table_constraints(column_descs, pk_column_id_from_columns, constraints)?;
let row_id_index = row_id_index.map(|index| ProstColumnIndex { index: index as _ });
Expand Down
125 changes: 125 additions & 0 deletions src/frontend/src/handler/create_table_as.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{ColumnDesc, Field};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_sqlparser::ast::{ObjectName, Query, Statement};

use super::{HandlerArgs, RwPgResponse};
use crate::binder::{BoundSetExpr, BoundStatement};
use crate::handler::create_table::gen_create_table_plan_without_bind;
use crate::handler::query::handle_query;
use crate::{build_graph, Binder, OptimizerContext};

/// Used in `handle_create_as` to convert filed to column desc
fn convert_field_to_column_desc(field: &Field, column_id: i32) -> ColumnDesc {
let field_descs = field
.sub_fields
.iter()
.map(|field| convert_field_to_column_desc(field, 0))
.collect();

ColumnDesc {
data_type: field.data_type(),
name: field.name.clone(),
column_id: column_id.into(),
field_descs,
type_name: "".to_string(),
}
}

pub async fn handle_create_as(
handler_args: HandlerArgs,
table_name: ObjectName,
if_not_exists: bool,
query: Box<Query>,
) -> Result<RwPgResponse> {
let session = handler_args.session.clone();

if let Err(e) = session.check_relation_name_duplicated(table_name.clone()) {
if if_not_exists {
return Ok(PgResponse::empty_result_with_notice(
StatementType::CREATE_TABLE,
format!("relation \"{}\" already exists, skipping", table_name),
));
} else {
return Err(e);
}
}

// Generate catalog descs from query
let column_descs: Vec<_> = {
let mut binder = Binder::new(&session);
let bound = binder.bind(Statement::Query(query.clone()))?;
if let BoundStatement::Query(query) = bound {
// Check if all expressions have an alias
if let BoundSetExpr::Select(select) = &query.body {
if select.aliases.iter().any(Option::is_none) {
return Err(ErrorCode::BindError(
"An alias must be specified for an expression".to_string(),
)
.into());
}
}

// Create ColumnCatelog by Field
query
.schema()
.fields()
.iter()
.enumerate()
.map(|(column_id, field)| convert_field_to_column_desc(field, column_id as i32))
.collect()
} else {
unreachable!()
}
};

let (graph, source, table) = {
let context = OptimizerContext::new_with_handler_args(handler_args.clone());
let (plan, source, table) = gen_create_table_plan_without_bind(
&session,
context.into(),
table_name.clone(),
column_descs,
None,
vec![],
)?;
let graph = build_graph(plan);

(graph, source, table)
};

tracing::trace!(
"name={}, graph=\n{}",
table_name,
serde_json::to_string_pretty(&graph).unwrap()
);

let catalog_writer = session.env().catalog_writer();

// TODO(Yuanxin): `source` will contain either an external source or nothing. Rewrite
// `create_table` accordingly.
catalog_writer.create_table(source, table, graph).await?;

// Generate insert
let insert = Statement::Insert {
table_name,
columns: vec![],
source: query,
};

handle_query(handler_args, insert, false).await
}
6 changes: 4 additions & 2 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ mod create_schema;
pub mod create_sink;
pub mod create_source;
pub mod create_table;
pub mod create_table_as;
pub mod create_user;
mod create_view;
mod describe;
Expand Down Expand Up @@ -141,8 +142,9 @@ pub async fn handle(
)
.into());
}
if query.is_some() {
return Err(ErrorCode::NotImplemented("CREATE AS".to_string(), 6215.into()).into());
if let Some(query) = query {
return create_table_as::handle_create_as(handler_args, name, if_not_exists, query)
.await;
}
create_table::handle_create_table(
handler_args,
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ pub async fn distribute_execute(
}

#[expect(clippy::unused_async)]
async fn local_execute(
pub async fn local_execute(
session: Arc<SessionImpl>,
query: Query,
pinned_snapshot: HummockSnapshotGuard,
Expand All @@ -254,7 +254,7 @@ async fn local_execute(
Ok(execution.stream_rows())
}

async fn flush_for_write(session: &SessionImpl, stmt_type: StatementType) -> Result<()> {
pub async fn flush_for_write(session: &SessionImpl, stmt_type: StatementType) -> Result<()> {
match stmt_type {
StatementType::INSERT | StatementType::DELETE | StatementType::UPDATE => {
let client = session.env().meta_client();
Expand Down

0 comments on commit 5d0ebdf

Please sign in to comment.